pyspark.sql.functions.udf#

pyspark.sql.functions.udf(f=None, returnType=StringType(), *, useArrow=None)[source]#

Creates a user defined function (UDF).

New in version 1.3.0.

Changed in version 3.4.0: Supports Spark Connect.

Changed in version 4.0.0: Supports keyword-arguments.

Parameters
ffunction, optional

python function if used as a standalone function

Changed in version 4.1.0: Supports vectorized function by specifiying the type hints.

returnTypepyspark.sql.types.DataType or str, optional

the return type of the user-defined function. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string. Defaults to StringType.

useArrowbool, optional

whether to use Arrow to optimize the (de)serialization. When it is None, the Spark config “spark.sql.execution.pythonUDF.arrow.enabled” takes effect.

Notes

The user-defined functions are considered deterministic by default. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. If your function is not deterministic, call asNondeterministic on the user defined function. E.g.:

>>> from pyspark.sql.types import IntegerType
>>> import random
>>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

The user-defined functions do not support conditional expressions or short circuiting in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions.

The user-defined functions do not take keyword arguments on the calling side.

Examples

>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> @udf
... def to_upper(s):
...     if s is not None:
...         return s.upper()
...
>>> @udf(returnType=IntegerType())
... def add_one(x):
...     if x is not None:
...         return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+

UDF can use keyword arguments:

>>> @udf(returnType=IntegerType())
... def calc(a, b):
...     return a + 10 * b
...
>>> spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
|                            0|
|                          101|
+-----------------------------+

Support vectorized function by specifiying the type hints.

To define a vectorized function, the function should meet following requirements:

1, have at least 1 argument. 0-arg is not supported;

2, the type hints should match one of the patterns of pandas UDFs and arrow UDFs;

3, argument useArrow should not be explictly set;

If a function doesn’t meet the requirements, the function should be treated as a vanilla python UDF or arrow-optimized python UDF (depending on argument useArrow, configuration spark.sql.execution.pythonUDF.arrow.enabled, and dependency installations)

For example, define a ‘Series to Series’ type pandas UDF.

>>> from pyspark.sql.functions import udf, PandasUDFType
>>> import pandas as pd
>>> @udf(returnType=IntegerType())
... def pd_calc(a: pd.Series, b: pd.Series) -> pd.Series:
...     return a + 10 * b
...
>>> pd_calc.evalType == PandasUDFType.SCALAR
True
>>> spark.range(2).select(pd_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pd_calc(b => (id * 10), a => id)|
+--------------------------------+
|                               0|
|                             101|
+--------------------------------+

For another example, define a ‘Array to Array’ type arrow UDF.

>>> from pyspark.sql.functions import udf, ArrowUDFType
>>> import pyarrow as pa
>>> @udf(returnType=IntegerType())
... def pa_calc(a: pa.Array, b: pa.Array) -> pa.Array:
...     return pa.compute.add(a, pa.compute.multiply(b, 10))
...
>>> pa_calc.evalType == ArrowUDFType.SCALAR
True
>>> spark.range(2).select(pa_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pa_calc(b => (id * 10), a => id)|
+--------------------------------+
|                               0|
|                             101|
+--------------------------------+