Commit 0f3c7357 authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

[test] Don't run tasks in cubicweb test mode

Provides run_all_task() and get_tasks() methods instead.

This allow to make assertions before and after running tasks and/or introspect
task signatures.

Also, set CELERY_ALWAYS_EAGER and CELERY_EAGER_PROPAGATES_EXCEPTIONS to True by
default, this can be overriden in setUp() method.
parent a918fd0b25f4
......@@ -81,3 +81,14 @@ For instance, to start the above task in a dedicated queue named `myqueue`:
start_async_task(cnx, celery.signature('hi_there', args=('THERE',),
kwargs={'kw': 42}, queue='myqueue'))
Testing task based application
------------------------------
In CubicWeb test mode, tasks don't run automatically, use
`cubes.celerytask.entities.get_tasks()` to introspect them and
`cubes.celerytask.entities.run_all_tasks()` to run them.
Also, CELERY_ALWAYS_EAGER and CELERY_EAGER_PROPAGATES_EXCEPTIONS are set to
True by default.
......@@ -28,6 +28,21 @@ from cubicweb.server.hook import DataOperationMixIn, Operation
import cw_celerytask_helpers
_TEST_TASKS = {}
def get_tasks():
"""Return tasks to be run (for use in cubicweb test mode)"""
return _TEST_TASKS
def run_all_tasks():
"""Run all pending tasks (for use in cubicweb test mode)"""
results = {}
for task_eid, task in _TEST_TASKS.items():
results[task_eid] = task.delay()
return results
def start_async_task(cnx, name, *args, **kwargs):
task_name = six.text_type(celery.signature(name).task)
......@@ -39,10 +54,15 @@ def start_async_task(cnx, name, *args, **kwargs):
class StartCeleryTaskOp(DataOperationMixIn, Operation):
def postcommit_event(self):
for eid in self.get_data():
task = self.cnx.transaction_data.get('celerytask', {}).get(eid)
if task is not None:
task.delay()
global _TEST_TASKS
if self.cnx.vreg.config.mode == 'test':
# In test mode, task should run explicitelly with run_all_tasks()
_TEST_TASKS = self.cnx.transaction_data.get('celerytask', {})
else:
for eid in self.get_data():
task = self.cnx.transaction_data.get('celerytask', {}).get(eid)
if task is not None:
task.delay()
class CeleryTask(AnyEntity):
......@@ -202,3 +222,11 @@ class CeleryTaskAdapter(ICeleryTask):
return task_logs.read()
else:
return super(CeleryTaskAdapter, self).logs
def registration_callback(vreg):
vreg.register_all(six.itervalues(globals()), __name__)
if vreg.config.mode == 'test':
conf = celery.current_app.conf
conf['CELERY_ALWAYS_EAGER'] = True
conf['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
......@@ -29,7 +29,8 @@ from celery.bin.worker import worker as celery_worker
from cubicweb.devtools import testlib
from cubes.celerytask.entities import start_async_task, StartCeleryTaskOp
from cubes.celerytask.entities import (start_async_task, StartCeleryTaskOp,
run_all_tasks)
from cubes.celerytask.ccplugin import CeleryMonitorCommand
import cw_celerytask_helpers
......@@ -71,6 +72,11 @@ class CeleryTaskTC(testlib.CubicWebTC):
worker = celery_worker(app)
worker.run_from_argv("worker", ['-P', 'solo', '-c', '1', '-l', 'info'])
def setUp(self):
super(CeleryTaskTC, self).setUp()
conf = celery.current_app.conf
conf.CELERY_ALWAYS_EAGER = False
def wait_async_task(self, cnx, task_id, timeout=5):
result = celery.result.AsyncResult(task_id)
start = time.time()
......@@ -89,6 +95,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
cwtask_eid = start_async_task(cnx, 'success', 42).eid
cnx.commit()
cwtask = cnx.entity_from_eid(cwtask_eid)
run_all_tasks()
self.wait_async_task(cnx, cwtask.task_id)
result = cwtask.cw_adapt_to('ICeleryTask').result
self.assertEqual(result.get(), 42)
......@@ -99,6 +106,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
with self.admin_access.repo_cnx() as cnx:
cwtask_eid = start_async_task(cnx, 'fail').eid
cnx.commit()
run_all_tasks()
cwtask = cnx.entity_from_eid(cwtask_eid)
self.wait_async_task(cnx, cwtask.task_id)
result = cwtask.cw_adapt_to('ICeleryTask').result
......@@ -118,6 +126,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
self.assertIsNone(run.result)
run.cw_adapt_to('IWorkflowable').fire_transition('start')
cnx.commit()
run_all_tasks()
run = cnx.entity_from_eid(run.eid)
self.wait_async_task(cnx, run.task_id)
......@@ -133,6 +142,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
cnx.transaction_data['celerytask'] = {42: task}
StartCeleryTaskOp.get_instance(cnx).add_data(42)
cnx.commit()
run_all_tasks()
result = self.wait_async_task(cnx, task_id)
self.assertEqual(result.get(), 10)
......@@ -140,6 +150,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
with self.admin_access.repo_cnx() as cnx:
cwtask_eid = start_async_task(cnx, 'log').eid
cnx.commit()
run_all_tasks()
cwtask = cnx.entity_from_eid(cwtask_eid)
self.wait_async_task(cnx, cwtask.task_id)
......@@ -169,6 +180,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
task = celery.chain(s("add", (2, 2)), s("add", (4,)))
cwtask = start_async_task(cnx, task)
cnx.commit()
run_all_tasks()
result = self.wait_async_task(cnx, cwtask.task_id)
self.assertEqual(result.get(), 8)
......@@ -184,6 +196,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
cwtask = start_async_task(cnx, task)
cnx.commit()
self.assertEqual(cwtask.task_name, u'celery.group')
run_all_tasks()
result = self.wait_async_task(cnx, cwtask.task_id)
# Group task return the latest subtask
self.assertEqual(result.get(), 8)
......@@ -203,6 +216,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
cwtask = start_async_task(cnx, task)
cnx.commit()
self.assertEqual(cwtask.task_name, u'celery.chord')
run_all_tasks()
result = self.wait_async_task(cnx, cwtask.task_id)
self.assertEqual(result.get(), 45)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment