Package pyspark :: Package mllib :: Module regression
[frames] | no frames]

Source Code for Module pyspark.mllib.regression

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  from numpy import array, ndarray 
 19  from pyspark import SparkContext 
 20  from pyspark.mllib._common import \ 
 21      _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 22      _serialize_double_matrix, _deserialize_double_matrix, \ 
 23      _serialize_double_vector, _deserialize_double_vector, \ 
 24      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 25      _linear_predictor_typecheck, _have_scipy, _scipy_issparse 
 26  from pyspark.mllib.linalg import SparseVector 
27 28 29 -class LabeledPoint(object):
30 """ 31 The features and labels of a data point. 32 33 @param label: Label for this data point. 34 @param features: Vector of features for this point (NumPy array, list, 35 pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) 36 """
37 - def __init__(self, label, features):
38 self.label = label 39 if (type(features) == ndarray or type(features) == SparseVector 40 or (_have_scipy and _scipy_issparse(features))): 41 self.features = features 42 elif type(features) == list: 43 self.features = array(features) 44 else: 45 raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
46
47 48 -class LinearModel(object):
49 """A linear model that has a vector of coefficients and an intercept."""
50 - def __init__(self, weights, intercept):
51 self._coeff = weights 52 self._intercept = intercept
53 54 @property
55 - def weights(self):
56 return self._coeff
57 58 @property
59 - def intercept(self):
60 return self._intercept
61
62 63 -class LinearRegressionModelBase(LinearModel):
64 """A linear regression model. 65 66 >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) 67 >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 68 True 69 >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 70 True 71 """
72 - def predict(self, x):
73 """Predict the value of the dependent variable given a vector x""" 74 """containing values for the independent variables.""" 75 _linear_predictor_typecheck(x, self._coeff) 76 return _dot(x, self._coeff) + self._intercept
77
78 79 -class LinearRegressionModel(LinearRegressionModelBase):
80 """A linear regression model derived from a least-squares fit. 81 82 >>> from pyspark.mllib.regression import LabeledPoint 83 >>> data = [ 84 ... LabeledPoint(0.0, [0.0]), 85 ... LabeledPoint(1.0, [1.0]), 86 ... LabeledPoint(3.0, [2.0]), 87 ... LabeledPoint(2.0, [3.0]) 88 ... ] 89 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 90 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 91 True 92 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 93 True 94 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 95 True 96 >>> data = [ 97 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 98 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 99 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 100 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 101 ... ] 102 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 103 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 104 True 105 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 106 True 107 """
108
109 110 -class LinearRegressionWithSGD(object):
111 @classmethod
112 - def train(cls, data, iterations=100, step=1.0, 113 miniBatchFraction=1.0, initialWeights=None):
114 """Train a linear regression model on the given data.""" 115 sc = data.context 116 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( 117 d._jrdd, iterations, step, miniBatchFraction, i) 118 return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights)
119
120 121 -class LassoModel(LinearRegressionModelBase):
122 """A linear regression model derived from a least-squares fit with an 123 l_1 penalty term. 124 125 >>> from pyspark.mllib.regression import LabeledPoint 126 >>> data = [ 127 ... LabeledPoint(0.0, [0.0]), 128 ... LabeledPoint(1.0, [1.0]), 129 ... LabeledPoint(3.0, [2.0]), 130 ... LabeledPoint(2.0, [3.0]) 131 ... ] 132 >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 133 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 134 True 135 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 136 True 137 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 138 True 139 >>> data = [ 140 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 141 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 142 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 143 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 144 ... ] 145 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 146 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 147 True 148 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 149 True 150 """
151
152 153 -class LassoWithSGD(object):
154 @classmethod
155 - def train(cls, data, iterations=100, step=1.0, regParam=1.0, 156 miniBatchFraction=1.0, initialWeights=None):
157 """Train a Lasso regression model on the given data.""" 158 sc = data.context 159 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD( 160 d._jrdd, iterations, step, regParam, miniBatchFraction, i) 161 return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights)
162
163 164 -class RidgeRegressionModel(LinearRegressionModelBase):
165 """A linear regression model derived from a least-squares fit with an 166 l_2 penalty term. 167 168 >>> from pyspark.mllib.regression import LabeledPoint 169 >>> data = [ 170 ... LabeledPoint(0.0, [0.0]), 171 ... LabeledPoint(1.0, [1.0]), 172 ... LabeledPoint(3.0, [2.0]), 173 ... LabeledPoint(2.0, [3.0]) 174 ... ] 175 >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 176 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 177 True 178 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 179 True 180 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 181 True 182 >>> data = [ 183 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 184 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 185 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 186 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 187 ... ] 188 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 189 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 190 True 191 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 192 True 193 """
194
195 196 -class RidgeRegressionWithSGD(object):
197 @classmethod
198 - def train(cls, data, iterations=100, step=1.0, regParam=1.0, 199 miniBatchFraction=1.0, initialWeights=None):
200 """Train a ridge regression model on the given data.""" 201 sc = data.context 202 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD( 203 d._jrdd, iterations, step, regParam, miniBatchFraction, i) 204 return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights)
205
206 207 -def _test():
208 import doctest 209 globs = globals().copy() 210 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 211 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 212 globs['sc'].stop() 213 if failure_count: 214 exit(-1)
215 216 if __name__ == "__main__": 217 _test() 218