Contributing to PySpark

There are many types of contribution, for example, helping other users, testing releases, reviewing changes, documentation contribution, bug reporting, JIRA maintenance, code changes, etc. These are documented at the general guidelines. This page focuses on PySpark and includes additional details specifically for PySpark.

Contributing by Testing Releases

Before the official release, PySpark release candidates are shared in the dev@spark.apache.org mailing list to vote on. This release candidates can be easily installed via pip. For example, in case of Spark 3.0.0 RC1, you can install as below:

pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz

The link for release files such as https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin can be found in the vote thread.

Testing and verifying users’ existing workloads against release candidates is one of the vital contributions to PySpark. It prevents breaking users’ existing workloads before the official release. When there is an issue such as a regression, correctness problem or performance degradation worth enough to drop the release candidate, usually the release candidate is dropped and the community focuses on fixing it to include in the next release candidate.

Contributing Documentation Changes

The release documentation is located under Spark’s docs directory. README.md describes the required dependencies and steps to generate the documentations. Usually, PySpark documentation is tested with the command below under the docs directory:

SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch

PySpark uses Sphinx to generate its release PySpark documentation. Therefore, if you want to build only PySpark documentation alone, you can build under python/docs directory by:

make html

It generates the corresponding HTMLs under python/docs/build/html.

Lastly, please make sure that the new APIs are documented by manually adding methods and/or classes at the corresponding RST files under python/docs/source/reference. Otherwise, they would not be documented in PySpark documentation.

Preparing to Contribute Code Changes

Before starting to work on codes in PySpark, it is recommended to read the general guidelines. Additionally, there are a couple of additional notes to keep in mind when contributing to codes in PySpark:

  • Be Pythonic

    See The Zen of Python.

  • Match APIs with Scala and Java sides

    Apache Spark is an unified engine that provides a consistent API layer. In general, the APIs are consistently supported across other languages.

  • PySpark-specific APIs can be accepted

    As long as they are Pythonic and do not conflict with other existent APIs, it is fine to raise a API request, for example, decorator usage of UDFs.

  • Adjust the corresponding type hints if you extend or modify public API

    See Contributing and Maintaining Type Hints for details.

If you are fixing pandas API on Spark (pyspark.pandas) package, please consider the design principles below:

  • Return pandas-on-Spark data structure for big data, and pandas data structure for small data

    Often developers face the question whether a particular function should return a pandas-on-Spark DataFrame/Series, or a pandas DataFrame/Series. The principle is: if the returned object can be large, use a pandas-on-Spark DataFrame/Series. If the data is bound to be small, use a pandas DataFrame/Series. For example, DataFrame.dtypes return a pandas Series, because the number of columns in a DataFrame is bounded and small, whereas DataFrame.head() or Series.unique() returns a pandas-on-Spark DataFrame/Series, because the resulting object can be large.

  • Provide discoverable APIs for common data science tasks

    At the risk of overgeneralization, there are two API design approaches: the first focuses on providing APIs for common tasks; the second starts with abstractions, and enables users to accomplish their tasks by composing primitives. While the world is not black and white, pandas takes more of the former approach, while Spark has taken more of the latter.

    One example is value count (count by some key column), one of the most common operations in data science. pandas DataFrame.value_counts() returns the result in sorted order, which in 90% of the cases is what users prefer when exploring data, whereas Spark’s does not sort, which is more desirable when building data pipelines, as users can accomplish the pandas behavior by adding an explicit orderBy.

    Similar to pandas, pandas API on Spark should also lean more towards the former, providing discoverable APIs for common data science tasks. In most cases, this principle is well taken care of by simply implementing pandas’ APIs. However, there will be circumstances in which pandas’ APIs don’t address a specific need, e.g. plotting for big data.

  • Guardrails to prevent users from shooting themselves in the foot

    Certain operations in pandas are prohibitively expensive as data scales, and we don’t want to give users the illusion that they can rely on such operations in pandas API on Spark. That is to say, methods implemented in pandas API on Spark should be safe to perform by default on large datasets. As a result, the following capabilities are not implemented in pandas API on Spark:

    • Capabilities that are fundamentally not parallelizable: e.g. imperatively looping over each element

    • Capabilities that require materializing the entire working set in a single node’s memory. This is why we do not implement pandas.DataFrame.to_xarray. Another example is the _repr_html_ call caps the total number of records shown to a maximum of 1000, to prevent users from blowing up their driver node simply by typing the name of the DataFrame in a notebook.

    A few exceptions, however, exist. One common pattern with “big data science” is that while the initial dataset is large, the working set becomes smaller as the analysis goes deeper. For example, data scientists often perform aggregation on datasets and want to then convert the aggregated dataset to some local data structure. To help data scientists, we offer the following:

    • DataFrame.to_pandas: returns a pandas DataFrame (pandas-on-Spark only)

    • DataFrame.to_numpy: returns a numpy array, works with both pandas and pandas API on Spark

    Note that it is clear from the names that these functions return some local data structure that would require materializing data in a single node’s memory. For these functions, we also explicitly document them with a warning note that the resulting data structure must be small.

Environment Setup

Prerequisite

PySpark development requires to build Spark that needs a proper JDK installed, etc. See Building Spark for more details.

Note that if you intend to contribute to Spark Connect in Python, buf version 1.24.0 is required, see Buf Installation for more details.

Conda

If you are using Conda, the development environment can be set as follows.

# Python 3.8+ is required
conda create --name pyspark-dev-env python=3.9
conda activate pyspark-dev-env
pip install --upgrade -r dev/requirements.txt

Once it is set up, make sure you switch to pyspark-dev-env before starting the development:

conda activate pyspark-dev-env

Now, you can start developing and running the tests.

pip

With Python 3.8+, pip can be used as below to install and set up the development environment.

pip install --upgrade -r dev/requirements.txt

Now, you can start developing and running the tests.

Contributing and Maintaining Type Hints

PySpark type hints are inlined, to take advantage of static type checking.

As a rule of thumb, only public API is annotated.

Annotations should, when possible:

  • Reflect expectations of the underlying JVM API, to help avoid type related failures outside Python interpreter.

  • In case of conflict between too broad (Any) and too narrow argument annotations, prefer the latter as one, as long as it is covering most of the typical use cases.

  • Indicate nonsensical combinations of arguments using @overload annotations. For example, to indicate that *Col and *Cols arguments are mutually exclusive:

    @overload
    def __init__(
        self,
        *,
        threshold: float = ...,
        inputCol: Optional[str] = ...,
        outputCol: Optional[str] = ...
    ) -> None: ...
    @overload
    def __init__(
        self,
        *,
        thresholds: Optional[List[float]] = ...,
        inputCols: Optional[List[str]] = ...,
        outputCols: Optional[List[str]] = ...
    ) -> None: ...
    
  • Be compatible with the current stable MyPy release.

Complex supporting type definitions, should be placed in dedicated _typing.pyi stubs. See for example pyspark.sql._typing.pyi.

Annotations can be validated using dev/lint-python script or by invoking mypy directly:

mypy --config python/mypy.ini python/pyspark

Code and Docstring Guide

Code Conventions

Please follow the style of the existing codebase as is, which is virtually PEP 8 with one exception: lines can be up to 100 characters in length, not 79.

Note that:

  • the method and variable names in PySpark are the similar case is threading library in Python itself where the APIs were inspired by Java. PySpark also follows camelCase for exposed APIs that match with Scala and Java.

  • In contrast, functions.py uses snake_case in order to make APIs SQL (and Python) friendly.

  • In addition, pandas-on-Spark (pyspark.pandas) also uses snake_case because this package is free from API consistency with other languages.

PySpark leverages linters such as pycodestyle and flake8, which dev/lint-python runs. Therefore, make sure to run that script to double check.

Docstring Conventions

PySpark follows NumPy documentation style.

Doctest Conventions

In general, doctests should be grouped logically by separating a newline.

For instance, the first block is for the statements for preparation, the second block is for using the function with a specific argument, and third block is for another argument. As a example, please refer DataFrame.rsub in pandas.

These blocks should be consistently separated in PySpark doctests, and more doctests should be added if the coverage of the doctests or the number of examples to show is not enough.

Contributing Error and Exception

To throw a standardized user-facing error or exception, developers should specify the error class and message parameters rather than an arbitrary error message.

Usage

  1. Check if an appropriate error class already exists in error_classes.py. If true, use the error class and skip to step 3.

  2. Add a new class to error_classes.py; keep in mind the invariants below.

  3. Check if the exception type already extends PySparkException. If true, skip to step 5.

  4. Mix PySparkException into the exception.

  5. Throw the exception with the error class and message parameters.

Before

Throw with arbitrary error message:

raise ValueError("Problem A because B")

After

error_classes.py

"PROBLEM_BECAUSE": {
  "message": ["Problem <problem> because <cause>"]
}

exceptions.py

class PySparkTestError(PySparkException):
    def __init__(self, error_class: str, message_parameters: Dict[str, str]):
        super().__init__(error_class=error_class, message_parameters=message_parameters)

    def getMessageParameters(self) -> Optional[Dict[str, str]]:
        return super().getMessageParameters()

Throw with error class and message parameters:

raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})

Access fields

To access error fields, catch exceptions that extend PySparkException and access to error class with PySparkException.getErrorClass().

try:
    ...
except PySparkException as pe:
    if pe.getErrorClass() == "PROBLEM_BECAUSE":
        ...

Fields

Error class

Error classes are a succinct, human-readable representation of the error category.

An uncategorized errors can be assigned to a legacy error class with the prefix _LEGACY_ERROR_TEMP_ and an unused sequential number, for instance _LEGACY_ERROR_TEMP_0053.

Invariants:

  • Unique

  • Consistent across releases

  • Sorted alphabetically

Message

Error messages provide a descriptive, human-readable representation of the error. The message format accepts string parameters via the C-style printf syntax.

The quality of the error message should match the Apache Spark Error Message Guidelines

Invariants:

  • Unique