pyspark.sql.streaming.DataStreamWriter.foreachBatch

DataStreamWriter.foreachBatch(func: Callable[[DataFrame, int], None]) → DataStreamWriter[source]

Sets the output of the streaming query to be processed using the provided function. This is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous). In every micro-batch, the provided function will be called in every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier. The batchId can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query).

New in version 2.4.0.

Changed in version 3.5.0: Supports Spark Connect.

Notes

This API is evolving. This function behaves differently in Spark Connect mode. See examples. In Connect, the provided function doesn’t have access to variables defined outside of it.

Examples

>>> import time
>>> df = spark.readStream.format("rate").load()
>>> my_value = -1
>>> def func(batch_df, batch_id):
...     global my_value
...     my_value = 100
...     batch_df.collect()
...
>>> q = df.writeStream.foreachBatch(func).start()
>>> time.sleep(3)
>>> q.stop()
>>> # if in Spark Connect, my_value = -1, else my_value = 100