Package pyspark :: Package mllib :: Module regression
[frames] | no frames]

Source Code for Module pyspark.mllib.regression

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  from numpy import array, ndarray 
 19  from pyspark import SparkContext 
 20  from pyspark.mllib._common import \ 
 21      _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 22      _serialize_double_matrix, _deserialize_double_matrix, \ 
 23      _serialize_double_vector, _deserialize_double_vector, \ 
 24      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 25      _linear_predictor_typecheck, _have_scipy, _scipy_issparse 
 26  from pyspark.mllib.linalg import SparseVector, Vectors 
27 28 29 -class LabeledPoint(object):
30 31 """ 32 The features and labels of a data point. 33 34 @param label: Label for this data point. 35 @param features: Vector of features for this point (NumPy array, list, 36 pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) 37 """ 38
39 - def __init__(self, label, features):
40 self.label = label 41 if (type(features) == ndarray or type(features) == SparseVector 42 or (_have_scipy and _scipy_issparse(features))): 43 self.features = features 44 elif type(features) == list: 45 self.features = array(features) 46 else: 47 raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
48
49 - def __str__(self):
50 return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")"
51
52 53 -class LinearModel(object):
54 55 """A linear model that has a vector of coefficients and an intercept.""" 56
57 - def __init__(self, weights, intercept):
58 self._coeff = weights 59 self._intercept = intercept
60 61 @property
62 - def weights(self):
63 return self._coeff
64 65 @property
66 - def intercept(self):
67 return self._intercept
68
69 70 -class LinearRegressionModelBase(LinearModel):
71 72 """A linear regression model. 73 74 >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) 75 >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 76 True 77 >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 78 True 79 """ 80
81 - def predict(self, x):
82 """Predict the value of the dependent variable given a vector x""" 83 """containing values for the independent variables.""" 84 _linear_predictor_typecheck(x, self._coeff) 85 return _dot(x, self._coeff) + self._intercept
86
87 88 -class LinearRegressionModel(LinearRegressionModelBase):
89 90 """A linear regression model derived from a least-squares fit. 91 92 >>> from pyspark.mllib.regression import LabeledPoint 93 >>> data = [ 94 ... LabeledPoint(0.0, [0.0]), 95 ... LabeledPoint(1.0, [1.0]), 96 ... LabeledPoint(3.0, [2.0]), 97 ... LabeledPoint(2.0, [3.0]) 98 ... ] 99 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 100 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 101 True 102 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 103 True 104 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 105 True 106 >>> data = [ 107 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 108 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 109 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 110 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 111 ... ] 112 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 113 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 114 True 115 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 116 True 117 """
118
119 120 -class LinearRegressionWithSGD(object):
121 122 @classmethod
123 - def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, 124 initialWeights=None, regParam=1.0, regType=None, intercept=False):
125 """ 126 Train a linear regression model on the given data. 127 128 @param data: The training data. 129 @param iterations: The number of iterations (default: 100). 130 @param step: The step parameter used in SGD 131 (default: 1.0). 132 @param miniBatchFraction: Fraction of data to be used for each SGD 133 iteration. 134 @param initialWeights: The initial weights (default: None). 135 @param regParam: The regularizer parameter (default: 1.0). 136 @param regType: The type of regularizer used for training 137 our model. 138 Allowed values: "l1" for using L1Updater, 139 "l2" for using 140 SquaredL2Updater, 141 "none" for no regularizer. 142 (default: "none") 143 @param intercept: Boolean parameter which indicates the use 144 or not of the augmented representation for 145 training data (i.e. whether bias features 146 are activated or not). 147 """ 148 sc = data.context 149 if regType is None: 150 regType = "none" 151 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( 152 d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept) 153 return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights)
154
155 156 -class LassoModel(LinearRegressionModelBase):
157 158 """A linear regression model derived from a least-squares fit with an 159 l_1 penalty term. 160 161 >>> from pyspark.mllib.regression import LabeledPoint 162 >>> data = [ 163 ... LabeledPoint(0.0, [0.0]), 164 ... LabeledPoint(1.0, [1.0]), 165 ... LabeledPoint(3.0, [2.0]), 166 ... LabeledPoint(2.0, [3.0]) 167 ... ] 168 >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 169 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 170 True 171 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 172 True 173 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 174 True 175 >>> data = [ 176 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 177 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 178 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 179 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 180 ... ] 181 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 182 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 183 True 184 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 185 True 186 """
187
188 189 -class LassoWithSGD(object):
190 191 @classmethod
192 - def train(cls, data, iterations=100, step=1.0, regParam=1.0, 193 miniBatchFraction=1.0, initialWeights=None):
194 """Train a Lasso regression model on the given data.""" 195 sc = data.context 196 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD( 197 d._jrdd, iterations, step, regParam, miniBatchFraction, i) 198 return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights)
199
200 201 -class RidgeRegressionModel(LinearRegressionModelBase):
202 203 """A linear regression model derived from a least-squares fit with an 204 l_2 penalty term. 205 206 >>> from pyspark.mllib.regression import LabeledPoint 207 >>> data = [ 208 ... LabeledPoint(0.0, [0.0]), 209 ... LabeledPoint(1.0, [1.0]), 210 ... LabeledPoint(3.0, [2.0]), 211 ... LabeledPoint(2.0, [3.0]) 212 ... ] 213 >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 214 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 215 True 216 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 217 True 218 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 219 True 220 >>> data = [ 221 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 222 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 223 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 224 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 225 ... ] 226 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 227 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 228 True 229 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 230 True 231 """
232
233 234 -class RidgeRegressionWithSGD(object):
235 236 @classmethod
237 - def train(cls, data, iterations=100, step=1.0, regParam=1.0, 238 miniBatchFraction=1.0, initialWeights=None):
239 """Train a ridge regression model on the given data.""" 240 sc = data.context 241 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD( 242 d._jrdd, iterations, step, regParam, miniBatchFraction, i) 243 return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights)
244
245 246 -def _test():
247 import doctest 248 globs = globals().copy() 249 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 250 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 251 globs['sc'].stop() 252 if failure_count: 253 exit(-1)
254 255 if __name__ == "__main__": 256 _test() 257