Commit 884c9fcb authored by David Douard's avatar David Douard
Browse files

[entities] improve workflow management of CeleryTasks (by celery-monitor)...

[entities] improve workflow management of CeleryTasks (by celery-monitor) robustness (related to #16640842)

the transition may fail (if for some race-like reasons it has already been fired).

Also ensure some CeleryTask changes are commited (celery-monitor)
since the transaction may be rolled back during the WF management part of the
on_event() method.
parent 3ae98145d1b4
......@@ -22,6 +22,7 @@ import warnings
import celery
from celery.result import AsyncResult, from_serializable
from cubicweb import ValidationError
from cubicweb.entities import AnyEntity, fetch_config
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
......@@ -329,6 +330,7 @@ class CeleryTaskAdapter(ICeleryTask):
CeleryTaskAdapter.info('set %s name to %s', entity.task_id,
event['name'])
entity.cw_set(task_name=event['name'])
cnx.commit()
elif event['type'] == 'task-succeeded':
rset = cnx.find('CeleryTask', task_id=event['uuid'])
......@@ -339,6 +341,7 @@ class CeleryTaskAdapter(ICeleryTask):
event['uuid'], asresult.result, event['result'])
root = rset.one()
root.cw_adapt_to('ICeleryTask').attach_result(asresult)
cnx.commit()
else:
CeleryTaskAdapter.warning(
'Cannot find a cw entity for %s', event['uuid'])
......@@ -355,10 +358,17 @@ class CeleryTaskAdapter(ICeleryTask):
return
entity = rset.one()
transition = tr_map[event['type']]
entity.cw_adapt_to('IWorkflowable').fire_transition(
transition, event.get('exception'))
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
entity.eid, entity.task_id, transition)
try:
entity.cw_adapt_to('IWorkflowable').fire_transition(
transition, event.get('exception'))
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
entity.eid, entity.task_id, transition)
cnx.commit()
except ValidationError as exc:
CeleryTaskAdapter.warning(
'<CeleryTask %s (task_id %s)> faild to fire %s:\n%s',
entity.eid, entity.task_id, transition, exc)
cnx.rollback()
@staticmethod
def on_monitor_start(cnx):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment