Package pyspark :: Package mllib :: Module util
[frames] | no frames]

Source Code for Module pyspark.mllib.util

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  import numpy as np 
 19   
 20  from pyspark.mllib.linalg import Vectors, SparseVector 
 21  from pyspark.mllib.regression import LabeledPoint 
 22  from pyspark.mllib._common import _convert_vector 
23 24 25 -class MLUtils:
26 """ 27 Helper methods to load, save and pre-process data used in MLlib. 28 """ 29 30 @staticmethod
31 - def _parse_libsvm_line(line, multiclass):
32 """ 33 Parses a line in LIBSVM format into (label, indices, values). 34 """ 35 items = line.split(None) 36 label = float(items[0]) 37 if not multiclass: 38 label = 1.0 if label > 0.5 else 0.0 39 nnz = len(items) - 1 40 indices = np.zeros(nnz, dtype=np.int32) 41 values = np.zeros(nnz) 42 for i in xrange(nnz): 43 index, value = items[1 + i].split(":") 44 indices[i] = int(index) - 1 45 values[i] = float(value) 46 return label, indices, values
47 48 @staticmethod
50 """Converts a LabeledPoint to a string in LIBSVM format.""" 51 items = [str(p.label)] 52 v = _convert_vector(p.features) 53 if type(v) == np.ndarray: 54 for i in xrange(len(v)): 55 items.append(str(i + 1) + ":" + str(v[i])) 56 elif type(v) == SparseVector: 57 nnz = len(v.indices) 58 for i in xrange(nnz): 59 items.append(str(v.indices[i] + 1) + ":" + str(v.values[i])) 60 else: 61 raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector" 62 " but got " % type(v)) 63 return " ".join(items)
64 65 @staticmethod
66 - def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None):
67 """ 68 Loads labeled data in the LIBSVM format into an RDD of 69 LabeledPoint. The LIBSVM format is a text-based format used by 70 LIBSVM and LIBLINEAR. Each line represents a labeled sparse 71 feature vector using the following format: 72 73 label index1:value1 index2:value2 ... 74 75 where the indices are one-based and in ascending order. This 76 method parses each line into a LabeledPoint, where the feature 77 indices are converted to zero-based. 78 79 @param sc: Spark context 80 @param path: file or directory path in any Hadoop-supported file 81 system URI 82 @param multiclass: whether the input labels contain more than 83 two classes. If false, any label with value 84 greater than 0.5 will be mapped to 1.0, or 85 0.0 otherwise. So it works for both +1/-1 and 86 1/0 cases. If true, the double value parsed 87 directly from the label string will be used 88 as the label value. 89 @param numFeatures: number of features, which will be determined 90 from the input data if a nonpositive value 91 is given. This is useful when the dataset is 92 already split into multiple files and you 93 want to load them separately, because some 94 features may not present in certain files, 95 which leads to inconsistent feature 96 dimensions. 97 @param minPartitions: min number of partitions 98 @return: labeled data stored as an RDD of LabeledPoint 99 100 >>> from tempfile import NamedTemporaryFile 101 >>> from pyspark.mllib.util import MLUtils 102 >>> tempFile = NamedTemporaryFile(delete=True) 103 >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0") 104 >>> tempFile.flush() 105 >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() 106 >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect() 107 >>> tempFile.close() 108 >>> examples[0].label 109 1.0 110 >>> examples[0].features.size 111 6 112 >>> print examples[0].features 113 [0: 1.0, 2: 2.0, 4: 3.0] 114 >>> examples[1].label 115 0.0 116 >>> examples[1].features.size 117 6 118 >>> print examples[1].features 119 [] 120 >>> examples[2].label 121 0.0 122 >>> examples[2].features.size 123 6 124 >>> print examples[2].features 125 [1: 4.0, 3: 5.0, 5: 6.0] 126 >>> multiclass_examples[1].label 127 -1.0 128 """ 129 130 lines = sc.textFile(path, minPartitions) 131 parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l, multiclass)) 132 if numFeatures <= 0: 133 parsed.cache() 134 numFeatures = parsed.map(lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 135 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2])))
136 137 @staticmethod
138 - def saveAsLibSVMFile(data, dir):
139 """ 140 Save labeled data in LIBSVM format. 141 142 @param data: an RDD of LabeledPoint to be saved 143 @param dir: directory to save the data 144 145 >>> from tempfile import NamedTemporaryFile 146 >>> from fileinput import input 147 >>> from glob import glob 148 >>> from pyspark.mllib.util import MLUtils 149 >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ 150 LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] 151 >>> tempFile = NamedTemporaryFile(delete=True) 152 >>> tempFile.close() 153 >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name) 154 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 155 '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n' 156 """ 157 lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p)) 158 lines.saveAsTextFile(dir)
159
160 161 -def _test():
162 import doctest 163 from pyspark.context import SparkContext 164 globs = globals().copy() 165 # The small batch size here ensures that we see multiple batches, 166 # even in these small test examples: 167 globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) 168 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 169 globs['sc'].stop() 170 if failure_count: 171 exit(-1)
172 173 174 if __name__ == "__main__": 175 _test() 176