Source code for pyspark.pandas.indexes.multi

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from functools import partial, reduce
from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, cast, no_type_check

import pandas as pd
from pandas.api.types import is_hashable, is_list_like  # type: ignore[attr-defined]

from pyspark.sql import functions as F, Column as PySparkColumn, Window
from pyspark.sql.types import DataType
from pyspark.sql.utils import get_column_class

# For running doctests and reference resolution in PyCharm.
from pyspark import pandas as ps
from pyspark.pandas._typing import Label, Name, Scalar
from pyspark.pandas.exceptions import PandasNotImplementedError
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.indexes.base import Index
from pyspark.pandas.missing.indexes import MissingPandasLikeMultiIndex
from pyspark.pandas.series import Series, first_series
from pyspark.pandas.utils import (
    compare_disallow_null,
    is_name_like_tuple,
    name_like_string,
    scol_for,
    verify_temp_column_name,
    validate_index_loc,
)
from pyspark.pandas.internal import (
    InternalField,
    InternalFrame,
    NATURAL_ORDER_COLUMN_NAME,
    SPARK_INDEX_NAME_FORMAT,
)


[docs]class MultiIndex(Index): """ pandas-on-Spark MultiIndex that corresponds to pandas MultiIndex logically. This might hold Spark Column internally. Parameters ---------- levels : sequence of arrays The unique labels for each level. codes : sequence of arrays Integers for each level designating which label at each location. sortorder : optional int Level of sortedness (must be lexicographically sorted by that level). names : optional sequence of objects Names for each of the index levels. (name is accepted for compat). copy : bool, default False Copy the meta-data. verify_integrity : bool, default True Check that the levels/codes are consistent and valid. See Also -------- MultiIndex.from_arrays : Convert list of arrays to MultiIndex. MultiIndex.from_product : Create a MultiIndex from the cartesian product of iterables. MultiIndex.from_tuples : Convert list of tuples to a MultiIndex. MultiIndex.from_frame : Make a MultiIndex from a DataFrame. Index : A single-level Index. Examples -------- >>> ps.DataFrame({'a': ['a', 'b', 'c']}, index=[[1, 2, 3], [4, 5, 6]]).index # doctest: +SKIP MultiIndex([(1, 4), (2, 5), (3, 6)], ) >>> ps.DataFrame({'a': [1, 2, 3]}, index=[list('abc'), list('def')]).index # doctest: +SKIP MultiIndex([('a', 'd'), ('b', 'e'), ('c', 'f')], ) """ @no_type_check def __new__( cls, levels=None, codes=None, sortorder=None, names=None, dtype=None, copy=False, name=None, verify_integrity: bool = True, ) -> "MultiIndex": pidx = pd.MultiIndex( levels=levels, codes=codes, sortorder=sortorder, names=names, dtype=dtype, copy=copy, name=name, verify_integrity=verify_integrity, ) return ps.from_pandas(pidx) @property def _internal(self) -> InternalFrame: internal = self._psdf._internal scol = F.struct(*internal.index_spark_columns) return internal.copy( column_labels=[None], data_spark_columns=[scol], data_fields=[None], column_label_names=None, ) @property def _column_label(self) -> Optional[Label]: return None def __abs__(self) -> "MultiIndex": raise TypeError("TypeError: cannot perform __abs__ with this index type: MultiIndex") def _with_new_scol( self, scol: PySparkColumn, *, field: Optional[InternalField] = None ) -> "MultiIndex": raise NotImplementedError("Not supported for type MultiIndex") @no_type_check def any(self, *args, **kwargs) -> None: raise TypeError("cannot perform any with this index type: MultiIndex") @no_type_check def all(self, *args, **kwargs) -> None: raise TypeError("cannot perform all with this index type: MultiIndex")
[docs] @staticmethod def from_tuples( tuples: List[Tuple], sortorder: Optional[int] = None, names: Optional[List[Name]] = None, ) -> "MultiIndex": """ Convert list of tuples to MultiIndex. Parameters ---------- tuples : list / sequence of tuple-likes Each tuple is the index of one row/column. sortorder : int or None Level of sortedness (must be lexicographically sorted by that level). names : list / sequence of str, optional Names for the levels in the index. Returns ------- index : MultiIndex Examples -------- >>> tuples = [(1, 'red'), (1, 'blue'), ... (2, 'red'), (2, 'blue')] >>> ps.MultiIndex.from_tuples(tuples, names=('number', 'color')) # doctest: +SKIP MultiIndex([(1, 'red'), (1, 'blue'), (2, 'red'), (2, 'blue')], names=['number', 'color']) """ return cast( MultiIndex, ps.from_pandas( pd.MultiIndex.from_tuples(tuples=tuples, sortorder=sortorder, names=names) ), )
[docs] @staticmethod def from_arrays( arrays: List[List], sortorder: Optional[int] = None, names: Optional[List[Name]] = None, ) -> "MultiIndex": """ Convert arrays to MultiIndex. Parameters ---------- arrays: list / sequence of array-likes Each array-like gives one level’s value for each data point. len(arrays) is the number of levels. sortorder: int or None Level of sortedness (must be lexicographically sorted by that level). names: list / sequence of str, optional Names for the levels in the index. Returns ------- index: MultiIndex Examples -------- >>> arrays = [[1, 1, 2, 2], ['red', 'blue', 'red', 'blue']] >>> ps.MultiIndex.from_arrays(arrays, names=('number', 'color')) # doctest: +SKIP MultiIndex([(1, 'red'), (1, 'blue'), (2, 'red'), (2, 'blue')], names=['number', 'color']) """ return cast( MultiIndex, ps.from_pandas( pd.MultiIndex.from_arrays(arrays=arrays, sortorder=sortorder, names=names) ), )
[docs] @staticmethod def from_product( iterables: List[List], sortorder: Optional[int] = None, names: Optional[List[Name]] = None, ) -> "MultiIndex": """ Make a MultiIndex from the cartesian product of multiple iterables. Parameters ---------- iterables : list / sequence of iterables Each iterable has unique labels for each level of the index. sortorder : int or None Level of sortedness (must be lexicographically sorted by that level). names : list / sequence of str, optional Names for the levels in the index. Returns ------- index : MultiIndex See Also -------- MultiIndex.from_arrays : Convert list of arrays to MultiIndex. MultiIndex.from_tuples : Convert list of tuples to MultiIndex. Examples -------- >>> numbers = [0, 1, 2] >>> colors = ['green', 'purple'] >>> ps.MultiIndex.from_product([numbers, colors], ... names=['number', 'color']) # doctest: +SKIP MultiIndex([(0, 'green'), (0, 'purple'), (1, 'green'), (1, 'purple'), (2, 'green'), (2, 'purple')], names=['number', 'color']) """ return cast( MultiIndex, ps.from_pandas( pd.MultiIndex.from_product(iterables=iterables, sortorder=sortorder, names=names) ), )
[docs] @staticmethod def from_frame(df: DataFrame, names: Optional[List[Name]] = None) -> "MultiIndex": """ Make a MultiIndex from a DataFrame. Parameters ---------- df : DataFrame DataFrame to be converted to MultiIndex. names : list-like, optional If no names are provided, use the column names, or tuple of column names if the column is a MultiIndex. If a sequence, overwrite names with the given sequence. Returns ------- MultiIndex The MultiIndex representation of the given DataFrame. See Also -------- MultiIndex.from_arrays : Convert list of arrays to MultiIndex. MultiIndex.from_tuples : Convert list of tuples to MultiIndex. MultiIndex.from_product : Make a MultiIndex from cartesian product of iterables. Examples -------- >>> df = ps.DataFrame([['HI', 'Temp'], ['HI', 'Precip'], ... ['NJ', 'Temp'], ['NJ', 'Precip']], ... columns=['a', 'b']) >>> df # doctest: +SKIP a b 0 HI Temp 1 HI Precip 2 NJ Temp 3 NJ Precip >>> ps.MultiIndex.from_frame(df) # doctest: +SKIP MultiIndex([('HI', 'Temp'), ('HI', 'Precip'), ('NJ', 'Temp'), ('NJ', 'Precip')], names=['a', 'b']) Using explicit names, instead of the column names >>> ps.MultiIndex.from_frame(df, names=['state', 'observation']) # doctest: +SKIP MultiIndex([('HI', 'Temp'), ('HI', 'Precip'), ('NJ', 'Temp'), ('NJ', 'Precip')], names=['state', 'observation']) """ if not isinstance(df, DataFrame): raise TypeError("Input must be a DataFrame") sdf = df._to_spark() if names is None: names = df._internal.column_labels elif not is_list_like(names): raise TypeError("Names should be list-like for a MultiIndex") else: names = [name if is_name_like_tuple(name) else (name,) for name in names] internal = InternalFrame( spark_frame=sdf, index_spark_columns=[scol_for(sdf, col) for col in sdf.columns], index_names=names, ) return cast(MultiIndex, DataFrame(internal).index)
@property def name(self) -> Name: raise PandasNotImplementedError(class_name="pd.MultiIndex", property_name="name") @name.setter def name(self, name: Name) -> None: raise PandasNotImplementedError(class_name="pd.MultiIndex", property_name="name") @property def dtypes(self) -> pd.Series: """Return the dtypes as a Series for the underlying MultiIndex. .. versionadded:: 3.3.0 Returns ------- pd.Series The data type of each level. Examples -------- >>> psmidx = ps.MultiIndex.from_arrays( ... [[0, 1, 2, 3, 4, 5, 6, 7, 8], [1, 2, 3, 4, 5, 6, 7, 8, 9]], ... names=("zero", "one"), ... ) >>> psmidx.dtypes zero int64 one int64 dtype: object """ return pd.Series( [field.dtype for field in self._internal.index_fields], index=pd.Index( [name if len(name) > 1 else name[0] for name in self._internal.index_names] ), ) def _verify_for_rename(self, name: List[Name]) -> List[Label]: # type: ignore[override] if is_list_like(name): if self._internal.index_level != len(name): raise ValueError( "Length of new names must be {}, got {}".format( self._internal.index_level, len(name) ) ) if any(not is_hashable(n) for n in name): raise TypeError("MultiIndex.name must be a hashable type") return [n if is_name_like_tuple(n) else (n,) for n in name] else: raise TypeError("Must pass list-like as `names`.")
[docs] def swaplevel(self, i: int = -2, j: int = -1) -> "MultiIndex": """ Swap level i with level j. Calling this method does not change the ordering of the values. Parameters ---------- i : int, str, default -2 First level of index to be swapped. Can pass level name as string. Parameter types can be mixed. j : int, str, default -1 Second level of index to be swapped. Can pass level name as string. Parameter types can be mixed. Returns ------- MultiIndex A new MultiIndex. Examples -------- >>> midx = ps.MultiIndex.from_arrays([['a', 'b'], [1, 2]], names = ['word', 'number']) >>> midx # doctest: +SKIP MultiIndex([('a', 1), ('b', 2)], names=['word', 'number']) >>> midx.swaplevel(0, 1) # doctest: +SKIP MultiIndex([(1, 'a'), (2, 'b')], names=['number', 'word']) >>> midx.swaplevel('number', 'word') # doctest: +SKIP MultiIndex([(1, 'a'), (2, 'b')], names=['number', 'word']) """ for index in (i, j): if not isinstance(index, int) and index not in self.names: raise KeyError("Level %s not found" % index) i = i if isinstance(i, int) else self.names.index(i) j = j if isinstance(j, int) else self.names.index(j) for index in (i, j): if index >= len(self.names) or index < -len(self.names): raise IndexError( "Too many levels: Index has only %s levels, " "%s is not a valid level number" % (len(self.names), index) ) index_map = list( zip( self._internal.index_spark_columns, self._internal.index_names, self._internal.index_fields, ) ) index_map[i], index_map[j] = index_map[j], index_map[i] index_spark_columns, index_names, index_fields = zip(*index_map) internal = self._internal.copy( index_spark_columns=list(index_spark_columns), index_names=list(index_names), index_fields=list(index_fields), column_labels=[], data_spark_columns=[], data_fields=[], ) return cast(MultiIndex, DataFrame(internal).index)
@property def levshape(self) -> Tuple[int, ...]: """ A tuple with the length of each level. Examples -------- >>> midx = ps.MultiIndex.from_tuples([('a', 'x'), ('b', 'y'), ('c', 'z')]) >>> midx # doctest: +SKIP MultiIndex([('a', 'x'), ('b', 'y'), ('c', 'z')], ) >>> midx.levshape (3, 3) """ result = self._internal.spark_frame.agg( *(F.countDistinct(c) for c in self._internal.index_spark_columns) ).collect()[0] return tuple(result) @staticmethod def _comparator_for_monotonic_increasing( data_type: DataType, ) -> Callable[ [PySparkColumn, PySparkColumn, Callable[[PySparkColumn, PySparkColumn], PySparkColumn]], PySparkColumn, ]: return compare_disallow_null def _is_monotonic(self, order: str) -> bool: if order == "increasing": return self._is_monotonic_increasing().all() else: return self._is_monotonic_decreasing().all() def _is_monotonic_increasing(self) -> Series: window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1, -1) cond = F.lit(True) has_not_null = F.lit(True) Column = get_column_class() for scol in self._internal.index_spark_columns[::-1]: data_type = self._internal.spark_type_for(scol) prev = F.lag(scol, 1).over(window) compare = MultiIndex._comparator_for_monotonic_increasing(data_type) # Since pandas 1.1.4, null value is not allowed at any levels of MultiIndex. # Therefore, we should check `has_not_null` over all levels. has_not_null = has_not_null & scol.isNotNull() cond = F.when(scol.eqNullSafe(prev), cond).otherwise(compare(scol, prev, Column.__gt__)) cond = has_not_null & (prev.isNull() | cond) cond_name = verify_temp_column_name( self._internal.spark_frame.select(self._internal.index_spark_columns), "__is_monotonic_increasing_cond__", ) sdf = self._internal.spark_frame.select( self._internal.index_spark_columns + [cond.alias(cond_name)] ) internal = InternalFrame( spark_frame=sdf, index_spark_columns=[ scol_for(sdf, col) for col in self._internal.index_spark_column_names ], index_names=self._internal.index_names, index_fields=self._internal.index_fields, ) return first_series(DataFrame(internal)) @staticmethod def _comparator_for_monotonic_decreasing( data_type: DataType, ) -> Callable[ [PySparkColumn, PySparkColumn, Callable[[PySparkColumn, PySparkColumn], PySparkColumn]], PySparkColumn, ]: return compare_disallow_null def _is_monotonic_decreasing(self) -> Series: window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1, -1) cond = F.lit(True) has_not_null = F.lit(True) Column = get_column_class() for scol in self._internal.index_spark_columns[::-1]: data_type = self._internal.spark_type_for(scol) prev = F.lag(scol, 1).over(window) compare = MultiIndex._comparator_for_monotonic_increasing(data_type) # Since pandas 1.1.4, null value is not allowed at any levels of MultiIndex. # Therefore, we should check `has_not_null` over all levels. has_not_null = has_not_null & scol.isNotNull() cond = F.when(scol.eqNullSafe(prev), cond).otherwise(compare(scol, prev, Column.__lt__)) cond = has_not_null & (prev.isNull() | cond) cond_name = verify_temp_column_name( self._internal.spark_frame.select(self._internal.index_spark_columns), "__is_monotonic_decreasing_cond__", ) sdf = self._internal.spark_frame.select( self._internal.index_spark_columns + [cond.alias(cond_name)] ) internal = InternalFrame( spark_frame=sdf, index_spark_columns=[ scol_for(sdf, col) for col in self._internal.index_spark_column_names ], index_names=self._internal.index_names, index_fields=self._internal.index_fields, ) return first_series(DataFrame(internal))
[docs] def to_frame( # type: ignore[override] self, index: bool = True, name: Optional[List[Name]] = None ) -> DataFrame: """ Create a DataFrame with the levels of the MultiIndex as columns. Column ordering is determined by the DataFrame constructor with data as a dict. Parameters ---------- index : boolean, default True Set the index of the returned DataFrame as the original MultiIndex. name : list / sequence of strings, optional The passed names should substitute index level names. Returns ------- DataFrame : a DataFrame containing the original MultiIndex data. See Also -------- DataFrame Examples -------- >>> tuples = [(1, 'red'), (1, 'blue'), ... (2, 'red'), (2, 'blue')] >>> idx = ps.MultiIndex.from_tuples(tuples, names=('number', 'color')) >>> idx # doctest: +SKIP MultiIndex([(1, 'red'), (1, 'blue'), (2, 'red'), (2, 'blue')], names=['number', 'color']) >>> idx.to_frame() # doctest: +NORMALIZE_WHITESPACE number color number color 1 red 1 red blue 1 blue 2 red 2 red blue 2 blue By default, the original Index is reused. To enforce a new Index: >>> idx.to_frame(index=False) number color 0 1 red 1 1 blue 2 2 red 3 2 blue To override the name of the resulting column, specify `name`: >>> idx.to_frame(name=['n', 'c']) # doctest: +NORMALIZE_WHITESPACE n c number color 1 red 1 red blue 1 blue 2 red 2 red blue 2 blue """ if name is None: name = [ name if name is not None else (i,) for i, name in enumerate(self._internal.index_names) ] elif is_list_like(name): if len(name) != self._internal.index_level: raise ValueError("'name' should have same length as number of levels on index.") name = [n if is_name_like_tuple(n) else (n,) for n in name] else: raise TypeError("'name' must be a list / sequence of column names.") return self._to_frame(index=index, names=name)
def to_pandas(self) -> pd.MultiIndex: """ Return a pandas MultiIndex. .. note:: This method should only be used if the resulting pandas object is expected to be small, as all the data is loaded into the driver's memory. Examples -------- >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], ... columns=['dogs', 'cats'], ... index=[list('abcd'), list('efgh')]) >>> df['dogs'].index.to_pandas() # doctest: +SKIP MultiIndex([('a', 'e'), ('b', 'f'), ('c', 'g'), ('d', 'h')], ) """ # TODO: We might need to handle internal state change. # So far, we don't have any functions to change the internal state of MultiIndex except for # series-like operations. In that case, it creates a new Index object instead of MultiIndex. return cast(pd.MultiIndex, super().to_pandas()) def _to_pandas(self) -> pd.MultiIndex: """ Same as `to_pandas()`, without issuing the advice log for internal usage. """ return cast(pd.MultiIndex, super()._to_pandas()) def nunique(self, dropna: bool = True, approx: bool = False, rsd: float = 0.05) -> int: raise NotImplementedError("nunique is not defined for MultiIndex") # TODO: add 'name' parameter after pd.MultiIndex.name is implemented
[docs] def copy(self, deep: Optional[bool] = None) -> "MultiIndex": # type: ignore[override] """ Make a copy of this object. Parameters ---------- deep : None this parameter is not supported but just dummy parameter to match pandas. Examples -------- >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], ... columns=['dogs', 'cats'], ... index=[list('abcd'), list('efgh')]) >>> df['dogs'].index # doctest: +SKIP MultiIndex([('a', 'e'), ('b', 'f'), ('c', 'g'), ('d', 'h')], ) Copy index >>> df.index.copy() # doctest: +SKIP MultiIndex([('a', 'e'), ('b', 'f'), ('c', 'g'), ('d', 'h')], ) """ return cast(MultiIndex, super().copy(deep=deep))
[docs] def symmetric_difference( # type: ignore[override] self, other: Index, result_name: Optional[List[Name]] = None, sort: Optional[bool] = None, ) -> "MultiIndex": """ Compute the symmetric difference of two MultiIndex objects. Parameters ---------- other : Index or array-like result_name : list sort : True or None, default None Whether to sort the resulting index. * True : Attempt to sort the result. * None : Do not sort the result. Returns ------- symmetric_difference : MultiIndex Notes ----- ``symmetric_difference`` contains elements that appear in either ``idx1`` or ``idx2`` but not both. Equivalent to the Index created by ``idx1.difference(idx2) | idx2.difference(idx1)`` with duplicates dropped. Examples -------- >>> midx1 = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 0, 0, 0, 1, 2, 0, 1, 2]]) >>> midx2 = pd.MultiIndex([['pandas-on-Spark', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 0, 0, 0, 1, 2, 0, 1, 2]]) >>> s1 = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx1) >>> s2 = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx2) >>> s1.index.symmetric_difference(s2.index) # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], ) You can set names of the result Index. >>> s1.index.symmetric_difference(s2.index, result_name=['a', 'b']) # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], names=['a', 'b']) You can set sort to `True`, if you want to sort the resulting index. >>> s1.index.symmetric_difference(s2.index, sort=True) # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], ) You can also use the ``^`` operator: >>> s1.index ^ s2.index # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], ) """ if type(self) != type(other): raise NotImplementedError( "Doesn't support symmetric_difference between Index & MultiIndex for now" ) sdf_self = self._psdf._internal.spark_frame.select(self._internal.index_spark_columns) sdf_other = other._psdf._internal.spark_frame.select(other._internal.index_spark_columns) sdf_symdiff = sdf_self.union(sdf_other).subtract(sdf_self.intersect(sdf_other)) if sort: sdf_symdiff = sdf_symdiff.sort(*self._internal.index_spark_column_names) internal = InternalFrame( spark_frame=sdf_symdiff, index_spark_columns=[ scol_for(sdf_symdiff, col) for col in self._internal.index_spark_column_names ], index_names=self._internal.index_names, index_fields=self._internal.index_fields, ) result = cast(MultiIndex, DataFrame(internal).index) if result_name: result.names = result_name return result
# TODO: ADD error parameter
[docs] def drop(self, codes: List[Any], level: Optional[Union[int, Name]] = None) -> "MultiIndex": """ Make new MultiIndex with passed list of labels deleted Parameters ---------- codes : array-like Must be a list of tuples level : int or level name, default None Returns ------- dropped : MultiIndex Examples -------- >>> index = ps.MultiIndex.from_tuples([('a', 'x'), ('b', 'y'), ('c', 'z')]) >>> index # doctest: +SKIP MultiIndex([('a', 'x'), ('b', 'y'), ('c', 'z')], ) >>> index.drop(['a']) # doctest: +SKIP MultiIndex([('b', 'y'), ('c', 'z')], ) >>> index.drop(['x', 'y'], level=1) # doctest: +SKIP MultiIndex([('c', 'z')], ) """ internal = self._internal.resolved_copy sdf = internal.spark_frame index_scols = internal.index_spark_columns if level is None: scol = index_scols[0] elif isinstance(level, int): scol = index_scols[level] else: scol = None for index_spark_column, index_name in zip( internal.index_spark_columns, internal.index_names ): if not isinstance(level, tuple): level = (level,) if level == index_name: if scol is not None: raise ValueError( "The name {} occurs multiple times, use a level number".format( name_like_string(level) ) ) scol = index_spark_column if scol is None: raise KeyError("Level {} not found".format(name_like_string(level))) sdf = sdf[~scol.isin(codes)] internal = InternalFrame( spark_frame=sdf, index_spark_columns=[scol_for(sdf, col) for col in internal.index_spark_column_names], index_names=internal.index_names, index_fields=internal.index_fields, column_labels=[], data_spark_columns=[], data_fields=[], ) return cast(MultiIndex, DataFrame(internal).index)
def drop_duplicates(self, keep: Union[bool, str] = "first") -> "MultiIndex": """ Return MultiIndex with duplicate values removed. Parameters ---------- keep : {'first', 'last', ``False``}, default 'first' Method to handle dropping duplicates: - 'first' : Drop duplicates except for the first occurrence. - 'last' : Drop duplicates except for the last occurrence. - ``False`` : Drop all duplicates. Returns ------- deduplicated : MultiIndex See Also -------- Series.drop_duplicates : Equivalent method on Series. DataFrame.drop_duplicates : Equivalent method on DataFrame. Examples -------- Generate a MultiIndex with duplicate values. >>> arrays = [[1, 2, 3, 1, 2], ["red", "blue", "black", "red", "blue"]] >>> midx = ps.MultiIndex.from_arrays(arrays, names=("number", "color")) >>> midx MultiIndex([(1, 'red'), (2, 'blue'), (3, 'black'), (1, 'red'), (2, 'blue')], names=['number', 'color']) >>> midx.drop_duplicates() MultiIndex([(1, 'red'), (2, 'blue'), (3, 'black')], names=['number', 'color']) >>> midx.drop_duplicates(keep='first') MultiIndex([(1, 'red'), (2, 'blue'), (3, 'black')], names=['number', 'color']) >>> midx.drop_duplicates(keep='last') MultiIndex([(3, 'black'), (1, 'red'), (2, 'blue')], names=['number', 'color']) >>> midx.drop_duplicates(keep=False) MultiIndex([(3, 'black')], names=['number', 'color']) """ with ps.option_context("compute.default_index_type", "distributed"): # The attached index caused by `reset_index` below is used for sorting only, # and it will be dropped soon, # so we enforce “distributed” default index type psdf = self.to_frame().reset_index(drop=True) return ps.MultiIndex.from_frame(psdf.drop_duplicates(keep=keep).sort_index()) def argmax(self) -> None: raise TypeError("reduction operation 'argmax' not allowed for this dtype") def argmin(self) -> None: raise TypeError("reduction operation 'argmin' not allowed for this dtype") def asof(self, label: Any) -> None: raise NotImplementedError( "only the default get_loc method is currently supported for MultiIndex" ) @property def is_all_dates(self) -> bool: """ is_all_dates always returns False for MultiIndex Examples -------- >>> from datetime import datetime >>> idx = ps.MultiIndex.from_tuples( ... [(datetime(2019, 1, 1, 0, 0, 0), datetime(2019, 1, 1, 0, 0, 0)), ... (datetime(2019, 1, 1, 0, 0, 0), datetime(2019, 1, 1, 0, 0, 0))]) >>> idx # doctest: +SKIP MultiIndex([('2019-01-01', '2019-01-01'), ('2019-01-01', '2019-01-01')], ) >>> idx.is_all_dates False """ return False def __getattr__(self, item: str) -> Any: if hasattr(MissingPandasLikeMultiIndex, item): property_or_func = getattr(MissingPandasLikeMultiIndex, item) if isinstance(property_or_func, property): return property_or_func.fget(self) else: return partial(property_or_func, self) raise AttributeError("'MultiIndex' object has no attribute '{}'".format(item)) def _get_level_number(self, level: Union[int, Name]) -> int: """ Return the level number if a valid level is given. """ count = self.names.count(level) if (count > 1) and not isinstance(level, int): raise ValueError("The name %s occurs multiple times, use a level number" % level) if level in self.names: level = self.names.index(level) elif isinstance(level, int): nlevels = self.nlevels if level >= nlevels: raise IndexError( "Too many levels: Index has only %d " "levels, %d is not a valid level number" % (nlevels, level) ) if level < 0: if (level + nlevels) < 0: raise IndexError( "Too many levels: Index has only %d levels, " "not %d" % (nlevels, level + 1) ) level = level + nlevels else: raise KeyError("Level %s not found" % str(level)) return level def get_level_values(self, level: Union[int, Name]) -> Index: """ Return vector of label values for requested level, equal to the length of the index. Parameters ---------- level : int or str ``level`` is either the integer position of the level in the MultiIndex, or the name of the level. Returns ------- values : Index Values is a level of this MultiIndex converted to a single :class:`Index` (or subclass thereof). Examples -------- Create a MultiIndex: >>> mi = ps.MultiIndex.from_tuples([('x', 'a'), ('x', 'b'), ('y', 'a')]) >>> mi.names = ['level_1', 'level_2'] Get level values by supplying level as either integer or name: >>> mi.get_level_values(0) Index(['x', 'x', 'y'], dtype='object', name='level_1') >>> mi.get_level_values('level_2') Index(['a', 'b', 'a'], dtype='object', name='level_2') """ level = self._get_level_number(level) index_scol = self._internal.index_spark_columns[level] index_name = self._internal.index_names[level] index_field = self._internal.index_fields[level] internal = self._internal.copy( index_spark_columns=[index_scol], index_names=[index_name], index_fields=[index_field], column_labels=[], data_spark_columns=[], data_fields=[], ) return DataFrame(internal).index
[docs] def insert(self, loc: int, item: Any) -> Index: """ Make new MultiIndex inserting new item at location. Follows Python list.append semantics for negative values. .. versionchanged:: 3.4.0 Raise IndexError when loc is out of bounds to follow Pandas 1.4+ behavior Parameters ---------- loc : int item : object Returns ------- new_index : MultiIndex Examples -------- >>> psmidx = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")]) >>> psmidx.insert(3, ("h", "j")) # doctest: +SKIP MultiIndex([('a', 'x'), ('b', 'y'), ('c', 'z'), ('h', 'j')], ) For negative values >>> psmidx.insert(-2, ("h", "j")) # doctest: +SKIP MultiIndex([('a', 'x'), ('h', 'j'), ('b', 'y'), ('c', 'z')], ) """ validate_index_loc(self, loc) loc = loc + len(self) if loc < 0 else loc index_name: List[Label] = [(name,) for name in self._internal.index_spark_column_names] sdf_before = self.to_frame(name=index_name)[:loc]._to_spark() sdf_middle = Index([item]).to_frame(name=index_name)._to_spark() sdf_after = self.to_frame(name=index_name)[loc:]._to_spark() sdf = sdf_before.union(sdf_middle).union(sdf_after) internal = InternalFrame( spark_frame=sdf, index_spark_columns=[ scol_for(sdf, col) for col in self._internal.index_spark_column_names ], index_names=self._internal.index_names, index_fields=[InternalField(field.dtype) for field in self._internal.index_fields], ) return DataFrame(internal).index
[docs] def item(self) -> Tuple[Scalar, ...]: """ Return the first element of the underlying data as a python tuple. Returns ------- tuple The first element of MultiIndex. Raises ------ ValueError If the data is not length-1. Examples -------- >>> psmidx = ps.MultiIndex.from_tuples([('a', 'x')]) >>> psmidx.item() ('a', 'x') """ return self._psdf.head(2)._to_internal_pandas().index.item()
[docs] def intersection(self, other: Union[DataFrame, Series, Index, List]) -> "MultiIndex": """ Form the intersection of two Index objects. This returns a new Index with elements common to the index and `other`. Parameters ---------- other : Index or array-like Returns ------- intersection : MultiIndex Examples -------- >>> midx1 = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")]) >>> midx2 = ps.MultiIndex.from_tuples([("c", "z"), ("d", "w")]) >>> midx1.intersection(midx2).sort_values() # doctest: +SKIP MultiIndex([('c', 'z')], ) """ if isinstance(other, Series) or not is_list_like(other): raise TypeError("other must be a MultiIndex or a list of tuples") elif isinstance(other, DataFrame): raise ValueError("Index data must be 1-dimensional") elif isinstance(other, MultiIndex): spark_frame_other = other.to_frame()._to_spark() keep_name = self.names == other.names elif isinstance(other, Index): # Always returns an empty MultiIndex if `other` is Index. return cast(MultiIndex, self.to_frame().head(0).index) elif not all(isinstance(item, tuple) for item in other): raise TypeError("other must be a MultiIndex or a list of tuples") else: other = MultiIndex.from_tuples(list(other)) spark_frame_other = cast(MultiIndex, other).to_frame()._to_spark() keep_name = True index_fields = self._index_fields_for_union_like(other, func_name="intersection") default_name: List[Name] = [SPARK_INDEX_NAME_FORMAT(i) for i in range(self.nlevels)] spark_frame_self = self.to_frame(name=default_name)._to_spark() spark_frame_intersected = spark_frame_self.intersect(spark_frame_other) if keep_name: index_names = self._internal.index_names else: index_names = None internal = InternalFrame( spark_frame=spark_frame_intersected, index_spark_columns=[ scol_for(spark_frame_intersected, cast(str, col)) for col in default_name ], index_names=index_names, index_fields=index_fields, ) return cast(MultiIndex, DataFrame(internal).index)
[docs] def equal_levels(self, other: "MultiIndex") -> bool: """ Return True if the levels of both MultiIndex objects are the same .. versionadded:: 3.3.0 Examples -------- >>> psmidx1 = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")]) >>> psmidx2 = ps.MultiIndex.from_tuples([("b", "y"), ("a", "x"), ("c", "z")]) >>> psmidx1.equal_levels(psmidx2) True >>> psmidx2 = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "j")]) >>> psmidx1.equal_levels(psmidx2) False """ nlevels = self.nlevels if nlevels != other.nlevels: return False self_sdf = self._internal.spark_frame other_sdf = other._internal.spark_frame subtract_list = [] for nlevel in range(nlevels): self_index_scol = self._internal.index_spark_columns[nlevel] other_index_scol = other._internal.index_spark_columns[nlevel] self_subtract_other = self_sdf.select(self_index_scol).subtract( other_sdf.select(other_index_scol) ) subtract_list.append(self_subtract_other) unioned_subtracts = reduce(lambda x, y: x.union(y), subtract_list) return len(unioned_subtracts.head(1)) == 0
@property def hasnans(self) -> bool: raise NotImplementedError("hasnans is not defined for MultiIndex") @property def inferred_type(self) -> str: """ Return a string of the type inferred from the values. """ # Always returns "mixed" for MultiIndex return "mixed" @property def asi8(self) -> None: """ Integer representation of the values. """ # Always returns None for MultiIndex return None def factorize( self, sort: bool = True, na_sentinel: Optional[int] = -1 ) -> Tuple["MultiIndex", pd.Index]: return MissingPandasLikeMultiIndex.factorize(self, sort=sort, na_sentinel=na_sentinel) def __iter__(self) -> Iterator: return MissingPandasLikeMultiIndex.__iter__(self) def map( self, mapper: Union[dict, Callable[[Any], Any], pd.Series] = None, na_action: Optional[str] = None, ) -> "Index": return MissingPandasLikeMultiIndex.map(self, mapper, na_action)
def _test() -> None: import os import doctest import sys import numpy from pyspark.sql import SparkSession import pyspark.pandas.indexes.multi os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.pandas.indexes.multi.__dict__.copy() globs["np"] = numpy globs["ps"] = pyspark.pandas spark = ( SparkSession.builder.master("local[4]") .appName("pyspark.pandas.indexes.multi tests") .getOrCreate() ) (failure_count, test_count) = doctest.testmod( pyspark.pandas.indexes.multi, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE, ) spark.stop() if failure_count: sys.exit(-1) if __name__ == "__main__": _test()