Package pyspark :: Module serializers
[frames] | no frames]

Source Code for Module pyspark.serializers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  """ 
 19  PySpark supports custom serializers for transferring data; this can improve 
 20  performance. 
 21   
 22  By default, PySpark uses L{PickleSerializer} to serialize objects using Python's 
 23  C{cPickle} serializer, which can serialize nearly any Python object. 
 24  Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be 
 25  faster. 
 26   
 27  The serializer is chosen when creating L{SparkContext}: 
 28   
 29  >>> from pyspark.context import SparkContext 
 30  >>> from pyspark.serializers import MarshalSerializer 
 31  >>> sc = SparkContext('local', 'test', serializer=MarshalSerializer()) 
 32  >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) 
 33  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 
 34  >>> sc.stop() 
 35   
 36  By default, PySpark serialize objects in batches; the batch size can be 
 37  controlled through SparkContext's C{batchSize} parameter 
 38  (the default size is 1024 objects): 
 39   
 40  >>> sc = SparkContext('local', 'test', batchSize=2) 
 41  >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) 
 42   
 43  Behind the scenes, this creates a JavaRDD with four partitions, each of 
 44  which contains two batches of two objects: 
 45   
 46  >>> rdd.glom().collect() 
 47  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] 
 48  >>> rdd._jrdd.count() 
 49  8L 
 50  >>> sc.stop() 
 51   
 52  A batch size of -1 uses an unlimited batch size, and a size of 1 disables 
 53  batching: 
 54   
 55  >>> sc = SparkContext('local', 'test', batchSize=1) 
 56  >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) 
 57  >>> rdd.glom().collect() 
 58  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] 
 59  >>> rdd._jrdd.count() 
 60  16L 
 61  """ 
 62   
 63  import cPickle 
 64  from itertools import chain, izip, product 
 65  import marshal 
 66  import struct 
 67  import sys 
 68  from pyspark import cloudpickle 
 69   
 70   
 71  __all__ = ["PickleSerializer", "MarshalSerializer"] 
 72   
 73   
74 -class SpecialLengths(object):
75 END_OF_DATA_SECTION = -1 76 PYTHON_EXCEPTION_THROWN = -2 77 TIMING_DATA = -3
78 79
80 -class Serializer(object):
81
82 - def dump_stream(self, iterator, stream):
83 """ 84 Serialize an iterator of objects to the output stream. 85 """ 86 raise NotImplementedError
87
88 - def load_stream(self, stream):
89 """ 90 Return an iterator of deserialized objects from the input stream. 91 """ 92 raise NotImplementedError
93 94
95 - def _load_stream_without_unbatching(self, stream):
96 return self.load_stream(stream)
97 98 # Note: our notion of "equality" is that output generated by 99 # equal serializers can be deserialized using the same serializer. 100 101 # This default implementation handles the simple cases; 102 # subclasses should override __eq__ as appropriate. 103
104 - def __eq__(self, other):
105 return isinstance(other, self.__class__)
106
107 - def __ne__(self, other):
108 return not self.__eq__(other)
109 110
111 -class FramedSerializer(Serializer):
112 """ 113 Serializer that writes objects as a stream of (length, data) pairs, 114 where C{length} is a 32-bit integer and data is C{length} bytes. 115 """ 116
117 - def __init__(self):
118 # On Python 2.6, we can't write bytearrays to streams, so we need to convert them 119 # to strings first. Check if the version number is that old. 120 self._only_write_strings = sys.version_info[0:2] <= (2, 6)
121
122 - def dump_stream(self, iterator, stream):
123 for obj in iterator: 124 self._write_with_length(obj, stream)
125
126 - def load_stream(self, stream):
127 while True: 128 try: 129 yield self._read_with_length(stream) 130 except EOFError: 131 return
132
133 - def _write_with_length(self, obj, stream):
134 serialized = self.dumps(obj) 135 write_int(len(serialized), stream) 136 if self._only_write_strings: 137 stream.write(str(serialized)) 138 else: 139 stream.write(serialized)
140
141 - def _read_with_length(self, stream):
142 length = read_int(stream) 143 obj = stream.read(length) 144 if obj == "": 145 raise EOFError 146 return self.loads(obj)
147
148 - def dumps(self, obj):
149 """ 150 Serialize an object into a byte array. 151 When batching is used, this will be called with an array of objects. 152 """ 153 raise NotImplementedError
154
155 - def loads(self, obj):
156 """ 157 Deserialize an object from a byte array. 158 """ 159 raise NotImplementedError
160 161
162 -class BatchedSerializer(Serializer):
163 """ 164 Serializes a stream of objects in batches by calling its wrapped 165 Serializer with streams of objects. 166 """ 167 168 UNLIMITED_BATCH_SIZE = -1 169
170 - def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
171 self.serializer = serializer 172 self.batchSize = batchSize
173
174 - def _batched(self, iterator):
175 if self.batchSize == self.UNLIMITED_BATCH_SIZE: 176 yield list(iterator) 177 else: 178 items = [] 179 count = 0 180 for item in iterator: 181 items.append(item) 182 count += 1 183 if count == self.batchSize: 184 yield items 185 items = [] 186 count = 0 187 if items: 188 yield items
189
190 - def dump_stream(self, iterator, stream):
191 self.serializer.dump_stream(self._batched(iterator), stream)
192
193 - def load_stream(self, stream):
194 return chain.from_iterable(self._load_stream_without_unbatching(stream))
195
196 - def _load_stream_without_unbatching(self, stream):
197 return self.serializer.load_stream(stream)
198
199 - def __eq__(self, other):
200 return isinstance(other, BatchedSerializer) and \ 201 other.serializer == self.serializer
202
203 - def __str__(self):
204 return "BatchedSerializer<%s>" % str(self.serializer)
205 206
207 -class CartesianDeserializer(FramedSerializer):
208 """ 209 Deserializes the JavaRDD cartesian() of two PythonRDDs. 210 """ 211
212 - def __init__(self, key_ser, val_ser):
213 self.key_ser = key_ser 214 self.val_ser = val_ser
215
216 - def prepare_keys_values(self, stream):
217 key_stream = self.key_ser._load_stream_without_unbatching(stream) 218 val_stream = self.val_ser._load_stream_without_unbatching(stream) 219 key_is_batched = isinstance(self.key_ser, BatchedSerializer) 220 val_is_batched = isinstance(self.val_ser, BatchedSerializer) 221 for (keys, vals) in izip(key_stream, val_stream): 222 keys = keys if key_is_batched else [keys] 223 vals = vals if val_is_batched else [vals] 224 yield (keys, vals)
225
226 - def load_stream(self, stream):
227 for (keys, vals) in self.prepare_keys_values(stream): 228 for pair in product(keys, vals): 229 yield pair
230
231 - def __eq__(self, other):
232 return isinstance(other, CartesianDeserializer) and \ 233 self.key_ser == other.key_ser and self.val_ser == other.val_ser
234
235 - def __str__(self):
236 return "CartesianDeserializer<%s, %s>" % \ 237 (str(self.key_ser), str(self.val_ser))
238 239
240 -class PairDeserializer(CartesianDeserializer):
241 """ 242 Deserializes the JavaRDD zip() of two PythonRDDs. 243 """ 244
245 - def __init__(self, key_ser, val_ser):
246 self.key_ser = key_ser 247 self.val_ser = val_ser
248
249 - def load_stream(self, stream):
250 for (keys, vals) in self.prepare_keys_values(stream): 251 for pair in izip(keys, vals): 252 yield pair
253
254 - def __eq__(self, other):
255 return isinstance(other, PairDeserializer) and \ 256 self.key_ser == other.key_ser and self.val_ser == other.val_ser
257
258 - def __str__(self):
259 return "PairDeserializer<%s, %s>" % \ 260 (str(self.key_ser), str(self.val_ser))
261 262
263 -class NoOpSerializer(FramedSerializer):
264
265 - def loads(self, obj): return obj
266 - def dumps(self, obj): return obj
267 268
269 -class PickleSerializer(FramedSerializer):
270 """ 271 Serializes objects using Python's cPickle serializer: 272 273 http://docs.python.org/2/library/pickle.html 274 275 This serializer supports nearly any Python object, but may 276 not be as fast as more specialized serializers. 277 """ 278
279 - def dumps(self, obj): return cPickle.dumps(obj, 2)
280 loads = cPickle.loads
281
282 -class CloudPickleSerializer(PickleSerializer):
283
284 - def dumps(self, obj): return cloudpickle.dumps(obj, 2)
285 286
287 -class MarshalSerializer(FramedSerializer):
288 """ 289 Serializes objects using Python's Marshal serializer: 290 291 http://docs.python.org/2/library/marshal.html 292 293 This serializer is faster than PickleSerializer but supports fewer datatypes. 294 """ 295 296 dumps = marshal.dumps 297 loads = marshal.loads
298 299
300 -class UTF8Deserializer(Serializer):
301 """ 302 Deserializes streams written by getBytes. 303 """ 304
305 - def loads(self, stream):
306 length = read_int(stream) 307 return stream.read(length).decode('utf8')
308
309 - def load_stream(self, stream):
310 while True: 311 try: 312 yield self.loads(stream) 313 except struct.error: 314 return 315 except EOFError: 316 return
317 318
319 -def read_long(stream):
320 length = stream.read(8) 321 if length == "": 322 raise EOFError 323 return struct.unpack("!q", length)[0]
324 325
326 -def write_long(value, stream):
327 stream.write(struct.pack("!q", value))
328 329
330 -def pack_long(value):
331 return struct.pack("!q", value)
332 333
334 -def read_int(stream):
335 length = stream.read(4) 336 if length == "": 337 raise EOFError 338 return struct.unpack("!i", length)[0]
339 340
341 -def write_int(value, stream):
342 stream.write(struct.pack("!i", value))
343 344
345 -def write_with_length(obj, stream):
346 write_int(len(obj), stream) 347 stream.write(obj)
348