Package pyspark :: Module broadcast
[frames] | no frames]

Source Code for Module pyspark.broadcast

 1  # 
 2  # Licensed to the Apache Software Foundation (ASF) under one or more 
 3  # contributor license agreements.  See the NOTICE file distributed with 
 4  # this work for additional information regarding copyright ownership. 
 5  # The ASF licenses this file to You under the Apache License, Version 2.0 
 6  # (the "License"); you may not use this file except in compliance with 
 7  # the License.  You may obtain a copy of the License at 
 8  # 
 9  #    http://www.apache.org/licenses/LICENSE-2.0 
10  # 
11  # Unless required by applicable law or agreed to in writing, software 
12  # distributed under the License is distributed on an "AS IS" BASIS, 
13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14  # See the License for the specific language governing permissions and 
15  # limitations under the License. 
16  # 
17   
18  """ 
19  >>> from pyspark.context import SparkContext 
20  >>> sc = SparkContext('local', 'test') 
21  >>> b = sc.broadcast([1, 2, 3, 4, 5]) 
22  >>> b.value 
23  [1, 2, 3, 4, 5] 
24  >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() 
25  [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] 
26  >>> b.unpersist() 
27   
28  >>> large_broadcast = sc.broadcast(list(range(10000))) 
29  """ 
30  import os 
31   
32  from pyspark.serializers import CompressedSerializer, PickleSerializer 
33   
34  # Holds broadcasted data received from Java, keyed by its id. 
35  _broadcastRegistry = {} 
36   
37   
38 -def _from_id(bid):
39 from pyspark.broadcast import _broadcastRegistry 40 if bid not in _broadcastRegistry: 41 raise Exception("Broadcast variable '%s' not loaded!" % bid) 42 return _broadcastRegistry[bid]
43 44
45 -class Broadcast(object):
46 47 """ 48 A broadcast variable created with 49 L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}. 50 Access its value through C{.value}. 51 """ 52
53 - def __init__(self, bid, value, java_broadcast=None, 54 pickle_registry=None, path=None):
55 """ 56 Should not be called directly by users -- use 57 L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>} 58 instead. 59 """ 60 self.bid = bid 61 if path is None: 62 self.value = value 63 self._jbroadcast = java_broadcast 64 self._pickle_registry = pickle_registry 65 self.path = path
66
67 - def unpersist(self, blocking=False):
68 self._jbroadcast.unpersist(blocking) 69 os.unlink(self.path)
70
71 - def __reduce__(self):
72 self._pickle_registry.add(self) 73 return (_from_id, (self.bid, ))
74
75 - def __getattr__(self, item):
76 if item == 'value' and self.path is not None: 77 ser = CompressedSerializer(PickleSerializer()) 78 value = ser.load_stream(open(self.path)).next() 79 self.value = value 80 return value 81 82 raise AttributeError(item)
83 84 85 if __name__ == "__main__": 86 import doctest 87 doctest.testmod() 88