Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
# Holds broadcasted data received from Java, keyed by its id.
from pyspark.broadcast import _broadcastRegistry if bid not in _broadcastRegistry: raise RuntimeError("Broadcast variable '%s' not loaded!" % bid) return _broadcastRegistry[bid]
""" A broadcast variable created with :meth:`SparkContext.broadcast`. Access its value through :attr:`value`.
Examples -------- >>> from pyspark.context import SparkContext >>> sc = SparkContext('local', 'test') >>> b = sc.broadcast([1, 2, 3, 4, 5]) >>> b.value [1, 2, 3, 4, 5] >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] >>> b.unpersist()
>>> large_broadcast = sc.broadcast(range(10000)) """
sock_file=None): """ Should not be called directly by users -- use :meth:`SparkContext.broadcast` instead. """ # we're on the driver. We want the pickled data to end up in a file (maybe encrypted) # with encryption, we ask the jvm to do the encryption for us, we send it data # over a socket else: # no encryption, we can just write pickled data directly to the file from python else: # we're on an executor self._jbroadcast = None self._sc = None self._python_broadcast = None if sock_file is not None: # the jvm is doing decryption for us. Read the value # immediately from the sock_file self._value = self.load(sock_file) else: # the jvm just dumps the pickled data in path -- we'll unpickle lazily when # the value is requested assert(path is not None) self._path = path
except pickle.PickleError: raise except Exception as e: msg = "Could not serialize broadcast: %s: %s" \ % (e.__class__.__name__, str(e)) print_exec(sys.stderr) raise pickle.PicklingError(msg)
# "file" could also be a socket finally:
def value(self): """ Return the broadcasted value """ # we only need to decrypt it here when encryption is enabled and # if its on the driver, since executor decryption is handled already else:
""" Delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor.
Parameters ---------- blocking : bool, optional Whether to block until unpersisting has completed """ raise RuntimeError("Broadcast can only be unpersisted in driver")
""" Destroy all data and metadata related to this broadcast variable. Use this with caution; once a broadcast variable has been destroyed, it cannot be used again.
.. versionchanged:: 3.0.0 Added optional argument `blocking` to specify whether to block until all blocks are deleted.
Parameters ---------- blocking : bool, optional Whether to block until unpersisting has completed """ raise RuntimeError("Broadcast can only be destroyed in driver")
raise RuntimeError("Broadcast can only be serialized in driver")
""" Thread-local registry for broadcast variables that have been pickled """
sys.exit(-1) |