write.stream {SparkR}R Documentation

Write the streaming SparkDataFrame to a data source.


The data source is specified by the source and a set of options (...). If source is not specified, the default data source configured by spark.sql.sources.default will be used.


write.stream(df, source = NULL, outputMode = NULL, ...)

## S4 method for signature 'SparkDataFrame'
  source = NULL,
  outputMode = NULL,
  partitionBy = NULL,
  trigger.processingTime = NULL,
  trigger.once = NULL,



a streaming SparkDataFrame.


a name for external data source.


one of 'append', 'complete', 'update'.


additional external data source specific named options.


a name or a list of names of columns to partition the output by on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme.


a processing time interval as a string, e.g. '5 seconds', '1 minute'. This is a trigger that runs a query periodically based on the processing time. If value is '0 seconds', the query will run as fast as possible, this is the default. Only one trigger can be set.


a logical, must be set to TRUE. This is a trigger that processes only one batch of data in a streaming query then terminates the query. Only one trigger can be set.


Additionally, outputMode specifies how data of a streaming SparkDataFrame is written to a output data source. There are three modes:


write.stream since 2.2.0


See Also


Other SparkDataFrame functions: SparkDataFrame-class, agg(), alias(), arrange(), as.data.frame(), attach,SparkDataFrame-method, broadcast(), cache(), checkpoint(), coalesce(), collect(), colnames(), coltypes(), createOrReplaceTempView(), crossJoin(), cube(), dapplyCollect(), dapply(), describe(), dim(), distinct(), dropDuplicates(), dropna(), drop(), dtypes(), exceptAll(), except(), explain(), filter(), first(), gapplyCollect(), gapply(), getNumPartitions(), group_by(), head(), hint(), histogram(), insertInto(), intersectAll(), intersect(), isLocal(), isStreaming(), join(), limit(), localCheckpoint(), merge(), mutate(), ncol(), nrow(), persist(), printSchema(), randomSplit(), rbind(), rename(), repartitionByRange(), repartition(), rollup(), sample(), saveAsTable(), schema(), selectExpr(), select(), showDF(), show(), storageLevel(), str(), subset(), summary(), take(), toJSON(), unionAll(), unionByName(), union(), unpersist(), withColumn(), withWatermark(), with(), write.df(), write.jdbc(), write.json(), write.orc(), write.parquet(), write.text()


## Not run: 
##D sparkR.session()
##D df <- read.stream("socket", host = "localhost", port = 9999)
##D isStreaming(df)
##D wordCounts <- count(group_by(df, "value"))
##D # console
##D q <- write.stream(wordCounts, "console", outputMode = "complete")
##D # text stream
##D q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp",
##D                   partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
##D # memory stream
##D q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
##D head(sql("SELECT * from outs"))
##D queryName(q)
##D stopQuery(q)
## End(Not run)

[Package SparkR version 3.2.4 Index]