Remove sleep and switch to pipe for IPC (#28561)
Use a simple multiprocessing pipe to delay exiting the parent process until after the child has been doubly forked. Using a simple IPC to allow the forked process to start avoids the control node waiting unnecessarily for lightly loaded systems.pull/4420/head
parent
c1589c33c4
commit
4947936176
|
@ -18,6 +18,7 @@ import traceback
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
import syslog
|
import syslog
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
from ansible.module_utils._text import to_text
|
from ansible.module_utils._text import to_text
|
||||||
|
|
||||||
|
@ -26,6 +27,9 @@ PY3 = sys.version_info[0] == 3
|
||||||
syslog.openlog('ansible-%s' % os.path.basename(__file__))
|
syslog.openlog('ansible-%s' % os.path.basename(__file__))
|
||||||
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:]))
|
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:]))
|
||||||
|
|
||||||
|
# pipe for communication between forked process and parent
|
||||||
|
ipc_watcher, ipc_notifier = multiprocessing.Pipe()
|
||||||
|
|
||||||
|
|
||||||
def notice(msg):
|
def notice(msg):
|
||||||
syslog.syslog(syslog.LOG_NOTICE, msg)
|
syslog.syslog(syslog.LOG_NOTICE, msg)
|
||||||
|
@ -129,6 +133,11 @@ def _run_module(wrapped_cmd, jid, job_path):
|
||||||
jobfile = open(tmp_job_path, "w")
|
jobfile = open(tmp_job_path, "w")
|
||||||
result = {}
|
result = {}
|
||||||
|
|
||||||
|
# signal grandchild process started and isolated from being terminated
|
||||||
|
# by the connection being closed sending a signal to the job group
|
||||||
|
ipc_notifier.send(True)
|
||||||
|
ipc_notifier.close()
|
||||||
|
|
||||||
outdata = ''
|
outdata = ''
|
||||||
filtered_outdata = ''
|
filtered_outdata = ''
|
||||||
stderr = ''
|
stderr = ''
|
||||||
|
@ -140,6 +149,7 @@ def _run_module(wrapped_cmd, jid, job_path):
|
||||||
if interpreter:
|
if interpreter:
|
||||||
cmd = interpreter + cmd
|
cmd = interpreter + cmd
|
||||||
script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
|
||||||
(outdata, stderr) = script.communicate()
|
(outdata, stderr) = script.communicate()
|
||||||
if PY3:
|
if PY3:
|
||||||
outdata = outdata.decode('utf-8', 'surrogateescape')
|
outdata = outdata.decode('utf-8', 'surrogateescape')
|
||||||
|
@ -241,15 +251,32 @@ if __name__ == '__main__':
|
||||||
# to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
|
# to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
|
||||||
# this probably could be done with some IPC later. Modules should always read
|
# this probably could be done with some IPC later. Modules should always read
|
||||||
# the argsfile at the very first start of their execution anyway
|
# the argsfile at the very first start of their execution anyway
|
||||||
|
|
||||||
|
# close off notifier handle in grandparent, probably unnecessary as
|
||||||
|
# this process doesn't hang around long enough
|
||||||
|
ipc_notifier.close()
|
||||||
|
|
||||||
|
# allow waiting up to 2.5 seconds in total should be long enough for worst
|
||||||
|
# loaded environment in practice.
|
||||||
|
retries = 25
|
||||||
|
while retries > 0:
|
||||||
|
if ipc_watcher.poll(0.1):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
retries = retries - 1
|
||||||
|
continue
|
||||||
|
|
||||||
notice("Return async_wrapper task started.")
|
notice("Return async_wrapper task started.")
|
||||||
print(json.dumps({"started": 1, "finished": 0, "ansible_job_id": jid, "results_file": job_path,
|
print(json.dumps({"started": 1, "finished": 0, "ansible_job_id": jid, "results_file": job_path,
|
||||||
"_ansible_suppress_tmpdir_delete": not preserve_tmp}))
|
"_ansible_suppress_tmpdir_delete": not preserve_tmp}))
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
time.sleep(1)
|
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
else:
|
else:
|
||||||
# The actual wrapper process
|
# The actual wrapper process
|
||||||
|
|
||||||
|
# close off the receiving end of the pipe from child process
|
||||||
|
ipc_watcher.close()
|
||||||
|
|
||||||
# Daemonize, so we keep on running
|
# Daemonize, so we keep on running
|
||||||
daemonize_self()
|
daemonize_self()
|
||||||
|
|
||||||
|
@ -258,6 +285,10 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
sub_pid = os.fork()
|
sub_pid = os.fork()
|
||||||
if sub_pid:
|
if sub_pid:
|
||||||
|
# close off inherited pipe handles
|
||||||
|
ipc_watcher.close()
|
||||||
|
ipc_notifier.close()
|
||||||
|
|
||||||
# the parent stops the process after the time limit
|
# the parent stops the process after the time limit
|
||||||
remaining = int(time_limit)
|
remaining = int(time_limit)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue