pyspark.sql.functions.session_window

pyspark.sql.functions.session_window(timeColumn, gapDuration)[source]

Generates session window given a timestamp specifying column. Session window is one of dynamic windows, which means the length of window is varying according to the given inputs. The length of session window is defined as “the timestamp of latest input of the session + gap duration”, so when the new inputs are bound to the current session window, the end time of session window can be expanded according to the new inputs. Windows can support microsecond precision. Windows in the order of months are not supported. For a streaming query, you may use the function current_timestamp to generate windows on processing time. gapDuration is provided as strings, e.g. ‘1 second’, ‘1 day 12 hours’, ‘2 minutes’. Valid interval strings are ‘week’, ‘day’, ‘hour’, ‘minute’, ‘second’, ‘millisecond’, ‘microsecond’. It could also be a Column which can be evaluated to gap duration dynamically based on the input row. The output column will be a struct called ‘session_window’ by default with the nested columns ‘start’ and ‘end’, where ‘start’ and ‘end’ will be of pyspark.sql.types.TimestampType.

New in version 3.2.0.

Parameters
timeColumnColumn

The column or the expression to use as the timestamp for windowing by time. The time column must be of TimestampType.

gapDurationColumn or str

A column or string specifying the timeout of the session. It could be static value, e.g. 10 minutes, 1 second, or an expression/UDF that specifies gap duration dynamically based on the input row.

Examples

>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
>>> w = df.groupBy(session_window("date", "5 seconds")).agg(sum("val").alias("sum"))
>>> w.select(w.session_window.start.cast("string").alias("start"),
...          w.session_window.end.cast("string").alias("end"), "sum").collect()
[Row(start='2016-03-11 09:00:07', end='2016-03-11 09:00:12', sum=1)]
>>> w = df.groupBy(session_window("date", lit("5 seconds"))).agg(sum("val").alias("sum"))
>>> w.select(w.session_window.start.cast("string").alias("start"),
...          w.session_window.end.cast("string").alias("end"), "sum").collect()
[Row(start='2016-03-11 09:00:07', end='2016-03-11 09:00:12', sum=1)]