pyspark.sql.GroupedData.agg#

GroupedData.agg(*exprs)[source]#

Compute aggregates and returns the result as a DataFrame.

The available aggregate functions can be:

  1. built-in aggregation functions, such as avg, max, min, sum, count

  2. group aggregate pandas UDFs, created with pyspark.sql.functions.pandas_udf()

    Note

    There is no partial aggregation with group aggregate UDFs, i.e., a full shuffle is required. Also, all the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

If exprs is a single dict mapping from string to string, then the key is the column to perform aggregation on, and the value is the aggregate function.

Alternatively, exprs can also be a list of aggregate Column expressions.

New in version 1.3.0.

Changed in version 3.4.0: Supports Spark Connect.

Parameters
exprsdict

a dict mapping from column name (string) to aggregate functions (string), or a list of Column.

Notes

Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed in a single call to this function.

Examples

>>> from pyspark.sql import functions as sf
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
>>> df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  3|Alice|
|  5|  Bob|
| 10|  Bob|
+---+-----+

Group-by name, and count each group.

>>> df.groupBy(df.name)
GroupedData[grouping...: [name...], value: [age: bigint, name: string], type: GroupBy]
>>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+-----+--------+
| name|count(1)|
+-----+--------+
|Alice|       2|
|  Bob|       2|
+-----+--------+

Group-by name, and calculate the minimum age.

>>> df.groupBy(df.name).agg(sf.min(df.age)).sort("name").show()
+-----+--------+
| name|min(age)|
+-----+--------+
|Alice|       2|
|  Bob|       5|
+-----+--------+

Same as above but uses pandas UDF.

>>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  
... def min_udf(v):
...     return v.min()
...
>>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  
+-----+------------+
| name|min_udf(age)|
+-----+------------+
|Alice|           2|
|  Bob|           5|
+-----+------------+