Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
""" This class keeps track of different profilers on a per stage basis. Also this is used to create new profilers for the different stages. """
""" Create a new profiler using class `profiler_cls` """
""" Add a profiler for RDD `id` """ atexit.register(self.dump_profiles, self.profile_dump_path) else:
""" Dump the profile stats into directory `path` """
""" Print the profile stats to stdout """ # mark it as showed
""" PySpark supports custom profilers, this is to allow for different profilers to be used as well as outputting to different formats than what is provided in the BasicProfiler.
A custom profiler has to define or inherit the following methods: profile - will produce a system profile of some sort. stats - return the collected stats. dump - dumps the profiles to a path add - adds a profile to the existing accumulated profile
The profiler class is chosen when creating a SparkContext
Examples -------- >>> from pyspark import SparkConf, SparkContext >>> from pyspark import BasicProfiler >>> class MyCustomProfiler(BasicProfiler): ... def show(self, id): ... print("My custom profiles for RDD:%s" % id) ... >>> conf = SparkConf().set("spark.python.profile", "true") >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler) >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> sc.parallelize(range(1000)).count() 1000 >>> sc.show_profiles() My custom profiles for RDD:1 My custom profiles for RDD:3 >>> sc.stop()
Notes ----- This API is a developer API. """
""" Do profiling on the function `func`""" raise NotImplementedError
""" Return the collected profiling stats (pstats.Stats)""" raise NotImplementedError
""" Print the profile stats to stdout, id is the RDD id """
""" Dump the profile into path, id is the RDD id """ os.makedirs(path)
"""PStatsParam is used to merge pstats.Stats"""
def zero(value):
def addInPlace(value1, value2):
""" BasicProfiler is the default profiler, which is implemented based on cProfile and Accumulator """ # Creates a new accumulator for combining the profiles of different # partitions of a stage
""" Runs and profiles the method to_profile passed in. A profile object is returned. """ pr = cProfile.Profile() pr.runcall(func) st = pstats.Stats(pr) st.stream = None # make it picklable st.strip_dirs()
# Adds a new profile to the existing accumulated value self._accumulator.add(st)
sys.exit(-1) |