91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
# vim: set ts=4 sw=4 et ai:
|
|
"""
|
|
Unit tests for cron
|
|
"""
|
|
|
|
import unittest
|
|
import os
|
|
import shutil
|
|
import time
|
|
import sys
|
|
|
|
from gluon.newcron import (Token, crondance, subprocess_count,
|
|
SimplePool, stopcron, reset)
|
|
from gluon.fileutils import create_app, write_file
|
|
|
|
|
|
test_app_name = '_test_cron'
|
|
appdir = os.path.join('applications', test_app_name)
|
|
|
|
def setUpModule():
|
|
if not os.path.exists(appdir):
|
|
os.mkdir(appdir)
|
|
create_app(appdir)
|
|
|
|
def tearDownModule():
|
|
if os.path.exists(appdir):
|
|
shutil.rmtree(appdir)
|
|
|
|
|
|
TEST_CRONTAB = """@reboot peppe **applications/%s/cron/test.py
|
|
""" % test_app_name
|
|
TEST_SCRIPT1 = """
|
|
from os.path import join as pjoin
|
|
|
|
with open(pjoin(request.folder, 'private', 'cron_req'), 'w') as f:
|
|
f.write(str(request))
|
|
"""
|
|
TARGET = os.path.join(appdir, 'private', 'cron_req')
|
|
TEST_SCRIPT2 = """
|
|
import time
|
|
|
|
time.sleep(13)
|
|
"""
|
|
|
|
class TestCron(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
for master in (os.path.join(appdir, 'cron.master'), 'cron.master'):
|
|
if os.path.exists(master):
|
|
os.unlink(master)
|
|
|
|
def test_1_Token(self):
|
|
app_path = os.path.join(os.getcwd(), 'applications', test_app_name)
|
|
t = Token(path=app_path)
|
|
self.assertEqual(t.acquire(), t.now)
|
|
self.assertFalse(t.release())
|
|
self.assertIsNone(t.acquire())
|
|
self.assertTrue(t.release())
|
|
|
|
def test_2_crondance(self):
|
|
base = os.path.join(appdir, 'cron')
|
|
write_file(os.path.join(base, 'crontab'), TEST_CRONTAB)
|
|
write_file(os.path.join(base, 'test.py'), TEST_SCRIPT1)
|
|
if os.path.exists(TARGET):
|
|
os.unlink(TARGET)
|
|
crondance(os.getcwd(), 'hard', startup=True, apps=[test_app_name])
|
|
# must wait for the cron task, not very reliable
|
|
time.sleep(1)
|
|
while subprocess_count():
|
|
time.sleep(1)
|
|
time.sleep(1)
|
|
self.assertTrue(os.path.exists(TARGET))
|
|
|
|
def test_3_SimplePool(self):
|
|
base = os.path.join(appdir, 'cron')
|
|
write_file(os.path.join(base, 'test.py'), TEST_SCRIPT2)
|
|
w2p_path = os.path.join(os.getcwd(), 'web2py.py')
|
|
self.assertTrue(os.path.exists(w2p_path))
|
|
launcher = SimplePool(1)
|
|
cmd1 = [sys.executable, w2p_path,
|
|
'--cron_job', '--no_banner', '--no_gui', '--plain',
|
|
'-S', test_app_name, '-R', "applications/%s/cron/test.py" % test_app_name]
|
|
self.assertTrue(launcher(cmd1))
|
|
self.assertFalse(launcher(None))
|
|
time.sleep(1)
|
|
stopcron()
|
|
time.sleep(1)
|
|
reset()
|