Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
Worker that receives input from Piped RDD. """ # 'resource' is a Unix specific module. except ImportError: has_resource_module = False
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \ BatchedSerializer
# worker can be used, so do not add path multiple times # overwrite system packages
"""chain two functions together """
else:
arrow_return_type = to_arrow_type(return_type)
def verify_result_type(result): if not hasattr(result, "__len__"): pd_type = "Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series" raise TypeError("Return type of the user-defined function should be " "{}, but is {}".format(pd_type, type(result))) return result
def verify_result_length(result, length): if len(result) != length: raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (length, len(result))) return result
return lambda *a: (verify_result_length( verify_result_type(f(*a)), len(a[0])), arrow_return_type)
arrow_return_type = to_arrow_type(return_type)
def verify_result_type(result): if not hasattr(result, "__len__"): pd_type = "Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series" raise TypeError("Return type of the user-defined function should be " "{}, but is {}".format(pd_type, type(result))) return result
return lambda *iterator: map(lambda res: (res, arrow_return_type), map(verify_result_type, f(*iterator)))
def wrapped(left_key_series, left_value_series, right_key_series, right_value_series): import pandas as pd
left_df = pd.concat(left_value_series, axis=1) right_df = pd.concat(right_value_series, axis=1)
if len(argspec.args) == 2: result = f(left_df, right_df) elif len(argspec.args) == 3: key_series = left_key_series if not left_df.empty else right_key_series key = tuple(s[0] for s in key_series) result = f(key, left_df, right_df) if not isinstance(result, pd.DataFrame): raise TypeError("Return type of the user-defined function should be " "pandas.DataFrame, but is {}".format(type(result))) if not len(result.columns) == len(return_type): raise RuntimeError( "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) return result
return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
def wrapped(key_series, value_series): import pandas as pd
if len(argspec.args) == 1: result = f(pd.concat(value_series, axis=1)) elif len(argspec.args) == 2: key = tuple(s[0] for s in key_series) result = f(key, pd.concat(value_series, axis=1))
if not isinstance(result, pd.DataFrame): raise TypeError("Return type of the user-defined function should be " "pandas.DataFrame, but is {}".format(type(result))) if not len(result.columns) == len(return_type): raise RuntimeError( "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) return result
return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
arrow_return_type = to_arrow_type(return_type)
def wrapped(*series): import pandas as pd result = f(*series) return pd.Series([result])
return lambda *a: (wrapped(*a), arrow_return_type)
window_bound_types_str = runner_conf.get('pandas_window_bound_types') window_bound_type = [t.strip().lower() for t in window_bound_types_str.split(',')][udf_index] if window_bound_type == 'bounded': return wrap_bounded_window_agg_pandas_udf(f, return_type) elif window_bound_type == 'unbounded': return wrap_unbounded_window_agg_pandas_udf(f, return_type) else: raise RuntimeError("Invalid window bound type: {} ".format(window_bound_type))
# This is similar to grouped_agg_pandas_udf, the only difference # is that window_agg_pandas_udf needs to repeat the return value # to match window length, where grouped_agg_pandas_udf just returns # the scalar value. arrow_return_type = to_arrow_type(return_type)
def wrapped(*series): import pandas as pd result = f(*series) return pd.Series([result]).repeat(len(series[0]))
return lambda *a: (wrapped(*a), arrow_return_type)
arrow_return_type = to_arrow_type(return_type)
def wrapped(begin_index, end_index, *series): import pandas as pd result = []
# Index operation is faster on np.ndarray, # So we turn the index series into np array # here for performance begin_array = begin_index.values end_array = end_index.values
for i in range(len(begin_array)): # Note: Create a slice from a series for each window is # actually pretty expensive. However, there # is no easy way to reduce cost here. # Note: s.iloc[i : j] is about 30% faster than s[i: j], with # the caveat that the created slices shares the same # memory with s. Therefore, user are not allowed to # change the value of input series inside the window # function. It is rare that user needs to modify the # input series in the window function, and therefore, # it is be a reasonable restriction. # Note: Calling reset_index on the slices will increase the cost # of creating slices by about 100%. Therefore, for performance # reasons we don't do it here. series_slices = [s.iloc[begin_array[i]: end_array[i]] for s in series] result.append(f(*series_slices)) return pd.Series(result)
return lambda *a: (wrapped(*a), arrow_return_type)
else:
func = chained_func else: # make sure StopIteration's raised in the user code are not ignored # when they are processed in a for loop, raise them as RuntimeError's instead
# the last returnType will be the return type of UDF return arg_offsets, wrap_scalar_pandas_udf(func, return_type) return arg_offsets, wrap_pandas_iter_udf(func, return_type) return arg_offsets, wrap_pandas_iter_udf(func, return_type) argspec = getfullargspec(chained_func) # signature was lost when wrapping it return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec) argspec = getfullargspec(chained_func) # signature was lost when wrapping it return arg_offsets, wrap_cogrouped_map_pandas_udf(func, return_type, argspec) return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type) return arg_offsets, wrap_window_agg_pandas_udf(func, return_type, runner_conf, udf_index) else: raise ValueError("Unknown eval type: {}".format(eval_type))
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):
# Load conf used for pandas_udf evaluation num_conf = read_int(infile) for i in range(num_conf): k = utf8_deserializer.loads(infile) v = utf8_deserializer.loads(infile) runner_conf[k] = v
# NOTE: if timezone is set here, that implies respectSessionTimeZone is True timezone = runner_conf.get("spark.sql.session.timeZone", None) safecheck = runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false").lower() == 'true' # Used by SQL_GROUPED_MAP_PANDAS_UDF and SQL_SCALAR_PANDAS_UDF when returning StructType assign_cols_by_name = runner_conf.get( "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true")\ .lower() == "true"
if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name) else: # Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of # pandas Series. See SPARK-27240. df_for_struct = (eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF or eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF or eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF) ser = ArrowStreamPandasUDFSerializer(timezone, safecheck, assign_cols_by_name, df_for_struct) else:
if is_scalar_iter: assert num_udfs == 1, "One SCALAR_ITER UDF expected here." if is_map_iter: assert num_udfs == 1, "One MAP_ITER UDF expected here."
arg_offsets, udf = read_single_udf( pickleSer, infile, eval_type, runner_conf, udf_index=0)
def func(_, iterator): num_input_rows = 0
def map_batch(batch): nonlocal num_input_rows
udf_args = [batch[offset] for offset in arg_offsets] num_input_rows += len(udf_args[0]) if len(udf_args) == 1: return udf_args[0] else: return tuple(udf_args)
iterator = map(map_batch, iterator) result_iter = udf(iterator)
num_output_rows = 0 for result_batch, result_type in result_iter: num_output_rows += len(result_batch) # This assert is for Scalar Iterator UDF to fail fast. # The length of the entire input can only be explicitly known # by consuming the input iterator in user side. Therefore, # it's very unlikely the output length is higher than # input length. assert is_map_iter or num_output_rows <= num_input_rows, \ "Pandas SCALAR_ITER UDF outputted more rows than input rows." yield (result_batch, result_type)
if is_scalar_iter: try: next(iterator) except StopIteration: pass else: raise RuntimeError("pandas iterator UDF should exhaust the input " "iterator.")
if num_output_rows != num_input_rows: raise RuntimeError( "The length of output in Scalar iterator pandas UDF should be " "the same with the input's; however, the length of output was %d and the " "length of input was %d." % (num_output_rows, num_input_rows))
# profiling is not supported for UDF return func, None, ser, ser
""" Helper function to extract the key and value indexes from arg_offsets for the grouped and cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for equivalent scala code.
Parameters ---------- grouped_arg_offsets: list List containing the key and value indexes of columns of the DataFrames to be passed to the udf. It consists of n repeating groups where n is the number of DataFrames. Each group has the following format: group[0]: length of group group[1]: length of key indexes group[2.. group[1] +2]: key attributes group[group[1] +3 group[0]]: value attributes """ parsed = [] idx = 0 while idx < len(grouped_arg_offsets): offsets_len = grouped_arg_offsets[idx] idx += 1 offsets = grouped_arg_offsets[idx: idx + offsets_len] split_index = offsets[0] + 1 offset_keys = offsets[1: split_index] offset_values = offsets[split_index:] parsed.append([offset_keys, offset_values]) idx += offsets_len return parsed
# We assume there is only one UDF here because grouped map doesn't # support combining multiple UDFs. assert num_udfs == 1
# See FlatMapGroupsInPandasExec for how arg_offsets are used to # distinguish between grouping attributes and data attributes arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) parsed_offsets = extract_key_value_indexes(arg_offsets)
# Create function like this: # mapper a: f([a[0]], [a[0], a[1]]) def mapper(a): keys = [a[o] for o in parsed_offsets[0][0]] vals = [a[o] for o in parsed_offsets[0][1]] return f(keys, vals) # We assume there is only one UDF here because cogrouped map doesn't # support combining multiple UDFs. assert num_udfs == 1 arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
parsed_offsets = extract_key_value_indexes(arg_offsets)
def mapper(a): df1_keys = [a[0][o] for o in parsed_offsets[0][0]] df1_vals = [a[0][o] for o in parsed_offsets[0][1]] df2_keys = [a[1][o] for o in parsed_offsets[1][0]] df2_vals = [a[1][o] for o in parsed_offsets[1][1]] return f(df1_keys, df1_vals, df2_keys, df2_vals) else:
# In the special case of a single UDF this will return a single result rather # than a tuple of results; this is the format that the JVM side expects. else:
# profiling is not supported for UDF
sys.exit(-1)
"driver %s, PySpark cannot run with different minor versions. " + "Please check environment variables PYSPARK_PYTHON and " + "PYSPARK_DRIVER_PYTHON are correctly set.") % ("%d.%d" % sys.version_info[:2], version))
# read inputs only for a barrier task
# set up memory limits
# convert to bytes
except (resource.error, OSError, ValueError) as e: # not all systems support resource limits, so warn instead of failing lineno = getframeinfo( currentframe()).lineno + 1 if currentframe() is not None else 0 print(warnings.formatwarning( "Failed to set memory limit: {0}".format(e), ResourceWarning, __file__, lineno ), file=sys.stderr)
# initialize global state # Set the task context instance here, so we can get it by TaskContext.get for # both TaskContext and BarrierTaskContext else: # read inputs for TaskContext info
# fetch name of workdir
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
# fetch names and values of broadcast variables # read the decrypted data from a server in the jvm
Broadcast(sock_file=broadcast_sock_file) else:
else:
else:
finally:
else:
# Reset task context to None. This is a guard code to avoid residual context when worker # reuse. e.__cause__ = None exc_info = "".join(traceback.format_exception(type(e), e, tb))
except IOError: # JVM close the socket pass except BaseException: # Write the error to stderr if it happened while serializing print("PySpark worker failed with exception:", file=sys.stderr) print(traceback.format_exc(), file=sys.stderr) finally:
# Mark the beginning of the accumulators section of the output
# check end of stream else: # write a different value to tell JVM to not reuse this worker
# Read information about how to connect back to the JVM from the environment. java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] (sock_file, _) = local_connect_and_auth(java_port, auth_secret) # TODO: Remove thw following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() main(sock_file, sock_file) |