Package pyspark :: Package mllib :: Module stat
[frames] | no frames]

Source Code for Module pyspark.mllib.stat

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  # 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 18  """ 
 19  Python package for statistical functions in MLlib. 
 20  """ 
 22  from pyspark.mllib._common import \ 
 23      _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \ 
 24      _serialize_double, _serialize_double_vector, \ 
 25      _deserialize_double, _deserialize_double_matrix, _deserialize_double_vector 
26 27 28 -class MultivariateStatisticalSummary(object):
29 30 """ 31 Trait for multivariate statistical summary of a data matrix. 32 """ 33
34 - def __init__(self, sc, java_summary):
35 """ 36 :param sc: Spark context 37 :param java_summary: Handle to Java summary object 38 """ 39 self._sc = sc 40 self._java_summary = java_summary
42 - def __del__(self):
43 self._sc._gateway.detach(self._java_summary)
45 - def mean(self):
46 return _deserialize_double_vector(self._java_summary.mean())
48 - def variance(self):
49 return _deserialize_double_vector(self._java_summary.variance())
51 - def count(self):
52 return self._java_summary.count()
54 - def numNonzeros(self):
55 return _deserialize_double_vector(self._java_summary.numNonzeros())
57 - def max(self):
58 return _deserialize_double_vector(self._java_summary.max())
60 - def min(self):
61 return _deserialize_double_vector(self._java_summary.min())
63 64 -class Statistics(object):
65 66 @staticmethod
67 - def colStats(X):
68 """ 69 Computes column-wise summary statistics for the input RDD[Vector]. 70 71 >>> from linalg import Vectors 72 >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]), 73 ... Vectors.dense([4, 5, 0, 3]), 74 ... Vectors.dense([6, 7, 0, 8])]) 75 >>> cStats = Statistics.colStats(rdd) 76 >>> cStats.mean() 77 array([ 4., 4., 0., 3.]) 78 >>> cStats.variance() 79 array([ 4., 13., 0., 25.]) 80 >>> cStats.count() 81 3L 82 >>> cStats.numNonzeros() 83 array([ 3., 2., 0., 3.]) 84 >>> cStats.max() 85 array([ 6., 7., 0., 8.]) 86 >>> cStats.min() 87 array([ 2., 0., 0., -2.]) 88 """ 89 sc = X.ctx 90 Xser = _get_unmangled_double_vector_rdd(X) 91 cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd) 92 return MultivariateStatisticalSummary(sc, cStats)
93 94 @staticmethod
95 - def corr(x, y=None, method=None):
96 """ 97 Compute the correlation (matrix) for the input RDD(s) using the 98 specified method. 99 Methods currently supported: I{pearson (default), spearman}. 100 101 If a single RDD of Vectors is passed in, a correlation matrix 102 comparing the columns in the input RDD is returned. Use C{method=} 103 to specify the method to be used for single RDD inout. 104 If two RDDs of floats are passed in, a single float is returned. 105 106 >>> x = sc.parallelize([1.0, 0.0, -2.0], 2) 107 >>> y = sc.parallelize([4.0, 5.0, 3.0], 2) 108 >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2) 109 >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7 110 True 111 >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson") 112 True 113 >>> Statistics.corr(x, y, "spearman") 114 0.5 115 >>> from math import isnan 116 >>> isnan(Statistics.corr(x, zeros)) 117 True 118 >>> from linalg import Vectors 119 >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]), 120 ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])]) 121 >>> pearsonCorr = Statistics.corr(rdd) 122 >>> print str(pearsonCorr).replace('nan', 'NaN') 123 [[ 1. 0.05564149 NaN 0.40047142] 124 [ 0.05564149 1. NaN 0.91359586] 125 [ NaN NaN 1. NaN] 126 [ 0.40047142 0.91359586 NaN 1. ]] 127 >>> spearmanCorr = Statistics.corr(rdd, method="spearman") 128 >>> print str(spearmanCorr).replace('nan', 'NaN') 129 [[ 1. 0.10540926 NaN 0.4 ] 130 [ 0.10540926 1. NaN 0.9486833 ] 131 [ NaN NaN 1. NaN] 132 [ 0.4 0.9486833 NaN 1. ]] 133 >>> try: 134 ... Statistics.corr(rdd, "spearman") 135 ... print "Method name as second argument without 'method=' shouldn't be allowed." 136 ... except TypeError: 137 ... pass 138 """ 139 sc = x.ctx 140 # Check inputs to determine whether a single value or a matrix is needed for output. 141 # Since it's legal for users to use the method name as the second argument, we need to 142 # check if y is used to specify the method name instead. 143 if type(y) == str: 144 raise TypeError("Use 'method=' to specify method name.") 145 if not y: 146 try: 147 Xser = _get_unmangled_double_vector_rdd(x) 148 except TypeError: 149 raise TypeError("corr called on a single RDD not consisted of Vectors.") 150 resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method) 151 return _deserialize_double_matrix(resultMat) 152 else: 153 xSer = _get_unmangled_rdd(x, _serialize_double) 154 ySer = _get_unmangled_rdd(y, _serialize_double) 155 result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method) 156 return result
158 159 -def _test():
160 import doctest 161 from pyspark import SparkContext 162 globs = globals().copy() 163 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 164 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 165 globs['sc'].stop() 166 if failure_count: 167 exit(-1)
168 169 170 if __name__ == "__main__": 171 _test() 172