Package pyspark :: Module accumulators
[frames] | no frames]

Module accumulators

source code

>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> a = sc.accumulator(1)
>>> a.value
1
>>> a.value = 2
>>> a.value
2
>>> a += 5
>>> a.value
7
>>> sc.accumulator(1.0).value
1.0
>>> sc.accumulator(1j).value
1j
>>> rdd = sc.parallelize([1,2,3])
>>> def f(x):
...     global a
...     a += x
>>> rdd.foreach(f)
>>> a.value
13
>>> b = sc.accumulator(0)
>>> def g(x):
...     b.add(x)
>>> rdd.foreach(g)
>>> b.value
6
>>> from pyspark.accumulators import AccumulatorParam
>>> class VectorAccumulatorParam(AccumulatorParam):
...     def zero(self, value):
...         return [0.0] * len(value)
...     def addInPlace(self, val1, val2):
...         for i in xrange(len(val1)):
...              val1[i] += val2[i]
...         return val1
>>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
>>> va.value
[1.0, 2.0, 3.0]
>>> def g(x):
...     global va
...     va += [x] * 3
>>> rdd.foreach(g)
>>> va.value
[7.0, 8.0, 9.0]
>>> rdd.map(lambda x: a.value).collect() # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
    ...
Py4JJavaError:...
>>> def h(x):
...     global a
...     a.value = 7
>>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
    ...
Py4JJavaError:...
>>> sc.accumulator([1.0, 2.0, 3.0]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
    ...
Exception:...
Classes
  Accumulator
A shared variable that can be accumulated, i.e., has a commutative and associative "add" operation.
  AccumulatorParam
Helper object that defines how to accumulate values of a given type.
  AddingAccumulatorParam
An AccumulatorParam that uses the + operators to add values.
Variables
  pickleSer = PickleSerializer()
  INT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0)
  FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
  COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)