Python User-defined Table Functions (UDTFs)

Spark 3.5 introduces the Python user-defined table function (UDTF), a new type of user-defined function. Unlike scalar functions that return a single result value from each call, each UDTF is invoked in the FROM clause of a query and returns an entire table as output. Each UDTF call can accept zero or more arguments. These arguments can either be scalar expressions or table arguments that represent entire input tables.

Implementing a Python UDTF

To implement a Python UDTF, you first need to define a class implementing the methods:

class PythonUDTF:

    def __init__(self) -> None:
        """
        Initializes the user-defined table function (UDTF). This is optional.

        This method serves as the default constructor and is called once when the
        UDTF is instantiated on the executor side.

        Any class fields assigned in this method will be available for subsequent
        calls to the `eval` and `terminate` methods. This class instance will remain
        alive until all rows in the current partition have been consumed by the `eval`
        method.

        Notes
        -----
        - This method does not accept any extra arguments. Only the default
          constructor is supported.
        - You cannot create or reference the Spark session within the UDTF. Any
          attempt to do so will result in a serialization error.
        """
        ...

    def eval(self, *args: Any) -> Iterator[Any]:
        """
        Evaluates the function using the given input arguments.

        This method is required and must be implemented.

        Argument Mapping:
        - Each provided scalar expression maps to exactly one value in the
          `*args` list.
        - Each provided table argument maps to a pyspark.sql.Row object containing
          the columns in the order they appear in the provided input table,
          and with the names computed by the query analyzer.

        This method is called on every input row, and can produce zero or more
        output rows. Each element in the output tuple corresponds to one column
        specified in the return type of the UDTF.

        Parameters
        ----------
        *args : Any
            Arbitrary positional arguments representing the input to the UDTF.

        Yields
        ------
        tuple
            A tuple representing a single row in the UDTF result table.
            Yield as many times as needed to produce multiple rows.

        Notes
        -----
        - The result of the function must be a tuple representing a single row
          in the UDTF result table.
        - UDTFs currently do not accept keyword arguments during the function call.

        Examples
        --------
        eval that returns one row and one column for each input.

        >>> def eval(self, x: int):
        ...     yield (x, )

        eval that returns two rows and two columns for each input.

        >>> def eval(self, x: int, y: int):
        ...     yield (x + y, x - y)
        ...     yield (y + x, y - x)
        """
        ...

    def terminate(self) -> Iterator[Any]:
        """
        Called when the UDTF has processed all input rows.

        This method is optional to implement and is useful for performing any
        cleanup or finalization operations after the UDTF has finished processing
        all rows. It can also be used to yield additional rows if needed.
        Table functions that consume all rows in the entire input partition
        and then compute and return the entire output table can do so from
        this method as well (please be mindful of memory usage when doing
        this).

        Yields
        ------
        tuple
            A tuple representing a single row in the UDTF result table.
            Yield this if you want to return additional rows during termination.

        Examples
        --------
        >>> def terminate(self) -> Iterator[Any]:
        >>>     yield "done", None
        """
        ...

The return type of the UDTF defines the schema of the table it outputs. It must be either a StructType, for example StructType().add("c1", StringType()) or a DDL string representing a struct type, for example c1: string.

Example of UDTF Class Implementation

Here is a simple example of a UDTF class implementation:

# Define the UDTF class and implement the required `eval` method.
class SquareNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)

Instantiating a UDTF with the ``udtf`` Decorator

To make use of the UDTF, you’ll first need to instantiate it using the @udtf decorator:

from pyspark.sql.functions import lit, udtf

# Create a UDTF using the class definition and the `udtf` function.
square_num = udtf(SquareNumbers, returnType="num: int, squared: int")

# Invoke the UDTF in PySpark.
square_num(lit(1), lit(3)).show()
# +---+-------+
# |num|squared|
# +---+-------+
# |  1|      1|
# |  2|      4|
# |  3|      9|
# +---+-------+

Instantiating a UDTF with the ``udtf`` Function

An alternative way to create a UDTF is to use the udtf() function:

from pyspark.sql.functions import lit, udtf

# Define a UDTF using the `udtf` decorator directly on the class.
@udtf(returnType="num: int, squared: int")
class SquareNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)

# Invoke the UDTF in PySpark using the SquareNumbers class directly.
SquareNumbers(lit(1), lit(3)).show()
# +---+-------+
# |num|squared|
# +---+-------+
# |  1|      1|
# |  2|      4|
# |  3|      9|
# +---+-------+

For more detailed usage, please see udtf().

Registering and Using Python UDTFs in SQL

Python UDTFs can also be registered and used in SQL queries.

from pyspark.sql.functions import udtf

@udtf(returnType="word: string")
class WordSplitter:
    def eval(self, text: str):
        for word in text.split(" "):
            yield (word.strip(),)

# Register the UDTF for use in Spark SQL.
spark.udtf.register("split_words", WordSplitter)

# Example: Using the UDTF in SQL.
spark.sql("SELECT * FROM split_words('hello world')").show()
# +-----+
# | word|
# +-----+
# |hello|
# |world|
# +-----+

# Example: Using the UDTF with a lateral join in SQL.
# The lateral join allows us to reference the columns and aliases
# in the previous FROM clause items as inputs to the UDTF.
spark.sql(
    "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), "
    "LATERAL split_words(text)"
).show()
# +------------+------+
# |        text|  word|
# +------------+------+
# | Hello World| Hello|
# | Hello World| World|
# |Apache Spark|Apache|
# |Apache Spark| Spark|
# +------------+------+

Arrow Optimization

Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs.

Arrow can improve performance when each input row generates a large result table from the UDTF.

To enable Arrow optimization, set the spark.sql.execution.pythonUDTF.arrow.enabled configuration to true. You can also enable it by specifying the useArrow parameter when declaring the UDTF.

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

For more details, please see Apache Arrow in PySpark.

TABLE input argument

Python UDTFs can also take a TABLE as input argument, and it can be used in conjunction with scalar input arguments. By default, you are allowed to have only one TABLE argument as input, primarily for performance reasons. If you need to have more than one TABLE input argument, you can enable this by setting the spark.sql.tvf.allowMultipleTableArguments.enabled configuration to true.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

More Examples

A Python UDTF that expands date ranges into individual dates:

from datetime import datetime, timedelta
from pyspark.sql.functions import lit, udtf

@udtf(returnType="date: string")
class DateExpander:
    def eval(self, start_date: str, end_date: str):
        current = datetime.strptime(start_date, '%Y-%m-%d')
        end = datetime.strptime(end_date, '%Y-%m-%d')
        while current <= end:
            yield (current.strftime('%Y-%m-%d'),)
            current += timedelta(days=1)

DateExpander(lit("2023-02-25"), lit("2023-03-01")).show()
# +----------+
# |      date|
# +----------+
# |2023-02-25|
# |2023-02-26|
# |2023-02-27|
# |2023-02-28|
# |2023-03-01|
# +----------+

A Python UDTF with __init__ and terminate:

from pyspark.sql.functions import udtf

@udtf(returnType="cnt: int")
class CountUDTF:
    def __init__(self):
        # Initialize the counter to 0 when an instance of the class is created.
        self.count = 0

    def eval(self, x: int):
        # Increment the counter by 1 for each input value received.
        self.count += 1

    def terminate(self):
        # Yield the final count when the UDTF is done processing.
        yield self.count,

spark.udtf.register("count_udtf", CountUDTF)
spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)").show()
# +---+---+
# | id|cnt|
# +---+---+
# |  9| 10|
# +---+---+
spark.sql("SELECT * FROM range(0, 10, 1, 2), LATERAL count_udtf(id)").show()
# +---+---+
# | id|cnt|
# +---+---+
# |  4|  5|
# |  9|  5|
# +---+---+