Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
# Holds accumulators registered on the current machine, keyed by ID. This is then used to send # the local accumulator updates back to the driver program at the end of a task.
from pyspark.accumulators import _accumulatorRegistry # If this certain accumulator was deserialized, don't overwrite it. if aid in _accumulatorRegistry: return _accumulatorRegistry[aid] else: accum = Accumulator(aid, zero_value, accum_param) accum._deserialized = True _accumulatorRegistry[aid] = accum return accum
""" A shared variable that can be accumulated, i.e., has a commutative and associative "add" operation. Worker tasks on a Spark cluster can add values to an Accumulator with the `+=` operator, but only the driver program is allowed to access its value, using `value`. Updates from the workers get propagated automatically to the driver program.
While :class:`SparkContext` supports accumulators for primitive data types like :class:`int` and :class:`float`, users can also define accumulators for custom types by providing a custom :py:class:`AccumulatorParam` object. Refer to its doctest for an example.
Examples -------- >>> a = sc.accumulator(1) >>> a.value 1 >>> a.value = 2 >>> a.value 2 >>> a += 5 >>> a.value 7 >>> sc.accumulator(1.0).value 1.0 >>> sc.accumulator(1j).value 1j >>> rdd = sc.parallelize([1,2,3]) >>> def f(x): ... global a ... a += x >>> rdd.foreach(f) >>> a.value 13 >>> b = sc.accumulator(0) >>> def g(x): ... b.add(x) >>> rdd.foreach(g) >>> b.value 6
>>> rdd.map(lambda x: a.value).collect() # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... Py4JJavaError: ...
>>> def h(x): ... global a ... a.value = 7 >>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... Py4JJavaError: ...
>>> sc.accumulator([1.0, 2.0, 3.0]) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... TypeError: ... """
"""Create a new Accumulator with a given initial value and AccumulatorParam object"""
"""Custom serialization; saves the zero value from our AccumulatorParam"""
def value(self): """Get the accumulator's value; only usable in driver program""" raise RuntimeError("Accumulator.value cannot be accessed inside tasks")
def value(self, value): """Sets the accumulator's value; only usable in driver program""" raise RuntimeError("Accumulator.value cannot be accessed inside tasks")
"""Adds a term to this accumulator's value"""
"""The += operator; adds a term to this accumulator's value"""
return str(self._value)
return "Accumulator<id=%i, value=%s>" % (self.aid, self._value)
""" Helper object that defines how to accumulate values of a given type.
Examples -------- >>> from pyspark.accumulators import AccumulatorParam >>> class VectorAccumulatorParam(AccumulatorParam): ... def zero(self, value): ... return [0.0] * len(value) ... def addInPlace(self, val1, val2): ... for i in range(len(val1)): ... val1[i] += val2[i] ... return val1 >>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam()) >>> va.value [1.0, 2.0, 3.0] >>> def g(x): ... global va ... va += [x] * 3 >>> rdd = sc.parallelize([1,2,3]) >>> rdd.foreach(g) >>> va.value [7.0, 8.0, 9.0] """
""" Provide a "zero value" for the type, compatible in dimensions with the provided `value` (e.g., a zero vector) """ raise NotImplementedError
""" Add two values of the accumulator's data type, returning a new value; for efficiency, can also update `value1` in place and return it. """ raise NotImplementedError
""" An AccumulatorParam that uses the + operators to add values. Designed for simple types such as integers, floats, and lists. Requires the zero value for the underlying type as a parameter. """
# Singleton accumulator params for some standard types
""" This handler will keep polling updates from the same socket until the server is shutdown. """
# Poll every 1 second for new data -- don't block in case of shutdown.
# Write a byte in acknowledgement
# we've authenticated, we can break out of the first loop now else: raise ValueError( "The value of the provided token to the AccumulatorServer is not correct.")
# first we keep polling till we've received the authentication token # now we've authenticated, don't need to check for the token anymore
""" A simple TCP server that intercepts shutdown() in order to interrupt our continuous polling on the handler. """
"""Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
# The small batch size here ensures that we see multiple batches, # even in these small test examples: globs=globs, optionflags=doctest.ELLIPSIS) sys.exit(-1) |