pyspark.sql.functions.udf#
- pyspark.sql.functions.udf(f=None, returnType=StringType(), *, useArrow=None)[source]#
- Creates a user defined function (UDF). - New in version 1.3.0. - Changed in version 3.4.0: Supports Spark Connect. - Changed in version 4.0.0: Supports keyword-arguments. - Parameters
- ffunction, optional
- python function if used as a standalone function - Changed in version 4.1.0: Supports vectorized function by specifiying the type hints. 
- returnTypepyspark.sql.types.DataTypeor str, optional
- the return type of the user-defined function. The value can be either a - pyspark.sql.types.DataTypeobject or a DDL-formatted type string. Defaults to- StringType.
- useArrowbool, optional
- whether to use Arrow to optimize the (de)serialization. When it is None, the Spark config “spark.sql.execution.pythonUDF.arrow.enabled” takes effect. 
 
 - Notes - The user-defined functions are considered deterministic by default. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. If your function is not deterministic, call asNondeterministic on the user defined function. E.g.: - >>> from pyspark.sql.types import IntegerType >>> import random >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() - The user-defined functions do not support conditional expressions or short circuiting in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - The user-defined functions do not take keyword arguments on the calling side. - Examples - >>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) >>> @udf ... def to_upper(s): ... if s is not None: ... return s.upper() ... >>> @udf(returnType=IntegerType()) ... def add_one(x): ... if x is not None: ... return x + 1 ... >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() +----------+--------------+------------+ |slen(name)|to_upper(name)|add_one(age)| +----------+--------------+------------+ | 8| JOHN DOE| 22| +----------+--------------+------------+ - UDF can use keyword arguments: - >>> @udf(returnType=IntegerType()) ... def calc(a, b): ... return a + 10 * b ... >>> spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show() +-----------------------------+ |calc(b => (id * 10), a => id)| +-----------------------------+ | 0| | 101| +-----------------------------+ - Support vectorized function by specifiying the type hints. - To define a vectorized function, the function should meet following requirements: - 1, have at least 1 argument. 0-arg is not supported; - 2, the type hints should match one of the patterns of pandas UDFs and arrow UDFs; - 3, argument useArrow should not be explictly set; - If a function doesn’t meet the requirements, the function should be treated as a vanilla python UDF or arrow-optimized python UDF (depending on argument useArrow, configuration spark.sql.execution.pythonUDF.arrow.enabled, and dependency installations) - For example, define a ‘Series to Series’ type pandas UDF. - >>> from pyspark.sql.functions import udf, PandasUDFType >>> import pandas as pd >>> @udf(returnType=IntegerType()) ... def pd_calc(a: pd.Series, b: pd.Series) -> pd.Series: ... return a + 10 * b ... >>> pd_calc.evalType == PandasUDFType.SCALAR True >>> spark.range(2).select(pd_calc(b=col("id") * 10, a="id")).show() +--------------------------------+ |pd_calc(b => (id * 10), a => id)| +--------------------------------+ | 0| | 101| +--------------------------------+ - For another example, define a ‘Array to Array’ type arrow UDF. - >>> from pyspark.sql.functions import udf, ArrowUDFType >>> import pyarrow as pa >>> @udf(returnType=IntegerType()) ... def pa_calc(a: pa.Array, b: pa.Array) -> pa.Array: ... return pa.compute.add(a, pa.compute.multiply(b, 10)) ... >>> pa_calc.evalType == ArrowUDFType.SCALAR True >>> spark.range(2).select(pa_calc(b=col("id") * 10, a="id")).show() +--------------------------------+ |pa_calc(b => (id * 10), a => id)| +--------------------------------+ | 0| | 101| +--------------------------------+