Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
# SystemExit's code can be integer or string, but os._exit only accepts integers if isinstance(exit_code, numbers.Integral): return exit_code else: return 1
""" Called by a worker process after the fork(). """ signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL) # restore the handler for SIGINT, # it's useful for debugging (show the stacktrace before exit) signal.signal(SIGINT, signal.default_int_handler)
# Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because # otherwise writes also cause a seek that makes us miss data on the read side. buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536)) infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size) outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
if not authenticated: client_secret = UTF8Deserializer().loads(infile) if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret: write_with_length("ok".encode("utf-8"), outfile) outfile.flush() else: write_with_length("err".encode("utf-8"), outfile) outfile.flush() sock.close() return 1
exit_code = 0 try: worker_main(infile, outfile) except SystemExit as exc: exit_code = compute_real_exit_code(exc.code) finally: try: outfile.flush() except Exception: pass return exit_code
# Create a new process group to corral our children
# Create a listening socket on the AF_INET loopback interface
# re-open stdin/stdout in 'wb' mode
# Send SIGHUP to notify workers of shutdown
# Initialization complete if ex[0] == EINTR: continue else: raise
# Spark told us to exit by closing stdin except OSError: pass # process already died
except OSError as e: if e.errno == EINTR: continue raise
# Launch a worker process except OSError as e: if e.errno in (EAGAIN, EINTR): time.sleep(1) pid = os.fork() # error here will shutdown daemon else: outfile = sock.makefile(mode='wb') write_int(e.errno, outfile) # Signal that the fork failed outfile.flush() outfile.close() sock.close() continue
# in child process listen_sock.close()
# It should close the standard input in the child process so that # Python native function executions stay intact. # # Note that if we just close the standard input (file descriptor 0), # the lowest file descriptor (file descriptor 0) will be allocated, # later when other file descriptors should happen to open. # # Therefore, here we redirects it to '/dev/null' by duplicating # another file descriptor for '/dev/null' to the standard input (0). # See SPARK-26175. devnull = open(os.devnull, 'r') os.dup2(devnull.fileno(), 0) devnull.close()
try: # Acknowledge that the fork was successful outfile = sock.makefile(mode="wb") write_int(os.getpid(), outfile) outfile.flush() outfile.close() authenticated = False while True: code = worker(sock, authenticated) if code == 0: authenticated = True if not reuse or code: # wait for closing try: while sock.recv(1024): pass except Exception: pass break gc.collect() except: traceback.print_exc() os._exit(1) else: os._exit(0) else:
finally:
|