PySpark Usage Guide for Pandas with Apache Arrow

Apache Arrow in PySpark

Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.

Ensure PyArrow Installed

If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql]. Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. The current supported version is 0.12.1. You can install using pip or conda from the conda-forge channel. See PyArrow installation for details.

Enabling for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call toPandas() and when creating a Spark DataFrame from a Pandas DataFrame with createDataFrame(pandas_df). To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.pyspark.enabled to true. This is disabled by default.

In addition, optimizations enabled by spark.sql.execution.arrow.pyspark.enabled could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled.

import numpy as np import pandas as pd

# Enable Arrow-based columnar data transfers spark.conf.set(“spark.sql.execution.arrow.pyspark.enabled”, “true”)

# Generate a Pandas DataFrame pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow result_pdf = df.select(“*”).toPandas()

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow, toPandas() results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data. Not all Spark data types are currently supported and an error can be raised if a column has an unsupported type, see Supported SQL Types. If an error occurs during createDataFrame(), Spark will fall back to create the DataFrame without Arrow.

Pandas UDFs (a.k.a. Vectorized UDFs)

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data. A Pandas UDF is defined using the keyword pandas_udf as a decorator or to wrap the function, no additional configuration is required. Currently, there are two types of Pandas UDF: Scalar and Grouped Map.

Scalar

Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such as select and withColumn. The Python function should take pandas.Series as inputs and return a pandas.Series of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns.

import pandas as pd

from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType

# Declare the function and create the UDF def multiply_func(a, b): return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data x = pd.Series([1, 2, 3]) print(multiply_func(x, x)) # 0 1

1 4

2 9

dtype: int64

</span> # Create a Spark DataFrame, ‘spark’ is an existing SparkSession df = spark.createDataFrame(pd.DataFrame(x, columns=[“x”]))

# Execute function as a Spark vectorized UDF df.select(multiply(col(“x”), col(“x”))).show() # +——————-+

|multiply_func(x, x)|

+——————-+

| 1|

| 4|

| 9|

+——————-+

</span>

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

Scalar Iterator

Scalar iterator (SCALAR_ITER) Pandas UDF is the same as scalar Pandas UDF above except that the underlying Python function takes an iterator of batches as input instead of a single batch and, instead of returning a single output batch, it yields output batches or returns an iterator of output batches. It is useful when the UDF execution requires initializing some states, e.g., loading an machine learning model file to apply inference to every input batch.

The following example shows how to create scalar iterator Pandas UDFs:

import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType

pdf = pd.DataFrame([1, 2, 3], columns=[“x”]) df = spark.createDataFrame(pdf)

# When the UDF is called with a single column that is not StructType,

the input to the underlying function is an iterator of pd.Series.

</span>@pandas_udf(“long”, PandasUDFType.SCALAR_ITER) def plus_one(batch_iter): for x in batch_iter: yield x + 1

df.select(plus_one(col(“x”))).show() # +———–+

|plus_one(x)|

+———–+

| 2|

| 3|

| 4|

+———–+

</span> # When the UDF is called with more than one columns,

the input to the underlying function is an iterator of pd.Series tuple.

</span>@pandas_udf(“long”, PandasUDFType.SCALAR_ITER) def multiply_two_cols(batch_iter): for a, b in batch_iter: yield a * b

df.select(multiply_two_cols(col(“x”), col(“x”))).show() # +———————–+

|multiply_two_cols(x, x)|

+———————–+

| 1|

| 4|

| 9|

+———————–+

</span> # When the UDF is called with a single column that is StructType,

the input to the underlying function is an iterator of pd.DataFrame.

</span>@pandas_udf(“long”, PandasUDFType.SCALAR_ITER) def multiply_two_nested_cols(pdf_iter): for pdf in pdf_iter: yield pdf[“a”] * pdf[“b”]

df.select( multiply_two_nested_cols( struct(col(“x”).alias(“a”), col(“x”).alias(“b”)) ).alias(“y”) ).show() # +—+

| y|

+—+

| 1|

| 4|

| 9|

+—+

</span> # In the UDF, you can initialize some states before processing batches.

Wrap your code with try/finally or use context managers to ensure

the release of resources at the end.

</span>y_bc = spark.sparkContext.broadcast(1)

@pandas_udf(“long”, PandasUDFType.SCALAR_ITER) def plus_y(batch_iter): y = y_bc.value # initialize states try: for x in batch_iter: yield x + y finally: pass # release resources here, if any df.select(plus_y(col(“x”))).show() # +———+

|plus_y(x)|

+———+

| 2|

| 3|

| 4|

+———+

</span>

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

Grouped Map

Grouped map Pandas UDFs are used with groupBy().apply() which implements the “split-apply-combine” pattern. Split-apply-combine consists of three steps:

To use groupBy().apply(), the user needs to define the following:

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a pandas.DataFrame.

Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.

The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], (“id”, “v”))

@pandas_udf(“id long, v double”, PandasUDFType.GROUPED_MAP) def subtract_mean(pdf): # pdf is a pandas.DataFrame v = pdf.v return pdf.assign(v=v - v.mean())

df.groupby(“id”).apply(subtract_mean).show() # +—+—-+

| id| v|

+—+—-+

| 1|-0.5|

| 1| 0.5|

| 2|-3.0|

| 2|-1.0|

| 2| 4.0|

+—+—-+

</span>

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see pyspark.sql.functions.pandas_udf and pyspark.sql.GroupedData.apply.

Grouped Aggregate

Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with groupBy().agg() and pyspark.sql.Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window.

Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas UDFs currently.

The following example shows how to use this type of UDF to compute mean with groupBy and window operations:

from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql import Window

df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], (“id”, “v”))

@pandas_udf(“double”, PandasUDFType.GROUPED_AGG) def mean_udf(v): return v.mean()

df.groupby(“id”).agg(mean_udf(df[‘v’])).show() # +—+———–+

| id|mean_udf(v)|

+—+———–+

| 1| 1.5|

| 2| 6.0|

+—+———–+

</span> w = Window \ .partitionBy(‘id’) \ .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df.withColumn(‘mean_v’, mean_udf(df[‘v’]).over(w)).show() # +—+—-+——+

| id| v|mean_v|

+—+—-+——+

| 1| 1.0| 1.5|

| 1| 2.0| 1.5|

| 2| 3.0| 6.0|

| 2| 5.0| 6.0|

| 2|10.0| 6.0|

+—+—-+——+

</span>

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see pyspark.sql.functions.pandas_udf

Map Iterator

Map iterator Pandas UDFs are used to transform data with an iterator of batches. Map iterator Pandas UDFs can be used with pyspark.sql.DataFrame.mapInPandas. It defines a map function that transforms an iterator of pandas.DataFrame to another.

It can return the output of arbitrary length in contrast to the scalar Pandas UDF. It maps an iterator of pandas.DataFrames, that represents the current DataFrame, using the map iterator UDF and returns the result as a DataFrame.

The following example shows how to create map iterator Pandas UDFs:

import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], (“id”, “age”))

@pandas_udf(df.schema, PandasUDFType.MAP_ITER) def filter_func(batch_iter): for pdf in batch_iter: yield pdf[pdf.id == 1]

df.mapInPandas(filter_func).show() # +—+—+

| id|age|

+—+—+

| 1| 21|

+—+—+

</span>

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see pyspark.sql.functions.pandas_udf and pyspark.sql.DataFrame.mapsInPandas.

Cogrouped Map

Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to each cogroup. They are used with groupBy().cogroup().apply() which consists of the following steps:

To use groupBy().cogroup().apply(), the user needs to define the following:

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a pandas.DataFrame.

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.

The following example shows how to use groupby().cogroup().apply() to perform an asof join between two datasets.

import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], (“time”, “id”, “v1”))

df2 = spark.createDataFrame( [(20000101, 1, “x”), (20000101, 2, “y”)], (“time”, “id”, “v2”))

@pandas_udf(“time int, id int, v1 double, v2 string”, PandasUDFType.COGROUPED_MAP) def asof_join(l, r): return pd.merge_asof(l, r, on=“time”, by=“id”)

df1.groupby(“id”).cogroup(df2.groupby(“id”)).apply(asof_join).show() # +——–+—+—+—+

| time| id| v1| v2|

+——–+—+—+—+

|20000101| 1|1.0| x|

|20000102| 1|3.0| x|

|20000101| 2|2.0| y|

|20000102| 2|4.0| y|

+——–+—+—+—+

</span>

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see pyspark.sql.functions.pandas_udf and pyspark.sql.CoGroupedData.apply.

Usage Notes

Supported SQL Types

Currently, all Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType.

Setting Arrow Batch Size

Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf “spark.sql.execution.arrow.maxRecordsPerBatch” to an integer that will determine the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.

Timestamp with Time Zone Semantics

Spark internally stores timestamps as UTC values, and timestamp data that is brought in without a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp data is exported or displayed in Spark, the session time zone is used to localize the timestamp values. The session time zone is set with the configuration ‘spark.sql.session.timeZone’ and will default to the JVM system local time zone if not set. Pandas uses a datetime64 type with nanosecond resolution, datetime64[ns], with optional time zone on a per-column basis.

When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds and each column will be converted to the Spark session time zone then localized to that time zone, which removes the time zone and displays values as local time. This will occur when calling toPandas() or pandas_udf with timestamp columns.

When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This occurs when calling createDataFrame with a Pandas DataFrame or when returning a timestamp from a pandas_udf. These conversions are done automatically to ensure Spark will have data in the expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond values will be truncated.

Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is different than a Pandas timestamp. It is recommended to use Pandas time series functionality when working with timestamps in pandas_udfs to get the best performance, see here for details.

Compatibiliy Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x

Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following can be added to conf/spark-env.sh to use the legacy Arrow IPC format:

ARROW_PRE_0_15_IPC_FORMAT=1

This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as described in SPARK-29367 when running pandas_udfs or toPandas() with Arrow enabled. More information about the Arrow IPC change can be read on the Arrow 0.15.0 release blog.