Commit 0c364ddd authored by David Douard's avatar David Douard
Browse files

[entities] try to weave 'parent_task' relations between CelereyTask entities (closes #16640666)

This is done by listening at the celery event bus for 'task-xxx' events (which
is normally done by the celery-monitor cwctl command).
parent 826cf4c405a0
......@@ -20,7 +20,7 @@ import six
import warnings
import celery
from celery.result import AsyncResult
from celery.result import AsyncResult, from_serializable
from cubicweb.entities import AnyEntity, fetch_config
from cubicweb.view import EntityAdapter
......@@ -177,6 +177,7 @@ class ICeleryTask(EntityAdapter):
else:
return 0.
@property
def state(self):
return self.result.state
......@@ -247,11 +248,101 @@ class CeleryTaskAdapter(ICeleryTask):
def task_name(self):
return self.entity.task_name
def attach_result(self, result):
def tree(result, seen=None):
if seen is None:
seen = set()
if result.parent:
for r in tree(result.parent, seen):
yield r
for child in result.children or []:
for r in tree(child, seen):
yield r
if isinstance(result, AsyncResult):
try:
rresult = from_serializable(result.result)
except:
CeleryTaskAdapter.info(
'Cannot deserialize task result %s: %s',
result, result.result)
else:
for r in tree(rresult, seen):
yield r
if result.task_id not in seen:
seen.add(result.task_id)
yield result
for asr in tree(result):
task_id = six.text_type(asr.id)
rset = self._cw.find('CeleryTask', task_id=task_id)
if rset:
cwtask = rset.one()
else:
self.info("create a CeleryTask for %s" % task_id)
cwtask = self._cw.create_entity(
'CeleryTask',
task_name=six.text_type(''),
task_id=six.text_type(task_id))
if not cwtask.parent_task and self.entity is not cwtask:
self.info('Set %s parent_task to %s (%s)', cwtask.task_id,
self.entity, self.entity.task_id)
cwtask.cw_set(parent_task=self.entity)
@staticmethod
def on_event(cnx, event):
# handle weaving of parent_task relations
if event['type'] in ('task-received', 'task-sent'):
# may create the cw task entity, if needed
rset = cnx.find('CeleryTask', task_id=event['uuid'])
if not rset: # and 'name' in event:
CeleryTaskAdapter.info("create CeleryTask for %s (%s)",
event['uuid'], event['name'])
entity = cnx.create_entity(
'CeleryTask',
task_id=six.text_type(event['uuid']),
task_name=six.text_type(event['name']),
)
cnx.commit()
else:
entity = rset.one()
CeleryTaskAdapter.info(
"using existing CeleryTask %s for %s (%s)",
entity, event['uuid'], event['name'])
if entity.task_name != event['name']:
if entity.task_name:
CeleryTaskAdapter.warning(
'<CeleryTask %s (task_id %s)> already has a name: %s',
entity.eid, entity.task_id, entity.task_name)
CeleryTaskAdapter.info('set %s name to %s', entity.task_id,
event['name'])
entity.cw_set(task_name=event['name'])
elif event['type'] == 'task-succeeded':
rset = cnx.find('CeleryTask', task_id=event['uuid'])
if rset:
asresult = AsyncResult(event['uuid'])
CeleryTaskAdapter.info(
"Task %s:\n asresult=%s\n event.result=%s",
event['uuid'], asresult.result, event['result'])
root = rset.one()
root.cw_adapt_to('ICeleryTask').attach_result(asresult)
else:
CeleryTaskAdapter.warning(
'Cannot find a cw entity for %s', event['uuid'])
# manage workflow
tr_map = CeleryTaskAdapter.tr_map
if event['type'] in tr_map:
entity = cnx.find('CeleryTask', task_id=event['uuid']).one()
rset = cnx.find('CeleryTask', task_id=event['uuid'])
if not rset:
CeleryTaskAdapter.warning(
'<CeleryTask (task_id %s)> not found in database; '
'cannot set it\'s state',
event['uuid'])
return
entity = rset.one()
transition = tr_map[event['type']]
entity.cw_adapt_to('IWorkflowable').fire_transition(
transition, event.get('exception'))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment