Package pyspark :: Module serializers
[frames] | no frames]

Source Code for Module pyspark.serializers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  """ 
 19  PySpark supports custom serializers for transferring data; this can improve 
 20  performance. 
 21   
 22  By default, PySpark uses L{PickleSerializer} to serialize objects using Python's 
 23  C{cPickle} serializer, which can serialize nearly any Python object. 
 24  Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be 
 25  faster. 
 26   
 27  The serializer is chosen when creating L{SparkContext}: 
 28   
 29  >>> from pyspark.context import SparkContext 
 30  >>> from pyspark.serializers import MarshalSerializer 
 31  >>> sc = SparkContext('local', 'test', serializer=MarshalSerializer()) 
 32  >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) 
 33  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 
 34  >>> sc.stop() 
 35   
 36  By default, PySpark serialize objects in batches; the batch size can be 
 37  controlled through SparkContext's C{batchSize} parameter 
 38  (the default size is 1024 objects): 
 39   
 40  >>> sc = SparkContext('local', 'test', batchSize=2) 
 41  >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) 
 42   
 43  Behind the scenes, this creates a JavaRDD with four partitions, each of 
 44  which contains two batches of two objects: 
 45   
 46  >>> rdd.glom().collect() 
 47  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] 
 48  >>> rdd._jrdd.count() 
 49  8L 
 50  >>> sc.stop() 
 51   
 52  A batch size of -1 uses an unlimited batch size, and a size of 1 disables 
 53  batching: 
 54   
 55  >>> sc = SparkContext('local', 'test', batchSize=1) 
 56  >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) 
 57  >>> rdd.glom().collect() 
 58  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] 
 59  >>> rdd._jrdd.count() 
 60  16L 
 61  """ 
 62   
 63  import cPickle 
 64  from itertools import chain, izip, product 
 65  import marshal 
 66  import struct 
 67  from pyspark import cloudpickle 
 68   
 69   
 70  __all__ = ["PickleSerializer", "MarshalSerializer"] 
 71   
 72   
73 -class SpecialLengths(object):
74 END_OF_DATA_SECTION = -1 75 PYTHON_EXCEPTION_THROWN = -2 76 TIMING_DATA = -3
77 78
79 -class Serializer(object):
80
81 - def dump_stream(self, iterator, stream):
82 """ 83 Serialize an iterator of objects to the output stream. 84 """ 85 raise NotImplementedError
86
87 - def load_stream(self, stream):
88 """ 89 Return an iterator of deserialized objects from the input stream. 90 """ 91 raise NotImplementedError
92 93
94 - def _load_stream_without_unbatching(self, stream):
95 return self.load_stream(stream)
96 97 # Note: our notion of "equality" is that output generated by 98 # equal serializers can be deserialized using the same serializer. 99 100 # This default implementation handles the simple cases; 101 # subclasses should override __eq__ as appropriate. 102
103 - def __eq__(self, other):
104 return isinstance(other, self.__class__)
105
106 - def __ne__(self, other):
107 return not self.__eq__(other)
108 109
110 -class FramedSerializer(Serializer):
111 """ 112 Serializer that writes objects as a stream of (length, data) pairs, 113 where C{length} is a 32-bit integer and data is C{length} bytes. 114 """ 115
116 - def dump_stream(self, iterator, stream):
117 for obj in iterator: 118 self._write_with_length(obj, stream)
119
120 - def load_stream(self, stream):
121 while True: 122 try: 123 yield self._read_with_length(stream) 124 except EOFError: 125 return
126
127 - def _write_with_length(self, obj, stream):
128 serialized = self.dumps(obj) 129 write_int(len(serialized), stream) 130 stream.write(serialized)
131
132 - def _read_with_length(self, stream):
133 length = read_int(stream) 134 obj = stream.read(length) 135 if obj == "": 136 raise EOFError 137 return self.loads(obj)
138
139 - def dumps(self, obj):
140 """ 141 Serialize an object into a byte array. 142 When batching is used, this will be called with an array of objects. 143 """ 144 raise NotImplementedError
145
146 - def loads(self, obj):
147 """ 148 Deserialize an object from a byte array. 149 """ 150 raise NotImplementedError
151 152
153 -class BatchedSerializer(Serializer):
154 """ 155 Serializes a stream of objects in batches by calling its wrapped 156 Serializer with streams of objects. 157 """ 158 159 UNLIMITED_BATCH_SIZE = -1 160
161 - def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
162 self.serializer = serializer 163 self.batchSize = batchSize
164
165 - def _batched(self, iterator):
166 if self.batchSize == self.UNLIMITED_BATCH_SIZE: 167 yield list(iterator) 168 else: 169 items = [] 170 count = 0 171 for item in iterator: 172 items.append(item) 173 count += 1 174 if count == self.batchSize: 175 yield items 176 items = [] 177 count = 0 178 if items: 179 yield items
180
181 - def dump_stream(self, iterator, stream):
182 self.serializer.dump_stream(self._batched(iterator), stream)
183
184 - def load_stream(self, stream):
185 return chain.from_iterable(self._load_stream_without_unbatching(stream))
186
187 - def _load_stream_without_unbatching(self, stream):
188 return self.serializer.load_stream(stream)
189
190 - def __eq__(self, other):
191 return isinstance(other, BatchedSerializer) and \ 192 other.serializer == self.serializer
193
194 - def __str__(self):
195 return "BatchedSerializer<%s>" % str(self.serializer)
196 197
198 -class CartesianDeserializer(FramedSerializer):
199 """ 200 Deserializes the JavaRDD cartesian() of two PythonRDDs. 201 """ 202
203 - def __init__(self, key_ser, val_ser):
204 self.key_ser = key_ser 205 self.val_ser = val_ser
206
207 - def load_stream(self, stream):
208 key_stream = self.key_ser._load_stream_without_unbatching(stream) 209 val_stream = self.val_ser._load_stream_without_unbatching(stream) 210 key_is_batched = isinstance(self.key_ser, BatchedSerializer) 211 val_is_batched = isinstance(self.val_ser, BatchedSerializer) 212 for (keys, vals) in izip(key_stream, val_stream): 213 keys = keys if key_is_batched else [keys] 214 vals = vals if val_is_batched else [vals] 215 for pair in product(keys, vals): 216 yield pair
217
218 - def __eq__(self, other):
219 return isinstance(other, CartesianDeserializer) and \ 220 self.key_ser == other.key_ser and self.val_ser == other.val_ser
221
222 - def __str__(self):
223 return "CartesianDeserializer<%s, %s>" % \ 224 (str(self.key_ser), str(self.val_ser))
225 226
227 -class NoOpSerializer(FramedSerializer):
228
229 - def loads(self, obj): return obj
230 - def dumps(self, obj): return obj
231 232
233 -class PickleSerializer(FramedSerializer):
234 """ 235 Serializes objects using Python's cPickle serializer: 236 237 http://docs.python.org/2/library/pickle.html 238 239 This serializer supports nearly any Python object, but may 240 not be as fast as more specialized serializers. 241 """ 242
243 - def dumps(self, obj): return cPickle.dumps(obj, 2)
244 loads = cPickle.loads
245
246 -class CloudPickleSerializer(PickleSerializer):
247
248 - def dumps(self, obj): return cloudpickle.dumps(obj, 2)
249 250
251 -class MarshalSerializer(FramedSerializer):
252 """ 253 Serializes objects using Python's Marshal serializer: 254 255 http://docs.python.org/2/library/marshal.html 256 257 This serializer is faster than PickleSerializer but supports fewer datatypes. 258 """ 259 260 dumps = marshal.dumps 261 loads = marshal.loads
262 263
264 -class UTF8Deserializer(Serializer):
265 """ 266 Deserializes streams written by getBytes. 267 """ 268
269 - def loads(self, stream):
270 length = read_int(stream) 271 return stream.read(length).decode('utf8')
272
273 - def load_stream(self, stream):
274 while True: 275 try: 276 yield self.loads(stream) 277 except struct.error: 278 return 279 except EOFError: 280 return
281 282
283 -def read_long(stream):
284 length = stream.read(8) 285 if length == "": 286 raise EOFError 287 return struct.unpack("!q", length)[0]
288 289
290 -def write_long(value, stream):
291 stream.write(struct.pack("!q", value))
292 293
294 -def pack_long(value):
295 return struct.pack("!q", value)
296 297
298 -def read_int(stream):
299 length = stream.read(4) 300 if length == "": 301 raise EOFError 302 return struct.unpack("!i", length)[0]
303 304
305 -def write_int(value, stream):
306 stream.write(struct.pack("!i", value))
307 308
309 -def write_with_length(obj, stream):
310 write_int(len(obj), stream) 311 stream.write(obj)
312