Read rows based on multiple consecutive streams of time

I have a large time series data file that looks like this. The data set covers years in increments of 15 minutes. A small subset is as follows:

uniqueid     time
a            2014-04-30 23:30:00 
a            2014-04-30 23:45:00
a            2014-05-01 00:00:00
a            2014-05-01 00:15:00
a            2014-05-12 13:45:00
a            2014-05-12 14:00:00
b            2014-05-12 13:45:00
b            2014-05-12 14:00:00
b            2014-05-12 14:30:00

To play above:

time<-c("2014-04-30 23:30:00","2014-04-30 23:45:00","2014-05-01 00:00:00","2014-05-01 00:15:00",
    "2014-05-12 13:45:00","2014-05-12 14:00:00","2014-05-12 13:45:00","2014-05-12 14:00:00",
    "2014-05-12 14:30:00")

uniqueid<-c("a","a","a","a","a","a","b","b","b")
mydf<-data.frame(uniqueid,time)

- . - 15 (, A, 30.04.14 23.30 01.05.14 00.15 hrs - , 4 ), 15 - ( 01.05.14 00:15 01.05.14 00:30, , ), , . POSIX.

; , . (, , ), , (, , ).

- :

uniqueid    flow     number_rows
a           1        4
a           2        2
b           3        2
b           4        1

(, lubridate), , R, , .

, , , . !

+4
4

data.table data.table :

library(data.table)
res<-setDT(mydf)[, list(number_rows=.N,flow=.GRP),
                 by=.(uniqueid,cumsum(as.numeric(difftime(time,shift(time,1L,type="lag",fill=0))) - 15))][,cumsum:=NULL]
print(res)

   uniqueid number_rows flow
1:        a           4    1
2:        a           2    2
3:        b           2    3
4:        b           1    4

, , , , :

time<-as.POSIXct(c("2014-04-30 23:30:00","2014-04-30 23:45:00","2014-05-01 00:00:00","2014-05-01 00:15:00",
        "2014-05-12 13:45:00","2014-05-12 14:00:00","2014-05-12 13:45:00","2014-05-12 14:00:00",
        "2014-05-12 14:30:00"))


uniqueid<-c("a","a","a","a","a","a","b","b","b")
mydf<-data.frame(uniqueid,time)
+4

uniqueid , 15 min, id, , :

, 15 uniqueid, , TRUE cumsum, flow id :

library(dplyr)
mydf$time <- as.POSIXct(mydf$time, "%Y-%m-%d %H:%M:%S")
# convert the time column to POSIXct class so that we can apply the diff function correctly
mydf %>% group_by(uniqueid, flow = 1 + cumsum(c(F, diff(time) != 15))) %>% 
         summarize(num_rows = n())

# Source: local data frame [4 x 3]
# Groups: uniqueid [?]
# 
#   uniqueid  flow num_rows
#     <fctr> <dbl>    <int>
# 1        a     1        4
# 2        a     2        2
# 3        b     3        2
# 4        b     4        1
+2

R . , DT, dplyr.

# estimated size of data, years x days x hours x 15mins x uniqueids
5*365*24*4*1000 # = approx 180M

# make data with posixct and characters of 180M rows, mydf is approx 2.5GB in memory
time<-rep(as.POSIXct(c("2014-04-30 23:30:00","2014-04-30 23:45:00","2014-05-01 00:00:00","2014-05-01 00:15:00",
        "2014-05-12 13:45:00","2014-05-12 14:00:00","2014-05-12 13:45:00","2014-05-12 14:00:00",
        "2014-05-12 14:30:00")),times = 20000000)

uniqueid<-rep(as.character(c("a","a","a","a","a","a","b","b","b")),times = 20000000)

mydf<-data.frame(uniqueid,time = time)
rm(time,uniqueid);gc()

R:

# assumes that uniqueid are in groups and in order, and there won't be a followed by b that have the 15 minute "flow"
starttime <- Sys.time()

# find failed flows
mydf$diff <- c(0,diff(mydf$time))
mydf$flowstop <- mydf$diff != 15

# give each flow an id
mydf$flowid <- cumsum(mydf$flowstop)

# clean up vars
mydf$time <- mydf$diff <- mydf$flowstop <- NULL

# find flow length
mydfrle <- rle(mydf$flowid)

# get uniqueid/flowid pairs (unique() is too slow)
mydf <- mydf[!duplicated(mydf$flowid), ]

# append rle and remove separate var
mydf$number_rows <- mydfrle$lengths
rm(mydfrle)

print(Sys.time()-starttime)
# Time difference of 30.39437 secs

data.table:

library(data.table)
starttime <- Sys.time()
res<-setDT(mydf)[, list(number_rows=.N,flow=.GRP),
                 by=.(uniqueid,cumsum(as.numeric(difftime(time,shift(time,1L,type="lag",fill=0))) - 15))][,cumsum:=NULL]
print(Sys.time()-starttime)
# Time difference of 57.08156 secs

dplyr:

library(dplyr)
# convert the time column to POSIXct class so that we can apply the diff function correctly
starttime <- Sys.time()
mydf %>% group_by(uniqueid, flow = 1 + cumsum(c(F, diff(time) != 15))) %>% 
  summarize(num_rows = n())
print(Sys.time()-starttime)
# too long, did not finish after a few minutes

, , , , . order() .

, . , . Base R.

+2
source

Having both the ordered columns "id" and "time", we could create a single group to work by creating a logical vector of indices wherever either "id" or "time" changes → 15 minutes.

WITH

id = as.character(mydf$uniqueid)
tm = mydf$time

find where "id":

id_gr = c(TRUE, id[-1] != id[-length(id)])

and time":

tm_gr = c(0, difftime(tm[-1], tm[-length(tm)], unit = "mins")) > 15

change and merge them into:

gr = id_gr | tm_gr

where either "id" or "time"> 15 is displayed. And to get the result:

tab = tabulate(cumsum(gr))  ## basically, the only operation per group -- 'n by group'
data.frame(id = id[gr], flow = seq_along(tab), n = tab)
#  id flow n
#1  a    1 4
#2  a    2 2
#3  b    3 2
#4  b    4 1

On a larger scale:

set.seed(1821); nid = 1e4         
dat = replicate(nid, as.POSIXct("2016-07-07 12:00:00 EEST") + 
                     cumsum(sample(c(1, 5, 10, 15, 20, 30, 45, 60, 90, 120, 150, 200, 250, 300), sample(5e2:1e3, 1), TRUE)*60),
                simplify = FALSE)
names(dat) = make.unique(rep_len(letters, nid))
dat = data.frame(id = rep(names(dat), lengths(dat)), time = do.call(c, dat))

system.time({
    id = as.character(dat$id); tm = dat$time
    id_gr = c(TRUE, id[-1] != id[-length(id)])
    tm_gr = c(0, difftime(tm[-1], tm[-length(tm)], unit = "mins")) > 15
    gr = id_gr | tm_gr
    tab = tabulate(cumsum(gr))
    ans1 = data.frame(id = id[gr], flow = seq_along(tab), n = tab)
})
# user  system elapsed 
#  1.44    0.19    1.66

For comparison, MikeyMike answer:

library(data.table)
dat2 = copy(dat)
system.time({
    ans2 = setDT(dat2)[, list(flow = .GRP, n = .N),
                by = .(id, cumsum(as.numeric(difftime(time, 
                                      shift(time, 1L, type = "lag", fill = 0), 
      unit = "mins")) > 15))][, cumsum := NULL]    
})
# user  system elapsed 
# 3.95    0.22    4.26

identical(as.data.table(ans1), ans2)
#[1] TRUE
+2
source

All Articles