Commit a3c78036 authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

CeleryTask: move sync task logic in sync_task_state

- strict policy on sql transaction (commit all or nothing for each task)
- make it work with on_monitor_start (used to synchronize task states when
  monitor starts in case of missing events)
- Use serializable() in "spawn" task so it force celery to use the json task
  serializer instead of pickle
- Don't always update task_name when creating subtasks outside of celerytask
  (eg. by using start_async_task), use a fixed identifier "<unknown>" as task
  name instead and only update these.
parent 926f4bed2f00
......@@ -22,6 +22,7 @@ import warnings
import celery
from celery.result import AsyncResult, from_serializable
from cubicweb import NoResultError
from cubicweb.entities import AnyEntity, fetch_config
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
......@@ -34,6 +35,7 @@ from cubes.celerytask import STATES, FINAL_STATES
_ = six.text_type
_TEST_TASKS = {}
UNKNOWN_TASK_NAME = six.text_type('<unknown>')
def get_tasks():
......@@ -217,14 +219,6 @@ class CeleryTaskAdapter(ICeleryTask):
__select__ = ICeleryTask.__select__ & is_instance('CeleryTask')
tr_map = {
'task-failed': 'fail',
'task-succeeded': 'finish',
'task-received': 'enqueue',
'task-started': 'start',
'task-revoked': 'fail',
}
def attach_task(self, task, seen, parent=None):
task_id = six.text_type(get_task_id(task))
if parent is None:
......@@ -293,7 +287,7 @@ class CeleryTaskAdapter(ICeleryTask):
self.info("create a CeleryTask for %s" % task_id)
cwtask = self._cw.create_entity(
'CeleryTask',
task_name=six.text_type(''),
task_name=UNKNOWN_TASK_NAME,
task_id=six.text_type(task_id))
if not cwtask.parent_task and self.entity is not cwtask:
self.info('Set %s parent_task to %s (%s)', cwtask.task_id,
......@@ -303,96 +297,77 @@ class CeleryTaskAdapter(ICeleryTask):
@staticmethod
def on_event(cnx, event):
# handle weaving of parent_task relations
if event['type'] in ('task-received', ):
# may create the cw task entity, if needed
rset = cnx.find('CeleryTask', task_id=event['uuid'])
if not rset: # and 'name' in event:
CeleryTaskAdapter.info("create CeleryTask for %s (%s)",
event['uuid'], event['name'])
entity = cnx.create_entity(
'CeleryTask',
task_id=six.text_type(event['uuid']),
task_name=six.text_type(event['name']),
)
cnx.commit()
else:
entity = rset.one()
CeleryTaskAdapter.info(
"using existing CeleryTask %s for %s (%s)",
entity, event['uuid'], event['name'])
if entity.task_name != event['name']:
if entity.task_name:
CeleryTaskAdapter.warning(
'<CeleryTask %s (task_id %s)> already has a name: %s',
entity.eid, entity.task_id, entity.task_name)
CeleryTaskAdapter.info('set %s name to %s', entity.task_id,
event['name'])
entity.cw_set(task_name=event['name'])
cnx.commit()
elif event['type'] == 'task-succeeded':
rset = cnx.find('CeleryTask', task_id=event['uuid'])
if rset:
asresult = AsyncResult(event['uuid'])
CeleryTaskAdapter.info(
"Task %s:\n asresult=%s\n event.result=%s",
event['uuid'], asresult.result, event['result'])
root = rset.one()
root.cw_adapt_to('ICeleryTask').attach_result(asresult)
cnx.commit()
else:
CeleryTaskAdapter.warning(
'Cannot find a cw entity for %s', event['uuid'])
# manage workflow
tr_map = CeleryTaskAdapter.tr_map
if event['type'] in tr_map:
rset = cnx.find('CeleryTask', task_id=event['uuid'])
if not rset:
CeleryTaskAdapter.warning(
'<CeleryTask (task_id %s)> not found in database; '
'cannot set it\'s state',
event['uuid'])
return
entity = rset.one()
transition = tr_map[event['type']]
CeleryTaskAdapter.fire_task_transition(
cnx,
entity, transition, event.get('exception'))
log = CeleryTaskAdapter
if event['type'] in ('task-failed', 'task-received', 'task-succeeded',
'task-revoked', 'task-started'):
try:
CeleryTaskAdapter.sync_task_state(cnx, event['uuid'],
task_name=event.get('name'))
except Exception:
log.exception('Unhandled exception while syncing task ',
'%s state', event['uuid'])
cnx.rollback()
@staticmethod
def on_monitor_start(cnx):
log = CeleryTaskAdapter
for task_eid, task_id in cnx.execute((
'Any T, TID WHERE '
'T is CeleryTask, T task_id TID, T in_state S, '
'S name in ("waiting", "queued", "running")'
)):
result = AsyncResult(task_id)
transition = {
STATES.SUCCESS: 'finish',
STATES.FAILURE: 'fail',
STATES.STARTED: 'running',
}.get(result.state)
if transition is not None:
CeleryTaskAdapter.fire_task_transition(
cnx,
cnx.entity_from_eid(task_eid),
transition, result.traceback)
try:
CeleryTaskAdapter.sync_task_state(cnx, task_id)
except Exception:
log.exception('Unhandled exception while syncing task '
'%s state', task_id)
cnx.rollback()
raise
@staticmethod
def fire_task_transition(cnx, task, transition, traceback):
def sync_task_state(cnx, task_id, task_name=None, commit=True):
log = CeleryTaskAdapter
log.info('Syncing task %s state', task_id)
task_id = six.text_type(task_id)
result = AsyncResult(task_id)
state = result.state
if state == 'PENDING':
log.info('Task %s state is unknown', task_id)
# unknown state
return
try:
wf = task.cw_adapt_to('IWorkflowable')
wf.fire_transition_if_possible(
transition, traceback)
task = cnx.find('CeleryTask', task_id=task_id).one()
except NoResultError:
task = cnx.create_entity('CeleryTask', task_id=task_id,
task_name=task_name or UNKNOWN_TASK_NAME)
log.info('Created <CeleryTask %s (task_id %s)>',
task.eid, task_id)
if task.task_name == UNKNOWN_TASK_NAME and task_name is not None:
log.info('Update <CeleryTask %s (task_id %s)> name to %s',
task.eid, task_id, task_name)
task.cw_set(task_name=six.text_type(task_name))
if result.ready():
task.cw_adapt_to('ICeleryTask').attach_result(result)
transition = {
STATES.SUCCESS: 'finish',
STATES.FAILURE: 'fail',
STATES.STARTED: 'running',
}.get(result.state)
if transition is not None:
log.info('<CeleryTask %s (task_id %s)> %s', task.eid, task_id,
transition)
task.cw_adapt_to('IWorkflowable').fire_transition_if_possible(
transition, result.traceback)
else:
log.info('<CeleryTask %s (task_id %s)> no transition found for '
'state %s', task.eid, task_id, result.state)
if commit:
cnx.commit()
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
task.eid, task.task_id, transition)
except Exception as exc:
CeleryTaskAdapter.error(
'<CeleryTask %s (task_id %s)> failed to fire %s:\n%s',
task.eid, task.task_id, transition, exc)
cnx.rollback()
@property
def logs(self):
......
......@@ -62,5 +62,5 @@ def tsum(args):
@app.task(bind=True, name="spawn")
def spawn(self):
return chord((success.s(i) for i in range(10)), tsum.s() | add.s(-45))()
return chord((success.s(i) for i in range(10)), tsum.s() | add.s(-45))().serializable()
......@@ -16,6 +16,7 @@
"""cubicweb-celerytask automatic tests"""
import logging
import unittest
import celery.result
......@@ -170,6 +171,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
def test_workflow_subtasks(self):
with self.admin_access.repo_cnx() as cnx:
logging.getLogger('cubicweb.appobject').setLevel(logging.DEBUG)
s = celery.signature
task = s("spawn")
cwtask = start_async_task(cnx, task)
......@@ -179,18 +181,19 @@ class CeleryTaskTC(BaseCeleryTaskTC):
asresult = self.wait_async_task(cnx, cwtask.task_id)
result = celery.result.from_serializable(asresult.result)
self.assertEqual(0, result.get())
if False:
# require a running CeleryMonitor, and I don't know
# how to get one running while executing the tests...
children = cwtask.reverse_parent_task
names = [child.task_name for child in children]
self.assertEqual(10, len([x for x in names
if x == 'success']))
self.assertEqual(1, len([x for x in names
if x == 'tsum']))
self.assertEqual(1, len([x for x in names
if x == 'add']))
cwtask = cnx.entity_from_eid(cwtask.eid)
self.assertEqual(cwtask.cw_adapt_to('IWorkflowable').state, "done")
children = cwtask.reverse_parent_task
sync_task_state = cwtask.cw_adapt_to('ICeleryTask').sync_task_state
for task in children:
sync_task_state(cnx, task.task_id)
self.assertEqual(len(children), 11)
# XXX: cannot check children task names since they are not
# availables in broker
self.assertEqual(set([c.task_name for c in children]),
set(['<unknown>']))
self.assertEqual(set([c.cw_adapt_to('IWorkflowable').state
for c in children]), set(['done']))
if __name__ == '__main__':
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment