entities.py 12 KB
Newer Older
Philippe Pepiot's avatar
Philippe Pepiot committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- coding: utf-8 -*-
# copyright 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""cubicweb-celerytask entity's classes"""
Philippe Pepiot's avatar
Philippe Pepiot committed
19
20
21
import six

import celery
22
from celery.result import AsyncResult, result_from_tuple
Philippe Pepiot's avatar
Philippe Pepiot committed
23

24
from cubicweb import NoResultError
25
from cubicweb.entities import AnyEntity, fetch_config
Philippe Pepiot's avatar
Philippe Pepiot committed
26
27
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
28
from cubicweb.server.hook import DataOperationMixIn, Operation
Philippe Pepiot's avatar
Philippe Pepiot committed
29

30
from cw_celerytask_helpers.filelogger import get_task_logs
Philippe Pepiot's avatar
Philippe Pepiot committed
31

Philippe Pepiot's avatar
Philippe Pepiot committed
32
from cubicweb_celerytask import STATES, FINAL_STATES
33

34
_ = six.text_type
35

36
_TEST_TASKS = {}
37
UNKNOWN_TASK_NAME = six.text_type('<unknown>')
38
39
40
41


def get_tasks():
    """Return tasks to be run (for use in cubicweb test mode)"""
42
    return _TEST_TASKS.copy()
43
44


45
def run_all_tasks(cnx):
46
47
    """Run all pending tasks (for use in cubicweb test mode)"""
    results = {}
48
49
50
51
    # run all tasks and gather results.
    # Tasks can create other tasks, so run them until there is no one left.
    while _TEST_TASKS:
        task_eid = list(_TEST_TASKS)[0]
52
53
54
55
        task = _TEST_TASKS.pop(task_eid)
        # Ensure current task id is in the scope of the current test
        if task.id is not None and not cnx.execute(
            'Any X WHERE X is CeleryTask, X task_id %(task_id)s',
56
            {'task_id': task.freeze().id}
57
        ):
58
            continue
59
        results[task_eid] = task.delay()
60

Philippe Pepiot's avatar
Philippe Pepiot committed
61
    if celery.current_app.conf.task_always_eager:
62
63
64
65
66
67
        for task_eid, result in results.items():
            wf = cnx.entity_from_eid(task_eid).cw_adapt_to('IWorkflowable')
            transition = {
                STATES.SUCCESS: 'finish',
                STATES.FAILURE: 'fail',
            }[result.state]
68
69
70
71
            comment = result.traceback
            if comment is not None and not isinstance(comment, six.text_type):
                comment = comment.decode('utf-8')
            wf.fire_transition(transition, comment)
72
73
    return results

Philippe Pepiot's avatar
Philippe Pepiot committed
74

75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def sync_task_state(cnx, task_id, task_name):
    log = CeleryTaskAdapter
    task_id = six.text_type(task_id)
    result = AsyncResult(task_id)
    if result.state == 'PENDING':
        log.info('Task %s state is unknown', task_id)
        return
    try:
        task = cnx.find('CeleryTask', task_id=task_id).one()
    except NoResultError:
        task = cnx.create_entity('CeleryTask', task_id=task_id,
                                 task_name=task_name or UNKNOWN_TASK_NAME)
        log.info('Created <CeleryTask %s (task_id %s)>',
                 task.eid, task_id)
    task.cw_adapt_to('ICeleryTask').sync_state(task_id, task_name)


92
93
94
95
96
97
def start_async_task(cnx, task, *args, **kwargs):
    """Create and start a new task

    `task` can be either a task name, a task object or a task signature
    """
    task_name = six.text_type(celery.signature(task).task)
98
    entity = cnx.create_entity('CeleryTask', task_name=task_name)
99
    entity.cw_adapt_to('ICeleryTask').start(task, *args, **kwargs)
Philippe Pepiot's avatar
Philippe Pepiot committed
100
101
102
    return entity


103
def task_in_backend(task_id):
104
    app = celery.current_app
Philippe Pepiot's avatar
Philippe Pepiot committed
105
    if app.conf.task_always_eager:
106
107
108
109
        return False
    else:
        backend = app.backend
        return backend.get(backend.get_key_for_task(task_id)) is not None
110
111


112
113
114
class StartCeleryTaskOp(DataOperationMixIn, Operation):

    def postcommit_event(self):
115
116
        global _TEST_TASKS
        if self.cnx.vreg.config.mode == 'test':
Philippe Pepiot's avatar
Philippe Pepiot committed
117
            # In test mode, task should run explicitly with run_all_tasks()
118
            _TEST_TASKS.update(self.cnx.transaction_data.get('celerytask', {}))
119
120
121
122
123
        else:
            for eid in self.get_data():
                task = self.cnx.transaction_data.get('celerytask', {}).get(eid)
                if task is not None:
                    task.delay()
124
125


126
127
128
129
130
131
132
class CeleryTask(AnyEntity):
    __regid__ = 'CeleryTask'
    fetch_attrs, cw_fetch_order = fetch_config(('task_name',))

    def dc_title(self):
        return self.task_name

133
134
135
    def dc_long_title(self):
        adapted = self.cw_adapt_to('ICeleryTask')
        state, finished = adapted.state, adapted.finished
136
        title = self.task_name or self._cw._('subtask')
137
138
139
        if finished:
            title = '%s (%s)' % (title, self._cw._(state))
        return title
140

141
142
143
144
145
146
    @property
    def progress(self):
        yield self.cw_adapt_to('ICeleryTask').progress
        for subtask in self.reverse_parent_task:
            yield subtask.progress

147
148
149
150
151
152
153
    @property
    def parent_tasks(self):
        yield self
        for task in self.parent_task:
            for ptask in task.parent_tasks:
                yield ptask

154
155
156
157
158
159
    def child_tasks(self):
        yield self
        for task in self.reverse_parent_task:
            for ctask in task.child_tasks():
                yield ctask

160

Philippe Pepiot's avatar
Philippe Pepiot committed
161
162
163
164
165
166
167
168
class ICeleryTask(EntityAdapter):
    __regid__ = 'ICeleryTask'
    __abstract__ = True

    def start(self, name, *args, **kwargs):
        eid = self.entity.eid
        task = self.get_task(name, *args, **kwargs)
        self._cw.transaction_data.setdefault('celerytask', {})[eid] = task
169
        StartCeleryTaskOp.get_instance(self._cw).add_data(eid)
Philippe Pepiot's avatar
Philippe Pepiot committed
170
171
172
173

    def get_task(self, name, *args, **kwargs):
        """Should return a celery task / signature or None

Arthur Lutz's avatar
Arthur Lutz committed
174
        This method is run in a precommit event
Philippe Pepiot's avatar
Philippe Pepiot committed
175
176
177
        """
        return celery.signature(name, args=args, kwargs=kwargs)

178
    def sync_state(self, task_id, task_name):
Philippe Pepiot's avatar
Philippe Pepiot committed
179
        """Triggered by celery-monitor"""
180
        raise NotImplementedError
Philippe Pepiot's avatar
Philippe Pepiot committed
181
182
183
184
185

    @property
    def task_id(self):
        raise NotImplementedError

186
187
188
189
    @property
    def task_name(self):
        raise NotImplementedError

190
191
192
193
    def revoke(self, terminate=True, signal='SIGKILL'):
        return celery.task.control.revoke(
            [self.task_id], terminate=terminate, signal=signal)

Philippe Pepiot's avatar
Philippe Pepiot committed
194
195
    @property
    def logs(self):
196
        return get_task_logs(self.task_id) or b''
Philippe Pepiot's avatar
Philippe Pepiot committed
197
198
199
200
201

    @property
    def result(self):
        return AsyncResult(self.task_id)

Laura Médioni's avatar
Laura Médioni committed
202
203
    @property
    def progress(self):
Philippe Pepiot's avatar
Philippe Pepiot committed
204
        if celery.current_app.conf.task_always_eager:
205
            return 1.
206
        result = self.result
207
        if result.info and 'progress' in result.info:
Laura Médioni's avatar
Laura Médioni committed
208
            return result.info['progress']
209
210
211
212
        elif self.entity.reverse_parent_task:
            children = self.entity.reverse_parent_task
            return sum(child.cw_adapt_to('ICeleryTask').progress
                       for child in children) / len(children)
213
        elif result.state == STATES.SUCCESS:
Laura Médioni's avatar
Laura Médioni committed
214
215
216
217
218
219
            return 1.
        else:
            return 0.

    @property
    def state(self):
220
        return self.result.state
Laura Médioni's avatar
Laura Médioni committed
221
222
223

    @property
    def finished(self):
224
        return self.state in FINAL_STATES
Laura Médioni's avatar
Laura Médioni committed
225

Philippe Pepiot's avatar
Philippe Pepiot committed
226
227
228
229
230
231

class CeleryTaskAdapter(ICeleryTask):
    """Base adapter that store task call args in the transaction"""

    __select__ = ICeleryTask.__select__ & is_instance('CeleryTask')

232
    def attach_task(self, task, seen, parent=None):
Philippe Pepiot's avatar
Philippe Pepiot committed
233
        task_id = six.text_type(task.freeze().id)
234
235
236
237
238
        if parent is None:
            parent = self.entity
        if self.entity.task_id is None:
            self.entity.cw_set(task_id=task_id)
        elif task_id not in seen:
239
            task_name = six.text_type(task.task)
240
241
            parent = self._cw.create_entity('CeleryTask',
                                            task_id=six.text_type(task_id),
242
                                            task_name=task_name,
243
244
                                            parent_task=parent)
        seen.add(task_id)
Philippe Pepiot's avatar
Philippe Pepiot committed
245
        if task.name in ('celery.chain', 'celery.group'):
246
247
            for subtask in task.tasks:
                self.attach_task(subtask, seen, parent)
248
249
        if task.name == 'celery.chord':
            self.attach_task(task.body, seen, parent)
Philippe Pepiot's avatar
Philippe Pepiot committed
250
251
            for subtask in task.tasks.tasks:
                self.attach_task(subtask, seen, parent)
Philippe Pepiot's avatar
Philippe Pepiot committed
252
253

    def get_task(self, name, *args, **kwargs):
254
        task = super(CeleryTaskAdapter, self).get_task(
Philippe Pepiot's avatar
Philippe Pepiot committed
255
            name, *args, **kwargs)
256
257
        self.attach_task(task, set())
        return task
Philippe Pepiot's avatar
Philippe Pepiot committed
258
259
260
261
262

    @property
    def task_id(self):
        return self.entity.task_id

263
264
265
266
    @property
    def task_name(self):
        return self.entity.task_name

267
268
269
270
271
    def revoke(self, terminate=True, signal='SIGKILL'):
        to_revoke = set([e.task_id for e in self.entity.child_tasks()])
        return celery.task.control.revoke(
            list(to_revoke), terminate=terminate, signal=signal)

272
273
274
275
276
277
278
279
280
281
282
283
    def attach_result(self, result):
        def tree(result, seen=None):
            if seen is None:
                seen = set()
            if result.parent:
                for r in tree(result.parent, seen):
                    yield r
            for child in result.children or []:
                for r in tree(child, seen):
                    yield r

            if isinstance(result, AsyncResult):
284
285
286
                rresult = result.result
                if (isinstance(rresult, dict)
                        and "celerytask_subtasks" in rresult):
287
                    subtasks = result_from_tuple(
288
289
                        rresult["celerytask_subtasks"])
                    for r in tree(subtasks, seen):
290
291
292
293
294
295
296
297
                        yield r

                if result.task_id not in seen:
                    seen.add(result.task_id)
                yield result

        for asr in tree(result):
            task_id = six.text_type(asr.id)
298
299
300
            try:
                cwtask = self._cw.find('CeleryTask', task_id=task_id).one()
            except NoResultError:
301
302
                cwtask = self._cw.create_entity(
                    'CeleryTask',
303
                    task_name=UNKNOWN_TASK_NAME,
304
                    task_id=six.text_type(task_id))
305
306
                self.info("Create <CeleryTask %s (task_id %s)>",
                          cwtask.eid, task_id)
307
308
309
310
311
            if not cwtask.parent_task and self.entity is not cwtask:
                self.info('Set %s parent_task to %s (%s)', cwtask.task_id,
                          self.entity, self.entity.task_id)
                cwtask.cw_set(parent_task=self.entity)

312
313
314
315
316
317
    def sync_state(self, task_id, task_name, commit=True):
        if (self.entity.task_name == UNKNOWN_TASK_NAME
                and task_name is not None):
            self.info('Update <CeleryTask %s (task_id %s)> name to %s',
                      self.entity.eid, task_id, task_name)
            self.entity.cw_set(task_name=six.text_type(task_name))
318

319
        result = self.result
320
        if result.ready():
321
            self.attach_result(result)
322
323
324
325

        transition = {
            STATES.SUCCESS: 'finish',
            STATES.FAILURE: 'fail',
326
            STATES.STARTED: 'start',
327
            STATES.REVOKED: 'fail',
328
            'PROGRESS': 'start',
329
330
        }.get(result.state)
        if transition is not None:
331
332
333
334
            self.info('<CeleryTask %s (task_id %s)> %s', self.entity.eid,
                      task_id, transition)
            wf = self.entity.cw_adapt_to('IWorkflowable')
            wf.fire_transition_if_possible(transition, result.traceback)
335
        else:
336
337
            self.info('<CeleryTask %s (task_id %s)> no transition found for '
                      'state %s', self.entity.eid, task_id, result.state)
338
339

        if commit:
340
            self._cw.commit()
341

342
343
344
    @property
    def state(self):
        db_state = self.entity.cw_adapt_to('IWorkflowable').state
345
346
347
348
349
350
        db_final_state_map = {'done': STATES.SUCCESS, 'failed': STATES.FAILURE}
        if db_state in db_final_state_map:
            return db_final_state_map[db_state]
        elif task_in_backend(self.task_id):
            return super(CeleryTaskAdapter, self).state
        return _('unknown state')