pyspark.mllib package

Submodules

pyspark.mllib.classification module

class pyspark.mllib.classification.LogisticRegressionModel(weights, intercept)[source]

Bases: pyspark.mllib.classification.LinearBinaryClassificationModel

A linear binary classification model derived from logistic regression.

>>> data = [
...     LabeledPoint(0.0, [0.0, 1.0]),
...     LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
0
>>> lrm.predict(sc.parallelize([[1.0, 0.0], [0.0, 1.0]])).collect()
[1, 0]
>>> lrm.clearThreshold()
>>> lrm.predict([0.0, 1.0])
0.123...
>>> sparse_data = [
...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
>>> lrm.predict(array([0.0, 1.0]))
1
>>> lrm.predict(array([1.0, 0.0]))
0
>>> lrm.predict(SparseVector(2, {1: 1.0}))
1
>>> lrm.predict(SparseVector(2, {0: 1.0}))
0
predict(x)[source]

Predict values for a single data point or an RDD of points using the model trained.

class pyspark.mllib.classification.LogisticRegressionWithSGD[source]

Bases: object

classmethod train(data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.01, regType='l2', intercept=False)[source]

Train a logistic regression model on the given data.

Parameters:
  • data – The training data, an RDD of LabeledPoint.
  • iterations – The number of iterations (default: 100).
  • step – The step parameter used in SGD (default: 1.0).
  • miniBatchFraction – Fraction of data to be used for each SGD iteration.
  • initialWeights – The initial weights (default: None).
  • regParam – The regularizer parameter (default: 0.01).
  • regType

    The type of regularizer used for training our model.

    Allowed values:
    • “l1” for using L1 regularization
    • “l2” for using L2 regularization
    • None for no regularization

    (default: “l2”)

  • intercept – Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features are activated or not).
class pyspark.mllib.classification.LogisticRegressionWithLBFGS[source]

Bases: object

classmethod train(data, iterations=100, initialWeights=None, regParam=0.01, regType='l2', intercept=False, corrections=10, tolerance=0.0001)[source]

Train a logistic regression model on the given data.

Parameters:
  • data – The training data, an RDD of LabeledPoint.
  • iterations – The number of iterations (default: 100).
  • initialWeights – The initial weights (default: None).
  • regParam – The regularizer parameter (default: 0.01).
  • regType

    The type of regularizer used for training our model.

    Allowed values:
    • “l1” for using L1 regularization
    • “l2” for using L2 regularization
    • None for no regularization

    (default: “l2”)

  • intercept – Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features are activated or not).
  • corrections – The number of corrections used in the LBFGS update (default: 10).
  • tolerance – The convergence tolerance of iterations for L-BFGS (default: 1e-4).
>>> data = [
...     LabeledPoint(0.0, [0.0, 1.0]),
...     LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data))
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
0
class pyspark.mllib.classification.SVMModel(weights, intercept)[source]

Bases: pyspark.mllib.classification.LinearBinaryClassificationModel

A support vector machine.

>>> data = [
...     LabeledPoint(0.0, [0.0]),
...     LabeledPoint(1.0, [1.0]),
...     LabeledPoint(1.0, [2.0]),
...     LabeledPoint(1.0, [3.0])
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict([1.0])
1
>>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>>> svm.clearThreshold()
>>> svm.predict(array([1.0]))
1.25...
>>> sparse_data = [
...     LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
>>> svm.predict(SparseVector(2, {1: 1.0}))
1
>>> svm.predict(SparseVector(2, {0: -1.0}))
0
predict(x)[source]

Predict values for a single data point or an RDD of points using the model trained.

class pyspark.mllib.classification.SVMWithSGD[source]

Bases: object

classmethod train(data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None, regType='l2', intercept=False)[source]

Train a support vector machine on the given data.

Parameters:
  • data – The training data, an RDD of LabeledPoint.
  • iterations – The number of iterations (default: 100).
  • step – The step parameter used in SGD (default: 1.0).
  • regParam – The regularizer parameter (default: 0.01).
  • miniBatchFraction – Fraction of data to be used for each SGD iteration.
  • initialWeights – The initial weights (default: None).
  • regType

    The type of regularizer used for training our model.

    Allowed values:
    • “l1” for using L1 regularization
    • “l2” for using L2 regularization
    • None for no regularization

    (default: “l2”)

  • intercept – Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features are activated or not).
class pyspark.mllib.classification.NaiveBayesModel(labels, pi, theta)[source]

Bases: object

Model for Naive Bayes classifiers.

Contains two parameters: - pi: vector of logs of class priors (dimension C) - theta: matrix of logs of class conditional probabilities (CxD)

>>> data = [
...     LabeledPoint(0.0, [0.0, 0.0]),
...     LabeledPoint(0.0, [0.0, 1.0]),
...     LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
>>> model.predict(sc.parallelize([[1.0, 0.0]])).collect()
[1.0]
>>> sparse_data = [
...     LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
...     LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
...     LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
... ]
>>> model = NaiveBayes.train(sc.parallelize(sparse_data))
>>> model.predict(SparseVector(2, {1: 1.0}))
0.0
>>> model.predict(SparseVector(2, {0: 1.0}))
1.0
predict(x)[source]

Return the most likely class for a data vector or an RDD of vectors

class pyspark.mllib.classification.NaiveBayes[source]

Bases: object

classmethod train(data, lambda_=1.0)[source]

Train a Naive Bayes model given an RDD of (label, features) vectors.

This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can handle all kinds of discrete data. For example, by converting documents into TF-IDF vectors, it can be used for document classification. By making every vector a 0-1 vector, it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).

Parameters:
  • data – RDD of LabeledPoint.
  • lambda – The smoothing parameter

pyspark.mllib.clustering module

class pyspark.mllib.clustering.KMeansModel(centers)[source]

Bases: object

A clustering model derived from the k-means method.

>>> from numpy import array
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> model = KMeans.train(
...     sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
True
>>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
True
>>> model = KMeans.train(sc.parallelize(data), 2)
>>> sparse_data = [
...     SparseVector(3, {1: 1.0}),
...     SparseVector(3, {1: 1.1}),
...     SparseVector(3, {2: 1.0}),
...     SparseVector(3, {2: 1.1})
... ]
>>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
>>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
True
>>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
True
>>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
True
>>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
>>> type(model.clusterCenters)
<type 'list'>
clusterCenters[source]

Get the cluster centers, represented as a list of NumPy arrays.

predict(x)[source]

Find the cluster to which x belongs in this model.

class pyspark.mllib.clustering.KMeans[source]

Bases: object

classmethod train(rdd, k, maxIterations=100, runs=1, initializationMode='k-means||')[source]

Train a k-means clustering model.

pyspark.mllib.feature module

Python package for feature in MLlib.

class pyspark.mllib.feature.Normalizer(p=2.0)[source]

Bases: pyspark.mllib.feature.VectorTransformer

:: Experimental

Normalizes samples individually to unit Lp norm

For any 1 <= p < float(‘inf’), normalizes samples using sum(abs(vector) p) (1/p) as norm.

For p = float(‘inf’), max(abs(vector)) will be used as norm for normalization.

>>> v = Vectors.dense(range(3))
>>> nor = Normalizer(1)
>>> nor.transform(v)
DenseVector([0.0, 0.3333, 0.6667])
>>> rdd = sc.parallelize([v])
>>> nor.transform(rdd).collect()
[DenseVector([0.0, 0.3333, 0.6667])]
>>> nor2 = Normalizer(float("inf"))
>>> nor2.transform(v)
DenseVector([0.0, 0.5, 1.0])
transform(vector)[source]

Applies unit length normalization on a vector.

Parameters:vector – vector or RDD of vector to be normalized.
Returns:normalized vector. If the norm of the input is zero, it will return the input vector.
class pyspark.mllib.feature.StandardScalerModel(java_model)[source]

Bases: pyspark.mllib.feature.JavaVectorTransformer

:: Experimental

Represents a StandardScaler model that can transform vectors.

transform(vector)[source]

Applies standardization transformation on a vector.

Parameters:vector – Vector or RDD of Vector to be standardized.
Returns:Standardized vector. If the variance of a column is zero, it will return default 0.0 for the column with zero variance.
class pyspark.mllib.feature.StandardScaler(withMean=False, withStd=True)[source]

Bases: object

:: Experimental

Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set.

>>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
>>> dataset = sc.parallelize(vs)
>>> standardizer = StandardScaler(True, True)
>>> model = standardizer.fit(dataset)
>>> result = model.transform(dataset)
>>> for r in result.collect(): r
DenseVector([-0.7071, 0.7071, -0.7071])
DenseVector([0.7071, -0.7071, 0.7071])
fit(dataset)[source]

Computes the mean and variance and stores as a model to be used for later scaling.

Parameters:data – The data used to compute the mean and variance to build the transformation model.
Returns:a StandardScalarModel
class pyspark.mllib.feature.HashingTF(numFeatures=1048576)[source]

Bases: object

:: Experimental

Maps a sequence of terms to their term frequencies using the hashing trick.

Note: the terms must be hashable (can not be dict/set/list...).

>>> htf = HashingTF(100)
>>> doc = "a a b b c d".split(" ")
>>> htf.transform(doc)
SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
indexOf(term)[source]

Returns the index of the input term.

transform(document)[source]

Transforms the input document (list of terms) to term frequency vectors, or transform the RDD of document to RDD of term frequency vectors.

class pyspark.mllib.feature.IDFModel(java_model)[source]

Bases: pyspark.mllib.feature.JavaVectorTransformer

Represents an IDF model that can transform term frequency vectors.

transform(dataset)[source]

Transforms term frequency (TF) vectors to TF-IDF vectors.

If minDocFreq was set for the IDF calculation, the terms which occur in fewer than minDocFreq documents will have an entry of 0.

Parameters:dataset – an RDD of term frequency vectors
Returns:an RDD of TF-IDF vectors
class pyspark.mllib.feature.IDF(minDocFreq=0)[source]

Bases: object

:: Experimental

Inverse document frequency (IDF).

The standard formulation is used: idf = log((m + 1) / (d(t) + 1)), where m is the total number of documents and d(t) is the number of documents that contain term t.

This implementation supports filtering out terms which do not appear in a minimum number of documents (controlled by the variable minDocFreq). For terms that are not in at least minDocFreq documents, the IDF is found as 0, resulting in TF-IDFs of 0.

>>> n = 4
>>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
...          Vectors.dense([0.0, 1.0, 2.0, 3.0]),
...          Vectors.sparse(n, [1], [1.0])]
>>> data = sc.parallelize(freqs)
>>> idf = IDF()
>>> model = idf.fit(data)
>>> tfidf = model.transform(data)
>>> for r in tfidf.collect(): r
SparseVector(4, {1: 0.0, 3: 0.5754})
DenseVector([0.0, 0.0, 1.3863, 0.863])
SparseVector(4, {1: 0.0})
fit(dataset)[source]

Computes the inverse document frequency.

Parameters:dataset – an RDD of term frequency vectors
class pyspark.mllib.feature.Word2Vec[source]

Bases: object

Word2Vec creates vector representation of words in a text corpus. The algorithm first constructs a vocabulary from the corpus and then learns vector representation of words in the vocabulary. The vector representation can be used as features in natural language processing and machine learning algorithms.

We used skip-gram model in our implementation and hierarchical softmax method to train the model. The variable names in the implementation matches the original C implementation.

For original C implementation, see https://code.google.com/p/word2vec/ For research papers, see Efficient Estimation of Word Representations in Vector Space and Distributed Representations of Words and Phrases and their Compositionality.

>>> sentence = "a b " * 100 + "a c " * 10
>>> localDoc = [sentence, sentence]
>>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
>>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
>>> syms = model.findSynonyms("a", 2)
>>> [s[0] for s in syms]
[u'b', u'c']
>>> vec = model.transform("a")
>>> syms = model.findSynonyms(vec, 2)
>>> [s[0] for s in syms]
[u'b', u'c']
fit(data)[source]

Computes the vector representation of each word in vocabulary.

Parameters:data – training data. RDD of list of string
Returns:Word2VecModel instance
setLearningRate(learningRate)[source]

Sets initial learning rate (default: 0.025).

setNumIterations(numIterations)[source]

Sets number of iterations (default: 1), which should be smaller than or equal to number of partitions.

setNumPartitions(numPartitions)[source]

Sets number of partitions (default: 1). Use a small number for accuracy.

setSeed(seed)[source]

Sets random seed.

setVectorSize(vectorSize)[source]

Sets vector size (default: 100).

class pyspark.mllib.feature.Word2VecModel(java_model)[source]

Bases: pyspark.mllib.feature.JavaVectorTransformer

class for Word2Vec model

findSynonyms(word, num)[source]

Find synonyms of a word

Parameters:
  • word – a word or a vector representation of word
  • num – number of synonyms to find
Returns:

array of (word, cosineSimilarity)

Note: local use only

transform(word)[source]

Transforms a word to its vector representation

Note: local use only

Parameters:word – a word
Returns:vector representation of word(s)

pyspark.mllib.linalg module

MLlib utilities for linear algebra. For dense vectors, MLlib uses the NumPy array type, so you can simply pass NumPy arrays around. For sparse vectors, users can construct a SparseVector object from MLlib or pass SciPy scipy.sparse column vectors if SciPy is available in their environment.

class pyspark.mllib.linalg.Vector[source]

Bases: object

toArray()[source]

Convert the vector into an numpy.ndarray :return: numpy.ndarray

class pyspark.mllib.linalg.DenseVector(ar)[source]

Bases: pyspark.mllib.linalg.Vector

A dense vector represented by a value array.

dot(other)[source]

Compute the dot product of two Vectors. We support (Numpy array, list, SparseVector, or SciPy sparse) and a target NumPy array that is either 1- or 2-dimensional. Equivalent to calling numpy.dot of the two vectors.

>>> dense = DenseVector(array.array('d', [1., 2.]))
>>> dense.dot(dense)
5.0
>>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
4.0
>>> dense.dot(range(1, 3))
5.0
>>> dense.dot(np.array(range(1, 3)))
5.0
>>> dense.dot([1.,])
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
>>> dense.dot(np.reshape([1., 2., 3., 4.], (2, 2), order='F'))
array([  5.,  11.])
>>> dense.dot(np.reshape([1., 2., 3.], (3, 1), order='F'))
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
squared_distance(other)[source]

Squared distance of two Vectors.

>>> dense1 = DenseVector(array.array('d', [1., 2.]))
>>> dense1.squared_distance(dense1)
0.0
>>> dense2 = np.array([2., 1.])
>>> dense1.squared_distance(dense2)
2.0
>>> dense3 = [2., 1.]
>>> dense1.squared_distance(dense3)
2.0
>>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
>>> dense1.squared_distance(sparse1)
2.0
>>> dense1.squared_distance([1.,])
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
>>> dense1.squared_distance(SparseVector(1, [0,], [1.,]))
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
toArray()[source]
class pyspark.mllib.linalg.SparseVector(size, *args)[source]

Bases: pyspark.mllib.linalg.Vector

A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy’s {scipy.sparse} data types.

dot(other)[source]

Dot product with a SparseVector or 1- or 2-dimensional Numpy array.

>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
>>> a.dot(array.array('d', [1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
>>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22.,  22.])
>>> a.dot([1., 2., 3.])
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
>>> a.dot(np.array([1., 2.]))
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
>>> a.dot(DenseVector([1., 2.]))
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
>>> a.dot(np.zeros((3, 2)))
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
squared_distance(other)[source]

Squared distance from a SparseVector or 1-dimensional NumPy array.

>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
>>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
11.0
>>> a.squared_distance(np.array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
30.0
>>> b.squared_distance(a)
30.0
>>> b.squared_distance([1., 2.])
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
>>> b.squared_distance(SparseVector(3, [1,], [1.0,]))
Traceback (most recent call last):
    ...
AssertionError: dimension mismatch
toArray()[source]

Returns a copy of this SparseVector as a 1-dimensional NumPy array.

class pyspark.mllib.linalg.Vectors[source]

Bases: object

Factory methods for working with vectors. Note that dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users can pass in SciPy’s scipy.sparse column vectors.

static dense(elements)[source]

Create a dense vector of 64-bit floats from a Python list. Always returns a NumPy array.

>>> Vectors.dense([1, 2, 3])
DenseVector([1.0, 2.0, 3.0])
static sparse(size, *args)[source]

Create a sparse vector, using either a dictionary, a list of (index, value) pairs, or two separate arrays of indices and values (sorted by index).

Parameters:
  • size – Size of the vector.
  • args – Non-zero entries, as a dictionary, list of tupes, or two sorted lists containing indices and values.
>>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
(4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
(4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
(4,[1,3],[1.0,5.5])
static stringify(vector)[source]

Converts a vector into a string, which can be recognized by Vectors.parse().

>>> Vectors.stringify(Vectors.sparse(2, [1], [1.0]))
'(2,[1],[1.0])'
>>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
'[0.0,1.0]'
class pyspark.mllib.linalg.DenseMatrix(numRows, numCols, values)[source]

Bases: pyspark.mllib.linalg.Matrix

Column-major dense matrix.

toArray()[source]

Return an numpy.ndarray

>>> m = DenseMatrix(2, 2, range(4))
>>> m.toArray()
array([[ 0.,  2.],
       [ 1.,  3.]])
class pyspark.mllib.linalg.Matrices[source]

Bases: object

static dense(numRows, numCols, values)[source]

Create a DenseMatrix

pyspark.mllib.random module

Python package for random data generation.

class pyspark.mllib.random.RandomRDDs[source]

Bases: object

Generator methods for creating RDDs comprised of i.i.d samples from some distribution.

static normalRDD(sc, size, numPartitions=None, seed=None)[source]

Generates an RDD comprised of i.i.d. samples from the standard normal distribution.

To transform the distribution in the generated RDD from standard normal to some other normal N(mean, sigma^2), use RandomRDDs.normal(sc, n, p, seed) .map(lambda v: mean + sigma * v)

Parameters:
  • sc – SparkContext used to create the RDD.
  • size – Size of the RDD.
  • numPartitions – Number of partitions in the RDD (default: sc.defaultParallelism).
  • seed – Random seed (default: a random long integer).
Returns:

RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).

>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
>>> stats = x.stats()
>>> stats.count()
1000L
>>> abs(stats.mean() - 0.0) < 0.1
True
>>> abs(stats.stdev() - 1.0) < 0.1
True
static normalVectorRDD(sc, *a, **kw)[source]

Generates an RDD comprised of vectors containing i.i.d. samples drawn from the standard normal distribution.

Parameters:
  • sc – SparkContext used to create the RDD.
  • numRows – Number of Vectors in the RDD.
  • numCols – Number of elements in each Vector.
  • numPartitions – Number of partitions in the RDD (default: sc.defaultParallelism).
  • seed – Random seed (default: a random long integer).
Returns:

RDD of Vector with vectors containing i.i.d. samples ~ N(0.0, 1.0).

>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - 0.0) < 0.1
True
>>> abs(mat.std() - 1.0) < 0.1
True
static poissonRDD(sc, mean, size, numPartitions=None, seed=None)[source]

Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.

Parameters:
  • sc – SparkContext used to create the RDD.
  • mean – Mean, or lambda, for the Poisson distribution.
  • size – Size of the RDD.
  • numPartitions – Number of partitions in the RDD (default: sc.defaultParallelism).
  • seed – Random seed (default: a random long integer).
Returns:

RDD of float comprised of i.i.d. samples ~ Pois(mean).

>>> mean = 100.0
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
>>> stats = x.stats()
>>> stats.count()
1000L
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
static poissonVectorRDD(sc, *a, **kw)[source]

Generates an RDD comprised of vectors containing i.i.d. samples drawn from the Poisson distribution with the input mean.

Parameters:
  • sc – SparkContext used to create the RDD.
  • mean – Mean, or lambda, for the Poisson distribution.
  • numRows – Number of Vectors in the RDD.
  • numCols – Number of elements in each Vector.
  • numPartitions – Number of partitions in the RDD (default: sc.defaultParallelism)
  • seed – Random seed (default: a random long integer).
Returns:

RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).

>>> import numpy as np
>>> mean = 100.0
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
static uniformRDD(sc, size, numPartitions=None, seed=None)[source]

Generates an RDD comprised of i.i.d. samples from the uniform distribution U(0.0, 1.0).

To transform the distribution in the generated RDD from U(0.0, 1.0) to U(a, b), use RandomRDDs.uniformRDD(sc, n, p, seed) .map(lambda v: a + (b - a) * v)

Parameters:
  • sc – SparkContext used to create the RDD.
  • size – Size of the RDD.
  • numPartitions – Number of partitions in the RDD (default: sc.defaultParallelism).
  • seed – Random seed (default: a random long integer).
Returns:

RDD of float comprised of i.i.d. samples ~ U(0.0, 1.0).

>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
>>> len(x)
100
>>> max(x) <= 1.0 and min(x) >= 0.0
True
>>> RandomRDDs.uniformRDD(sc, 100, 4).getNumPartitions()
4
>>> parts = RandomRDDs.uniformRDD(sc, 100, seed=4).getNumPartitions()
>>> parts == sc.defaultParallelism
True
static uniformVectorRDD(sc, *a, **kw)[source]

Generates an RDD comprised of vectors containing i.i.d. samples drawn from the uniform distribution U(0.0, 1.0).

Parameters:
  • sc – SparkContext used to create the RDD.
  • numRows – Number of Vectors in the RDD.
  • numCols – Number of elements in each Vector.
  • numPartitions – Number of partitions in the RDD.
  • seed – Seed for the RNG that generates the seed for the generator in each partition.
Returns:

RDD of Vector with vectors containing i.i.d samples ~ U(0.0, 1.0).

>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
>>> mat.shape
(10, 10)
>>> mat.max() <= 1.0 and mat.min() >= 0.0
True
>>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
4

pyspark.mllib.recommendation module

class pyspark.mllib.recommendation.MatrixFactorizationModel(java_model)[source]

Bases: pyspark.mllib.common.JavaModelWrapper

A matrix factorisation model trained by regularized alternating least-squares.

>>> r1 = (1, 1, 1.0)
>>> r2 = (1, 2, 2.0)
>>> r3 = (2, 1, 2.0)
>>> ratings = sc.parallelize([r1, r2, r3])
>>> model = ALS.trainImplicit(ratings, 1, seed=10)
>>> model.predict(2,2)
0.4473...
>>> testset = sc.parallelize([(1, 2), (1, 1)])
>>> model = ALS.train(ratings, 1, seed=10)
>>> model.predictAll(testset).collect()
[Rating(user=1, product=1, rating=1.0471...), Rating(user=1, product=2, rating=1.9679...)]
>>> model = ALS.train(ratings, 4, seed=10)
>>> model.userFeatures().collect()
[(2, array('d', [...])), (1, array('d', [...]))]
>>> first_user = model.userFeatures().take(1)[0]
>>> latents = first_user[1]
>>> len(latents) == 4
True
>>> model.productFeatures().collect()
[(2, array('d', [...])), (1, array('d', [...]))]
>>> first_product = model.productFeatures().take(1)[0]
>>> latents = first_product[1]
>>> len(latents) == 4
True
>>> model = ALS.train(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2,2)
3.735...
>>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2,2)
0.4473...
predict(user, product)[source]
predictAll(user_product)[source]
productFeatures()[source]
userFeatures()[source]
class pyspark.mllib.recommendation.ALS[source]

Bases: object

classmethod train(ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, seed=None)[source]
classmethod trainImplicit(ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01, nonnegative=False, seed=None)[source]
class pyspark.mllib.recommendation.Rating[source]

Bases: collections.Rating

Represents a (user, product, rating) tuple.

>>> r = Rating(1, 2, 5.0)
>>> (r.user, r.product, r.rating)
(1, 2, 5.0)
>>> (r[0], r[1], r[2])
(1, 2, 5.0)

pyspark.mllib.regression module

class pyspark.mllib.regression.LabeledPoint(label, features)[source]

Bases: object

The features and labels of a data point.

Parameters:
  • label – Label for this data point.
  • features – Vector of features for this point (NumPy array, list, pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
class pyspark.mllib.regression.LinearModel(weights, intercept)[source]

Bases: object

A linear model that has a vector of coefficients and an intercept.

intercept[source]
weights[source]
class pyspark.mllib.regression.LinearRegressionModel(weights, intercept)[source]

Bases: pyspark.mllib.regression.LinearRegressionModelBase

A linear regression model derived from a least-squares fit.

>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
...     LabeledPoint(0.0, [0.0]),
...     LabeledPoint(1.0, [1.0]),
...     LabeledPoint(3.0, [2.0]),
...     LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> data = [
...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
class pyspark.mllib.regression.RidgeRegressionModel(weights, intercept)[source]

Bases: pyspark.mllib.regression.LinearRegressionModelBase

A linear regression model derived from a least-squares fit with an l_2 penalty term.

>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
...     LabeledPoint(0.0, [0.0]),
...     LabeledPoint(1.0, [1.0]),
...     LabeledPoint(3.0, [2.0]),
...     LabeledPoint(2.0, [3.0])
... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> data = [
...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
class pyspark.mllib.regression.LinearRegressionWithSGD[source]

Bases: object

classmethod train(data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.0, regType=None, intercept=False)[source]

Train a linear regression model on the given data.

Parameters:
  • data – The training data.
  • iterations – The number of iterations (default: 100).
  • step – The step parameter used in SGD (default: 1.0).
  • miniBatchFraction – Fraction of data to be used for each SGD iteration.
  • initialWeights – The initial weights (default: None).
  • regParam – The regularizer parameter (default: 0.0).
  • regType

    The type of regularizer used for training our model.

    Allowed values:
    • “l1” for using L1 regularization (lasso),
    • “l2” for using L2 regularization (ridge),
    • None for no regularization

    (default: None)

  • intercept – Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features are activated or not).
class pyspark.mllib.regression.LassoWithSGD[source]

Bases: object

classmethod train(data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None)[source]

Train a Lasso regression model on the given data.

class pyspark.mllib.regression.RidgeRegressionWithSGD[source]

Bases: object

classmethod train(data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None)[source]

Train a ridge regression model on the given data.

pyspark.mllib.stat module

Python package for statistical functions in MLlib.

class pyspark.mllib.stat.MultivariateStatisticalSummary(java_model)[source]

Bases: pyspark.mllib.common.JavaModelWrapper

Trait for multivariate statistical summary of a data matrix.

count()[source]
max()[source]
mean()[source]
min()[source]
numNonzeros()[source]
variance()[source]
class pyspark.mllib.stat.ChiSqTestResult(java_model)[source]

Bases: pyspark.mllib.common.JavaModelWrapper

:: Experimental

Object containing the test results for the chi-squared hypothesis test.

degreesOfFreedom[source]

Returns the degree(s) of freedom of the hypothesis test. Return type should be Number(e.g. Int, Double) or tuples of Numbers.

method[source]

Name of the test method

nullHypothesis[source]

Null hypothesis of the test.

pValue[source]

The probability of obtaining a test statistic result at least as extreme as the one that was actually observed, assuming that the null hypothesis is true.

statistic[source]

Test statistic.

class pyspark.mllib.stat.Statistics[source]

Bases: object

static chiSqTest(observed, expected=None)[source]

:: Experimental

If observed is Vector, conduct Pearson’s chi-squared goodness of fit test of the observed data against the expected distribution, or againt the uniform distribution (by default), with each category having an expected frequency of 1 / len(observed). (Note: observed cannot contain negative values)

If observed is matrix, conduct Pearson’s independence test on the input contingency matrix, which cannot contain negative entries or columns or rows that sum up to 0.

If observed is an RDD of LabeledPoint, conduct Pearson’s independence test for every feature against the label across the input RDD. For each feature, the (feature, label) pairs are converted into a contingency matrix for which the chi-squared statistic is computed. All label and feature values must be categorical.

Parameters:
  • observed – it could be a vector containing the observed categorical counts/relative frequencies, or the contingency matrix (containing either counts or relative frequencies), or an RDD of LabeledPoint containing the labeled dataset with categorical features. Real-valued features will be treated as categorical for each distinct value.
  • expected – Vector containing the expected categorical counts/relative frequencies. expected is rescaled if the expected sum differs from the observed sum.
Returns:

ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, the method used, and the null hypothesis.

>>> from pyspark.mllib.linalg import Vectors, Matrices
>>> observed = Vectors.dense([4, 6, 5])
>>> pearson = Statistics.chiSqTest(observed)
>>> print pearson.statistic
0.4
>>> pearson.degreesOfFreedom
2
>>> print round(pearson.pValue, 4)
0.8187
>>> pearson.method
u'pearson'
>>> pearson.nullHypothesis
u'observed follows the same distribution as expected.'
>>> observed = Vectors.dense([21, 38, 43, 80])
>>> expected = Vectors.dense([3, 5, 7, 20])
>>> pearson = Statistics.chiSqTest(observed, expected)
>>> print round(pearson.pValue, 4)
0.0027
>>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
>>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
>>> print round(chi.statistic, 4)
21.9958
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
...         LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
...         LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
...         LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
...         LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
...         LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
>>> rdd = sc.parallelize(data, 4)
>>> chi = Statistics.chiSqTest(rdd)
>>> print chi[0].statistic
0.75
>>> print chi[1].statistic
1.5
static colStats(rdd)[source]

Computes column-wise summary statistics for the input RDD[Vector].

Parameters:rdd – an RDD[Vector] for which column-wise summary statistics are to be computed.
Returns:MultivariateStatisticalSummary object containing column-wise summary statistics.
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
...                       Vectors.dense([4, 5, 0,  3]),
...                       Vectors.dense([6, 7, 0,  8])])
>>> cStats = Statistics.colStats(rdd)
>>> cStats.mean()
array([ 4.,  4.,  0.,  3.])
>>> cStats.variance()
array([  4.,  13.,   0.,  25.])
>>> cStats.count()
3L
>>> cStats.numNonzeros()
array([ 3.,  2.,  0.,  3.])
>>> cStats.max()
array([ 6.,  7.,  0.,  8.])
>>> cStats.min()
array([ 2.,  0.,  0., -2.])
static corr(x, y=None, method=None)[source]

Compute the correlation (matrix) for the input RDD(s) using the specified method. Methods currently supported: pearson (default), spearman.

If a single RDD of Vectors is passed in, a correlation matrix comparing the columns in the input RDD is returned. Use method= to specify the method to be used for single RDD inout. If two RDDs of floats are passed in, a single float is returned.

Parameters:
  • x – an RDD of vector for which the correlation matrix is to be computed, or an RDD of float of the same cardinality as y when y is specified.
  • y – an RDD of float of the same cardinality as x.
  • method – String specifying the method to use for computing correlation. Supported: pearson (default), spearman
Returns:

Correlation matrix comparing columns in x.

>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
>>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
True
>>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
True
>>> Statistics.corr(x, y, "spearman")
0.5
>>> from math import isnan
>>> isnan(Statistics.corr(x, zeros))
True
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
...                       Vectors.dense([6, 7, 0,  8]), Vectors.dense([9, 0, 0, 1])])
>>> pearsonCorr = Statistics.corr(rdd)
>>> print str(pearsonCorr).replace('nan', 'NaN')
[[ 1.          0.05564149         NaN  0.40047142]
 [ 0.05564149  1.                 NaN  0.91359586]
 [        NaN         NaN  1.                 NaN]
 [ 0.40047142  0.91359586         NaN  1.        ]]
>>> spearmanCorr = Statistics.corr(rdd, method="spearman")
>>> print str(spearmanCorr).replace('nan', 'NaN')
[[ 1.          0.10540926         NaN  0.4       ]
 [ 0.10540926  1.                 NaN  0.9486833 ]
 [        NaN         NaN  1.                 NaN]
 [ 0.4         0.9486833          NaN  1.        ]]
>>> try:
...     Statistics.corr(rdd, "spearman")
...     print "Method name as second argument without 'method=' shouldn't be allowed."
... except TypeError:
...     pass

pyspark.mllib.tree module

class pyspark.mllib.tree.DecisionTreeModel(java_model)[source]

Bases: pyspark.mllib.common.JavaModelWrapper

A decision tree model for classification or regression.

EXPERIMENTAL: This is an experimental API.
It will probably be modified in future.
depth()[source]
numNodes()[source]
predict(x)[source]

Predict the label of one or more examples.

Parameters:x – Data point (feature vector), or an RDD of data points (feature vectors).
toDebugString()[source]

full model.

class pyspark.mllib.tree.DecisionTree[source]

Bases: object

Learning algorithm for a decision tree model for classification or regression.

EXPERIMENTAL: This is an experimental API.
It will probably be modified in future.
classmethod trainClassifier(data, numClasses, categoricalFeaturesInfo, impurity='gini', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0)[source]

Train a DecisionTreeModel for classification.

Parameters:
  • data – Training data: RDD of LabeledPoint. Labels are integers {0,1,...,numClasses}.
  • numClasses – Number of classes for classification.
  • categoricalFeaturesInfo – Map from categorical feature index to number of categories. Any feature not in this map is treated as continuous.
  • impurity – Supported values: “entropy” or “gini”
  • maxDepth – Max depth of tree. E.g., depth 0 means 1 leaf node. Depth 1 means 1 internal node + 2 leaf nodes.
  • maxBins – Number of bins used for finding splits at each node.
  • minInstancesPerNode – Min number of instances required at child nodes to create the parent split
  • minInfoGain – Min info gain required to create a split
Returns:

DecisionTreeModel

Example usage:

>>> from numpy import array
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import DecisionTree
>>>
>>> data = [
...     LabeledPoint(0.0, [0.0]),
...     LabeledPoint(1.0, [1.0]),
...     LabeledPoint(1.0, [2.0]),
...     LabeledPoint(1.0, [3.0])
... ]
>>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
>>> print model,  # it already has newline
DecisionTreeModel classifier of depth 1 with 3 nodes
>>> print model.toDebugString(),  # it already has newline
DecisionTreeModel classifier of depth 1 with 3 nodes
  If (feature 0 <= 0.0)
   Predict: 0.0
  Else (feature 0 > 0.0)
   Predict: 1.0
>>> model.predict(array([1.0]))
1.0
>>> model.predict(array([0.0]))
0.0
>>> rdd = sc.parallelize([[1.0], [0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
classmethod trainRegressor(data, categoricalFeaturesInfo, impurity='variance', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0)[source]

Train a DecisionTreeModel for regression.

Parameters:
  • data – Training data: RDD of LabeledPoint. Labels are real numbers.
  • categoricalFeaturesInfo – Map from categorical feature index to number of categories. Any feature not in this map is treated as continuous.
  • impurity – Supported values: “variance”
  • maxDepth – Max depth of tree. E.g., depth 0 means 1 leaf node. Depth 1 means 1 internal node + 2 leaf nodes.
  • maxBins – Number of bins used for finding splits at each node.
  • minInstancesPerNode – Min number of instances required at child nodes to create the parent split
  • minInfoGain – Min info gain required to create a split
Returns:

DecisionTreeModel

Example usage:

>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import DecisionTree
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {1: 0.0}))
0.0
>>> rdd = sc.parallelize([[0.0, 1.0], [0.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
class pyspark.mllib.tree.RandomForestModel(java_model)[source]

Bases: pyspark.mllib.common.JavaModelWrapper

Represents a random forest model.

EXPERIMENTAL: This is an experimental API.
It will probably be modified in future.
numTrees()[source]

Get number of trees in forest.

predict(x)[source]

Predict values for a single data point or an RDD of points using the model trained.

toDebugString()[source]

Full model

totalNumNodes()[source]

Get total number of nodes, summed over all trees in the forest.

class pyspark.mllib.tree.RandomForest[source]

Bases: object

Learning algorithm for a random forest model for classification or regression.

EXPERIMENTAL: This is an experimental API.
It will probably be modified in future.
supportedFeatureSubsetStrategies = ('auto', 'all', 'sqrt', 'log2', 'onethird')
classmethod trainClassifier(data, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy='auto', impurity='gini', maxDepth=4, maxBins=32, seed=None)[source]

Method to train a decision tree model for binary or multiclass classification.

Parameters:
  • data – Training dataset: RDD of LabeledPoint. Labels should take values {0, 1, ..., numClasses-1}.
  • numClasses – number of classes for classification.
  • categoricalFeaturesInfo – Map storing arity of categorical features. E.g., an entry (n -> k) indicates that feature n is categorical with k categories indexed from 0: {0, 1, ..., k-1}.
  • numTrees – Number of trees in the random forest.
  • featureSubsetStrategy – Number of features to consider for splits at each node. Supported: “auto” (default), “all”, “sqrt”, “log2”, “onethird”. If “auto” is set, this parameter is set based on numTrees: if numTrees == 1, set to “all”; if numTrees > 1 (forest) set to “sqrt”.
  • impurity – Criterion used for information gain calculation. Supported values: “gini” (recommended) or “entropy”.
  • maxDepth – Maximum depth of the tree. E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 4)
  • maxBins – maximum number of bins used for splitting features (default: 100)
  • seed – Random seed for bootstrapping and choosing feature subsets.
Returns:

RandomForestModel that can be used for prediction

Example usage:

>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>>
>>> data = [
...     LabeledPoint(0.0, [0.0]),
...     LabeledPoint(0.0, [1.0]),
...     LabeledPoint(1.0, [2.0]),
...     LabeledPoint(1.0, [3.0])
... ]
>>> model = RandomForest.trainClassifier(sc.parallelize(data), 2, {}, 3, seed=42)
>>> model.numTrees()
3
>>> model.totalNumNodes()
7
>>> print model,
TreeEnsembleModel classifier with 3 trees
>>> print model.toDebugString(),
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    Predict: 1.0
  Tree 1:
    If (feature 0 <= 1.0)
     Predict: 0.0
    Else (feature 0 > 1.0)
     Predict: 1.0
  Tree 2:
    If (feature 0 <= 1.0)
     Predict: 0.0
    Else (feature 0 > 1.0)
     Predict: 1.0
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
0.0
>>> rdd = sc.parallelize([[3.0], [1.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
classmethod trainRegressor(data, categoricalFeaturesInfo, numTrees, featureSubsetStrategy='auto', impurity='variance', maxDepth=4, maxBins=32, seed=None)[source]

Method to train a decision tree model for regression.

Parameters:
  • data – Training dataset: RDD of LabeledPoint. Labels are real numbers.
  • categoricalFeaturesInfo – Map storing arity of categorical features. E.g., an entry (n -> k) indicates that feature n is categorical with k categories indexed from 0: {0, 1, ..., k-1}.
  • numTrees – Number of trees in the random forest.
  • featureSubsetStrategy – Number of features to consider for splits at each node. Supported: “auto” (default), “all”, “sqrt”, “log2”, “onethird”. If “auto” is set, this parameter is set based on numTrees: if numTrees == 1, set to “all”; if numTrees > 1 (forest) set to “onethird” for regression.
  • impurity – Criterion used for information gain calculation. Supported values: “variance”.
  • maxDepth – Maximum depth of the tree. E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 4)
  • maxBins – maximum number of bins used for splitting features (default: 100)
  • seed – Random seed for bootstrapping and choosing feature subsets.
Returns:

RandomForestModel that can be used for prediction

Example usage:

>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = RandomForest.trainRegressor(sc.parallelize(sparse_data), {}, 2, seed=42)
>>> model.numTrees()
2
>>> model.totalNumNodes()
4
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
0.5
>>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.5]

pyspark.mllib.util module

class pyspark.mllib.util.MLUtils[source]

Bases: object

Helper methods to load, save and pre-process data used in MLlib.

static loadLabeledPoints(sc, path, minPartitions=None)[source]

Load labeled points saved using RDD.saveAsTextFile.

Parameters:
  • sc – Spark context
  • path – file or directory path in any Hadoop-supported file system URI
  • minPartitions – min number of partitions
Returns:

labeled data stored as an RDD of LabeledPoint

>>> from tempfile import NamedTemporaryFile
>>> from pyspark.mllib.util import MLUtils
>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])),                         LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
>>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
[LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])]
static loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None, multiclass=None)[source]

Loads labeled data in the LIBSVM format into an RDD of LabeledPoint. The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR. Each line represents a labeled sparse feature vector using the following format:

label index1:value1 index2:value2 ...

where the indices are one-based and in ascending order. This method parses each line into a LabeledPoint, where the feature indices are converted to zero-based.

Parameters:
  • sc – Spark context
  • path – file or directory path in any Hadoop-supported file system URI
  • numFeatures – number of features, which will be determined from the input data if a nonpositive value is given. This is useful when the dataset is already split into multiple files and you want to load them separately, because some features may not present in certain files, which leads to inconsistent feature dimensions.
  • minPartitions – min number of partitions
Returns:

labeled data stored as an RDD of LabeledPoint

>>> from tempfile import NamedTemporaryFile
>>> from pyspark.mllib.util import MLUtils
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\n-1\n-1 2:4.0 4:5.0 6:6.0")
>>> tempFile.flush()
>>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
>>> tempFile.close()
>>> type(examples[0]) == LabeledPoint
True
>>> print examples[0]
(1.0,(6,[0,2,4],[1.0,2.0,3.0]))
>>> type(examples[1]) == LabeledPoint
True
>>> print examples[1]
(-1.0,(6,[],[]))
>>> type(examples[2]) == LabeledPoint
True
>>> print examples[2]
(-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
static saveAsLibSVMFile(data, dir)[source]

Save labeled data in LIBSVM format.

Parameters:
  • data – an RDD of LabeledPoint to be saved
  • dir – directory to save the data
>>> from tempfile import NamedTemporaryFile
>>> from fileinput import input
>>> from glob import glob
>>> from pyspark.mllib.util import MLUtils
>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])),                         LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
>>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0.0 1:1.01 2:2.02 3:3.03\n1.1 1:1.23 3:4.56\n'