Commit b0eb28cb authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

celery-monitor: fix syncing tasks for each ICeleryTask

When a cube inherit from CeleryTaskAdapter, celery-monitor was running
sync_task_state for each of them.

Replace this by adding a `sync_task_state()` function that first get or create
a CeleryTask, then adapt it to ICeleryTask and call `sync_state()` method.
parent 545f37e2801d
......@@ -25,6 +25,7 @@ import redis.exceptions
from cubicweb.toolsutils import Command
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.cwctl import CWCTL
from cubes.celerytask.entities import sync_task_state
from cw_celerytask_helpers.monitor import MONITOR_KEY
from cw_celerytask_helpers.utils import get_redis_client
......@@ -109,21 +110,19 @@ class CeleryMonitorCommand(Command):
continue
payload = json.loads(data.decode())
task_id, task_name = payload['task_id'], payload['task_name']
for adapter in cnx.vreg['adapters']['ICeleryTask']:
try:
adapter.sync_task_state(cnx, task_id,
task_name=task_name)
except Exception:
adapter.exception('Unhandled exception while syncing '
'task <Task %s (%s)>', task_id,
task_name)
cnx.rollback()
if test:
# we should not hide exceptions in tests
raise
else:
# success, drop item from PENDING_KEY
client.lrem(PENDING_KEY, data, num=1)
try:
sync_task_state(cnx, task_id, task_name)
except Exception:
logger.exception('Unhandled exception while syncing '
'task <Task %s (%s)>', task_id,
task_name)
cnx.rollback()
if test:
# we should not hide exceptions in tests
raise
else:
# success, drop item from PENDING_KEY
client.lrem(PENDING_KEY, data, num=1)
CWCTL.register(CeleryMonitorCommand)
......@@ -68,6 +68,23 @@ def run_all_tasks(cnx=None):
return results
def sync_task_state(cnx, task_id, task_name):
log = CeleryTaskAdapter
task_id = six.text_type(task_id)
result = AsyncResult(task_id)
if result.state == 'PENDING':
log.info('Task %s state is unknown', task_id)
return
try:
task = cnx.find('CeleryTask', task_id=task_id).one()
except NoResultError:
task = cnx.create_entity('CeleryTask', task_id=task_id,
task_name=task_name or UNKNOWN_TASK_NAME)
log.info('Created <CeleryTask %s (task_id %s)>',
task.eid, task_id)
task.cw_adapt_to('ICeleryTask').sync_state(task_id, task_name)
def start_async_task(cnx, task, *args, **kwargs):
"""Create and start a new task
......@@ -154,10 +171,9 @@ class ICeleryTask(EntityAdapter):
"""
return celery.signature(name, args=args, kwargs=kwargs)
@staticmethod
def sync_task_state(cnx, task_id, task_name=None):
def sync_state(self, task_id, task_name):
"""Triggered by celery-monitor"""
pass
raise NotImplementedError
@property
def task_id(self):
......@@ -302,33 +318,16 @@ class CeleryTaskAdapter(ICeleryTask):
self.entity, self.entity.task_id)
cwtask.cw_set(parent_task=self.entity)
@staticmethod
def sync_task_state(cnx, task_id, task_name=None, commit=True):
log = CeleryTaskAdapter
log.info('Syncing task %s state', task_id)
task_id = six.text_type(task_id)
result = AsyncResult(task_id)
state = result.state
if state == 'PENDING':
log.info('Task %s state is unknown', task_id)
# unknown state
return
try:
task = cnx.find('CeleryTask', task_id=task_id).one()
except NoResultError:
task = cnx.create_entity('CeleryTask', task_id=task_id,
task_name=task_name or UNKNOWN_TASK_NAME)
log.info('Created <CeleryTask %s (task_id %s)>',
task.eid, task_id)
if task.task_name == UNKNOWN_TASK_NAME and task_name is not None:
log.info('Update <CeleryTask %s (task_id %s)> name to %s',
task.eid, task_id, task_name)
task.cw_set(task_name=six.text_type(task_name))
def sync_state(self, task_id, task_name, commit=True):
if (self.entity.task_name == UNKNOWN_TASK_NAME
and task_name is not None):
self.info('Update <CeleryTask %s (task_id %s)> name to %s',
self.entity.eid, task_id, task_name)
self.entity.cw_set(task_name=six.text_type(task_name))
result = self.result
if result.ready():
task.cw_adapt_to('ICeleryTask').attach_result(result)
self.attach_result(result)
transition = {
STATES.SUCCESS: 'finish',
......@@ -338,16 +337,16 @@ class CeleryTaskAdapter(ICeleryTask):
'PROGRESS': 'start',
}.get(result.state)
if transition is not None:
log.info('<CeleryTask %s (task_id %s)> %s', task.eid, task_id,
transition)
task.cw_adapt_to('IWorkflowable').fire_transition_if_possible(
transition, result.traceback)
self.info('<CeleryTask %s (task_id %s)> %s', self.entity.eid,
task_id, transition)
wf = self.entity.cw_adapt_to('IWorkflowable')
wf.fire_transition_if_possible(transition, result.traceback)
else:
log.info('<CeleryTask %s (task_id %s)> no transition found for '
'state %s', task.eid, task_id, result.state)
self.info('<CeleryTask %s (task_id %s)> no transition found for '
'state %s', self.entity.eid, task_id, result.state)
if commit:
cnx.commit()
self._cw.commit()
@property
def logs(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment