Source code for pyspark.ml.regression

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.ml.util import keyword_only
from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import *
from pyspark.mllib.common import inherit_doc


__all__ = ['DecisionTreeRegressor', 'DecisionTreeRegressionModel', 'GBTRegressor',
           'GBTRegressionModel', 'LinearRegression', 'LinearRegressionModel',
           'RandomForestRegressor', 'RandomForestRegressionModel']


@inherit_doc
[docs]class LinearRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter, HasRegParam, HasTol): """ Linear regression. The learning objective is to minimize the squared error, with regularization. The specific squared error loss function used is: L = 1/2n ||A weights - y||^2^ This support multiple types of regularization: - none (a.k.a. ordinary least squares) - L2 (ridge regression) - L1 (Lasso) - L2 + L1 (elastic net) >>> from pyspark.mllib.linalg import Vectors >>> df = sqlContext.createDataFrame([ ... (1.0, Vectors.dense(1.0)), ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"]) >>> lr = LinearRegression(maxIter=5, regParam=0.0) >>> model = lr.fit(df) >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> model.transform(test0).head().prediction -1.0 >>> model.weights DenseVector([1.0]) >>> model.intercept 0.0 >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) >>> model.transform(test1).head().prediction 1.0 >>> lr.setParams("vector") Traceback (most recent call last): ... TypeError: Method setParams forces keyword arguments. """ # a placeholder to make it appear in the generated doc elasticNetParam = \ Param(Params._dummy(), "elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, " + "the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.") @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6) """ super(LinearRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.regression.LinearRegression", self.uid) #: param for the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty # is an L2 penalty. For alpha = 1, it is an L1 penalty. self.elasticNetParam = \ Param(self, "elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty " + "is an L2 penalty. For alpha = 1, it is an L1 penalty.") self._setDefault(maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only
[docs] def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6) Sets params for linear regression. """ kwargs = self.setParams._input_kwargs return self._set(**kwargs)
def _create_model(self, java_model): return LinearRegressionModel(java_model)
[docs] def setElasticNetParam(self, value): """ Sets the value of :py:attr:`elasticNetParam`. """ self._paramMap[self.elasticNetParam] = value return self
[docs] def getElasticNetParam(self): """ Gets the value of elasticNetParam or its default value. """ return self.getOrDefault(self.elasticNetParam)
[docs]class LinearRegressionModel(JavaModel): """ Model fitted by LinearRegression. """ @property
[docs] def weights(self): """ Model weights. """ return self._call_java("weights")
@property
[docs] def intercept(self): """ Model intercept. """ return self._call_java("intercept")
class TreeRegressorParams(object): """ Private class to track supported impurity measures. """ supportedImpurities = ["variance"] class RandomForestParams(object): """ Private class to track supported random forest parameters. """ supportedFeatureSubsetStrategies = ["auto", "all", "onethird", "sqrt", "log2"] class GBTParams(object): """ Private class to track supported GBT params. """ supportedLossTypes = ["squared", "absolute"] @inherit_doc
[docs]class DecisionTreeRegressor(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, DecisionTreeParams, HasCheckpointInterval): """ `http://en.wikipedia.org/wiki/Decision_tree_learning Decision tree` learning algorithm for regression. It supports both continuous and categorical features. >>> from pyspark.mllib.linalg import Vectors >>> df = sqlContext.createDataFrame([ ... (1.0, Vectors.dense(1.0)), ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"]) >>> dt = DecisionTreeRegressor(maxDepth=2) >>> model = dt.fit(df) >>> model.depth 1 >>> model.numNodes 3 >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> model.transform(test0).head().prediction 0.0 >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) >>> model.transform(test1).head().prediction 1.0 """ # a placeholder to make it appear in the generated doc impurity = Param(Params._dummy(), "impurity", "Criterion used for information gain calculation (case-insensitive). " + "Supported options: " + ", ".join(TreeRegressorParams.supportedImpurities)) @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance"): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance") """ super(DecisionTreeRegressor, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.regression.DecisionTreeRegressor", self.uid) #: param for Criterion used for information gain calculation (case-insensitive). self.impurity = \ Param(self, "impurity", "Criterion used for information gain calculation (case-insensitive). " + "Supported options: " + ", ".join(TreeRegressorParams.supportedImpurities)) self._setDefault(maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance") kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only
[docs] def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance"): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance") Sets params for the DecisionTreeRegressor. """ kwargs = self.setParams._input_kwargs return self._set(**kwargs)
def _create_model(self, java_model): return DecisionTreeRegressionModel(java_model)
[docs] def setImpurity(self, value): """ Sets the value of :py:attr:`impurity`. """ self._paramMap[self.impurity] = value return self
[docs] def getImpurity(self): """ Gets the value of impurity or its default value. """ return self.getOrDefault(self.impurity)
@inherit_doc class DecisionTreeModel(JavaModel): @property def numNodes(self): """Return number of nodes of the decision tree.""" return self._call_java("numNodes") @property def depth(self): """Return depth of the decision tree.""" return self._call_java("depth") def __repr__(self): return self._call_java("toString") @inherit_doc class TreeEnsembleModels(JavaModel): @property def treeWeights(self): """Return the weights for each tree""" return list(self._call_java("javaTreeWeights")) def __repr__(self): return self._call_java("toString") @inherit_doc
[docs]class DecisionTreeRegressionModel(DecisionTreeModel): """ Model fitted by DecisionTreeRegressor. """
@inherit_doc
[docs]class RandomForestRegressor(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasSeed, DecisionTreeParams, HasCheckpointInterval): """ `http://en.wikipedia.org/wiki/Random_forest Random Forest` learning algorithm for regression. It supports both continuous and categorical features. >>> from numpy import allclose >>> from pyspark.mllib.linalg import Vectors >>> df = sqlContext.createDataFrame([ ... (1.0, Vectors.dense(1.0)), ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"]) >>> rf = RandomForestRegressor(numTrees=2, maxDepth=2, seed=42) >>> model = rf.fit(df) >>> allclose(model.treeWeights, [1.0, 1.0]) True >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> model.transform(test0).head().prediction 0.0 >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) >>> model.transform(test1).head().prediction 0.5 """ # a placeholder to make it appear in the generated doc impurity = Param(Params._dummy(), "impurity", "Criterion used for information gain calculation (case-insensitive). " + "Supported options: " + ", ".join(TreeRegressorParams.supportedImpurities)) subsamplingRate = Param(Params._dummy(), "subsamplingRate", "Fraction of the training data used for learning each decision tree, " + "in range (0, 1].") numTrees = Param(Params._dummy(), "numTrees", "Number of trees to train (>= 1)") featureSubsetStrategy = \ Param(Params._dummy(), "featureSubsetStrategy", "The number of features to consider for splits at each tree node. Supported " + "options: " + ", ".join(RandomForestParams.supportedFeatureSubsetStrategies)) @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", numTrees=20, featureSubsetStrategy="auto", seed=None): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ impurity="variance", numTrees=20, \ featureSubsetStrategy="auto", seed=None) """ super(RandomForestRegressor, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.regression.RandomForestRegressor", self.uid) #: param for Criterion used for information gain calculation (case-insensitive). self.impurity = \ Param(self, "impurity", "Criterion used for information gain calculation (case-insensitive). " + "Supported options: " + ", ".join(TreeRegressorParams.supportedImpurities)) #: param for Fraction of the training data used for learning each decision tree, # in range (0, 1] self.subsamplingRate = Param(self, "subsamplingRate", "Fraction of the training data used for learning each " + "decision tree, in range (0, 1].") #: param for Number of trees to train (>= 1) self.numTrees = Param(self, "numTrees", "Number of trees to train (>= 1)") #: param for The number of features to consider for splits at each tree node self.featureSubsetStrategy = \ Param(self, "featureSubsetStrategy", "The number of features to consider for splits at each tree node. Supported " + "options: " + ", ".join(RandomForestParams.supportedFeatureSubsetStrategies)) self._setDefault(maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=None, impurity="variance", numTrees=20, featureSubsetStrategy="auto") kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only
[docs] def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=None, impurity="variance", numTrees=20, featureSubsetStrategy="auto"): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=None, \ impurity="variance", numTrees=20, featureSubsetStrategy="auto") Sets params for linear regression. """ kwargs = self.setParams._input_kwargs return self._set(**kwargs)
def _create_model(self, java_model): return RandomForestRegressionModel(java_model)
[docs] def setImpurity(self, value): """ Sets the value of :py:attr:`impurity`. """ self._paramMap[self.impurity] = value return self
[docs] def getImpurity(self): """ Gets the value of impurity or its default value. """ return self.getOrDefault(self.impurity)
[docs] def setSubsamplingRate(self, value): """ Sets the value of :py:attr:`subsamplingRate`. """ self._paramMap[self.subsamplingRate] = value return self
[docs] def getSubsamplingRate(self): """ Gets the value of subsamplingRate or its default value. """ return self.getOrDefault(self.subsamplingRate)
[docs] def setNumTrees(self, value): """ Sets the value of :py:attr:`numTrees`. """ self._paramMap[self.numTrees] = value return self
[docs] def getNumTrees(self): """ Gets the value of numTrees or its default value. """ return self.getOrDefault(self.numTrees)
[docs] def setFeatureSubsetStrategy(self, value): """ Sets the value of :py:attr:`featureSubsetStrategy`. """ self._paramMap[self.featureSubsetStrategy] = value return self
[docs] def getFeatureSubsetStrategy(self): """ Gets the value of featureSubsetStrategy or its default value. """ return self.getOrDefault(self.featureSubsetStrategy)
[docs]class RandomForestRegressionModel(TreeEnsembleModels): """ Model fitted by RandomForestRegressor. """
@inherit_doc
[docs]class GBTRegressor(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter, DecisionTreeParams, HasCheckpointInterval): """ `http://en.wikipedia.org/wiki/Gradient_boosting Gradient-Boosted Trees (GBTs)` learning algorithm for regression. It supports both continuous and categorical features. >>> from numpy import allclose >>> from pyspark.mllib.linalg import Vectors >>> df = sqlContext.createDataFrame([ ... (1.0, Vectors.dense(1.0)), ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"]) >>> gbt = GBTRegressor(maxIter=5, maxDepth=2) >>> model = gbt.fit(df) >>> allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1]) True >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> model.transform(test0).head().prediction 0.0 >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) >>> model.transform(test1).head().prediction 1.0 """ # a placeholder to make it appear in the generated doc lossType = Param(Params._dummy(), "lossType", "Loss function which GBT tries to minimize (case-insensitive). " + "Supported options: " + ", ".join(GBTParams.supportedLossTypes)) subsamplingRate = Param(Params._dummy(), "subsamplingRate", "Fraction of the training data used for learning each decision tree, " + "in range (0, 1].") stepSize = Param(Params._dummy(), "stepSize", "Step size (a.k.a. learning rate) in interval (0, 1] for shrinking the " + "contribution of each estimator") @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ lossType="squared", maxIter=20, stepSize=0.1) """ super(GBTRegressor, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.regression.GBTRegressor", self.uid) #: param for Loss function which GBT tries to minimize (case-insensitive). self.lossType = Param(self, "lossType", "Loss function which GBT tries to minimize (case-insensitive). " + "Supported options: " + ", ".join(GBTParams.supportedLossTypes)) #: Fraction of the training data used for learning each decision tree, in range (0, 1]. self.subsamplingRate = Param(self, "subsamplingRate", "Fraction of the training data used for learning each " + "decision tree, in range (0, 1].") #: Step size (a.k.a. learning rate) in interval (0, 1] for shrinking the contribution of # each estimator self.stepSize = Param(self, "stepSize", "Step size (a.k.a. learning rate) in interval (0, 1] for shrinking " + "the contribution of each estimator") self._setDefault(maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only
[docs] def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ lossType="squared", maxIter=20, stepSize=0.1) Sets params for Gradient Boosted Tree Regression. """ kwargs = self.setParams._input_kwargs return self._set(**kwargs)
def _create_model(self, java_model): return GBTRegressionModel(java_model)
[docs] def setLossType(self, value): """ Sets the value of :py:attr:`lossType`. """ self._paramMap[self.lossType] = value return self
[docs] def getLossType(self): """ Gets the value of lossType or its default value. """ return self.getOrDefault(self.lossType)
[docs] def setSubsamplingRate(self, value): """ Sets the value of :py:attr:`subsamplingRate`. """ self._paramMap[self.subsamplingRate] = value return self
[docs] def getSubsamplingRate(self): """ Gets the value of subsamplingRate or its default value. """ return self.getOrDefault(self.subsamplingRate)
[docs] def setStepSize(self, value): """ Sets the value of :py:attr:`stepSize`. """ self._paramMap[self.stepSize] = value return self
[docs] def getStepSize(self): """ Gets the value of stepSize or its default value. """ return self.getOrDefault(self.stepSize)
[docs]class GBTRegressionModel(TreeEnsembleModels): """ Model fitted by GBTRegressor. """
if __name__ == "__main__": import doctest from pyspark.context import SparkContext from pyspark.sql import SQLContext globs = globals().copy() # The small batch size here ensures that we see multiple batches, # even in these small test examples: sc = SparkContext("local[2]", "ml.regression tests") sqlContext = SQLContext(sc) globs['sc'] = sc globs['sqlContext'] = sqlContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) sc.stop() if failure_count: exit(-1)