Source code for pyspark.sql.window

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

from pyspark import since, SparkContext
from pyspark.sql.column import _to_seq, _to_java_column

__all__ = ["Window", "WindowSpec"]


def _to_java_cols(cols):
    sc = SparkContext._active_spark_context
    if len(cols) == 1 and isinstance(cols[0], list):
        cols = cols[0]
    return _to_seq(sc, cols, _to_java_column)


[docs]class Window(object): """ Utility functions for defining window in DataFrames. For example: >>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW >>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow) >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3) .. note:: Experimental .. versionadded:: 1.4 """ _JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808 _JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807 _PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG) _FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG) unboundedPreceding = _JAVA_MIN_LONG unboundedFollowing = _JAVA_MAX_LONG currentRow = 0
[docs] @staticmethod @since(1.4) def partitionBy(*cols): """ Creates a :class:`WindowSpec` with the partitioning defined. """ sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols)) return WindowSpec(jspec)
[docs] @staticmethod @since(1.4) def orderBy(*cols): """ Creates a :class:`WindowSpec` with the ordering defined. """ sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols)) return WindowSpec(jspec)
[docs] @staticmethod @since(2.1) def rowsBetween(start, end): """ Creates a :class:`WindowSpec` with the frame boundaries defined, from `start` (inclusive) to `end` (inclusive). Both `start` and `end` are relative positions from the current row. For example, "0" means "current row", while "-1" means the row before the current row, and "5" means the fifth row after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, and ``Window.currentRow`` to specify special boundary values, rather than using integral values directly. :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to -9223372036854775808. :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to 9223372036854775807. """ if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end) return WindowSpec(jspec)
[docs] @staticmethod @since(2.1) def rangeBetween(start, end): """ Creates a :class:`WindowSpec` with the frame boundaries defined, from `start` (inclusive) to `end` (inclusive). Both `start` and `end` are relative from the current row. For example, "0" means "current row", while "-1" means one off before the current row, and "5" means the five off after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, and ``Window.currentRow`` to specify special boundary values, rather than using integral values directly. :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end) return WindowSpec(jspec)
[docs]class WindowSpec(object): """ A window specification that defines the partitioning, ordering, and frame boundaries. Use the static methods in :class:`Window` to create a :class:`WindowSpec`. .. note:: Experimental .. versionadded:: 1.4 """ def __init__(self, jspec): self._jspec = jspec
[docs] @since(1.4) def partitionBy(self, *cols): """ Defines the partitioning columns in a :class:`WindowSpec`. :param cols: names of columns or expressions """ return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
[docs] @since(1.4) def orderBy(self, *cols): """ Defines the ordering columns in a :class:`WindowSpec`. :param cols: names of columns or expressions """ return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
[docs] @since(1.4) def rowsBetween(self, start, end): """ Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). Both `start` and `end` are relative positions from the current row. For example, "0" means "current row", while "-1" means the row before the current row, and "5" means the fifth row after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, and ``Window.currentRow`` to specify special boundary values, rather than using integral values directly. :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing return WindowSpec(self._jspec.rowsBetween(start, end))
[docs] @since(1.4) def rangeBetween(self, start, end): """ Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). Both `start` and `end` are relative from the current row. For example, "0" means "current row", while "-1" means one off before the current row, and "5" means the five off after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, and ``Window.currentRow`` to specify special boundary values, rather than using integral values directly. :param start: boundary start, inclusive. The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing return WindowSpec(self._jspec.rangeBetween(start, end))
def _test(): import doctest SparkContext('local[4]', 'PythonTest') (failure_count, test_count) = doctest.testmod() if failure_count: exit(-1) if __name__ == "__main__": _test()