Merge pull request #2269 from bmiklautz/spawn_v2

Fix scheduler issue on linux related to multiprocessing
This commit is contained in:
mdipierro
2019-10-27 21:27:04 -07:00
committed by GitHub

View File

@@ -596,12 +596,12 @@ class Scheduler(threading.Thread):
utc_time(bool): do all datetime calculations assuming UTC as the
timezone. Remember to pass `start_time` and `stop_time` to tasks
accordingly
use_spawn(bool): use spawn for subprocess (only useable with python3)
"""
def __init__(self, db, tasks=None, migrate=True,
worker_name=None, group_names=None, heartbeat=HEARTBEAT,
max_empty_runs=0, discard_results=False, utc_time=False):
max_empty_runs=0, discard_results=False, utc_time=False, use_spawn=False):
threading.Thread.__init__(self)
self.setDaemon(True)
@@ -639,6 +639,7 @@ class Scheduler(threading.Thread):
current._scheduler = self
self.define_tables(db, migrate=migrate)
self.use_spawn = use_spawn
def execute(self, task):
"""Start the background process.
@@ -649,10 +650,19 @@ class Scheduler(threading.Thread):
Returns:
a `TaskReport` object
"""
outq = multiprocessing.Queue()
retq = multiprocessing.Queue(maxsize=1)
self.process = p = \
multiprocessing.Process(target=executor, args=(retq, task, outq))
outq = None
retq = None
if (self.use_spawn and not PY2):
ctx = multiprocessing.get_context('spawn')
outq = ctx.Queue()
retq = ctx.Queue(maxsize=1)
sel.process = p = ctx.Process(target=executor, args=(retq, task, outq))
else:
outq = multiprocessing.Queue()
retq = multiprocessing.Queue(maxsize=1)
self.process = p = \
multiprocessing.Process(target=executor, args=(retq, task, outq))
self.process_queues = (retq, outq)
logger.debug(' task starting')