Apply lag function to columns of a Spark Streaming DataFrame

R/stream_operations.R

stream_lag

Description

Given a streaming Spark dataframe as input, this function will return another streaming dataframe that contains all columns in the input and column(s) that are shifted behind by the offset(s) specified in ... (see example)

Usage

stream_lag(x, cols, thresholds = NULL) 

Arguments

Arguments Description
x An object coercable to a Spark Streaming DataFrame.
cols A list of expressions for a single or multiple variables to create that will contain the value of a previous entry.
thresholds Optional named list of timestamp column(s) and corresponding time duration(s) for deterimining whether a previous record is sufficiently recent relative to the current record. If the any of the time difference(s) between the current and a previous record is greater than the maximal duration allowed, then the previous record is discarded and will not be part of the query result. The durations can be specified with numeric types (which will be interpreted as max difference allowed in number of milliseconds between 2 UNIX timestamps) or time duration strings such as “5s”, “5sec”, “5min”, “5hour”, etc. Any timestamp column in x that is not of timestamp of date Spark SQL types will be interepreted as number of milliseconds since the UNIX epoch.

Examples

library(sparklyr) 
sc <- spark_connect(master = "local", version = "2.2.0") 
streaming_path <- tempfile("days_df_") 
days_df <- tibble::tibble( 
  today = weekdays(as.Date(seq(7), origin = "1970-01-01")) 
) 
num_iters <- 7 
stream_generate_test( 
  df = days_df, 
  path = streaming_path, 
  distribution = rep(nrow(days_df), num_iters), 
  iterations = num_iters 
) 
stream_read_csv(sc, streaming_path) %>% 
  stream_lag(cols = c(yesterday = today ~ 1, two_days_ago = today ~ 2)) %>% 
  collect() %>% 
  print(n = 10L)