better cron limits number of threads, thanks P.Pastori

This commit is contained in:
mdipierro
2019-05-22 21:44:42 -07:00
parent f258cbecee
commit 94af61e732
5 changed files with 269 additions and 156 deletions

View File

@@ -556,11 +556,9 @@ def wsgibase(environ, responder):
return wsgibase(new_environ, responder)
if global_settings.web2py_crontype == 'soft':
# FIXME: calling softcron crondance in a new thread at every
# request is not a good idea in a long running process
cmd_opts = global_settings.cmd_options
newcron.softcron(global_settings.applications_parent,
apps=cmd_opts and cmd_opts.crontabs).start()
apps=cmd_opts and cmd_opts.crontabs)
return http_response.to(responder, env=env)

View File

@@ -5,6 +5,7 @@
| This file is part of the web2py Web Framework
| Created by Attila Csipa <web2py@csipa.in.rs>
| Modified by Massimo Di Pierro <mdipierro@cs.depaul.edu>
| Worker, SoftWorker and SimplePool added by Paolo Pastori
| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
Cron-style interface
@@ -24,24 +25,23 @@ import shlex
from gluon import fileutils
from gluon._compat import to_bytes, pickle
from pydal.contrib import portalocker
from gluon.settings import global_settings
logger_name = 'web2py.cron'
_cron_stopping = False
_stopping = False
def reset():
global _cron_stopping
_cron_stopping = False
global _stopping
_stopping = False
_cron_subprocs_lock = threading.RLock()
_cron_subprocs = []
_subprocs_lock = threading.RLock()
_subprocs = []
def subprocess_count():
with _cron_subprocs_lock:
return len(_cron_subprocs)
with _subprocs_lock:
return len(_subprocs)
def absolute_path_link(path):
@@ -59,11 +59,11 @@ def absolute_path_link(path):
def stopcron():
"""Graceful shutdown of cron"""
global _cron_stopping
_cron_stopping = True
global _stopping
_stopping = True
while subprocess_count():
with _cron_subprocs_lock:
proc = _cron_subprocs.pop()
with _subprocs_lock:
proc = _subprocs.pop()
if proc.poll() is None:
try:
proc.terminate()
@@ -71,17 +71,9 @@ def stopcron():
getLogger(logger_name).exception('error in stopcron')
class extcron(threading.Thread):
def __init__(self, applications_parent, apps=None):
threading.Thread.__init__(self)
self.setDaemon(False)
self.path = applications_parent
self.apps = apps
def run(self):
getLogger(logger_name).debug('external cron invocation')
crondance(self.path, 'external', startup=False, apps=self.apps)
def extcron(applications_parent, apps=None):
getLogger(logger_name).debug('external cron invocation')
crondance(applications_parent, 'external', startup=False, apps=apps)
class hardcron(threading.Thread):
@@ -96,7 +88,7 @@ class hardcron(threading.Thread):
crondance(self.path, 'hard', startup=True, apps=self.apps)
def launch(self):
if not _cron_stopping:
if not _stopping:
self.logger.debug('hard cron invocation')
crondance(self.path, 'hard', startup=False, apps=self.apps)
@@ -104,23 +96,19 @@ class hardcron(threading.Thread):
self.logger = getLogger(logger_name)
self.logger.info('hard cron daemon started')
s = sched.scheduler(time.time, time.sleep)
while not _cron_stopping:
while not _stopping:
now = time.time()
s.enter(60 - now % 60, 1, self.launch, ())
s.run()
class softcron(threading.Thread):
def __init__(self, applications_parent, apps=None):
threading.Thread.__init__(self)
self.path = applications_parent
self.apps = apps
def run(self):
if not _cron_stopping:
getLogger(logger_name).debug('soft cron invocation')
crondance(self.path, 'soft', startup=False, apps=self.apps)
def softcron(applications_parent, apps=None):
logger = getLogger(logger_name)
try:
if not _dancer((applications_parent, apps)):
logger.warning('no thread available for soft crondance')
except Exception:
logger.exception('error executing soft crondance')
class Token(object):
@@ -257,36 +245,144 @@ def parsecronline(line):
return task
class cronlauncher(threading.Thread):
class Worker(threading.Thread):
def __init__(self, cmd):
def __init__(self, pool):
threading.Thread.__init__(self)
self.cmd = cmd
self.setDaemon(True)
self.pool = pool
self.run_lock = threading.Lock()
self.run_lock.acquire()
self.payload = None
def run(self):
import subprocess
logger = getLogger(logger_name)
proc = subprocess.Popen(self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
with _cron_subprocs_lock:
_cron_subprocs.append(proc)
(stdoutdata, stderrdata) = proc.communicate()
try:
with _cron_subprocs_lock:
_cron_subprocs.remove(proc)
except ValueError:
pass
if proc.returncode != 0:
logger.warning('%r call returned code %s:\n%s\n%s',
' '.join(self.cmd), proc.returncode, stdoutdata, stderrdata)
else:
logger.debug('%r call returned success:\n%s',
' '.join(self.cmd), stdoutdata)
logger.info('Worker %s: started', self.name)
while True:
try:
with self.run_lock: # waiting for run_lock.release()
cmd = ' '.join(self.payload)
logger.debug('Worker %s: now calling %r', self.name, cmd)
import subprocess
proc = subprocess.Popen(self.payload,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
with _subprocs_lock:
_subprocs.append(proc)
stdoutdata, stderrdata = proc.communicate()
try:
with _subprocs_lock:
_subprocs.remove(proc)
except ValueError:
pass
if proc.returncode != 0:
logger.warning('Worker %s: %r call returned code %s:\n%s\n%s',
self.name, cmd, proc.returncode, stdoutdata, stderrdata)
else:
logger.debug('Worker %s: %r call returned success:\n%s',
self.name, cmd, stdoutdata)
finally:
self.run_lock.acquire()
self.pool.stop(self)
def crondance(applications_parent, ctype='soft', startup=False, apps=None):
class SoftWorker(threading.Thread):
def __init__(self, pool):
threading.Thread.__init__(self)
self.setDaemon(True)
self.pool = pool
self.run_lock = threading.Lock()
self.run_lock.acquire()
self.payload = None
def run(self):
logger = getLogger(logger_name)
logger.info('SoftWorker %s: started', self.name)
while True:
try:
with self.run_lock: # waiting for run_lock.release()
getLogger(logger_name).debug('soft cron invocation')
applications_parent, apps = self.payload
crondance(applications_parent, 'soft', startup=False, apps=apps)
finally:
self.run_lock.acquire()
self.pool.stop(self)
class SimplePool(object):
"""
Very simple thread pool,
(re)uses a maximum number of threads to launch cron tasks.
Pool size can be incremented after initialization,
this allows delayed configuration of a global instance
for the case you do not want to use lazy initialization.
"""
def __init__(self, size, worker_cls=Worker):
"""
Create the pool setting initial size.
Notice that no thread is created until the instance is called.
"""
self.size = size
self.worker_cls = worker_cls
self.lock = threading.RLock()
self.idle = list()
self.running = set()
def grow(self, size):
if size > self.size:
self.size = size
def start(self, t):
with self.lock:
try:
self.idle.remove(t)
except ValueError:
pass
self.running.add(t)
def stop(self, t):
with self.lock:
self.idle.append(t)
try:
self.running.remove(t)
except KeyError:
pass
def __call__(self, payload):
"""
Pass payload to a thread for immediate execution.
Returns a boolean indicating if a thread is available.
"""
with self.lock:
if len(self.running) == self.size:
# no worker available
return False
idle_num = len(self.idle)
if idle_num:
# use an existing (idle) thread
t = self.idle.pop(0)
else:
# create a new thread
t = self.worker_cls(self)
self.start(t)
t.payload = payload
t.run_lock.release()
if not idle_num:
t.start()
return True
_dancer = SimplePool(5, worker_cls=SoftWorker)
_launcher = SimplePool(5)
def crondance(applications_parent, ctype='hard', startup=False, apps=None):
"""
Does the periodic job of cron service: read the crontab(s) and launch
the various commands.
@@ -296,96 +392,94 @@ def crondance(applications_parent, ctype='soft', startup=False, apps=None):
cronmaster = token.acquire(startup=startup)
if not cronmaster:
return
now_s = time.localtime()
checks = (('min', now_s.tm_min),
('hr', now_s.tm_hour),
('mon', now_s.tm_mon),
('dom', now_s.tm_mday),
('dow', (now_s.tm_wday + 1) % 7))
try:
now_s = time.localtime()
checks = (('min', now_s.tm_min),
('hr', now_s.tm_hour),
('mon', now_s.tm_mon),
('dom', now_s.tm_mday),
('dow', (now_s.tm_wday + 1) % 7))
logger = getLogger(logger_name)
logger = getLogger(logger_name)
if not apps:
apps = [x for x in os.listdir(apppath)
if os.path.isdir(os.path.join(apppath, x))]
if not apps:
apps = [x for x in os.listdir(apppath)
if os.path.isdir(os.path.join(apppath, x))]
full_apath_links = set()
full_apath_links = set()
if sys.executable.lower().endswith('pythonservice.exe'):
_python_exe = os.path.join(sys.exec_prefix, 'python.exe')
else:
_python_exe = sys.executable
base_commands = [_python_exe]
w2p_path = fileutils.abspath('web2py.py', gluon=True)
if os.path.exists(w2p_path):
base_commands.append(w2p_path)
if applications_parent != global_settings.gluon_parent:
base_commands.extend(('-f', applications_parent))
base_commands.extend(('--cron_job', '--no_banner', '--no_gui', '--plain'))
for app in apps:
if _cron_stopping:
break
apath = os.path.join(apppath, app)
# if app is a symbolic link to other app, skip it
full_apath_link = absolute_path_link(apath)
if full_apath_link in full_apath_links:
continue
if sys.executable.lower().endswith('pythonservice.exe'):
_python_exe = os.path.join(sys.exec_prefix, 'python.exe')
else:
full_apath_links.add(full_apath_link)
_python_exe = sys.executable
base_commands = [_python_exe]
w2p_path = fileutils.abspath('web2py.py', gluon=True)
if os.path.exists(w2p_path):
base_commands.append(w2p_path)
base_commands.extend(('--cron_job', '--no_banner', '--no_gui', '--plain'))
cronpath = os.path.join(apath, 'cron')
crontab = os.path.join(cronpath, 'crontab')
if not os.path.exists(crontab):
continue
try:
cronlines = [line.strip() for line in fileutils.readlines_file(crontab, 'rt')]
lines = [line for line in cronlines if line and not line.startswith('#')]
tasks = [parsecronline(cline) for cline in lines]
except Exception as e:
logger.error('crontab read error %s', e)
continue
for task in tasks:
if _cron_stopping:
for app in apps:
if _stopping:
break
if not task:
continue
task_min = task.get('min', [])
if not startup and task_min == [-1]:
continue
citems = [(k in task and not v in task[k]) for k, v in checks]
if task_min != [-1] and reduce(lambda a, b: a or b, citems):
continue
apath = os.path.join(apppath, app)
logger.info('%s cron: %s executing %r in %s at %s',
ctype, app, task.get('cmd'),
os.getcwd(), datetime.datetime.now())
action = models = False
command = task['cmd']
if command.startswith('**'):
action = True
command = command[2:]
elif command.startswith('*'):
action = models = True
command = command[1:]
if action:
commands = base_commands[:]
if command.endswith('.py'):
commands.extend(('-S', app, '-R', command))
else:
commands.extend(('-S', app + '/' + command))
if models:
commands.append('-M')
# if app is a symbolic link to other app, skip it
full_apath_link = absolute_path_link(apath)
if full_apath_link in full_apath_links:
continue
else:
commands = shlex.split(command)
full_apath_links.add(full_apath_link)
cronpath = os.path.join(apath, 'cron')
crontab = os.path.join(cronpath, 'crontab')
if not os.path.exists(crontab):
continue
try:
# FIXME: using a new thread every time there is a task to
# launch is not a good idea in a long running process
cronlauncher(commands).start()
except Exception:
logger.exception('error starting %r', task['cmd'])
token.release()
cronlines = [line.strip() for line in fileutils.readlines_file(crontab, 'rt')]
lines = [line for line in cronlines if line and not line.startswith('#')]
tasks = [parsecronline(cline) for cline in lines]
except Exception as e:
logger.error('crontab read error %s', e)
continue
for task in tasks:
if _stopping:
break
if not task:
continue
task_min = task.get('min', [])
if not startup and task_min == [-1]:
continue
citems = [(k in task and not v in task[k]) for k, v in checks]
if task_min != [-1] and reduce(lambda a, b: a or b, citems):
continue
logger.info('%s cron: %s executing %r in %s at %s',
ctype, app, task.get('cmd'),
os.getcwd(), datetime.datetime.now())
action = models = False
command = task['cmd']
if command.startswith('**'):
action = True
command = command[2:]
elif command.startswith('*'):
action = models = True
command = command[1:]
if action:
commands = base_commands[:]
if command.endswith('.py'):
commands.extend(('-S', app, '-R', command))
else:
commands.extend(('-S', app + '/' + command))
if models:
commands.append('-M')
else:
commands = shlex.split(command)
try:
if not _launcher(commands):
logger.warning('no thread available, cannot execute %r', task['cmd'])
except Exception:
logger.exception('error executing %r', task['cmd'])
finally:
token.release()

View File

@@ -8,8 +8,10 @@ import unittest
import os
import shutil
import time
import sys
from gluon.newcron import Token, crondance, subprocess_count
from gluon.newcron import (Token, crondance, subprocess_count,
SimplePool, stopcron, reset)
from gluon.fileutils import create_app, write_file
@@ -28,13 +30,18 @@ def tearDownModule():
TEST_CRONTAB = """@reboot peppe **applications/%s/cron/test.py
""" % test_app_name
TEST_SCRIPT = """
TEST_SCRIPT1 = """
from os.path import join as pjoin
with open(pjoin(request.folder, 'private', 'cron_req'), 'w') as f:
f.write(str(request))
"""
TARGET = os.path.join(appdir, 'private', 'cron_req')
TEST_SCRIPT2 = """
import time
time.sleep(13)
"""
class TestCron(unittest.TestCase):
@@ -44,7 +51,7 @@ class TestCron(unittest.TestCase):
if os.path.exists(master):
os.unlink(master)
def test_Token(self):
def test_1_Token(self):
app_path = os.path.join(os.getcwd(), 'applications', test_app_name)
t = Token(path=app_path)
self.assertEqual(t.acquire(), t.now)
@@ -52,10 +59,10 @@ class TestCron(unittest.TestCase):
self.assertIsNone(t.acquire())
self.assertTrue(t.release())
def test_crondance(self):
def test_2_crondance(self):
base = os.path.join(appdir, 'cron')
write_file(os.path.join(base, 'crontab'), TEST_CRONTAB)
write_file(os.path.join(base, 'test.py'), TEST_SCRIPT)
write_file(os.path.join(base, 'test.py'), TEST_SCRIPT1)
if os.path.exists(TARGET):
os.unlink(TARGET)
crondance(os.getcwd(), 'hard', startup=True, apps=[test_app_name])
@@ -65,3 +72,19 @@ class TestCron(unittest.TestCase):
time.sleep(1)
time.sleep(1)
self.assertTrue(os.path.exists(TARGET))
def test_3_SimplePool(self):
base = os.path.join(appdir, 'cron')
write_file(os.path.join(base, 'test.py'), TEST_SCRIPT2)
w2p_path = os.path.join(os.getcwd(), 'web2py.py')
self.assertTrue(os.path.exists(w2p_path))
launcher = SimplePool(1)
cmd1 = [sys.executable, w2p_path,
'--cron_job', '--no_banner', '--no_gui', '--plain',
'-S', test_app_name, '-R', "applications/%s/cron/test.py" % test_app_name]
self.assertTrue(launcher(cmd1))
self.assertFalse(launcher(None))
time.sleep(1)
stopcron()
time.sleep(1)
reset()

View File

@@ -496,7 +496,7 @@ class web2pyDialog(object):
self.update_schedulers()
# softcron is stopped with HttpServer, thus if starting again
# need to reset newcron._cron_stopping to re-enable cron
# need to reset newcron._stopping to re-enable cron
if self.options.soft_cron:
newcron.reset()
@@ -746,11 +746,9 @@ def start():
if options.cron_run:
# run cron (extcron) and exit
logger.debug('Starting extcron...')
logger.debug('Running extcron...')
global_settings.web2py_crontype = 'external'
extcron = newcron.extcron(options.folder, apps=options.crontabs)
extcron.start()
extcron.join()
newcron.extcron(options.folder, apps=options.crontabs)
return
if not options.with_scheduler and options.schedulers: