1 import os
2 import signal
3 import socket
4 import sys
5 import traceback
6 import multiprocessing
7 from ctypes import c_bool
8 from errno import EINTR, ECHILD
9 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
10 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
11 from pyspark.worker import main as worker_main
12 from pyspark.serializers import write_int
13
14 try:
15 POOLSIZE = multiprocessing.cpu_count()
16 except NotImplementedError:
17 POOLSIZE = 4
18
19 exit_flag = multiprocessing.Value(c_bool, False)
20
21
25
26
28
29 import numbers
30 if isinstance(exit_code, numbers.Integral):
31 return exit_code
32 else:
33 return 1
34
35
37
38 os.dup2(2, 1)
39 sys.stdout = sys.stderr
40
41
42 def handle_sighup(*args):
43 assert should_exit()
44 signal.signal(SIGHUP, handle_sighup)
45
46
47 def handle_sigchld(*args):
48 pid = status = None
49 try:
50 while (pid, status) != (0, 0):
51 pid, status = os.waitpid(0, os.WNOHANG)
52 except EnvironmentError as err:
53 if err.errno == EINTR:
54
55 handle_sigchld()
56 elif err.errno != ECHILD:
57 raise
58 signal.signal(SIGCHLD, handle_sigchld)
59
60
61 while not should_exit():
62
63 sock = None
64 while not should_exit() and sock is None:
65 try:
66 sock, addr = listen_sock.accept()
67 except EnvironmentError as err:
68 if err.errno != EINTR:
69 raise
70
71 if sock is not None:
72
73
74
75 if os.fork() == 0:
76
77 signal.signal(SIGHUP, SIG_DFL)
78 signal.signal(SIGCHLD, SIG_DFL)
79 listen_sock.close()
80
81
82
83 infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
84 outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
85 exit_code = 0
86 try:
87 worker_main(infile, outfile)
88 except SystemExit as exc:
89 exit_code = exc.code
90 finally:
91 outfile.flush()
92 sock.close()
93 os._exit(compute_real_exit_code(exit_code))
94 else:
95 sock.close()
96
97
99 if os.fork() == 0:
100 try:
101 worker(listen_sock)
102 except Exception as err:
103 traceback.print_exc()
104 os._exit(1)
105 else:
106 assert should_exit()
107 os._exit(0)
108
109
111
112 os.setpgid(0, 0)
113
114
115 listen_sock = socket.socket(AF_INET, SOCK_STREAM)
116 listen_sock.bind(('127.0.0.1', 0))
117 listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
118 listen_host, listen_port = listen_sock.getsockname()
119 write_int(listen_port, sys.stdout)
120
121
122 for idx in range(POOLSIZE):
123 launch_worker(listen_sock)
124 listen_sock.close()
125
126 def shutdown():
127 global exit_flag
128 exit_flag.value = True
129
130
131 signal.signal(SIGTERM, lambda signum, frame: shutdown())
132 signal.signal(SIGHUP, SIG_IGN)
133
134
135 def handle_sigchld(*args):
136 try:
137 pid, status = os.waitpid(0, os.WNOHANG)
138 if status != 0 and not should_exit():
139 raise RuntimeError("worker crashed: %s, %s" % (pid, status))
140 except EnvironmentError as err:
141 if err.errno not in (ECHILD, EINTR):
142 raise
143 signal.signal(SIGCHLD, handle_sigchld)
144
145
146 sys.stdout.close()
147 try:
148 while not should_exit():
149 try:
150
151 if os.read(0, 512) == '':
152 shutdown()
153 except EnvironmentError as err:
154 if err.errno != EINTR:
155 shutdown()
156 raise
157 finally:
158 signal.signal(SIGTERM, SIG_DFL)
159 exit_flag.value = True
160
161 os.kill(0, SIGHUP)
162
163
164 if __name__ == '__main__':
165 manager()
166