test_profile_generation.py 56.3 KB
Newer Older
1
# copyright 2016-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
"""cubicweb-seda unit tests for XSD profile generation.

You may want to set the TEST_WRITE_SEDA_FILES environment variable to activate
writing of generated content back to the file-system.
"""
21

22
import io
23
from doctest import Example
Denis Laxalde's avatar
Denis Laxalde committed
24
from itertools import chain, repeat
25
26
import os
from os.path import basename, join
27
import unittest
28

29

30
from lxml import etree
31
from lxml.doctestcompare import LXMLOutputChecker
32

33
34
from logilab.common.decorators import cached

35
36
from cubicweb.devtools.testlib import CubicWebTC

37
from cubicweb_seda.xsd2yams import XSDMMapping
38
from cubicweb_seda.entities import profile_generation as pg
39

40
from cubicweb_seda import testutils
41

42
43

class XmlTestMixin(object):
Sylvain Thénault's avatar
Sylvain Thénault committed
44
    """Mixin class providing additional assertion methods for checking XML data."""
45

46
    NAMESPACES = {
47
48
49
50
        "xs": "http://www.w3.org/2001/XMLSchema",
        "seda": "fr:gouv:culture:archivesdefrance:seda:v2.0",
        "rng": "http://relaxng.org/ns/structure/1.0",
        "a": "http://relaxng.org/ns/compatibility/annotations/1.0",
51
    }
52
53
54
55
    # TBD in concret implementation
    schema_class = None
    schema_file = None
    adapter_id = None
56

57
58
59
60
    @classmethod
    @cached
    def schema(cls, xsd_filename):
        with open(xsd_filename) as stream:
61
            return cls.schema_class(etree.parse(stream))
62

63
64
65
    def xpath(self, element, expression):
        return element.xpath(expression, namespaces=self.NAMESPACES)

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    def get_element(self, profile, name):
        """Return etree element definition for 'name' (there should be only one)"""
        elements = self.get_elements(profile, name)
        self.assertEqual(len(elements), 1)
        return elements[0]

    def get_elements(self, profile, name):
        """Return etree element definitions for 'name'"""
        raise NotImplementedError()

    def get_attribute(self, profile, name):
        """Return etree attribute definition for 'name' (there should be only one)"""
        attributes = self.get_attributes(profile, name)
        self.assertEqual(len(attributes), 1)
        return attributes[0]

    def get_attributes(self, profile, name):
        """Return etree attribute definitions for 'name'"""
        raise NotImplementedError()

86
    def profile_etree(self, transfer_entity, adapter_id=None):
87
        """Return etree representation of profile's XSD for the given transfer entity."""
88

89
        adapter = transfer_entity.cw_adapt_to(adapter_id or self.adapter_id)
90
        root = adapter.dump_etree()
91
92
93
94
95
        self.assertXmlValid(root)
        return root

    def assertXmlValid(self, root):
        """Validate an XML etree according to an XSD file (.xsd)."""
96
97
        schema = self.schema(self.datapath(self.schema_file))
        schema.assert_(root)
98

99
100
101
102
103
104
105
    def assertXmlEqual(self, actual, expected):
        """Check that both XML strings represent the same XML tree."""
        checker = LXMLOutputChecker()
        if not checker.check_output(expected, actual, 0):
            message = checker.output_difference(Example("", expected), actual, 0)
            self.fail(message)

106
107
    def check_xsd_profile(self, root, sample_file, **substitutions):
        """Check that the SEDA profile can be used to validate a sample XML document."""
108
109
110
        if os.environ.get("TEST_WRITE_SEDA_FILES"):
            fname = join("/tmp", basename(sample_file))
            with io.open(fname, "w") as stream:
Noé Gaumont's avatar
Noé Gaumont committed
111
                stream.write(etree.tostring(root, encoding=str, pretty_print=True))
112
            print("Generated profile saved to {}".format(fname))
113
        profile = self.schema_class(root)
114
        with io.open(sample_file) as f:
115
            sample_xml_string = f.read().format(**substitutions)
116
        profile.assert_(etree.fromstring(sample_xml_string.encode("utf-8")))
117

118
119
120
121
122

class XMLSchemaTestMixin(XmlTestMixin):
    """Mixin extending XmlTestMixin with implementation of some assert methods for XMLSchema."""

    schema_class = etree.XMLSchema
123
124
    schema_file = "XMLSchema.xsd"
    adapter_id = "SEDA-2.1.xsd"
125

126
    def get_elements(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
127
        return self.xpath(profile, '//xs:element[@name="{0}"]'.format(name))
128
129

    def get_attributes(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
130
        return self.xpath(profile, '//xs:attribute[@name="{0}"]'.format(name))
131
132
133

    def assertElementDefinition(self, element, expected):
        edef = dict(element.attrib)
134
        types = self.xpath(element, "xs:complexType/xs:simpleContent/xs:extension")
135
136
        assert len(types) <= 1
        if types:
137
            edef["type"] = types[0].attrib["base"]
138
139
140
141
        self.assertEqual(edef, expected)

    def assertAttributeDefinition(self, element, expected):
        edef = dict(element.attrib)
142
        edef.setdefault("use", "required")
143
        self.assertEqual(edef, expected)
144
145
146

    def assertXSDAttributes(self, element, expected_attributes):
        # attributes for regular elements
147
        attrs = [x.attrib for x in self.xpath(element, "xs:complexType/xs:attribute")]
148
        # attributes for simple elements
149
150
151
152
153
154
        attrs += [
            x.attrib
            for x in self.xpath(
                element, "xs:complexType/xs:simpleContent/xs:extension/xs:attribute"
            )
        ]
155
156
        self.assertEqual(sorted(attrs), expected_attributes)

157

158
159
160
161
class RelaxNGTestMixin(XmlTestMixin):
    """Mixin extending XmlTestMixin with implementation of some assert methods for RelaxNG."""

    schema_class = etree.RelaxNG
162
163
    schema_file = "relaxng.rng"
    adapter_id = "SEDA-2.1.rng"
164
165

    def get_elements(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
166
        return self.xpath(profile, '//rng:element[@name="{0}"]'.format(name))
167
168

    def get_attributes(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
169
        return self.xpath(profile, '//rng:attribute[@name="{0}"]'.format(name))
170
171
172

    def assertElementDefinition(self, element, expected):
        edef = dict(element.attrib)  # {name: element name}
173
174
175
176
177
178
179
180
181
182
183
184
        if element.getparent().tag == "{http://relaxng.org/ns/structure/1.0}optional":
            edef["minOccurs"] = "0"
        elif (
            element.getparent().tag == "{http://relaxng.org/ns/structure/1.0}oneOrMore"
        ):
            edef["maxOccurs"] = "unbounded"
        elif (
            element.getparent().tag == "{http://relaxng.org/ns/structure/1.0}zeroOrMore"
        ):
            edef["minOccurs"] = "0"
            edef["maxOccurs"] = "unbounded"
        refs = self.xpath(element, "rng:ref")
185
186
        if refs:
            assert len(refs) == 1
187
            edef["type"] = refs[0].attrib["name"]
188
189
190
191
192
        self._element_fixed_value(edef, element)
        self.assertEqual(edef, expected)

    def assertAttributeDefinition(self, element, expected):
        edef = dict(element.attrib)  # {name: element name}
193
194
        if element.getparent().tag == "{http://relaxng.org/ns/structure/1.0}optional":
            edef["use"] = "optional"
195
        else:
196
            edef["use"] = "required"
197
198
199
200
201
        self._element_fixed_value(edef, element)
        self.assertEqual(edef, expected)

    def assertXSDAttributes(self, element, expected_attributes):
        adefs = []
202
203
204
205
206
        optattrs = self.xpath(element, "rng:optional/rng:attribute")
        attrs = self.xpath(element, "rng:attribute")
        for use, adef_element in chain(
            zip(repeat("optional"), optattrs), zip(repeat("required"), attrs)
        ):
207
            adef = dict(adef_element.attrib)
208
209
            adef["use"] = use
            data_elements = self.xpath(adef_element, "rng:data")
210
211
            if data_elements:
                assert len(data_elements) == 1
212
                adef["type"] = "xsd:" + data_elements[0].attrib["type"]
213
214
            self._element_fixed_value(adef, adef_element)
            adefs.append(adef)
215
        return sorted(adefs, key=lambda d: list(d.items()))
216
217

    def _element_fixed_value(self, edef, element):
218
        values = self.xpath(element, "rng:value")
219
220
221
        if values:
            assert len(values) == 1
            value = values[0]
222
223
224
            edef["fixed"] = value.text
            if value.attrib.get("type"):
                edef["type"] = "xsd:" + value.attrib["type"]
225
        else:
226
            datatypes = self.xpath(element, "rng:data")
227
228
229
            if datatypes:
                assert len(datatypes) == 1
                datatype = datatypes[0]
230
                edef["type"] = "xsd:" + datatype.attrib["type"]
231

232

233
class PathTargetValuesTC(CubicWebTC):
234
    def test_keyword_path(self):
235
        element_defs = iter(XSDMMapping("Keyword"))
236
        with self.admin_access.cnx() as cnx:
237
            create = cnx.create_entity
238
239
            kw = create("SEDAKeyword", user_cardinality=u"0..n")
            kt = create("SEDAKeywordType", seda_keyword_type_from=kw)
240
241
242
243
244
245
246

            edef = next(element_defs)
            # self.assertEqual(
            #  readable_edef(edef),
            #  ('Keyword', 'SEDAKeyword', [
            #      ('id', [('seda_id', 'object', 'SEDAid'),
            #              ('id', 'subject', 'String')]),
247
            #      ('KeywordContent', []),
248
249
250
251
252
253
254
            #      ('KeywordReference',
            #       [('seda_keyword_reference_from', 'object', 'SEDAKeywordReference'),
            #        ('seda_keyword_reference_to', 'subject', 'Concept')]),
            #      ('KeywordType', [('seda_keyword_type_from', 'object', 'SEDAKeywordType'),
            #                       ('seda_keyword_type_to', 'subject', 'Concept')]),
            #  ]))
            path = edef[-1][0][1]
255
            target_values = pg._path_target_values(kw, path)
256
257
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
258
            self.assertEqual(target_value[0], None)
259
260
261
            self.assertEqual(target_value[1], None)

            path = edef[-1][2][1]
262
            target_values = pg._path_target_values(kw, path)
263
264
265
            self.assertEqual(len(target_values), 0)

            path = edef[-1][3][1]
266
            target_values = pg._path_target_values(kw, path)
267
268
269
270
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0].eid, kt.eid)
            self.assertEqual(target_value[1], None)
271

272
273
274
            kt_scheme = testutils.scheme_for_rtype(
                cnx, "seda_keyword_type_to", u"theme"
            )
275
            kw_type = kt_scheme.reverse_in_scheme[0]
276
277
            kt.cw_set(seda_keyword_type_to=kw_type)
            path = edef[-1][3][1]
278
            target_values = pg._path_target_values(kw, path)
279
280
281
282
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0].eid, kt.eid)
            self.assertEqual(target_value[1].eid, kw_type.eid)
283

284
285
286
            edef = next(element_defs)
            # self.assertEqual(
            #     readable_edef(edef),
287
            #     ('KeywordContent', 'SEDAKeyword', [
288
289
            #         ('KeywordContent', [('keyword_content', 'subject', 'String')]),
            #     ]))
290
            path = edef[-1][0][1]
291
            target_values = pg._path_target_values(kw, path)
292
293
294
295
296
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0], None)
            self.assertEqual(target_value[1], None)

297
    def test_internal_reference(self):
298
        element_defs = iter(XSDMMapping("DataObjectReference"))
299
        with self.admin_access.cnx() as cnx:
300
            create = cnx.create_entity
301
302
303
304
305
306
307
308
309
310
311
312
            transfer = create("SEDAArchiveTransfer", title=u"test profile")
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(transfer)
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am mandatory",
                seda_binary_data_object=transfer,
            )
            ref = create(
                "SEDADataObjectReference",
                seda_data_object_reference=unit_alt_seq,
                seda_data_object_reference_id=bdo,
            )
313
314
315
316
317
318
319
320
321
322
323
324

            edef = next(element_defs)
            # readable_edef(edef)
            # ('DataObjectReference',
            #  'SEDADataObjectReference',
            #  [('id', [('seda_id', 'object', 'SEDAid'), ('id', 'subject', 'String')]),
            #   ('DataObjectReferenceId',
            #    [('seda_data_object_reference_id',
            #      'subject',
            #      ('SEDABinaryDataObject', 'SEDAPhysicalDataObject'))])])

            path = edef[-1][1][1]
325
            target_values = pg._path_target_values(ref, path)
326
327
328
329
330
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0], None)
            self.assertEqual(target_value[1].eid, bdo.eid)

331
332
    def test_integrity_cardinality(self):
        with self.admin_access.cnx() as cnx:
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am mandatory",
                seda_binary_data_object=transfer,
            )
            assert pg.integrity_cardinality(bdo) == "1"
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_cardinality=u"0..1",
                user_annotation=u"opt",
                seda_binary_data_object=transfer,
            )
            assert pg.integrity_cardinality(bdo) == "0..1"

            _, _, unit_alt_seq = testutils.create_archive_unit(None, cnx=cnx)
349
            bdo = testutils.create_data_object(unit_alt_seq)
350
            assert pg.integrity_cardinality(bdo) == "0..n"
351
352

            _, _, unit_alt_seq = testutils.create_archive_unit(
353
354
355
356
                None, cnx=cnx, user_cardinality=u"1..n"
            )
            bdo = testutils.create_data_object(unit_alt_seq, user_cardinality=u"0..1")
            assert pg.integrity_cardinality(bdo) == "0..n"
357
358

            _, _, unit_alt_seq = testutils.create_archive_unit(
359
360
                None, cnx=cnx, user_cardinality=u"1..n"
            )
361
            _, _, unit_alt_seq2 = testutils.create_archive_unit(
362
363
364
365
                unit_alt_seq, cnx=cnx, user_cardinality=u"0..n"
            )
            bdo = testutils.create_data_object(unit_alt_seq2, user_cardinality=u"1")
            assert pg.integrity_cardinality(bdo) == "0..n"
366
367

            _, _, unit_alt_seq = testutils.create_archive_unit(
368
369
                None, cnx=cnx, user_cardinality=u"1"
            )
370
            _, _, unit_alt_seq2 = testutils.create_archive_unit(
371
372
373
374
                unit_alt_seq, cnx=cnx, user_cardinality=u"1..n"
            )
            bdo = testutils.create_data_object(unit_alt_seq2, user_cardinality=u"0..1")
            assert pg.integrity_cardinality(bdo) == "0..n"
375
376

            _, _, unit_alt_seq = testutils.create_archive_unit(
377
378
                None, cnx=cnx, user_cardinality=u"0..1"
            )
379
            _, _, unit_alt_seq2 = testutils.create_archive_unit(
380
381
382
383
                unit_alt_seq, cnx=cnx, user_cardinality=u"1"
            )
            bdo = testutils.create_data_object(unit_alt_seq2, user_cardinality=u"1..n")
            assert pg.integrity_cardinality(bdo) == "0..n"
384

385

386
class SEDA2RNGExportTC(RelaxNGTestMixin, CubicWebTC):
387
    def test_skipped_mandatory_simple(self):
388
        with self.admin_access.cnx() as cnx:
389
390
391
392
393
            profile = self.profile_etree(
                cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            )
            date = self.get_element(profile, "Date")
            self.assertElementDefinition(date, {"name": "Date", "type": "xsd:dateTime"})
394
            self.assertXSDAttributes(date, [])
395
396
397
398
            identifier = self.get_element(profile, "MessageIdentifier")
            self.assertElementDefinition(
                identifier, {"name": "MessageIdentifier", "type": "xsd:token"}
            )
399
            self.assertXSDAttributes(
400
                identifier,
401
402
403
404
405
406
407
408
409
410
411
412
413
414
                [
                    {"name": "schemeAgencyID", "use": "optional", "type": "xsd:token"},
                    {
                        "name": "schemeAgencyName",
                        "use": "optional",
                        "type": "xsd:string",
                    },
                    {"name": "schemeDataURI", "use": "optional", "type": "xsd:anyURI"},
                    {"name": "schemeID", "use": "optional", "type": "xsd:token"},
                    {"name": "schemeName", "use": "optional", "type": "xsd:string"},
                    {"name": "schemeURI", "use": "optional", "type": "xsd:anyURI"},
                    {"name": "schemeVersionID", "use": "optional", "type": "xsd:token"},
                ],
            )
415

416
    def test_skipped_mandatory_complex(self):
417
        with self.admin_access.cnx() as cnx:
418
419
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            testutils.create_data_object(transfer, filename=u"fixed.txt")
420
            profile = self.profile_etree(transfer)
421
422
423
424
            fname = self.get_element(profile, "Filename")
            self.assertElementDefinition(
                fname, {"name": "Filename", "fixed": "fixed.txt", "type": "xsd:string"}
            )
425
            self.assertXSDAttributes(fname, [])
426
427
428
429
430
431
432
433
434
435
436
            attachment = self.get_element(profile, "Attachment")
            self.assertElementDefinition(
                attachment, {"name": "Attachment", "type": "xsd:base64Binary"}
            )
            self.assertXSDAttributes(
                attachment,
                [
                    {"name": "filename", "use": "optional", "type": "xsd:string"},
                    {"name": "uri", "use": "optional", "type": "xsd:anyURI"},
                ],
            )
437

438
    def test_fileinfo_card(self):
439
        with self.admin_access.cnx() as cnx:
440
441
442
443
444
445
446
447
448
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am mandatory",
                seda_binary_data_object=transfer,
            )
            appname = cnx.create_entity(
                "SEDACreatingApplicationName", seda_creating_application_name=bdo
            )
449

450
            profile = self.profile_etree(transfer)
451
452
            fileinfo = self.get_element(profile, "FileInfo")
            self.assertElementDefinition(fileinfo, {"name": "FileInfo"})
453

454
            appname.cw_set(user_cardinality=u"1")
455
            profile = self.profile_etree(transfer)
456
457
            fileinfo = self.get_element(profile, "FileInfo")
            self.assertElementDefinition(fileinfo, {"name": "FileInfo"})
458

459
460
            appname.cw_set(user_cardinality=u"0..1")
            bdo.cw_set(filename=u"fixed.txt")
461
            profile = self.profile_etree(transfer)
462
463
            fileinfo = self.get_element(profile, "FileInfo")
            self.assertElementDefinition(fileinfo, {"name": "FileInfo"})
464

465
    def test_data_object_package_card(self):
466
        with self.admin_access.cnx() as cnx:
467
468
469
470
471
472
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am mandatory",
                seda_binary_data_object=transfer,
            )
473
474

            profile = self.profile_etree(transfer)
475
476
            fileinfo = self.get_element(profile, "DataObjectPackage")
            self.assertElementDefinition(fileinfo, {"name": "DataObjectPackage"})
477

478
            bdo.cw_set(user_cardinality=u"1")
479
            profile = self.profile_etree(transfer)
480
481
            dop = self.get_element(profile, "DataObjectPackage")
            self.assertElementDefinition(dop, {"name": "DataObjectPackage"})
482

483
    def test_object_package_group(self):
484
        with self.admin_access.cnx() as cnx:
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am number one",
                seda_binary_data_object=transfer,
            )
            cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am number two",
                seda_binary_data_object=transfer,
            )
            cnx.create_entity(
                "SEDAPhysicalDataObject",
                user_annotation=u"I am number three",
                seda_physical_data_object=transfer,
            )

            profile = self.profile_etree(transfer)
            dop = self.get_element(profile, "DataObjectPackage")
            self.assertEqual(len(self.xpath(dop, "./rng:group/*")), 3)
505

506
507
508
            # setting some cardinality to 1 will remove rng:optional parent of the DataObjectPackage
            # and BinaryDataObject nodes
            bdo.cw_set(user_cardinality=u"1")
509
            profile = self.profile_etree(transfer)
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
            dop = self.get_element(profile, "DataObjectPackage")
            self.assertEqual(len(self.xpath(dop, "./rng:group/*")), 3)

    def test_object_data_object_group(self):
        with self.admin_access.cnx() as cnx:
            transfer = cnx.create_entity(
                "SEDAArchiveTransfer", title=u"test profile", wrap_dataobjects=True
            )
            bdo = cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am number one",
                seda_binary_data_object=transfer,
            )
            cnx.create_entity(
                "SEDABinaryDataObject",
                user_annotation=u"I am number two",
                seda_binary_data_object=transfer,
            )
            cnx.create_entity(
                "SEDAPhysicalDataObject",
                user_annotation=u"I am number three",
                seda_physical_data_object=transfer,
            )
533

534
535
536
            profile = self.profile_etree(transfer)
            dop = self.get_element(profile, "DataObjectPackage")
            self.assertEqual(
537
                len(self.xpath(dop, './rng:zeroOrMore/rng:choice/rng:element[@name="DataObjectGroup"]/*')), 3
538
            )
539
540
            # setting some cardinality to 1 will remove rng:optional parent of the DataObjectPackage
            # and BinaryDataObject nodes
541
            bdo.cw_set(user_cardinality=u"1")
542
            profile = self.profile_etree(transfer)
543
544
            dop = self.get_element(profile, "DataObjectPackage")
            self.assertEqual(
545
                len(self.xpath(dop, './rng:zeroOrMore/rng:choice/rng:element[@name="DataObjectGroup"]/*')), 3
546
            )
547

548
    def test_transfer_annotation(self):
549
        with self.admin_access.cnx() as cnx:
550
551
552
553
554
555
556
557
            profile = self.profile_etree(
                cnx.create_entity(
                    "SEDAArchiveTransfer",
                    title=u"test profile",
                    user_annotation=u"some description",
                )
            )
            docs = self.xpath(profile, "///xs:documentation")
558
            self.assertEqual(len(docs), 1)
559
            self.assertEqual(docs[0].text, "some description")
560

561
    def test_transfer_signature(self):
562
        with self.admin_access.cnx() as cnx:
563
564
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            cnx.create_entity("SEDASignature", seda_signature=transfer)
565
            profile = self.profile_etree(transfer)
566
567
568
569
            signature = self.get_element(profile, "Signature")
            self.assertElementDefinition(
                signature, {"name": "Signature", "type": "OpenType"}
            )
570
            self.assertOpenTypeIsDefined(profile)
571

572
    def test_keyword(self):
573
        with self.admin_access.cnx() as cnx:
574
575
            create = cnx.create_entity

576
            scheme = testutils.scheme_for_rtype(cnx, "seda_keyword_type_to", u"theme")
577
578
            kw_type = scheme.reverse_in_scheme[0]

579
            transfer = create("SEDAArchiveTransfer", title=u"test profile")
580
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
581
582
583
584
585
586
587
588
589
590
591
592
                transfer, user_cardinality=u"0..n"
            )

            kw = create(
                "SEDAKeyword", seda_keyword=unit_alt_seq, keyword_content=u"kwick"
            )
            kwr_e = create("SEDAKeywordReference", seda_keyword_reference_from=kw)
            create(
                "SEDAKeywordType",
                seda_keyword_type_from=kw,
                seda_keyword_type_to=kw_type,
            )
593
594
595

            profile = self.profile_etree(transfer)

596
597
            keywords = self.get_elements(profile, "Keyword")
            self.assertElementDefinition(keywords[0], {"name": "Keyword"})
598

599
600
601
602
603
            kwc = self.get_elements(profile, "KeywordContent")
            self.assertElementDefinition(
                kwc[0],
                {"name": "KeywordContent", "type": "xsd:string", "fixed": "kwick"},
            )
604

605
606
607
608
            kwt = self.get_element(profile, "KeywordType")
            self.assertElementDefinition(
                kwt, {"name": "KeywordType", "type": "xsd:token", "fixed": "theme"}
            )
609
            self.assertXSDAttributes(
610
                kwt,
611
612
613
614
615
616
617
618
619
620
621
                [
                    {
                        "name": "listVersionID",
                        "fixed": "seda_keyword_type_to/None vocabulary",
                    }
                ],
            )
            kwr = self.get_element(profile, "KeywordReference")
            self.assertElementDefinition(
                kwr, {"name": "KeywordReference", "type": "xsd:token"}
            )
622
            self.assertXSDAttributes(
623
                kwr,
624
625
626
627
628
629
630
631
632
633
634
635
636
637
                [
                    {"name": "schemeAgencyID", "type": "xsd:token", "use": "optional"},
                    {
                        "name": "schemeAgencyName",
                        "type": "xsd:string",
                        "use": "optional",
                    },
                    {"name": "schemeDataURI", "type": "xsd:anyURI", "use": "optional"},
                    {"name": "schemeID", "type": "xsd:token", "use": "optional"},
                    {"name": "schemeName", "type": "xsd:string", "use": "optional"},
                    {"name": "schemeURI", "type": "xsd:anyURI", "use": "optional"},
                    {"name": "schemeVersionID", "type": "xsd:token", "use": "optional"},
                ],
            )
638
639
640

            kwr_e.cw_set(seda_keyword_reference_to_scheme=scheme)
            profile = self.profile_etree(transfer)
641
642
643
644
            kwr = self.get_element(profile, "KeywordReference")
            self.assertElementDefinition(
                kwr, {"name": "KeywordReference", "type": "xsd:token"}
            )
645
            self.assertXSDAttributes(
646
647
                kwr, [{"name": "schemeURI", "fixed": scheme.cwuri}]
            )
648
649
650

            kwr_e.cw_set(seda_keyword_reference_to=kw_type)
            profile = self.profile_etree(transfer)
651
652
            kwr = self.get_element(profile, "KeywordReference")
            self.assertElementDefinition(
653
                kwr,
654
655
656
657
658
659
660
661
662
                {
                    "name": "KeywordReference",
                    "type": "xsd:token",
                    "fixed": kw_type.cwuri,
                },
            )
            self.assertXSDAttributes(
                kwr, [{"name": "schemeURI", "fixed": scheme.cwuri}]
            )
663

664
    def test_code_list(self):
665
        with self.admin_access.cnx() as cnx:
666
667
668
669
670
671
672
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            scheme = cnx.create_entity("ConceptScheme", title=u"Keyword Types")
            cnx.create_entity(
                "SEDAMimeTypeCodeListVersion",
                seda_mime_type_code_list_version_from=transfer,
                seda_mime_type_code_list_version_to=scheme,
            )
673
674

            profile = self.profile_etree(transfer)
675
676
677
678
679
680
681
682
683
            mt_clv = self.get_element(profile, "MimeTypeCodeListVersion")
            self.assertElementDefinition(
                mt_clv,
                {
                    "name": "MimeTypeCodeListVersion",
                    "fixed": scheme.cwuri,
                    "type": "xsd:token",
                },
            )
684
            # XXX also fix listSchemeURI ?
685
            self.assertXSDAttributes(
686
                mt_clv,
687
688
689
690
691
692
693
694
695
696
                [
                    {"name": "listAgencyID", "use": "optional", "type": "xsd:token"},
                    {"name": "listAgencyName", "use": "optional", "type": "xsd:string"},
                    {"name": "listID", "use": "optional", "type": "xsd:token"},
                    {"name": "listName", "use": "optional", "type": "xsd:string"},
                    {"name": "listSchemeURI", "use": "optional", "type": "xsd:anyURI"},
                    {"name": "listURI", "use": "optional", "type": "xsd:anyURI"},
                    {"name": "listVersionID", "use": "optional", "type": "xsd:token"},
                ],
            )
697

698
699
    def test_multiple_concepts(self):
        with self.admin_access.cnx() as cnx:
700
701
702
703
704
705
706
707
708
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
            scheme = testutils.scheme_for_type(
                cnx, None, None, u"application/msword", u"application/pdf"
            )
            cnx.create_entity(
                "SEDAMimeTypeCodeListVersion",
                seda_mime_type_code_list_version_from=transfer,
                seda_mime_type_code_list_version_to=scheme,
            )
709
            bdo = testutils.create_data_object(transfer)
710
711
712
            bdo.mime_type.cw_set(
                user_cardinality=u"1", seda_mime_type_to=scheme.reverse_in_scheme
            )
713
714

            profile = self.profile_etree(transfer)
715
716
717
718
719
720
721
722
            mt = self.get_element(profile, "MimeType")
            self.assertEqual(
                "\n".join(
                    etree.tostring(mt, encoding=str, pretty_print=True).splitlines()[
                        1:-1
                    ]
                ),
                """\
723
724
725
  <rng:choice>
    <rng:value type="token">application/msword</rng:value>
    <rng:value type="token">application/pdf</rng:value>
726
727
  </rng:choice>""",
            )
728

729
    def test_seda2_concept(self):
730
        with self.admin_access.cnx() as cnx:
731
            create = cnx.create_entity
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
            scheme = create("ConceptScheme", title=u"Digest algorithm")
            some_concept = scheme.add_concept(
                label=u"md5 algorithm", language_code=u"en"
            )
            transfer = create("SEDAArchiveTransfer", title=u"test profile")
            cnx.create_entity(
                "SEDAMessageDigestAlgorithmCodeListVersion",
                seda_message_digest_algorithm_code_list_version_from=transfer,
                seda_message_digest_algorithm_code_list_version_to=scheme,
            )
            create(
                "SEDABinaryDataObject",
                user_cardinality=u"0..n",
                user_annotation=u"I am mandatory",
                seda_binary_data_object=transfer,
                seda_algorithm=some_concept,
            )
749
750

            profile = self.profile_etree(transfer)
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
            algo = self.get_attribute(profile, "algorithm")
            self.assertAttributeDefinition(
                algo,
                {
                    "name": "algorithm",
                    "use": "required",
                    "type": "xsd:token",
                    "fixed": "md5 algorithm",
                },
            )

            create(
                "Label",
                label_of=some_concept,
                kind=u"preferred",
                language_code=u"seda-2",
                label=u"md5",
            )
769
770
771

            some_concept.cw_clear_all_caches()
            profile = self.profile_etree(transfer)
772
773
774
775
776
777
778
779
780
781
            algo = self.get_attribute(profile, "algorithm")
            self.assertAttributeDefinition(
                algo,
                {
                    "name": "algorithm",
                    "use": "required",
                    "type": "xsd:token",
                    "fixed": "md5",
                },
            )
782

783
784
785
786
787
    def assertOpenTypeIsDefined(self, profile):
        open_types = self.xpath(profile, '//rng:define[@name="OpenType"]')
        self.assertEqual(len(open_types), 1)

    def test_data_duplicates(self):
788
        with self.admin_access.cnx() as cnx:
789
            transfer = cnx.create_entity("SEDAArchiveTransfer", title=u"test profile")
790
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
791
792
                transfer, user_cardinality=u"0..n"
            )
793
            profile = self.profile_etree(transfer)
794
795
            title = self.get_element(profile, "Title")
            self.assertEqual(len(self.xpath(title, "rng:data")), 1)
796
797


798
class SEDAExportFuncTCMixIn(object):
799
800
801
    """Test that SEDA profile export works correctly."""

    def setup_database(self):
802
        with self.admin_access.cnx() as cnx:
803
            create = cnx.create_entity
804
            scheme = create("ConceptScheme", title=u"Keyword Types")
805
            # ensure we're able to export concept with unexpected language code
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
            some_concept = scheme.add_concept(label=u"md5", language_code=u"de")
            some_other_concept = scheme.add_concept(
                label=u"hash/md5", language_code=u"de"
            )

            transfer = create("SEDAArchiveTransfer", title=u"test profile")
            create(
                "SEDAMessageDigestAlgorithmCodeListVersion",
                seda_message_digest_algorithm_code_list_version_from=transfer,
                seda_message_digest_algorithm_code_list_version_to=scheme,
            )
            create(
                "SEDAFileFormatCodeListVersion",
                seda_file_format_code_list_version_from=transfer,
            )
            create(
                "SEDAMimeTypeCodeListVersion",
                user_cardinality=u"0..1",
                seda_mime_type_code_list_version_from=transfer,
                seda_mime_type_code_list_version_to=scheme,
            )
            access_rule = create(
                "SEDAAccessRule", user_cardinality=u"0..1", seda_access_rule=transfer
            )
            access_rule_seq = create(
                "SEDASeqAccessRuleRule", reverse_seda_seq_access_rule_rule=access_rule
            )
            create(
                "SEDAStartDate",
                user_cardinality=u"0..1",
                seda_start_date=access_rule_seq,
            )
838
            # binary data object
839
840
841
            bdo = testutils.create_data_object(
                transfer, user_cardinality=u"0..n", seda_algorithm=some_concept
            )
842
            bdo.mime_type.cw_set(seda_mime_type_to=[some_concept, some_other_concept])
843
844
845
846
847
            create(
                "SEDAFormatLitteral", user_cardinality=u"0..1", seda_format_litteral=bdo
            )
            create("SEDAEncoding", user_cardinality=u"0..1", seda_encoding_from=bdo)
            create("SEDAUri", seda_uri=bdo.seda_alt_binary_data_object_attachment)
848
            # first level archive unit
849
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
850
851
                transfer, user_cardinality=u"0..n", user_annotation=u"Composant ISAD(G)"
            )
Sylvain Thénault's avatar
Sylvain Thénault committed
852
            # management rules
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
            appraisal_rule = create(
                "SEDAAppraisalRule", seda_appraisal_rule=unit_alt_seq
            )
            appraisal_rule_seq = create(
                "SEDASeqAppraisalRuleRule",
                reverse_seda_seq_appraisal_rule_rule=appraisal_rule,
            )
            create(
                "SEDAStartDate",
                user_cardinality=u"0..1",
                seda_start_date=appraisal_rule_seq,
            )
            access_rule = create("SEDAAccessRule", seda_access_rule=unit_alt_seq)
            create("SEDADisseminationRule", seda_dissemination_rule=unit_alt_seq)
            create("SEDAReuseRule", seda_reuse_rule=unit_alt_seq)
            create("SEDANeedAuthorization", seda_need_authorization=unit_alt_seq)
869
            # content
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
            kw = create(
                "SEDAKeyword", user_cardinality=u"0..n", seda_keyword=unit_alt_seq
            )
            create("SEDAKeywordType", seda_keyword_type_from=kw)
            create("SEDAKeywordReference", seda_keyword_reference_from=kw)
            history_item = create(
                "SEDACustodialHistoryItem", seda_custodial_history_item=unit_alt_seq
            )
            create("SEDAwhen", user_cardinality=u"0..1", seda_when=history_item)
            version_of = create("SEDAIsVersionOf", seda_is_version_of=unit_alt_seq)
            alt2 = create(
                "SEDAAltIsVersionOfArchiveUnitRefId",
                reverse_seda_alt_is_version_of_archive_unit_ref_id=version_of,
            )
            create("SEDADataObjectReference", seda_data_object_reference=alt2)
            create(
                "SEDAOriginatingAgencyArchiveUnitIdentifier",
                seda_originating_agency_archive_unit_identifier=unit_alt_seq,
            )
            create(
                "SEDATransferringAgencyArchiveUnitIdentifier",
                seda_transferring_agency_archive_unit_identifier=unit_alt_seq,
            )
            create("SEDADescription", seda_description=unit_alt_seq)
            create("SEDALanguage", seda_language_from=unit_alt_seq)
            create(
                "SEDADescriptionLanguage", seda_description_language_from=unit_alt_seq
            )
            create("SEDACreatedDate", seda_created_date=unit_alt_seq)
            create("SEDAEndDate", seda_end_date=unit_alt_seq)
            create(
                "SEDADataObjectReference",
                user_cardinality=u"0..n",
                seda_data_object_reference=unit_alt_seq,
                seda_data_object_reference_id=bdo,
            )
906

907
            cnx.commit()
908
        self.transfer_eid = transfer.eid
909
        self.bdo_eid = bdo.eid
910
        self.bdo_xmlid = bdo.cw_adapt_to("IXmlId").id()
911
        self.au_eid = unit.eid
912

913
914
    def test_profile1(self):
        """Check a minimal SEDA profile validating BV2.0_min.xml."""
915
        with self.admin_access.cnx() as cnx:
916
            mda_scheme = cnx.find("ConceptScheme", title=u"Keyword Types").one()
917
            transfer = cnx.entity_from_eid(self.transfer_eid)
918
            root = self.profile_etree(transfer)
919
920
921
        self.check_xsd_profile(
            root, self.datapath("BV2.0_min.xml"), mda_scheme_url=mda_scheme.cwuri
        )
922
        # ensure jumped element without content are not there
923
        self.assertEqual(len(self.get_elements(root, "Gps")), 0)
924
        # ensure element with skipped value are not there
925
        self.assertEqual(len(self.get_elements(root, "TransactedDate")), 0)
926
927
928
        self.assertProfileDetails(root)


929
930
class SEDARNGExportFuncTC(SEDAExportFuncTCMixIn, RelaxNGTestMixin, CubicWebTC):
    def assertProfileDetails(self, root):
931
        # ensure profile's temporary id are exported in xml:id attribute
932
933
934
        self.assertEqual(len(self.xpath(root, "//rng:attribute[@xml:id]")), 2)
        for attrdef in self.xpath(root, "//xs:attribute[@xml:id]"):
            self.assertEqual(attrdef[0]["type"], "ID")
935
        # ensure they are properly referenced using 'default' attribute
936
937
938
        references = self.xpath(
            root, '//rng:element[@a:defaultValue="{}"]'.format(self.bdo_xmlid)
        )
939
        self.assertEqual(len(references), 1)
940
941
942
943
944
        self.assertEqual(references[0].attrib["name"], "DataObjectReferenceId")
        for reference_id in self.xpath(
            root, '//rng:element[@name="DataObjectReferenceId"]'
        ):
            self.assertEqual(reference_id[0].attrib["type"], "NCName")
945
        # ensure optional id are properly reinjected
946
947
948
949
        references = self.xpath(
            root,
            '//rng:element[@name="Keyword"]/rng:optional' '/rng:attribute[@name="id"]',
        )
950
951
952
953
954
        self.assertEqual(len(references), 1)
        # ensure custodial item content type is properly serialized
        chi = self.xpath(root, '//rng:element[@name="CustodialHistoryItem"]')
        self.assertEqual(len(chi), 1)
        self.assertXSDAttributes(
955
956
            chi[0], [{"name": "when", "use": "optional", "type": "datedateTime"}]
        )
957
958
959
        # ensure types union handling
        ddt = self.xpath(root, '//rng:element[@name="CreatedDate"]/rng:choice/rng:data')
        self.assertEqual(len(ddt), 2)
960
961
962
        self.assertEqual(
            set(elmt.attrib["type"] for elmt in ddt), set(["date", "dateTime"])
        )
963
964


965
class OldSEDAExportMixin(object):
966
    def setup_database(self):
967
        with self.admin_access.cnx() as cnx:
968
969
970
            create = cnx.create_entity

            concepts = {}
971
            for rtype, etype, labels in [
972
973
974
975
976
977
978
979
                ("file_category", None, [u"fmt/123", u"fmt/987"]),
                ("seda_encoding_to", None, [u"6"]),
                ("seda_type_to", None, [u"CDO"]),
                ("seda_description_level", None, [u"file"]),
                ("seda_algorithm", "SEDABinaryDataObject", [u"md5"]),
                ("seda_rule", "SEDASeqAppraisalRuleRule", [u"P10Y"]),
                ("seda_rule", "SEDASeqAccessRuleRule", [u"AR038"]),
                ("seda_final_action", "SEDAAppraisalRule", [u"detruire"]),
980
            ]:
981
982
983
984
985
986
                scheme = testutils.scheme_for_type(cnx, rtype, etype, *labels)
                if len(labels) == 1:
                    concepts[labels[0]] = scheme.reverse_in_scheme[0]
                else:
                    for concept in scheme.reverse_in_scheme:
                        concepts[concept.label()] = concept
987

988
            # ensure we're able to export concept with unexpected language code
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
            concepts["md5"].preferred_label[0].cw_set(language_code=u"de")

            agent = testutils.create_authority_record(cnx, u"bob")

            transfer = create(
                "SEDAArchiveTransfer",
                title=u"my profile title &&",
                simplified_profile=True,
            )

            create(
                "SEDAComment",
                user_cardinality=u"1",
                comment=u"my profile description &&",
                seda_comment=transfer,
            )

            create(
                "SEDAAccessRule",  # XXX mandatory for seda 1.0
                user_cardinality=u"1",
                seda_access_rule=transfer,
                seda_seq_access_rule_rule=create(
                    "SEDASeqAccessRuleRule",
                    reverse_seda_start_date=create("SEDAStartDate"),
                ),
            )
            appraisal_rule_rule = create(
                "SEDASeqAppraisalRuleRule",
                seda_rule=concepts["P10Y"],
                user_annotation=u"C'est dans 10ans je m'en irai",
                reverse_seda_start_date=create("SEDAStartDate"),
            )
            create(
                "SEDAAppraisalRule",
                user_cardinality=u"0..1",
                seda_appraisal_rule=transfer,
                seda_final_action=concepts["detruire"],
                seda_seq_appraisal_rule_rule=appraisal_rule_rule,
                user_annotation=u"detruire le document",
            )

            unit, _, unit_alt_seq = testutils.create_archive_unit(
                transfer, user_cardinality=u"1..n"
            )

            unit_alt_seq.cw_set(
                reverse_seda_start_date=create(
                    "SEDAStartDate", user_cardinality=u"0..1"
                ),
                reverse_seda_end_date=create("SEDAEndDate", user_cardinality=u"0..1"),
                # XXX, value=date(2015, 2, 24)),
                reverse_seda_description=create(
                    "SEDADescription", user_cardinality=u"0..1"
                ),
            )

            kw = create(
                "SEDAKeyword", user_cardinality=u"0..n", seda_keyword=unit_alt_seq
            )
            create(
                "SEDAKeywordReference",
                seda_keyword_reference_from=kw,
                seda_keyword_reference_to=concepts["file"],
                seda_keyword_reference_to_scheme=concepts["file"].scheme,
            )
            create(
                "SEDAKeywordType", user_cardinality=u"0..1", seda_keyword_type_from=kw
            )

            create(
                "SEDAOriginatingAgency",
                user_cardinality=u"0..1",
                seda_originating_agency_from=unit_alt_seq,
                seda_originating_agency_to=agent,
            )

            create(
                "SEDAType",
                user_cardinality=u"0..1",
                seda_type_from=unit_alt_seq,
                seda_type_to=concepts["CDO"],
            )

            chi = create(
                "SEDACustodialHistoryItem",
                user_cardinality=u"0..1",
                seda_custodial_history_item=unit_alt_seq,
                reverse_seda_when=create("SEDAwhen", user_cardinality=u"0..1"),
            )
1078

1079
1080
            # Add a sub archive unit
            subunit2, _, subunit2_alt_seq = testutils.create_archive_unit(
1081
1082
                unit_alt_seq, user_cardinality=u"1"
            )
1083
1084

            # Add another sub archive unit
1085
            subunit1, _, subunit_alt_seq = testutils.create_archive_unit(
1086
1087
1088