Package pyspark :: Module daemon
[frames] | no frames]

Source Code for Module pyspark.daemon

  1  import os 
  2  import signal 
  3  import socket 
  4  import sys 
  5  import traceback 
  6  import multiprocessing 
  7  from ctypes import c_bool 
  8  from errno import EINTR, ECHILD 
  9  from socket import AF_INET, SOCK_STREAM, SOMAXCONN 
 10  from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN 
 11  from pyspark.worker import main as worker_main 
 12  from pyspark.serializers import write_int 
 13   
 14  try: 
 15      POOLSIZE = multiprocessing.cpu_count() 
 16  except NotImplementedError: 
 17      POOLSIZE = 4 
 18   
 19  exit_flag = multiprocessing.Value(c_bool, False) 
 20   
 21   
22 -def should_exit():
23 global exit_flag 24 return exit_flag.value
25 26
27 -def compute_real_exit_code(exit_code):
28 # SystemExit's code can be integer or string, but os._exit only accepts integers 29 import numbers 30 if isinstance(exit_code, numbers.Integral): 31 return exit_code 32 else: 33 return 1
34 35
36 -def worker(listen_sock):
37 # Redirect stdout to stderr 38 os.dup2(2, 1) 39 sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1 40 41 # Manager sends SIGHUP to request termination of workers in the pool 42 def handle_sighup(*args): 43 assert should_exit()
44 signal.signal(SIGHUP, handle_sighup) 45 46 # Cleanup zombie children 47 def handle_sigchld(*args): 48 pid = status = None 49 try: 50 while (pid, status) != (0, 0): 51 pid, status = os.waitpid(0, os.WNOHANG) 52 except EnvironmentError as err: 53 if err.errno == EINTR: 54 # retry 55 handle_sigchld() 56 elif err.errno != ECHILD: 57 raise 58 signal.signal(SIGCHLD, handle_sigchld) 59 60 # Handle clients 61 while not should_exit(): 62 # Wait until a client arrives or we have to exit 63 sock = None 64 while not should_exit() and sock is None: 65 try: 66 sock, addr = listen_sock.accept() 67 except EnvironmentError as err: 68 if err.errno != EINTR: 69 raise 70 71 if sock is not None: 72 # Fork a child to handle the client. 73 # The client is handled in the child so that the manager 74 # never receives SIGCHLD unless a worker crashes. 75 if os.fork() == 0: 76 # Leave the worker pool 77 signal.signal(SIGHUP, SIG_DFL) 78 signal.signal(SIGCHLD, SIG_DFL) 79 listen_sock.close() 80 # Read the socket using fdopen instead of socket.makefile() because the latter 81 # seems to be very slow; note that we need to dup() the file descriptor because 82 # otherwise writes also cause a seek that makes us miss data on the read side. 83 infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) 84 outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) 85 exit_code = 0 86 try: 87 worker_main(infile, outfile) 88 except SystemExit as exc: 89 exit_code = exc.code 90 finally: 91 outfile.flush() 92 sock.close() 93 os._exit(compute_real_exit_code(exit_code)) 94 else: 95 sock.close() 96 97
98 -def launch_worker(listen_sock):
99 if os.fork() == 0: 100 try: 101 worker(listen_sock) 102 except Exception as err: 103 traceback.print_exc() 104 os._exit(1) 105 else: 106 assert should_exit() 107 os._exit(0)
108 109
110 -def manager():
111 # Create a new process group to corral our children 112 os.setpgid(0, 0) 113 114 # Create a listening socket on the AF_INET loopback interface 115 listen_sock = socket.socket(AF_INET, SOCK_STREAM) 116 listen_sock.bind(('127.0.0.1', 0)) 117 listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN)) 118 listen_host, listen_port = listen_sock.getsockname() 119 write_int(listen_port, sys.stdout) 120 121 # Launch initial worker pool 122 for idx in range(POOLSIZE): 123 launch_worker(listen_sock) 124 listen_sock.close() 125 126 def shutdown(): 127 global exit_flag 128 exit_flag.value = True
129 130 # Gracefully exit on SIGTERM, don't die on SIGHUP 131 signal.signal(SIGTERM, lambda signum, frame: shutdown()) 132 signal.signal(SIGHUP, SIG_IGN) 133 134 # Cleanup zombie children 135 def handle_sigchld(*args): 136 try: 137 pid, status = os.waitpid(0, os.WNOHANG) 138 if status != 0 and not should_exit(): 139 raise RuntimeError("worker crashed: %s, %s" % (pid, status)) 140 except EnvironmentError as err: 141 if err.errno not in (ECHILD, EINTR): 142 raise 143 signal.signal(SIGCHLD, handle_sigchld) 144 145 # Initialization complete 146 sys.stdout.close() 147 try: 148 while not should_exit(): 149 try: 150 # Spark tells us to exit by closing stdin 151 if os.read(0, 512) == '': 152 shutdown() 153 except EnvironmentError as err: 154 if err.errno != EINTR: 155 shutdown() 156 raise 157 finally: 158 signal.signal(SIGTERM, SIG_DFL) 159 exit_flag.value = True 160 # Send SIGHUP to notify workers of shutdown 161 os.kill(0, SIGHUP) 162 163 164 if __name__ == '__main__': 165 manager() 166