Newer
Older
# -*- coding: utf-8 -*-
# copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""cubicweb-rodolf specific hooks and operations"""
from datetime import datetime
from cubicweb.server.hook import Hook
from cubicweb.predicates import is_instance
from cubicweb_s3storage.storages import S3Storage
from cubicweb_rodolf.import_data import import_data
class S3StorageStartupHook(Hook):
__regid__ = "rodolf.server-startup-hook"
events = ("server_startup", "server_maintenance")
def __call__(self):
storage = S3Storage(os.environ.get("RODOLF_S3_BUCKET", "rodolf"))
self.repo.system_source.set_storage("File", "data", storage)
class EnqueueTaskHook(Hook):
__regid__ = "rodolf.enqueue-task-hook"
__select__ = Hook.__select__ & is_instance("ImportProcess")
events = ("after_add_entity",)
def __call__(self):
task_title = "import-process {eid} ({date})".format(
eid={self.entity.eid},
date=datetime.utcnow().strftime("%Y-%m-%d"),
)
rqtask = self._cw.create_entity(
"RqTask", name="import_process", title=task_title
)
self.entity.cw_set(rq_task=rqtask)
self._cw.commit()
rqtask.cw_adapt_to("IRqJob").enqueue(
import_data, import_process_eid=self.entity.eid
)
self._cw.commit()