pyspark.pandas.DataFrame.pandas_on_spark.apply_batch

pandas_on_spark.apply_batch(func: Callable[[…], pandas.core.frame.DataFrame], args: Tuple = (), **kwds: Any) → DataFrame

Apply a function that takes pandas DataFrame and outputs pandas DataFrame. The pandas DataFrame given to the function is of a batch used internally.

See also Transform and apply a function.

Note

the func is unable to access to the whole input frame. pandas-on-Spark internally splits the input series into multiple batches and calls func with each batch multiple times. Therefore, operations such as global aggregations are impossible. See the example below.

>>> # This case does not return the length of whole frame but of the batch internally
... # used.
... def length(pdf) -> ps.DataFrame[int]:
...     return pd.DataFrame([len(pdf)])
...
>>> df = ps.DataFrame({'A': range(1000)})
>>> df.pandas_on_spark.apply_batch(length)  
    c0
0   83
1   83
2   83
...
10  83
11  83

Note

this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting.

To avoid this, specify return type in func, for instance, as below:

>>> def plus_one(x) -> ps.DataFrame[float, float]:
...     return x + 1

If the return type is specified, the output column names become c0, c1, c2 … cn. These names are positionally mapped to the returned DataFrame in func.

To specify the column names, you can assign them in a pandas friendly style as below:

>>> def plus_one(x) -> ps.DataFrame["a": float, "b": float]:
...     return x + 1
>>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 4, 5]})
>>> def plus_one(x) -> ps.DataFrame[zip(pdf.dtypes, pdf.columns)]:
...     return x + 1

When the given function has the return type annotated, the original index of the DataFrame will be lost and a default index will be attached to the result DataFrame. Please be careful about configuring the default index. See also Default Index Type.

Parameters
funcfunction

Function to apply to each pandas frame.

argstuple

Positional arguments to pass to func in addition to the array/series.

**kwds

Additional keyword arguments to pass as keywords arguments to func.

Returns
DataFrame

See also

DataFrame.apply

For row/columnwise operations.

DataFrame.applymap

For elementwise operations.

DataFrame.aggregate

Only perform aggregating type operations.

DataFrame.transform

Only perform transforming type operations.

Series.pandas_on_spark.transform_batch

transform the search as each pandas chunks.

Examples

>>> df = ps.DataFrame([(1, 2), (3, 4), (5, 6)], columns=['A', 'B'])
>>> df
   A  B
0  1  2
1  3  4
2  5  6
>>> def query_func(pdf) -> ps.DataFrame[int, int]:
...     return pdf.query('A == 1')
>>> df.pandas_on_spark.apply_batch(query_func)
   c0  c1
0   1   2
>>> def query_func(pdf) -> ps.DataFrame["A": int, "B": int]:
...     return pdf.query('A == 1')
>>> df.pandas_on_spark.apply_batch(query_func)
   A  B
0  1  2

You can also omit the type hints so pandas-on-Spark infers the return schema as below:

>>> df.pandas_on_spark.apply_batch(lambda pdf: pdf.query('A == 1'))
   A  B
0  1  2

You can also specify extra arguments.

>>> def calculation(pdf, y, z) -> ps.DataFrame[int, int]:
...     return pdf ** y + z
>>> df.pandas_on_spark.apply_batch(calculation, args=(10,), z=20)
        c0        c1
0       21      1044
1    59069   1048596
2  9765645  60466196

You can also use np.ufunc and built-in functions as input.

>>> df.pandas_on_spark.apply_batch(np.add, args=(10,))
    A   B
0  11  12
1  13  14
2  15  16
>>> (df * -1).pandas_on_spark.apply_batch(abs)
   A  B
0  1  2
1  3  4
2  5  6