diff --git a/gluon/main.py b/gluon/main.py index e54c16aa..9d049344 100644 --- a/gluon/main.py +++ b/gluon/main.py @@ -556,11 +556,9 @@ def wsgibase(environ, responder): return wsgibase(new_environ, responder) if global_settings.web2py_crontype == 'soft': - # FIXME: calling softcron crondance in a new thread at every - # request is not a good idea in a long running process cmd_opts = global_settings.cmd_options newcron.softcron(global_settings.applications_parent, - apps=cmd_opts and cmd_opts.crontabs).start() + apps=cmd_opts and cmd_opts.crontabs) return http_response.to(responder, env=env) diff --git a/gluon/newcron.py b/gluon/newcron.py index b5f11293..a5888ad5 100644 --- a/gluon/newcron.py +++ b/gluon/newcron.py @@ -5,6 +5,7 @@ | This file is part of the web2py Web Framework | Created by Attila Csipa | Modified by Massimo Di Pierro +| Worker, SoftWorker and SimplePool added by Paolo Pastori | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) Cron-style interface @@ -24,24 +25,23 @@ import shlex from gluon import fileutils from gluon._compat import to_bytes, pickle from pydal.contrib import portalocker -from gluon.settings import global_settings logger_name = 'web2py.cron' -_cron_stopping = False +_stopping = False def reset(): - global _cron_stopping - _cron_stopping = False + global _stopping + _stopping = False -_cron_subprocs_lock = threading.RLock() -_cron_subprocs = [] +_subprocs_lock = threading.RLock() +_subprocs = [] def subprocess_count(): - with _cron_subprocs_lock: - return len(_cron_subprocs) + with _subprocs_lock: + return len(_subprocs) def absolute_path_link(path): @@ -59,11 +59,11 @@ def absolute_path_link(path): def stopcron(): """Graceful shutdown of cron""" - global _cron_stopping - _cron_stopping = True + global _stopping + _stopping = True while subprocess_count(): - with _cron_subprocs_lock: - proc = _cron_subprocs.pop() + with _subprocs_lock: + proc = _subprocs.pop() if proc.poll() is None: try: proc.terminate() @@ -71,17 +71,9 @@ def stopcron(): getLogger(logger_name).exception('error in stopcron') -class extcron(threading.Thread): - - def __init__(self, applications_parent, apps=None): - threading.Thread.__init__(self) - self.setDaemon(False) - self.path = applications_parent - self.apps = apps - - def run(self): - getLogger(logger_name).debug('external cron invocation') - crondance(self.path, 'external', startup=False, apps=self.apps) +def extcron(applications_parent, apps=None): + getLogger(logger_name).debug('external cron invocation') + crondance(applications_parent, 'external', startup=False, apps=apps) class hardcron(threading.Thread): @@ -96,7 +88,7 @@ class hardcron(threading.Thread): crondance(self.path, 'hard', startup=True, apps=self.apps) def launch(self): - if not _cron_stopping: + if not _stopping: self.logger.debug('hard cron invocation') crondance(self.path, 'hard', startup=False, apps=self.apps) @@ -104,23 +96,19 @@ class hardcron(threading.Thread): self.logger = getLogger(logger_name) self.logger.info('hard cron daemon started') s = sched.scheduler(time.time, time.sleep) - while not _cron_stopping: + while not _stopping: now = time.time() s.enter(60 - now % 60, 1, self.launch, ()) s.run() -class softcron(threading.Thread): - - def __init__(self, applications_parent, apps=None): - threading.Thread.__init__(self) - self.path = applications_parent - self.apps = apps - - def run(self): - if not _cron_stopping: - getLogger(logger_name).debug('soft cron invocation') - crondance(self.path, 'soft', startup=False, apps=self.apps) +def softcron(applications_parent, apps=None): + logger = getLogger(logger_name) + try: + if not _dancer((applications_parent, apps)): + logger.warning('no thread available for soft crondance') + except Exception: + logger.exception('error executing soft crondance') class Token(object): @@ -257,36 +245,144 @@ def parsecronline(line): return task -class cronlauncher(threading.Thread): +class Worker(threading.Thread): - def __init__(self, cmd): + def __init__(self, pool): threading.Thread.__init__(self) - self.cmd = cmd + self.setDaemon(True) + self.pool = pool + self.run_lock = threading.Lock() + self.run_lock.acquire() + self.payload = None def run(self): - import subprocess logger = getLogger(logger_name) - proc = subprocess.Popen(self.cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - with _cron_subprocs_lock: - _cron_subprocs.append(proc) - (stdoutdata, stderrdata) = proc.communicate() - try: - with _cron_subprocs_lock: - _cron_subprocs.remove(proc) - except ValueError: - pass - if proc.returncode != 0: - logger.warning('%r call returned code %s:\n%s\n%s', - ' '.join(self.cmd), proc.returncode, stdoutdata, stderrdata) - else: - logger.debug('%r call returned success:\n%s', - ' '.join(self.cmd), stdoutdata) + logger.info('Worker %s: started', self.name) + while True: + try: + with self.run_lock: # waiting for run_lock.release() + cmd = ' '.join(self.payload) + logger.debug('Worker %s: now calling %r', self.name, cmd) + import subprocess + proc = subprocess.Popen(self.payload, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + with _subprocs_lock: + _subprocs.append(proc) + stdoutdata, stderrdata = proc.communicate() + try: + with _subprocs_lock: + _subprocs.remove(proc) + except ValueError: + pass + if proc.returncode != 0: + logger.warning('Worker %s: %r call returned code %s:\n%s\n%s', + self.name, cmd, proc.returncode, stdoutdata, stderrdata) + else: + logger.debug('Worker %s: %r call returned success:\n%s', + self.name, cmd, stdoutdata) + finally: + self.run_lock.acquire() + self.pool.stop(self) -def crondance(applications_parent, ctype='soft', startup=False, apps=None): +class SoftWorker(threading.Thread): + + def __init__(self, pool): + threading.Thread.__init__(self) + self.setDaemon(True) + self.pool = pool + self.run_lock = threading.Lock() + self.run_lock.acquire() + self.payload = None + + def run(self): + logger = getLogger(logger_name) + logger.info('SoftWorker %s: started', self.name) + while True: + try: + with self.run_lock: # waiting for run_lock.release() + getLogger(logger_name).debug('soft cron invocation') + applications_parent, apps = self.payload + crondance(applications_parent, 'soft', startup=False, apps=apps) + finally: + self.run_lock.acquire() + self.pool.stop(self) + + +class SimplePool(object): + """ + Very simple thread pool, + (re)uses a maximum number of threads to launch cron tasks. + + Pool size can be incremented after initialization, + this allows delayed configuration of a global instance + for the case you do not want to use lazy initialization. + """ + + def __init__(self, size, worker_cls=Worker): + """ + Create the pool setting initial size. + + Notice that no thread is created until the instance is called. + """ + self.size = size + self.worker_cls = worker_cls + self.lock = threading.RLock() + self.idle = list() + self.running = set() + + def grow(self, size): + if size > self.size: + self.size = size + + def start(self, t): + with self.lock: + try: + self.idle.remove(t) + except ValueError: + pass + self.running.add(t) + + def stop(self, t): + with self.lock: + self.idle.append(t) + try: + self.running.remove(t) + except KeyError: + pass + + def __call__(self, payload): + """ + Pass payload to a thread for immediate execution. + + Returns a boolean indicating if a thread is available. + """ + with self.lock: + if len(self.running) == self.size: + # no worker available + return False + idle_num = len(self.idle) + if idle_num: + # use an existing (idle) thread + t = self.idle.pop(0) + else: + # create a new thread + t = self.worker_cls(self) + self.start(t) + t.payload = payload + t.run_lock.release() + if not idle_num: + t.start() + return True + + +_dancer = SimplePool(5, worker_cls=SoftWorker) + +_launcher = SimplePool(5) + +def crondance(applications_parent, ctype='hard', startup=False, apps=None): """ Does the periodic job of cron service: read the crontab(s) and launch the various commands. @@ -296,96 +392,94 @@ def crondance(applications_parent, ctype='soft', startup=False, apps=None): cronmaster = token.acquire(startup=startup) if not cronmaster: return - now_s = time.localtime() - checks = (('min', now_s.tm_min), - ('hr', now_s.tm_hour), - ('mon', now_s.tm_mon), - ('dom', now_s.tm_mday), - ('dow', (now_s.tm_wday + 1) % 7)) + try: + now_s = time.localtime() + checks = (('min', now_s.tm_min), + ('hr', now_s.tm_hour), + ('mon', now_s.tm_mon), + ('dom', now_s.tm_mday), + ('dow', (now_s.tm_wday + 1) % 7)) + logger = getLogger(logger_name) - logger = getLogger(logger_name) + if not apps: + apps = [x for x in os.listdir(apppath) + if os.path.isdir(os.path.join(apppath, x))] - if not apps: - apps = [x for x in os.listdir(apppath) - if os.path.isdir(os.path.join(apppath, x))] + full_apath_links = set() - full_apath_links = set() - - if sys.executable.lower().endswith('pythonservice.exe'): - _python_exe = os.path.join(sys.exec_prefix, 'python.exe') - else: - _python_exe = sys.executable - base_commands = [_python_exe] - w2p_path = fileutils.abspath('web2py.py', gluon=True) - if os.path.exists(w2p_path): - base_commands.append(w2p_path) - if applications_parent != global_settings.gluon_parent: - base_commands.extend(('-f', applications_parent)) - base_commands.extend(('--cron_job', '--no_banner', '--no_gui', '--plain')) - - for app in apps: - if _cron_stopping: - break - apath = os.path.join(apppath, app) - - # if app is a symbolic link to other app, skip it - full_apath_link = absolute_path_link(apath) - if full_apath_link in full_apath_links: - continue + if sys.executable.lower().endswith('pythonservice.exe'): + _python_exe = os.path.join(sys.exec_prefix, 'python.exe') else: - full_apath_links.add(full_apath_link) + _python_exe = sys.executable + base_commands = [_python_exe] + w2p_path = fileutils.abspath('web2py.py', gluon=True) + if os.path.exists(w2p_path): + base_commands.append(w2p_path) + base_commands.extend(('--cron_job', '--no_banner', '--no_gui', '--plain')) - cronpath = os.path.join(apath, 'cron') - crontab = os.path.join(cronpath, 'crontab') - if not os.path.exists(crontab): - continue - try: - cronlines = [line.strip() for line in fileutils.readlines_file(crontab, 'rt')] - lines = [line for line in cronlines if line and not line.startswith('#')] - tasks = [parsecronline(cline) for cline in lines] - except Exception as e: - logger.error('crontab read error %s', e) - continue - - for task in tasks: - if _cron_stopping: + for app in apps: + if _stopping: break - if not task: - continue - task_min = task.get('min', []) - if not startup and task_min == [-1]: - continue - citems = [(k in task and not v in task[k]) for k, v in checks] - if task_min != [-1] and reduce(lambda a, b: a or b, citems): - continue + apath = os.path.join(apppath, app) - logger.info('%s cron: %s executing %r in %s at %s', - ctype, app, task.get('cmd'), - os.getcwd(), datetime.datetime.now()) - action = models = False - command = task['cmd'] - if command.startswith('**'): - action = True - command = command[2:] - elif command.startswith('*'): - action = models = True - command = command[1:] - - if action: - commands = base_commands[:] - if command.endswith('.py'): - commands.extend(('-S', app, '-R', command)) - else: - commands.extend(('-S', app + '/' + command)) - if models: - commands.append('-M') + # if app is a symbolic link to other app, skip it + full_apath_link = absolute_path_link(apath) + if full_apath_link in full_apath_links: + continue else: - commands = shlex.split(command) + full_apath_links.add(full_apath_link) + cronpath = os.path.join(apath, 'cron') + crontab = os.path.join(cronpath, 'crontab') + if not os.path.exists(crontab): + continue try: - # FIXME: using a new thread every time there is a task to - # launch is not a good idea in a long running process - cronlauncher(commands).start() - except Exception: - logger.exception('error starting %r', task['cmd']) - token.release() + cronlines = [line.strip() for line in fileutils.readlines_file(crontab, 'rt')] + lines = [line for line in cronlines if line and not line.startswith('#')] + tasks = [parsecronline(cline) for cline in lines] + except Exception as e: + logger.error('crontab read error %s', e) + continue + + for task in tasks: + if _stopping: + break + if not task: + continue + task_min = task.get('min', []) + if not startup and task_min == [-1]: + continue + citems = [(k in task and not v in task[k]) for k, v in checks] + if task_min != [-1] and reduce(lambda a, b: a or b, citems): + continue + + logger.info('%s cron: %s executing %r in %s at %s', + ctype, app, task.get('cmd'), + os.getcwd(), datetime.datetime.now()) + action = models = False + command = task['cmd'] + if command.startswith('**'): + action = True + command = command[2:] + elif command.startswith('*'): + action = models = True + command = command[1:] + + if action: + commands = base_commands[:] + if command.endswith('.py'): + commands.extend(('-S', app, '-R', command)) + else: + commands.extend(('-S', app + '/' + command)) + if models: + commands.append('-M') + else: + commands = shlex.split(command) + + try: + if not _launcher(commands): + logger.warning('no thread available, cannot execute %r', task['cmd']) + except Exception: + logger.exception('error executing %r', task['cmd']) + finally: + token.release() diff --git a/gluon/packages/dal b/gluon/packages/dal index 66a00fa3..1fd32c51 160000 --- a/gluon/packages/dal +++ b/gluon/packages/dal @@ -1 +1 @@ -Subproject commit 66a00fa39b7d9a643395f5070501cdc9a1483348 +Subproject commit 1fd32c51338abce26b3c328887cfb513c9a2ce6f diff --git a/gluon/tests/test_cron.py b/gluon/tests/test_cron.py index c25f3ce5..b31b7f78 100644 --- a/gluon/tests/test_cron.py +++ b/gluon/tests/test_cron.py @@ -8,8 +8,10 @@ import unittest import os import shutil import time +import sys -from gluon.newcron import Token, crondance, subprocess_count +from gluon.newcron import (Token, crondance, subprocess_count, + SimplePool, stopcron, reset) from gluon.fileutils import create_app, write_file @@ -28,13 +30,18 @@ def tearDownModule(): TEST_CRONTAB = """@reboot peppe **applications/%s/cron/test.py """ % test_app_name -TEST_SCRIPT = """ +TEST_SCRIPT1 = """ from os.path import join as pjoin with open(pjoin(request.folder, 'private', 'cron_req'), 'w') as f: f.write(str(request)) """ TARGET = os.path.join(appdir, 'private', 'cron_req') +TEST_SCRIPT2 = """ +import time + +time.sleep(13) +""" class TestCron(unittest.TestCase): @@ -44,7 +51,7 @@ class TestCron(unittest.TestCase): if os.path.exists(master): os.unlink(master) - def test_Token(self): + def test_1_Token(self): app_path = os.path.join(os.getcwd(), 'applications', test_app_name) t = Token(path=app_path) self.assertEqual(t.acquire(), t.now) @@ -52,10 +59,10 @@ class TestCron(unittest.TestCase): self.assertIsNone(t.acquire()) self.assertTrue(t.release()) - def test_crondance(self): + def test_2_crondance(self): base = os.path.join(appdir, 'cron') write_file(os.path.join(base, 'crontab'), TEST_CRONTAB) - write_file(os.path.join(base, 'test.py'), TEST_SCRIPT) + write_file(os.path.join(base, 'test.py'), TEST_SCRIPT1) if os.path.exists(TARGET): os.unlink(TARGET) crondance(os.getcwd(), 'hard', startup=True, apps=[test_app_name]) @@ -65,3 +72,19 @@ class TestCron(unittest.TestCase): time.sleep(1) time.sleep(1) self.assertTrue(os.path.exists(TARGET)) + + def test_3_SimplePool(self): + base = os.path.join(appdir, 'cron') + write_file(os.path.join(base, 'test.py'), TEST_SCRIPT2) + w2p_path = os.path.join(os.getcwd(), 'web2py.py') + self.assertTrue(os.path.exists(w2p_path)) + launcher = SimplePool(1) + cmd1 = [sys.executable, w2p_path, + '--cron_job', '--no_banner', '--no_gui', '--plain', + '-S', test_app_name, '-R', "applications/%s/cron/test.py" % test_app_name] + self.assertTrue(launcher(cmd1)) + self.assertFalse(launcher(None)) + time.sleep(1) + stopcron() + time.sleep(1) + reset() diff --git a/gluon/widget.py b/gluon/widget.py index 4cfca0d2..c5944420 100644 --- a/gluon/widget.py +++ b/gluon/widget.py @@ -496,7 +496,7 @@ class web2pyDialog(object): self.update_schedulers() # softcron is stopped with HttpServer, thus if starting again - # need to reset newcron._cron_stopping to re-enable cron + # need to reset newcron._stopping to re-enable cron if self.options.soft_cron: newcron.reset() @@ -746,11 +746,9 @@ def start(): if options.cron_run: # run cron (extcron) and exit - logger.debug('Starting extcron...') + logger.debug('Running extcron...') global_settings.web2py_crontype = 'external' - extcron = newcron.extcron(options.folder, apps=options.crontabs) - extcron.start() - extcron.join() + newcron.extcron(options.folder, apps=options.crontabs) return if not options.with_scheduler and options.schedulers: