# HG changeset patch # User Alexandre Richardson <alexandre.richardson@logilab.fr> # Date 1711116169 -3600 # Fri Mar 22 15:02:49 2024 +0100 # Node ID 8eb2bf70e04c9c5d84371b5e601a9d8600757a03 # Parent f047ce9f5bf90c6048f38547638befcf49e4cea8 feat(schema): add a process function entity type related #45 diff --git a/cubicweb_rodolf/hooks.py b/cubicweb_rodolf/hooks.py --- a/cubicweb_rodolf/hooks.py +++ b/cubicweb_rodolf/hooks.py @@ -75,6 +75,43 @@ self.repo.system_source.set_storage("File", "data", storage) +class ProcessTypeStartupHook(Hook): + """Register Process Type entity""" + + __regid__ = "rodolf.process-type-register-hook" + events = ("server_startup", "server_maintenance") + + def __call__(self): + if "rodolf.appobject.processtype" not in self.repo.vreg: + self.error("No Rodolf ProcessType found") + return + + process_vreg = self.repo.vreg["rodolf.appobject.processtype"] + with self.repo.internal_cnx() as cnx: + for process_func_id in process_vreg.keys(): + try: + rset = cnx.find("ProcessType", regid=process_func_id) + except KeyError: + # this exception must be handle, because the hook is called + # *before* the migration can be executed. Therefore it's + # possible to be in a state where ProcessType does not exists + # yet. It should only happen here though. + self.error( + "ProcessType is not a known entity_type. It probably " + "means you need to run a migration." + ) + return + if not rset: + cnx.create_entity( + "ProcessType", regid=process_func_id, name=process_func_id + ) + else: + pf = rset.one() + if not pf.activated: + pf.cw_set(activated=True) + cnx.commit() + + class EnqueueTaskHook(Hook): __regid__ = "rodolf.enqueue-task-hook" __select__ = Hook.__select__ & is_instance("ImportProcess") diff --git a/cubicweb_rodolf/schema.py b/cubicweb_rodolf/schema.py --- a/cubicweb_rodolf/schema.py +++ b/cubicweb_rodolf/schema.py @@ -20,6 +20,7 @@ from yams.buildobjs import ( Boolean, EntityType, + RichString, String, SubjectRelation, ) @@ -27,6 +28,13 @@ from cubicweb.rdf import RDF_MIMETYPE_TO_FORMAT +class ProcessType(EntityType): + name = String(required=True, unique=True) + regid = String(required=True, unique=True) + description = RichString(fulltextindexed=True) + activated = Boolean(required=True, default=True) + + class ImportProcedure(EntityType): name = String() virtuoso_url = String(required=True) diff --git a/test/test_process_function.py b/test/test_process_function.py new file mode 100644 --- /dev/null +++ b/test/test_process_function.py @@ -0,0 +1,28 @@ +# copyright 2024 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact https://www.logilab.fr -- mailto:contact@logilab.fr +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + + +from cubicweb.devtools import testlib + + +class ProcessTypeCreation(testlib.CubicWebTC): + def test_process_function_creation(self): + with self.admin_access.cnx() as cnx: + process_types = cnx.find("ProcessType").entities() + ptypes_name = [pf.name for pf in process_types] + self.assertTrue(len(ptypes_name) >= 2) + self.assertIn("default", ptypes_name) + self.assertIn("default-dryrun", ptypes_name)