withWatermark {SparkR}R Documentation



Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in time before which we assume no more late data is going to arrive.


withWatermark(x, eventTime, delayThreshold)

## S4 method for signature 'SparkDataFrame,character,character'
withWatermark(x, eventTime,



a streaming SparkDataFrame


a string specifying the name of the Column that contains the event time of the row.


a string specifying the minimum delay to wait to data to arrive late, relative to the latest record that has been processed in the form of an interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.


Spark will use this watermark for several purposes:

The current watermark is computed by looking at the MAX(eventTime) seen across all of the partitions in the query minus a user specified delayThreshold. Due to the cost of coordinating this value across partitions, the actual watermark used is only guaranteed to be at least delayThreshold behind the actual event time. In some cases we may still process records that arrive more than delayThreshold late.


a SparkDataFrame.


withWatermark since 2.3.0

See Also

Other SparkDataFrame functions: SparkDataFrame-class, agg, alias, arrange, as.data.frame, attach,SparkDataFrame-method, broadcast, cache, checkpoint, coalesce, collect, colnames, coltypes, createOrReplaceTempView, crossJoin, cube, dapplyCollect, dapply, describe, dim, distinct, dropDuplicates, dropna, drop, dtypes, except, explain, filter, first, gapplyCollect, gapply, getNumPartitions, group_by, head, hint, histogram, insertInto, intersect, isLocal, isStreaming, join, limit, localCheckpoint, merge, mutate, ncol, nrow, persist, printSchema, randomSplit, rbind, registerTempTable, rename, repartition, rollup, sample, saveAsTable, schema, selectExpr, select, showDF, show, storageLevel, str, subset, summary, take, toJSON, unionByName, union, unpersist, withColumn, with, write.df, write.jdbc, write.json, write.orc, write.parquet, write.stream, write.text


## Not run: 
##D sparkR.session()
##D schema <- structType(structField("time", "timestamp"), structField("value", "double"))
##D df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
##D df <- withWatermark(df, "time", "10 minutes")
## End(Not run)

[Package SparkR version 2.3.0 Index]