Skip to content
Snippets Groups Projects
test_importprocedure.py 4.39 KiB
Newer Older
# copyright 2024 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
import fakeredis
import rq


from cubicweb.devtools import testlib

from cubicweb_rodolf.import_data import launch_import_procedure

LOGGER = logging.getLogger("testing.rodolf.importdata")


class ImportProcedure(testlib.CubicWebTC):
    def setUp(self):
        super(ImportProcedure, self).setUp()
        self.fakeredis = fakeredis.FakeStrictRedis()

    def test_only_delta_import(self):
        """
        Trying: create an ImportProcedure from 2 recipes (ir1, ir2).
        Create needed import processes and mark one of them as failed.

        Expected:
        - launching the procedure the first time must contain the 2 recipes
        - launching the procedure in delta mode must now only contain the failed recipe
        - launching the procedure in full mode must contain both recipes
        """
        with self.admin_access.cnx() as cnx, rq.Connection(self.fakeredis):
            ds1 = cnx.create_entity(
                "DataService",
                data_url="https://dbpedia.org/resource/Leonardo_da_Vinci",
                name="DBpedia Leo",
            )
            ir1 = cnx.create_entity(
                "ImportRecipe",
                name="IR1",
                dataservice=[ds1],
            )
            ds2 = cnx.create_entity(
                "DataService",
                data_url="https://dbpedia.org/resource/Virginia_Woolf",
                name="DBpedia Virgi",
            )
            ir2 = cnx.create_entity(
                "ImportRecipe",
                name="IR2",
                dataservice=[ds2],
            )
            importprocedure = cnx.create_entity(
                "ImportProcedure",
                virtuoso_url="https://sparql.poulet",
                virtuoso_password="loutre",
                import_recipes=[ir1, ir2],
            )
            cnx.commit()
            delta_recipes = [x for x in importprocedure.delta_import_recipes]
            self.assertEqual(delta_recipes, [ir1, ir2])

            # fake running the import processes
            # import_process1 fails and import_process2 succeeds
            nb_task_launched = launch_import_procedure(
                cnx, importprocedure, LOGGER, only_delta_import=True
            )
            self.assertEqual(nb_task_launched, 2)

            import_process1 = cnx.execute(
                "Any X WHERE X is ImportProcess, X import_recipe R, R name 'IR1'"
            ).one()
            import_process2 = cnx.execute(
                "Any X WHERE X is ImportProcess, X import_recipe R, R name 'IR2'"
            ).one()
            wf1 = import_process1.cw_adapt_to("IWorkflowable")
            wf1.fire_transition("starts")
            wf2 = import_process2.cw_adapt_to("IWorkflowable")
            wf2.fire_transition("starts")
            cnx.commit()
            wf1.fire_transition("fails")
            wf2.fire_transition("success")
            cnx.commit()

            # Check data only the failed IR is part of the delta recipes
            delta_recipes = [x for x in importprocedure.delta_import_recipes]
            self.assertEqual(delta_recipes, [ir1])

            # Launching in delta mode creates only 1 task
            nb_task_launched = launch_import_procedure(
                cnx, importprocedure, LOGGER, only_delta_import=True
            )
            self.assertEqual(nb_task_launched, 1)

            # Launching in full mode created 2 tasks
            nb_task_launched = launch_import_procedure(
                cnx, importprocedure, LOGGER, only_delta_import=False
            )
            self.assertEqual(nb_task_launched, 2)