Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

import numbers 

import os 

import signal 

import select 

import socket 

import sys 

import traceback 

import time 

import gc 

from errno import EINTR, EAGAIN 

from socket import AF_INET, SOCK_STREAM, SOMAXCONN 

from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT 

 

from pyspark.worker import main as worker_main 

from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer 

 

 

def compute_real_exit_code(exit_code): 

# SystemExit's code can be integer or string, but os._exit only accepts integers 

if isinstance(exit_code, numbers.Integral): 

return exit_code 

else: 

return 1 

 

 

def worker(sock, authenticated): 

""" 

Called by a worker process after the fork(). 

""" 

signal.signal(SIGHUP, SIG_DFL) 

signal.signal(SIGCHLD, SIG_DFL) 

signal.signal(SIGTERM, SIG_DFL) 

# restore the handler for SIGINT, 

# it's useful for debugging (show the stacktrace before exit) 

signal.signal(SIGINT, signal.default_int_handler) 

 

# Read the socket using fdopen instead of socket.makefile() because the latter 

# seems to be very slow; note that we need to dup() the file descriptor because 

# otherwise writes also cause a seek that makes us miss data on the read side. 

buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536)) 

infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size) 

outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size) 

 

if not authenticated: 

client_secret = UTF8Deserializer().loads(infile) 

if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret: 

write_with_length("ok".encode("utf-8"), outfile) 

outfile.flush() 

else: 

write_with_length("err".encode("utf-8"), outfile) 

outfile.flush() 

sock.close() 

return 1 

 

exit_code = 0 

try: 

worker_main(infile, outfile) 

except SystemExit as exc: 

exit_code = compute_real_exit_code(exc.code) 

finally: 

try: 

outfile.flush() 

except Exception: 

pass 

return exit_code 

 

 

def manager(): 

# Create a new process group to corral our children 

os.setpgid(0, 0) 

 

# Create a listening socket on the AF_INET loopback interface 

listen_sock = socket.socket(AF_INET, SOCK_STREAM) 

listen_sock.bind(('127.0.0.1', 0)) 

listen_sock.listen(max(1024, SOMAXCONN)) 

listen_host, listen_port = listen_sock.getsockname() 

 

# re-open stdin/stdout in 'wb' mode 

stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4) 

stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4) 

write_int(listen_port, stdout_bin) 

stdout_bin.flush() 

 

def shutdown(code): 

signal.signal(SIGTERM, SIG_DFL) 

# Send SIGHUP to notify workers of shutdown 

os.kill(0, SIGHUP) 

sys.exit(code) 

 

def handle_sigterm(*args): 

shutdown(1) 

signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM 

signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP 

signal.signal(SIGCHLD, SIG_IGN) 

 

reuse = os.environ.get("SPARK_REUSE_WORKER") 

 

# Initialization complete 

try: 

while True: 

try: 

ready_fds = select.select([0, listen_sock], [], [], 1)[0] 

except select.error as ex: 

if ex[0] == EINTR: 

continue 

else: 

raise 

 

if 0 in ready_fds: 

try: 

worker_pid = read_int(stdin_bin) 

except EOFError: 

# Spark told us to exit by closing stdin 

shutdown(0) 

try: 

os.kill(worker_pid, signal.SIGKILL) 

except OSError: 

pass # process already died 

 

if listen_sock in ready_fds: 

try: 

sock, _ = listen_sock.accept() 

except OSError as e: 

if e.errno == EINTR: 

continue 

raise 

 

# Launch a worker process 

try: 

pid = os.fork() 

except OSError as e: 

if e.errno in (EAGAIN, EINTR): 

time.sleep(1) 

pid = os.fork() # error here will shutdown daemon 

else: 

outfile = sock.makefile(mode='wb') 

write_int(e.errno, outfile) # Signal that the fork failed 

outfile.flush() 

outfile.close() 

sock.close() 

continue 

 

160 ↛ 162line 160 didn't jump to line 162, because the condition on line 160 was never true if pid == 0: 

# in child process 

listen_sock.close() 

 

# It should close the standard input in the child process so that 

# Python native function executions stay intact. 

# 

# Note that if we just close the standard input (file descriptor 0), 

# the lowest file descriptor (file descriptor 0) will be allocated, 

# later when other file descriptors should happen to open. 

# 

# Therefore, here we redirects it to '/dev/null' by duplicating 

# another file descriptor for '/dev/null' to the standard input (0). 

# See SPARK-26175. 

devnull = open(os.devnull, 'r') 

os.dup2(devnull.fileno(), 0) 

devnull.close() 

 

try: 

# Acknowledge that the fork was successful 

outfile = sock.makefile(mode="wb") 

write_int(os.getpid(), outfile) 

outfile.flush() 

outfile.close() 

authenticated = False 

while True: 

code = worker(sock, authenticated) 

if code == 0: 

authenticated = True 

if not reuse or code: 

# wait for closing 

try: 

while sock.recv(1024): 

pass 

except Exception: 

pass 

break 

gc.collect() 

except: 

traceback.print_exc() 

os._exit(1) 

else: 

os._exit(0) 

else: 

sock.close() 

 

finally: 

shutdown(1) 

 

 

if __name__ == '__main__': 

manager()