Commit c4e38da6 authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

celery-monitor: requeue all pending tasks

It can occur, for instance in case of "worker lost", that task events are not
sent in the "MONITOR_KEY" list in redis.
The proposed solution is to regularly check pending tasks status.
Since this is almost the same use, this can replace the old "requeue" behavior
of failed sync_task_state() which was based on a "PENDING_KEY" list in redis.

So, every 10 min, was 1min before, but 10 min seems more reasonable, check all
pending tasks status within the visibility_timeout, since tasks outside this
range cannot have state stored in the broker.
parent c6ec94bc9f69
......@@ -15,11 +15,13 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import logging
import os
import time
import celery
import redis.exceptions
from cubicweb.toolsutils import Command
......@@ -32,8 +34,7 @@ from cw_celerytask_helpers.utils import get_redis_client
logger = logging.getLogger(__name__)
PENDING_KEY = ':'.join([MONITOR_KEY, 'pending'])
REQUEUE_TIMEOUT = 60
REQUEUE_TIMEOUT = 600
class CeleryMonitorCommand(Command):
......@@ -83,9 +84,28 @@ class CeleryMonitorCommand(Command):
break
@staticmethod
def requeue(client):
while client.rpoplpush(PENDING_KEY, MONITOR_KEY):
pass
def requeue(cnx, client):
"""Requeue pending tasks in case we have missed some events.
For instance, in case of "worker lost", we don't have task events.
Only requeue tasks within the "visibility_timeout" (default 1h), since
tasks status is lost after this period.
"""
try:
timeout = celery.current_app.conf[
'BROKER_TRANSPORT_OPTIONS']['visibility_timeout']
except KeyError:
timeout = 3600
date = datetime.datetime.utcnow() - datetime.timedelta(seconds=timeout)
for task_id, in cnx.execute(
'Any T ORDERBY C WHERE X is CeleryTask, X task_id T, '
'X in_state ST, ST name IN ("waiting", "queued", "running"), '
'X creation_date C, X creation_date > %(d)s', {'d': date},
build_descr=False
):
client.lpush(
MONITOR_KEY,
json.dumps({'task_id': task_id, 'task_name': None}))
@staticmethod
def loop(cnx, timeout=None):
......@@ -95,20 +115,19 @@ class CeleryMonitorCommand(Command):
test = (cnx.repo.config.mode == "test")
requeue_timer = timer = time.time()
while True:
# pop item from MONITOR_KEY and push it to PENDING_KEY
data = client.brpoplpush(MONITOR_KEY, PENDING_KEY, timeout=1)
# pop item from MONITOR_KEY
data = client.brpop(MONITOR_KEY, timeout=1)
if data is None:
now = time.time()
if timeout is not None and abs(timer - now) > timeout:
break
if abs(requeue_timer - time.time()) > REQUEUE_TIMEOUT:
# no items left in MONITOR_KEY and we reached the
# REQUEUE_TIMEOUT requeue failed items from PENDING_KEY to
# MONITOR_KEY
CeleryMonitorCommand.requeue(client)
# REQUEUE_TIMEOUT, requeue tasks in MONITOR_KEY
CeleryMonitorCommand.requeue(cnx, client)
requeue_timer = time.time()
continue
payload = json.loads(data.decode())
payload = json.loads(data[1].decode())
task_id, task_name = payload['task_id'], payload['task_name']
try:
sync_task_state(cnx, task_id, task_name)
......@@ -120,9 +139,6 @@ class CeleryMonitorCommand(Command):
if test:
# we should not hide exceptions in tests
raise
else:
# success, drop item from PENDING_KEY
client.lrem(PENDING_KEY, data, num=1)
CWCTL.register(CeleryMonitorCommand)
......@@ -18,6 +18,8 @@
"""cubicweb-celerytask automatic tests"""
import collections
import datetime
import json
import logging
import time
import os.path
......@@ -29,10 +31,12 @@ import mock
from cubicweb.devtools import testlib # noqa
from cubicweb_celerytask.ccplugin import CeleryMonitorCommand
from cubicweb_celerytask.entities import (start_async_task, StartCeleryTaskOp,
run_all_tasks)
from cubicweb_celerytask.testutils import BaseCeleryTaskTC
from cw_celerytask_helpers.monitor import MONITOR_KEY
from cw_celerytask_helpers.utils import get_redis_client
from cw_celerytask_helpers.filelogger import get_log_filename
......@@ -263,6 +267,21 @@ class CeleryTaskTC(BaseCeleryTaskTC):
self.assertEqual(set(task_ids), set(args[0]))
self.assertEqual({'terminate': True, 'signal': 'SIGKILL'}, kwargs)
def test_celery_monitor_requeue(self):
with self.admin_access.cnx() as cnx:
t1 = start_async_task(cnx, "success", 42)
t2 = start_async_task(cnx, "success", 42)
t1.cw_set(creation_date=(
datetime.datetime.utcnow() - datetime.timedelta(hours=2)))
cnx.commit()
rdb = get_redis_client()
self.assertEqual(rdb.lrange(MONITOR_KEY, 0, -1), [])
CeleryMonitorCommand.requeue(cnx, rdb)
self.assertEqual(
[json.loads(data.decode())
for data in rdb.lrange(MONITOR_KEY, 0, -1)],
[{'task_id': t2.task_id, 'task_name': None}])
class StartAsyncTaskTC(testlib.CubicWebTC):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment