Source code for pyspark.ml.tuning

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import itertools
import numpy as np

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasSeed
from pyspark.sql.functions import rand

__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit',
           'TrainValidationSplitModel']


[docs]class ParamGridBuilder(object): r""" Builder for a param grid used in grid search-based model selection. >>> from pyspark.ml.classification import LogisticRegression >>> lr = LogisticRegression() >>> output = ParamGridBuilder() \ ... .baseOn({lr.labelCol: 'l'}) \ ... .baseOn([lr.predictionCol, 'p']) \ ... .addGrid(lr.regParam, [1.0, 2.0]) \ ... .addGrid(lr.maxIter, [1, 5]) \ ... .build() >>> expected = [ ... {lr.regParam: 1.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, ... {lr.regParam: 2.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, ... {lr.regParam: 1.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, ... {lr.regParam: 2.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}] >>> len(output) == len(expected) True >>> all([m in expected for m in output]) True .. versionadded:: 1.4.0 """ def __init__(self): self._param_grid = {}
[docs] @since("1.4.0") def addGrid(self, param, values): """ Sets the given parameters in this grid to fixed values. """ self._param_grid[param] = values return self
[docs] @since("1.4.0") def baseOn(self, *args): """ Sets the given parameters in this grid to fixed values. Accepts either a parameter dictionary or a list of (parameter, value) pairs. """ if isinstance(args[0], dict): self.baseOn(*args[0].items()) else: for (param, value) in args: self.addGrid(param, [value]) return self
[docs] @since("1.4.0") def build(self): """ Builds and returns all combinations of parameters specified by the param grid. """ keys = self._param_grid.keys() grid_values = self._param_grid.values() return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)]
class ValidatorParams(HasSeed): """ Common params for TrainValidationSplit and CrossValidator. """ estimator = Param(Params._dummy(), "estimator", "estimator to be cross-validated") estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps") evaluator = Param( Params._dummy(), "evaluator", "evaluator used to select hyper-parameters that maximize the validator metric") def setEstimator(self, value): """ Sets the value of :py:attr:`estimator`. """ return self._set(estimator=value) def getEstimator(self): """ Gets the value of estimator or its default value. """ return self.getOrDefault(self.estimator) def setEstimatorParamMaps(self, value): """ Sets the value of :py:attr:`estimatorParamMaps`. """ return self._set(estimatorParamMaps=value) def getEstimatorParamMaps(self): """ Gets the value of estimatorParamMaps or its default value. """ return self.getOrDefault(self.estimatorParamMaps) def setEvaluator(self, value): """ Sets the value of :py:attr:`evaluator`. """ return self._set(evaluator=value) def getEvaluator(self): """ Gets the value of evaluator or its default value. """ return self.getOrDefault(self.evaluator)
[docs]class CrossValidator(Estimator, ValidatorParams): """ K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the test set exactly once. >>> from pyspark.ml.classification import LogisticRegression >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator >>> from pyspark.ml.linalg import Vectors >>> dataset = spark.createDataFrame( ... [(Vectors.dense([0.0]), 0.0), ... (Vectors.dense([0.4]), 1.0), ... (Vectors.dense([0.5]), 0.0), ... (Vectors.dense([0.6]), 1.0), ... (Vectors.dense([1.0]), 1.0)] * 10, ... ["features", "label"]) >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) >>> cvModel = cv.fit(dataset) >>> cvModel.avgMetrics[0] 0.5 >>> evaluator.evaluate(cvModel.transform(dataset)) 0.8333... .. versionadded:: 1.4.0 """ numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation", typeConverter=TypeConverters.toInt) @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, seed=None): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ seed=None) """ super(CrossValidator, self).__init__() self._setDefault(numFolds=3) kwargs = self._input_kwargs self._set(**kwargs)
[docs] @keyword_only @since("1.4.0") def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, seed=None): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ seed=None): Sets params for cross validator. """ kwargs = self._input_kwargs return self._set(**kwargs)
[docs] @since("1.4.0") def setNumFolds(self, value): """ Sets the value of :py:attr:`numFolds`. """ return self._set(numFolds=value)
[docs] @since("1.4.0") def getNumFolds(self): """ Gets the value of numFolds or its default value. """ return self.getOrDefault(self.numFolds)
def _fit(self, dataset): est = self.getOrDefault(self.estimator) epm = self.getOrDefault(self.estimatorParamMaps) numModels = len(epm) eva = self.getOrDefault(self.evaluator) nFolds = self.getOrDefault(self.numFolds) seed = self.getOrDefault(self.seed) h = 1.0 / nFolds randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels for i in range(nFolds): validateLB = i * h validateUB = (i + 1) * h condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) validation = df.filter(condition) train = df.filter(~condition) models = est.fit(train, epm) for j in range(numModels): model = models[j] # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, epm[j])) metrics[j] += metric/nFolds if eva.isLargerBetter(): bestIndex = np.argmax(metrics) else: bestIndex = np.argmin(metrics) bestModel = est.fit(dataset, epm[bestIndex]) return self._copyValues(CrossValidatorModel(bestModel, metrics))
[docs] @since("1.4.0") def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ if extra is None: extra = dict() newCV = Params.copy(self, extra) if self.isSet(self.estimator): newCV.setEstimator(self.getEstimator().copy(extra)) # estimatorParamMaps remain the same if self.isSet(self.evaluator): newCV.setEvaluator(self.getEvaluator().copy(extra)) return newCV
[docs]class CrossValidatorModel(Model, ValidatorParams): """ CrossValidatorModel contains the model with the highest average cross-validation metric across folds and uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map evaluated. .. versionadded:: 1.4.0 """ def __init__(self, bestModel, avgMetrics=[]): super(CrossValidatorModel, self).__init__() #: best model from cross validation self.bestModel = bestModel #: Average cross-validation metrics for each paramMap in #: CrossValidator.estimatorParamMaps, in the corresponding order. self.avgMetrics = avgMetrics def _transform(self, dataset): return self.bestModel.transform(dataset)
[docs] @since("1.4.0") def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies the underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ if extra is None: extra = dict() bestModel = self.bestModel.copy(extra) avgMetrics = self.avgMetrics return CrossValidatorModel(bestModel, avgMetrics)
[docs]class TrainValidationSplit(Estimator, ValidatorParams): """ .. note:: Experimental Validation for hyper-parameter tuning. Randomly splits the input dataset into train and validation sets, and uses evaluation metric on the validation set to select the best model. Similar to :class:`CrossValidator`, but only splits the set once. >>> from pyspark.ml.classification import LogisticRegression >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator >>> from pyspark.ml.linalg import Vectors >>> dataset = spark.createDataFrame( ... [(Vectors.dense([0.0]), 0.0), ... (Vectors.dense([0.4]), 1.0), ... (Vectors.dense([0.5]), 0.0), ... (Vectors.dense([0.6]), 1.0), ... (Vectors.dense([1.0]), 1.0)] * 10, ... ["features", "label"]) >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) >>> tvsModel = tvs.fit(dataset) >>> evaluator.evaluate(tvsModel.transform(dataset)) 0.8333... .. versionadded:: 2.0.0 """ trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\ validation data. Must be between 0 and 1.", typeConverter=TypeConverters.toFloat) @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, seed=None): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ seed=None) """ super(TrainValidationSplit, self).__init__() self._setDefault(trainRatio=0.75) kwargs = self._input_kwargs self._set(**kwargs)
[docs] @since("2.0.0") @keyword_only def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, seed=None): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ seed=None): Sets params for the train validation split. """ kwargs = self._input_kwargs return self._set(**kwargs)
[docs] @since("2.0.0") def setTrainRatio(self, value): """ Sets the value of :py:attr:`trainRatio`. """ return self._set(trainRatio=value)
[docs] @since("2.0.0") def getTrainRatio(self): """ Gets the value of trainRatio or its default value. """ return self.getOrDefault(self.trainRatio)
def _fit(self, dataset): est = self.getOrDefault(self.estimator) epm = self.getOrDefault(self.estimatorParamMaps) numModels = len(epm) eva = self.getOrDefault(self.evaluator) tRatio = self.getOrDefault(self.trainRatio) seed = self.getOrDefault(self.seed) randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels condition = (df[randCol] >= tRatio) validation = df.filter(condition) train = df.filter(~condition) models = est.fit(train, epm) for j in range(numModels): model = models[j] metric = eva.evaluate(model.transform(validation, epm[j])) metrics[j] += metric if eva.isLargerBetter(): bestIndex = np.argmax(metrics) else: bestIndex = np.argmin(metrics) bestModel = est.fit(dataset, epm[bestIndex]) return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
[docs] @since("2.0.0") def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ if extra is None: extra = dict() newTVS = Params.copy(self, extra) if self.isSet(self.estimator): newTVS.setEstimator(self.getEstimator().copy(extra)) # estimatorParamMaps remain the same if self.isSet(self.evaluator): newTVS.setEvaluator(self.getEvaluator().copy(extra)) return newTVS
[docs]class TrainValidationSplitModel(Model, ValidatorParams): """ .. note:: Experimental Model from train validation split. .. versionadded:: 2.0.0 """ def __init__(self, bestModel, validationMetrics=[]): super(TrainValidationSplitModel, self).__init__() #: best model from cross validation self.bestModel = bestModel #: evaluated validation metrics self.validationMetrics = validationMetrics def _transform(self, dataset): return self.bestModel.transform(dataset)
[docs] @since("2.0.0") def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies the underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. And, this creates a shallow copy of the validationMetrics. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ if extra is None: extra = dict() bestModel = self.bestModel.copy(extra) validationMetrics = list(self.validationMetrics) return TrainValidationSplitModel(bestModel, validationMetrics)
if __name__ == "__main__": import doctest from pyspark.sql import SparkSession globs = globals().copy() # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ .master("local[2]")\ .appName("ml.tuning tests")\ .getOrCreate() sc = spark.sparkContext globs['sc'] = sc globs['spark'] = spark (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)