Commit f166f146 authored by David Douard's avatar David Douard
Browse files

[ccplugin] make celery-monitor more robust (closes #16640842)

it may occur that a transition cannot be fired due to a hook triggered by a
previous transition. Especially the on_monitor_start() was prone to this kind
of error since it was doing all the transitions in a single transaction.
parent f5ff381b3beb
......@@ -61,9 +61,15 @@ class CeleryMonitorCommand(Command):
def on_event(self, event):
with self.repo.internal_cnx() as cnx:
for adapter in cnx.vreg['adapters']['ICeleryTask']:
adapter.on_event(cnx, event)
cnx.commit()
try:
for adapter in cnx.vreg['adapters']['ICeleryTask']:
adapter.on_event(cnx, event)
cnx.commit()
except Exception as exc:
self.error('<CeleryMonitorCommand> '
'Unexpected error on event %s:\n%s',
event, exc)
cnx.roolback()
def celery_monitor(self):
result_backend = celery.current_app.conf['CELERY_RESULT_BACKEND']
......
......@@ -22,7 +22,6 @@ import warnings
import celery
from celery.result import AsyncResult, from_serializable
from cubicweb import ValidationError
from cubicweb.entities import AnyEntity, fetch_config
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
......@@ -357,17 +356,8 @@ class CeleryTaskAdapter(ICeleryTask):
return
entity = rset.one()
transition = tr_map[event['type']]
try:
entity.cw_adapt_to('IWorkflowable').fire_transition(
transition, event.get('exception'))
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
entity.eid, entity.task_id, transition)
cnx.commit()
except ValidationError as exc:
CeleryTaskAdapter.warning(
'<CeleryTask %s (task_id %s)> faild to fire %s:\n%s',
entity.eid, entity.task_id, transition, exc)
cnx.rollback()
CeleryTaskAdapter.fire_task_transition(
entity, transition, event.get('exception'))
@staticmethod
def on_monitor_start(cnx):
......@@ -383,10 +373,24 @@ class CeleryTaskAdapter(ICeleryTask):
STATES.STARTED: 'running',
}.get(result.state)
if transition is not None:
wf = cnx.entity_from_eid(task_eid).cw_adapt_to('IWorkflowable')
wf.fire_transition(transition, result.traceback)
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
task_eid, task_id, transition)
CeleryTaskAdapter.fire_task_transition(
cnx.entity_from_eid(task_eid),
transition, result.traceback)
@staticmethod
def fire_task_transition(cnx, task, transition, traceback):
try:
wf = task.cw_adapt_to('IWorkflowable')
wf.fire_transition_if_possible(
transition, traceback)
cnx.commit()
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
task.eid, task.task_id, transition)
except Exception as exc:
CeleryTaskAdapter.error(
'<CeleryTask %s (task_id %s)> failed to fire %s:\n%s',
task.eid, task.task_id, transition, exc)
cnx.rollback()
@property
def logs(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment