pyspark.sql.PandasCogroupedOps.applyInPandas

PandasCogroupedOps.applyInPandas(func, schema)[source]

Applies a function to each cogroup using pandas and returns the result as a DataFrame.

The function should take two pandas.DataFrames and return another pandas.DataFrame. For each side of the cogroup, all columns are passed together as a pandas.DataFrame to the user-function and the returned pandas.DataFrame are combined as a DataFrame.

The schema should be a StructType describing the schema of the returned pandas.DataFrame. The column labels of the returned pandas.DataFrame must either match the field names in the defined schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. The length of the returned pandas.DataFrame can be arbitrary.

New in version 3.0.0.

Parameters
funcfunction

a Python native function that takes two pandas.DataFrames, and outputs a pandas.DataFrame, or that takes one tuple (grouping keys) and two pandas DataFrames, and outputs a pandas DataFrame.

schemapyspark.sql.types.DataType or str

the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

Notes

This function requires a full shuffle. All the data of a cogroup will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

If returning a new pandas.DataFrame constructed with a dictionary, it is recommended to explicitly index the columns by name to ensure the positions are correct, or alternatively use an OrderedDict. For example, pd.DataFrame({‘id’: ids, ‘a’: data}, columns=[‘id’, ‘a’]) or pd.DataFrame(OrderedDict([(‘id’, ids), (‘a’, data)])).

This API is experimental.

Examples

>>> from pyspark.sql.functions import pandas_udf
>>> df1 = spark.createDataFrame(
...     [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
...     ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
...     [(20000101, 1, "x"), (20000101, 2, "y")],
...     ("time", "id", "v2"))
>>> def asof_join(l, r):
...     return pd.merge_asof(l, r, on="time", by="id")
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
...     asof_join, schema="time int, id int, v1 double, v2 string"
... ).show()  
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

Alternatively, the user can define a function that takes three arguments. In this case, the grouping key(s) will be passed as the first argument and the data will be passed as the second and third arguments. The grouping key(s) will be passed as a tuple of numpy data types, e.g., numpy.int32 and numpy.float64. The data will still be passed in as two pandas.DataFrame containing all columns from the original Spark DataFrames.

>>> def asof_join(k, l, r):
...     if k == (1,):
...         return pd.merge_asof(l, r, on="time", by="id")
...     else:
...         return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
...     asof_join, "time int, id int, v1 double, v2 string").show()  
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
+--------+---+---+---+