pyspark.sql.GroupedData.applyInPandasWithState

GroupedData.applyInPandasWithState(func: PandasGroupedMapFunctionWithState, outputStructType: Union[pyspark.sql.types.StructType, str], stateStructType: Union[pyspark.sql.types.StructType, str], outputMode: str, timeoutConf: str) → pyspark.sql.dataframe.DataFrame

Applies the given function to each group of data, while maintaining a user-defined per-group state. The result Dataset will represent the flattened record returned by the function.

For a streaming DataFrame, the function will be invoked first for all input groups and then for all timed out states where the input data is set to be empty. Updates to each group’s state will be saved across invocations.

The function should take parameters (key, Iterator[pandas.DataFrame], state) and return another Iterator[pandas.DataFrame]. The grouping key(s) will be passed as a tuple of numpy data types, e.g., numpy.int32 and numpy.float64. The state will be passed as pyspark.sql.streaming.state.GroupState.

For each group, all columns are passed together as pandas.DataFrame to the user-function, and the returned pandas.DataFrame across all invocations are combined as a DataFrame. Note that the user function should not make a guess of the number of elements in the iterator. To process all data, the user function needs to iterate all elements and process them. On the other hand, the user function is not strictly required to iterate through all elements in the iterator if it intends to read a part of data.

The outputStructType should be a StructType describing the schema of all elements in the returned value, pandas.DataFrame. The column labels of all elements in returned pandas.DataFrame must either match the field names in the defined schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices.

The stateStructType should be StructType describing the schema of the user-defined state. The value of the state will be presented as a tuple, as well as the update should be performed with the tuple. The corresponding Python types for :class:DataType are supported. Please refer to the page https://spark.apache.org/docs/latest/sql-ref-datatypes.html (Python tab).

The size of each pandas.DataFrame in both the input and output can be arbitrary. The number of pandas.DataFrame in both the input and output can also be arbitrary.

New in version 3.4.0.

Changed in version 3.5.0: Supports Spark Connect.

Parameters
funcfunction

a Python native function to be called on every group. It should take parameters (key, Iterator[pandas.DataFrame], state) and return Iterator[pandas.DataFrame]. Note that the type of the key is tuple and the type of the state is pyspark.sql.streaming.state.GroupState.

outputStructTypepyspark.sql.types.DataType or str

the type of the output records. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

stateStructTypepyspark.sql.types.DataType or str

the type of the user-defined state. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

outputModestr

the output mode of the function.

timeoutConfstr

timeout configuration for groups that do not receive data for a while. valid values are defined in pyspark.sql.streaming.state.GroupStateTimeout.

Notes

This function requires a full shuffle.

This API is experimental.

Examples

>>> import pandas as pd  
>>> from pyspark.sql.streaming.state import GroupStateTimeout
>>> def count_fn(key, pdf_iter, state):
...     assert isinstance(state, GroupStateImpl)
...     total_len = 0
...     for pdf in pdf_iter:
...         total_len += len(pdf)
...     state.update((total_len,))
...     yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]})
...
>>> df.groupby("id").applyInPandasWithState(
...     count_fn, outputStructType="id long, countAsString string",
...     stateStructType="len long", outputMode="Update",
...     timeoutConf=GroupStateTimeout.NoTimeout)