Commit 9d705705 authored by Philippe Pepiot's avatar Philippe Pepiot
Migrate task logs from redis and database to logs files

Having logs stored in redis then in database took too much memory in redis and
storage in database.
Using files is far simplier, but it require to have a shared file system (nfs)
when the worker and the cubiweb instance (the reader) are not in the same

Use the new cw_celerytask_helpers filelogger instead of redislogger.
Logs are stored in celerytask-log-dir directory in gzip with a predictible
filename based on task_id (which is unique).

Drop task_logs attribute from CeleryTask and update tests accordingly.
celery-monitor don't copy anymore from redis to database when the task is
parent 31147960ad58
......@@ -27,7 +27,7 @@ from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
from cubicweb.server.hook import DataOperationMixIn, Operation
from cw_celerytask_helpers import redislogger as loghelper
from cw_celerytask_helpers.filelogger import get_task_logs
from cubes.celerytask import STATES, FINAL_STATES
......@@ -193,7 +193,7 @@ class ICeleryTask(EntityAdapter):
def logs(self):
return loghelper.get_task_logs(self.task_id) or b''
return get_task_logs(self.task_id) or b''
def result(self):
......@@ -352,15 +352,6 @@ class CeleryTaskAdapter(ICeleryTask):
if commit:
def logs(self):
task_logs = self.entity.task_logs
if task_logs is not None:
return super(CeleryTaskAdapter, self).logs
def state(self):
db_state = self.entity.cw_adapt_to('IWorkflowable').state
......@@ -24,33 +24,11 @@ import os.path
from celery import current_app
import celery.task.control
from cubicweb import ConfigurationError, Binary
from cubicweb.predicates import on_fire_transition, is_instance
from cubicweb import ConfigurationError
from cubicweb.predicates import is_instance
from cubicweb.server.hook import Hook, DataOperationMixIn, Operation
from cw_celerytask_helpers.redislogger import flush_task_logs
class FlushCeleryTaskLogsOp(DataOperationMixIn, Operation):
def postcommit_event(self):
for task_id in self.get_data():
class CeleryTaskFinishedHook(Hook):
__regid__ = 'celerytask.celerytask_finished'
__select__ = (Hook.__select__ &
on_fire_transition('CeleryTask', ('finish', 'fail')))
events = ('after_add_entity',)
def __call__(self):
if current_app.conf.CELERY_ALWAYS_EAGER:
entity = self.entity.for_entity
logs = Binary(entity.cw_adapt_to('ICeleryTask').logs)
from cw_celerytask_helpers.filelogger import flush_task_logs
class DeleteCeleryTaskOp(DataOperationMixIn, Operation):
......@@ -114,13 +114,6 @@ msgctxt "CeleryTask"
msgid "task_id"
msgstr "id"
msgid "task_logs"
msgstr "logs"
msgctxt "CeleryTask"
msgid "task_logs"
msgstr "logs"
msgid "task_name"
msgstr "name"
......@@ -114,13 +114,6 @@ msgctxt "CeleryTask"
msgid "task_id"
msgstr "identifiant"
msgid "task_logs"
msgstr "journal d'exécution"
msgctxt "CeleryTask"
msgid "task_logs"
msgstr "journal d'exécution"
msgid "task_name"
msgstr "nom"
from cubes.celerytask.migration.utils import migrate_task_logs_to_bfss
drop_attribute('CeleryTask', 'task_logs')
# -*- coding: utf-8 -*-
# copyright 2018 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact --
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <>.
import gzip
from logilab.common.shellutils import ProgressBar
from cw_celerytask_helpers.filelogger import get_log_filename
from cw_celerytask_helpers.redislogger import get_task_logs, flush_task_logs
def migrate_task_logs_to_bfss(cnx):
"""Migrate logs from redis and from database to logs files in
to_flush = set()
rset = cnx.execute('Any X, T WHERE X is CeleryTask, X task_id T')
pb = ProgressBar(len(rset))
for eid, task_id in rset:
entity = cnx.entity_from_eid(eid)
logs = get_task_logs(task_id)
if logs is not None:
if entity.task_logs is not None:
logs =
if logs is not None:
fname = get_log_filename(task_id)
with, 'wb') as f:
for task_id in to_flush:
......@@ -18,7 +18,7 @@
"""cubicweb-celerytask schema"""
from yams.buildobjs import RelationDefinition, String, Bytes
from yams.buildobjs import RelationDefinition, String
from cubicweb.schema import WorkflowableEntityType
......@@ -26,7 +26,6 @@ from cubicweb.schema import WorkflowableEntityType
class CeleryTask(WorkflowableEntityType):
task_id = String(maxsize=40, required=False, indexed=True, unique=True)
task_name = String(required=True)
task_logs = Bytes()
class parent_task(RelationDefinition):
......@@ -10,7 +10,8 @@ import six
from celery import current_app as app, chord
from celery.utils.log import get_task_logger
from cw_celerytask_helpers.redislogger import redirect_stdouts, get_redis_client
from cw_celerytask_helpers.utils import get_redis_client
from cw_celerytask_helpers import redirect_stdouts
cw_logger = logging.getLogger('cubes.tasks')
dummy_logger = logging.getLogger('dummy')
......@@ -21,6 +21,7 @@ import collections
import logging
import unittest
import time
import os.path
import six
import celery
......@@ -33,7 +34,8 @@ from cubes.celerytask.entities import (start_async_task, StartCeleryTaskOp,
from cubes.celerytask.testutils import BaseCeleryTaskTC
import cw_celerytask_helpers.redislogger as loghelper
from cw_celerytask_helpers.utils import get_redis_client
from cw_celerytask_helpers.filelogger import get_log_filename
def wait_until(func, timeout=10, retry=1):
......@@ -85,7 +87,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
self.wait_async_task(cnx, cwtask.task_id)
wf = cwtask.cw_adapt_to('IWorkflowable')
self.assertEqual(wf.state, 'failed')
logs ='utf8')
logs = cwtask.cw_adapt_to('ICeleryTask').logs.decode('utf-8')
if six.PY2:
(u"""raise RuntimeError(u'Cette tâche a échoué'."""
......@@ -118,15 +120,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
cwtask = cnx.entity_from_eid(cwtask_eid)
self.wait_async_task(cnx, cwtask.task_id)
# logs should be flushed from redis to database
redis_logs = loghelper.get_task_logs(cwtask.task_id)
task_logs =
logs = cwtask.cw_adapt_to('ICeleryTask').logs
self.assertEqual(task_logs, logs)
self.assertIn(b'out should be in logs', logs)
self.assertIn(b'err should be in logs', logs)
self.assertIn(b'cw warning should be in logs', logs)
......@@ -139,7 +133,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
self.assertIn(b'raise Exception("oops")', logs)
def test_task_deleted(self):
rdb = loghelper.get_redis_client()
rdb = get_redis_client()
with self.admin_access.cnx() as cnx:
# this 'buggy_task_revoked' key is used simulate the 'revoke' since
# it's not handled by celery in threaded solo mode that we use in
......@@ -157,8 +151,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
rdb.set('buggy_task_revoked', 'yes')
revoke.assert_called_once_with([task.task_id], signal='SIGKILL',
@unittest.skipIf(celery.VERSION.major == 3, "not supported with celery 3")
def test_workflow_chain(self):
......@@ -50,6 +50,10 @@ class BaseCeleryTaskTC(testlib.CubicWebTC):
conf.CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
conf.CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'tasks')
# this is required since we use a non-cubicweb worker, so the startup
# hook setting CUBICWEB_CELERYTASK_LOGDIR won't run.
cls.config.appdatahome, 'logs')
import tasks # noqa
cls.worker = multiprocessing.Process(target=cls.start_worker)
