write.stream {SparkR}R Documentation

Write the streaming SparkDataFrame to a data source.

Description

The data source is specified by the source and a set of options (...). If source is not specified, the default data source configured by spark.sql.sources.default will be used.

Usage

write.stream(df, source = NULL, outputMode = NULL, ...)

## S4 method for signature 'SparkDataFrame'
write.stream(
  df,
  source = NULL,
  outputMode = NULL,
  partitionBy = NULL,
  trigger.processingTime = NULL,
  trigger.once = NULL,
  ...
)

Arguments

df

a streaming SparkDataFrame.

source

a name for external data source.

outputMode

one of 'append', 'complete', 'update'.

...

additional external data source specific named options.

partitionBy

a name or a list of names of columns to partition the output by on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme.

trigger.processingTime

a processing time interval as a string, e.g. '5 seconds', '1 minute'. This is a trigger that runs a query periodically based on the processing time. If value is '0 seconds', the query will run as fast as possible, this is the default. Only one trigger can be set.

trigger.once

a logical, must be set to TRUE. This is a trigger that processes only one batch of data in a streaming query then terminates the query. Only one trigger can be set.

Details

Additionally, outputMode specifies how data of a streaming SparkDataFrame is written to a output data source. There are three modes:

Note

write.stream since 2.2.0

experimental

See Also

read.stream

Other SparkDataFrame functions: SparkDataFrame-class, agg(), alias(), arrange(), as.data.frame(), attach,SparkDataFrame-method, broadcast(), cache(), checkpoint(), coalesce(), collect(), colnames(), coltypes(), createOrReplaceTempView(), crossJoin(), cube(), dapplyCollect(), dapply(), describe(), dim(), distinct(), dropDuplicates(), dropna(), drop(), dtypes(), exceptAll(), except(), explain(), filter(), first(), gapplyCollect(), gapply(), getNumPartitions(), group_by(), head(), hint(), histogram(), insertInto(), intersectAll(), intersect(), isLocal(), isStreaming(), join(), limit(), localCheckpoint(), merge(), mutate(), ncol(), nrow(), persist(), printSchema(), randomSplit(), rbind(), rename(), repartitionByRange(), repartition(), rollup(), sample(), saveAsTable(), schema(), selectExpr(), select(), showDF(), show(), storageLevel(), str(), subset(), summary(), take(), toJSON(), unionAll(), unionByName(), union(), unpersist(), withColumn(), withWatermark(), with(), write.df(), write.jdbc(), write.json(), write.orc(), write.parquet(), write.text()

Examples



## Not run: 
##D sparkR.session()
##D df <- read.stream("socket", host = "localhost", port = 9999)
##D isStreaming(df)
##D wordCounts <- count(group_by(df, "value"))
##D 
##D # console
##D q <- write.stream(wordCounts, "console", outputMode = "complete")
##D # text stream
##D q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp",
##D                   partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
##D # memory stream
##D q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
##D head(sql("SELECT * from outs"))
##D queryName(q)
##D 
##D stopQuery(q)
## End(Not run)




[Package SparkR version 3.2.4 Index]