profile_generation.py 74 KB
Newer Older
1
# copyright 2016-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
Sylvain Thénault's avatar
Sylvain Thénault committed
16
"""cubicweb-seda adapter classes for profile (schema) generation"""
17

18
from collections import defaultdict, namedtuple
19
from functools import partial
20
from itertools import chain
21

22
from lxml import etree
23
from pyxst.xml_struct import graph_nodes
24

25
26
from logilab.common import attrdict

27
28
from yams import BASE_TYPES

29
30
31
from cubicweb.predicates import is_instance
from cubicweb.view import EntityAdapter

32
33
from ..xsd import XSDM_MAPPING, JUMP_ELEMENTS
from ..xsd2yams import SKIP_ATTRS
34
from . import simplified_profile, wrap_dataobjects
35

36

37
38
39
JUMPED_OPTIONAL_ELEMENTS = set(
    ("DataObjectPackage", "FileInfo", "PhysicalDimensions", "Coverage")
)
40
41


42
43
44
45
46
47
48
49
def substitute_xml_prefix(prefix_name, namespaces):
    """Given an XML prefixed name in the form `'ns:name'`, return the string `'{<ns_uri>}name'`
    where `<ns_uri>` is the URI for the namespace prefix found in `namespaces`.

    This new string is then suitable to build an LXML etree.Element object.

    Example::

50
51
      >>> substitude_xml_prefix('xlink:href', {'xlink': 'http://wwww.w3.org/1999/xlink'})
      '{http://www.w3.org/1999/xlink}href'
52
53
54

    """
    try:
55
        prefix, name = prefix_name.split(":", 1)
56
57
    except ValueError:
        return prefix_name
58
59
    assert prefix in namespaces, "Unknown namespace prefix: {0}".format(prefix)
    return "{{{0}}}".format(namespaces[prefix]) + name
60
61


62
63
64
65
66
67
68
69
def content_types(content_type):
    """Return an ordered tuple of content types from pyxst `textual_content_type` that may be None, a
    set or a string value.
    """
    if content_type:
        if isinstance(content_type, set):
            content_types = sorted(content_type)
        else:
70
71
            if content_type == "IDREF":
                content_type = "NCName"
72
73
74
75
76
77
            content_types = (content_type,)
    else:
        content_types = ()
    return content_types


78
79
def _internal_reference(value):
    """Return True if the given value is a reference to an entity within the profile."""
80
81
82
83
84
    return getattr(value, "cw_etype", None) in (
        "SEDAArchiveUnit",
        "SEDABinaryDataObject",
        "SEDAPhysicalDataObject",
    )
85
86


87
88
89
90
91
92
93
94
def _concept_value(concept, language):
    """Return string value to be inserted in a SEDA export for the given concept.

    * `concept` may be None, in which case None will be returned

    * `language` is the language matching the exported format (one of 'seda-2', 'seda-1' or
      'seda-02')
    """
95
    assert language in ("seda-2", "seda-1", "seda-02")
96
97
    if concept is None:
        return None
98
    for code in (language, "seda", "en", "fr"):
99
100
101
102
        try:
            return concept.labels[code]
        except KeyError:
            continue
103
    return concept.label()
104
105


106
107
def xmlid(entity):
    """Return a value usable as ID/IDREF for the given entity."""
108
    return entity.cw_adapt_to("IXmlId").id()
109
110


111
def serialize(value, build_url):
112
113
114
    """Return typed `value` as an XSD string."""
    if value is None:
        return None
115
116
    if hasattr(value, "eid"):
        if value.cw_etype == "ConceptScheme":
117
            return build_url(value)
118
119
        if value.cw_etype == "Concept":
            return _concept_value(value, "seda-2")
120
        if _internal_reference(value):
121
            return xmlid(value)
122
123
        return None  # intermediary entity
    if isinstance(value, bool):
124
        return "true" if value else "false"
Noé Gaumont's avatar
Noé Gaumont committed
125
126
    assert isinstance(value, str), repr(value)
    return value
127
128


129
130
def integrity_cardinality(data_object):
    minvalue, maxvalue = minmax_cardinality(data_object.user_cardinality)
131
    itree = data_object.cw_adapt_to("ITreeBase")
132
133
134
135
136
137
138
    for parent in itree.iterancestors():
        try:
            parent_cardinality = parent.user_cardinality
        except AttributeError:
            continue
        minc, maxc = minmax_cardinality(parent_cardinality)
        minvalue = min(minc, minvalue)
139
        if maxc == graph_nodes.INFINITY or maxvalue == graph_nodes.INFINITY:
140
            maxvalue = graph_nodes.INFINITY
141
142
        else:
            maxvalue = max(maxc, maxvalue)
143
    if maxvalue == graph_nodes.INFINITY:
144
        maxvalue = "n"
145
    if minvalue == maxvalue == 1:
146
147
        return "1"
    return "{}..{}".format(minvalue, maxvalue)
148
149


150
def minmax_cardinality(string_cardinality, _allowed=("0..1", "0..n", "1", "1..n")):
151
152
153
    """Return (minimum, maximum) cardinality for the cardinality as string (one of '0..1', '0..n',
    '1' or '1..n').
    """
154
155
156
157
158
    assert string_cardinality in _allowed, "%s not allowed %s" % (
        string_cardinality,
        _allowed,
    )
    if string_cardinality[0] == "0":
159
160
161
        minimum = 0
    else:
        minimum = 1
162
    if string_cardinality[-1] == "n":
163
164
165
166
167
168
        maximum = graph_nodes.INFINITY
    else:
        maximum = 1
    return minimum, maximum


169
def element_minmax_cardinality(occ, card_entity):
170
171
172
    """Return (minimum, maximum) cardinality for the given pyxst Occurence and entity.

    Occurence 's cardinality may be overriden by the entity's user_cardinality value.
173
    """
174
    cardinality = getattr(card_entity, "user_cardinality", None)
175
    if cardinality is None:
176
        return occ.minimum, occ.maximum
177
    else:
178
        return minmax_cardinality(cardinality)
179
180
181
182
183
184


def attribute_minimum_cardinality(occ, card_entity):
    """Return 0 or 1 for the given pyxst attribute's Occurence. Cardinality may be overriden by
    the data model's user_cardinality value.
    """
185
    cardinality = getattr(card_entity, "user_cardinality", None)
186
    if cardinality is None:
187
        return occ.minimum
188
    else:
189
        return minmax_cardinality(cardinality, ("0..1", "1"))[0]
190
191


192
193
194
195
196
197
198
199
200
201
202
203
def iter_path_children(xselement, entity):
    """Return an iterator on `entity` children entities according to `xselement` definition.

    (`path`, `target`) is returned with `path` the path definition leading to the target, and
    `target` either a final value in case of attributes or a list of entities.
    """
    for rtype, role, _path in XSDM_MAPPING.iter_rtype_role(xselement.local_name):
        if _path[0][2] in BASE_TYPES:
            # entity attribute
            if getattr(entity, rtype) is not None:
                yield _path, getattr(entity, rtype)
        else:
204
            related = entity.related(rtype, role, entities=True)
205
206
207
208
            if related:
                yield _path, related


209
210
211
212
213
214
215
216
217
218
219
220
221
class RNGMixin(object):
    """Mixin class providing some Relax NG schema generation helper methods."""

    def rng_element_parent(self, parent, minimum, maximum=1):
        """Given a etree node and minimum/maximum cardinalities of a desired child element,
        return suitable parent node for it.

        This will be one of rng:optional, rng:zeroOrMore or rng:oneOrMore that will be created by
        this method or the given parent itself if minimum == maximum == 1.
        """
        if minimum == 1 and maximum == 1:
            return parent
        elif minimum == 0 and maximum == 1:
222
            return self.element("rng:optional", parent)
223
        elif minimum == 0 and maximum == graph_nodes.INFINITY:
224
            return self.element("rng:zeroOrMore", parent)
225
        elif minimum == 1 and maximum == graph_nodes.INFINITY:
226
            return self.element("rng:oneOrMore", parent)
227
        else:
228
            assert False, ("unexpected min/max cardinality:", minimum, maximum)
229
230
231
232
233
234
235
236
237
238
239

    def rng_attribute_parent(self, parent, minimum):
        """Given a etree node and minimum cardinality of a desired attribute,
        return suitable parent node for it.

        This will be rng:optional that will be created by this method or the given parent itself if
        minimum == 1.
        """
        if minimum == 1:
            return parent
        else:
240
            return self.element("rng:optional", parent)
241

242
243
244
    def rng_value(
        self, element, qualified_datatype, fixed_value=None, default_value=None
    ):
245
246
247
248
        """Given a (etree) schema element, a data type (e.g. 'xsd:token') and an optional fixed
        value, add RNG declaration to the element to declare the datatype and fix the value if
        necessary.
        """
249
250
        prefix, datatype = qualified_datatype.split(":")
        if prefix != "xsd":
251
252
253
254
            # XXX RelaxNG compatible version of custom types? this would allow
            # `type_attrs['datatypeLibrary'] = self.namespaces[prefix]`. In the mean time, turn
            # every custom type to string, supposing transfer are also checked against the original
            # schema (as agape v1 was doing).
255
256
            datatype = "string"
        type_attrs = {"type": datatype}
257
        if fixed_value is not None:
258
            if isinstance(fixed_value, (tuple, list)):
259
                choice = self.element("rng:choice", element)
260
                for value in fixed_value:
261
                    self.element("rng:value", choice, type_attrs, text=value)
262
            else:
263
                self.element("rng:value", element, type_attrs, text=fixed_value)
264
        elif default_value is not None:
265
266
            element.attrib[self.qname("a:defaultValue")] = default_value
            self.element("rng:data", element, type_attrs)
267
        else:
268
            self.element("rng:data", element, type_attrs)
269
270


271
class SEDA2ExportAdapter(EntityAdapter):
Sylvain Thénault's avatar
Sylvain Thénault committed
272
    """Abstract base class for export of SEDA profile."""
273

274
    __abstract__ = True
275
276
277
    __select__ = is_instance("SEDAArchiveTransfer")
    encoding = "utf-8"
    content_type = "application/xml"
278
279
    # to be defined in concret implementations
    namespaces = {}
280
281
282
283
284
285
286
    _root_attributes = {}

    @property
    def root_attributes(self):
        if self.entity.compat_list is None:
            # uncommited transfer may occurs during tests
            return self._root_attributes
287
288
        diag = "" if "RNG" in self.entity.compat_list else "rng-ambiguous"
        attributes = {"seda:warnings": diag}
289
290
        attributes.update(self._root_attributes)
        return attributes
291

292
293
294
295
296
297
    def dump(self, _encoding=None):
        """Return an schema string for the adapted SEDA profile

        _encoding will be used as "encoding" argument of lxml's tostring, in
        order to retrieve a unicode string. This is useful for tests.
        """
298
        root = self.dump_etree()
299
300
301
        kwargs = {}
        if _encoding is None:
            # We only want the XML declaration at all if _encoding is not specified.
302
303
            kwargs["standalone"] = False
        kwargs["encoding"] = _encoding or self.encoding
304
        return etree.tostring(root, pretty_print=True, **kwargs)
305
306
307
308
309
310
311
312
313
314
315
316

    def dump_etree(self):
        """Return an XSD etree for the adapted SEDA profile."""
        raise NotImplementedError()

    def qname(self, tag):
        return substitute_xml_prefix(tag, self.namespaces)

    def element(self, tag, parent=None, attributes=None, text=None):
        """Generic method to build a XSD element tag.

        Params:
317

Sylvain Thénault's avatar
Sylvain Thénault committed
318
        * `tag`, tag name of the element
319
320
321

        * `parent`, the parent etree node

Sylvain Thénault's avatar
Sylvain Thénault committed
322
323
324
325
        * `attributes`, dictionary of attributes - may contain a special 'documentation' attribute
          that will be added in a xsd:annotation node

        * `text`, textual content of the tag if any
326
327
328
        """
        attributes = attributes or {}
        tag = self.qname(tag)
329
        documentation = attributes.pop("documentation", None)
330
        for attr, value in list(attributes.items()):
331
332
333
334
335
336
337
338
339
340
341
            newattr = substitute_xml_prefix(attr, self.namespaces)
            attributes[newattr] = value
            if newattr != attr:
                attributes.pop(attr)
        if parent is None:
            elt = etree.Element(tag, attributes, nsmap=self.namespaces)
        else:
            elt = etree.SubElement(parent, tag, attributes)
        if text is not None:
            elt.text = text
        if documentation:
342
343
            annot = self.element("xsd:annotation", elt)
            self.element("xsd:documentation", annot).text = documentation
344
345
346
        return elt

    def dispatch_occ(self, profile_element, occ, target_value, to_process, card_entity):
347
        callback = getattr(self, "element_" + occ.target.__class__.__name__.lower())
348
349
        callback(occ, profile_element, target_value, to_process, card_entity)

350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
    def _dump(self, root):
        entity = self.entity
        xselement = XSDM_MAPPING.root_xselement
        transfer_element = self.init_transfer_element(xselement, root, entity)
        to_process = defaultdict(list)
        to_process[xselement].append((entity, transfer_element))
        # first round to ensure we have necessary basic structure
        for xselement, etype, child_defs in XSDM_MAPPING:
            # print 'PROCESS', getattr(xselement, 'local_name', xselement.__class__.__name__), etype
            for entity, profile_element in to_process.pop(xselement, ()):
                assert etype == entity.cw_etype
                self._process(entity, profile_element, child_defs, to_process)
        # then process remaining elements
        # print 'STARTING ROUND 2'
        while to_process:
            xselement = next(iter(to_process))
            entities_profiles = to_process.pop(xselement, ())
            if entities_profiles:
                try:
                    etype, child_defs = XSDM_MAPPING[xselement]
                except KeyError:
371
372
                    # element has no children
                    continue
373
374
375
376
377
378
                for entity, profile_element in entities_profiles:
                    assert etype == entity.cw_etype
                    self._process(entity, profile_element, child_defs, to_process)

        assert not to_process, to_process

379
380
381
382
383
384
    def _process(self, entity, profile_element, child_defs, to_process):
        for occ, path in child_defs:
            # print '  child', getattr(occ.target, 'local_name', occ.target.__class__.__name__), \
            #    [x[:-1] for x in path]
            if not path:
                assert not isinstance(occ.target, graph_nodes.XMLAttribute)
385
                assert occ.target.local_name in JUMP_ELEMENTS, occ.target
386
                if occ.minimum == 0 and not any(iter_path_children(occ.target, entity)):
387
388
                    # element has no children, skip it
                    continue
389
390
391
392
                if occ.target.local_name in JUMPED_OPTIONAL_ELEMENTS:
                    # elements in JUMPED_OPTIONAL_ELEMENTS are jumped but have optional cardinality,
                    # so search in all it's child element, and mark it as mandatory if one of them
                    # is mandatory, else keep it optional
393
                    cardinality = "0..1"
394
                    for _path, target in iter_path_children(occ.target, entity):
395
396
397
                        if _path[0][2] in BASE_TYPES:
                            # special case of a mandatory attribute: parent element will be
                            # mandatory if some value is specified, else that's fine
398
                            if target is not None:
399
                                cardinality = "1"
400
                                break
401
402
                        elif any(te.user_cardinality == "1" for te in target):
                            cardinality = "1"
403
404
405
406
407
                            break
                else:
                    cardinality = None
                # jumped element: give None as target_value but register the generated element for
                # later processing
408
409
410
411
412
413
414
415
416
417
                self.dispatch_occ(
                    profile_element,
                    occ,
                    None,
                    to_process,
                    card_entity=attrdict({"user_cardinality": cardinality}),
                )
                to_process[occ.target].append(
                    (entity, self.jumped_element(profile_element))
                )
418
419
420
            else:
                # print '  values', _path_target_values(entity, path)
                for card_entity, target_value in _path_target_values(entity, path):
421
422
423
424
425
426
427
                    self.dispatch_occ(
                        profile_element,
                        occ,
                        target_value,
                        to_process,
                        card_entity=card_entity,
                    )
428

429
430
431
432
433
434
435
436
    def init_transfer_element(self, xselement, root, entity):
        """Initialize and return the XML element holding the ArchiveTransfer definition, as well as
        any other necessary global definitions.
        """
        raise NotImplementedError()

    def jumped_element(self, profile_element):
        """Return the last generated element, for insertion of its content."""
437
438
        raise NotImplementedError()

439
440
441
442
443
444
445
446
    @staticmethod
    def cwuri_url(entity):
        """Return "public" URI for the given entity.

        In a staticmethod to ease overriding in subclasses (eg saem).
        """
        return entity.cwuri

447

448
class SEDA2RelaxNGExport(RNGMixin, SEDA2ExportAdapter):
449
450
451
452
    """Abstract Adapter to build a Relax NG representation of a SEDA profile, using SEDA 2.1 specification."""

    __regid__ = "SEDA-2.1.rng"
    __abstract__ = True
453

454
    namespaces = {
455
456
457
458
459
460
461
        None: "fr:gouv:culture:archivesdefrance:seda:v2.1",
        "seda": "fr:gouv:culture:archivesdefrance:seda:v2.1",
        "xml": "http://www.w3.org/XML/1998/namespace",
        "xsd": "http://www.w3.org/2001/XMLSchema",
        "xlink": "http://www.w3.org/1999/xlink",
        "rng": "http://relaxng.org/ns/structure/1.0",
        "a": "http://relaxng.org/ns/compatibility/annotations/1.0",
462
    }
463

464
    _root_attributes = {
465
466
        "ns": "fr:gouv:culture:archivesdefrance:seda:v2.1",
        "datatypeLibrary": "http://www.w3.org/2001/XMLSchema-datatypes",
467
468
469
    }

    def dump_etree(self):
Nicolas Chauvat's avatar
Nicolas Chauvat committed
470
        """Return a RelaxNG etree for the adapted SEDA profile."""
471
472
        root = self.element("rng:grammar", attributes=self.root_attributes)
        start = self.element("rng:start", root)
473
474
475
476
477
478
        # XXX http://lists.xml.org/archives/xml-dev/200206/msg01074.html ?
        # self.element('xsd:import', parent=root,
        #              attributes={'namespace': 'http://www.w3.org/1999/xlink',
        #                          'schemaLocation': 'http://www.w3.org/1999/xlink.xsd'})
        self._dump(start)

479
480
481
482
483
484
        open_type = self.element("rng:define", root, {"name": "OpenType"})
        open_elt = self._create_hierarchy(open_type, ["rng:zeroOrMore", "rng:element"])
        self.element("rng:anyName", open_elt)
        self._create_hierarchy(
            open_elt, ["rng:zeroOrMore", "rng:attribute", "rng:anyName"]
        )
485
486
487
488

        # add a 'text' node to empty rng:element to satisfy the RNG grammar
        namespaces = self.namespaces.copy()
        del namespaces[None]  # xpath engine don't want None prefix
489
490
        for element in root.xpath("//rng:element[not(*)]", namespaces=namespaces):
            self.element("rng:text", element)
491
492

        self.postprocess_dataobjects(root, namespaces)
493
494
        return root

495
496
497
498
    def postprocess_dataobjects(self, root, namespaces):
        """Insert rng:group node as parent of [Binary|Physical]DataObject node
        to avoid forcing an order among them
        """
499
        raise NotImplementedError()
500

501
    def init_transfer_element(self, xselement, root, entity):
502
503
504
505
506
        transfer_element = self.element(
            "rng:element",
            root,
            {"name": xselement.local_name, "documentation": entity.user_annotation},
        )
507
        exc = self._create_hierarchy(
508
509
510
511
512
            transfer_element,
            ["rng:zeroOrMore", "rng:attribute", "rng:anyName", "rng:except"],
        )
        self.element("rng:nsName", exc)
        self.element("rng:nsName", exc, {"ns": ""})
513
        return transfer_element
514

515
516
    def jumped_element(self, profile_element):
        element = profile_element[-1]
517
        if element.tag != "{http://relaxng.org/ns/structure/1.0}element":
518
519
            # optional, zeroOrMore, etc.: should pick their child element
            element = element[-1]
520
521
522
            assert (
                element.tag == "{http://relaxng.org/ns/structure/1.0}element"
            ), element
523
        return element
524

525
526
527
    def element_alternative(
        self, occ, profile_element, target_value, to_process, card_entity
    ):
528
        parent_element = self._rng_element_parent(occ, card_entity, profile_element)
529
        target_element = self.element("rng:choice", parent_element)
530
531
        to_process[occ.target].append((target_value, target_element))

532
533
534
    def element_sequence(
        self, occ, profile_element, target_value, to_process, card_entity
    ):
535
        parent_element = self._rng_element_parent(occ, card_entity, profile_element)
536
        target_element = self.element("rng:group", parent_element)  # XXX sequence
537
        to_process[occ.target].append((target_value, target_element))
538

539
540
541
    def element_xmlattribute(
        self, occ, profile_element, target_value, to_process, card_entity
    ):
542
        parent_element = self._rng_attribute_parent(occ, card_entity, profile_element)
543
544
545
        self._rng_attribute(
            occ.target, parent_element, serialize(target_value, self.cwuri_url)
        )
546

547
548
549
    def element_xmlelement(
        self, occ, profile_element, target_value, to_process, card_entity
    ):  # noqa
550
551
        parent_element = self._rng_element_parent(occ, card_entity, profile_element)
        xselement = occ.target
552
553
554
555
556
557
558
559
560
561
        attrs = {
            "documentation": getattr(card_entity, "user_annotation", None),
            "name": xselement.local_name,
        }
        if xselement.local_name == "Signature":
            element = self.element("rng:element", parent_element, attrs)
            self.element("rng:ref", element, {"name": "OpenType"})
        elif isinstance(
            occ, dict
        ):  # fake occurence introduced for some elements'content
562
563
564
            # target element has already been introduced: it is now given as profile_element
            self.fill_element(xselement, profile_element, target_value, card_entity)
        else:
565
            target_element = self.element("rng:element", parent_element, attrs)
566
567
568
            xstypes = content_types(xselement.textual_content_type)
            if xstypes:
                if len(xstypes) == 1:
569
570
                    parent_element = target_element
                else:
571
                    parent_element = self.element("rng:choice", target_element)
572
                for xstype in xstypes:
573
574
575
576
577
578
579
580
                    self.fill_element(
                        xselement,
                        parent_element,
                        target_value,
                        card_entity,
                        xstype=xstype,
                        copy_attributes=True,
                    )
581
582
            else:
                # target is a complex element
583
584
                if getattr(target_value, "eid", None):  # value is an entity
                    if target_value.cw_etype == "AuthorityRecord":
585
                        self.fill_organization_element(target_element, target_value)
586
                elif xselement.local_name in ("ArchivalAgency", "TransferringAgency"):
587
588
589
                    self.fill_organization_element(target_element, None)
                elif target_value is not None:
                    assert False, (xselement, target_value)
590
            if getattr(target_value, "eid", None):  # value is an entity
591
592
                to_process[xselement].append((target_value, target_element))

593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
    def fill_element(
        self,
        xselement,
        profile_element,
        value,
        card_entity,  # noqa
        copy_attributes=False,
        xstype=None,
    ):
        if xselement.local_name == "KeywordType":
            attr = self.element(
                "rng:attribute",
                attributes={"name": "listVersionID"},
                parent=self.element("rng:optional", profile_element),
            )
608
609
            if value:
                list_value = value.scheme.description or value.scheme.dc_title()
610
611
                attrs = {"type": xstype} if xstype else {}
                self.element("rng:value", attr, attrs, text=list_value)
612
            else:
613
                attr.attrib[self.qname("a:defaultValue")] = "edition 2009"
614

615
616
617
618
        elif xselement.local_name == "KeywordReference" and card_entity.scheme:
            self.concept_scheme_attribute(
                xselement, profile_element, card_entity.scheme
            )
619

620
        elif getattr(value, "cw_etype", None) == "Concept":
621
622
623
624
            self.concept_scheme_attribute(xselement, profile_element, value.scheme)

        elif copy_attributes:
            for attrname, occ in xselement.attributes.items():
625
626
627
628
629
630
                if attrname in ("id", "href") or attrname.startswith(
                    ("list", "scheme")
                ):
                    parent_element = self._rng_attribute_parent(
                        occ, None, profile_element
                    )
631
                    self._rng_attribute(occ.target, parent_element)
632
633
        # special case for KeywordReference content, the only known case where we want URL instead
        # of label of its concept value
634
        if value is not None and xselement.local_name == "KeywordReference":
635
            fixed_value = self.cwuri_url(value)
636
        elif isinstance(value, (tuple, list)):
637
            fixed_value = [serialize(val, self.cwuri_url) for val in value]
638
639
        else:
            fixed_value = serialize(value, self.cwuri_url)
640
        if fixed_value is not None:
641
            if _internal_reference(value):
642
643
                profile_element.attrib[self.qname("a:defaultValue")] = fixed_value
                self.element("rng:data", profile_element, {"type": xstype})
644
            else:
645
646
647
648
                if len(profile_element):
                    # As there is a fixed value search for potential element
                    # tag data and extract its type
                    for elem in profile_element:
649
650
                        if elem.tag == "{http://relaxng.org/ns/structure/1.0}data":
                            xstype = elem.attrib.get("type")
651
652
                            profile_element.remove(elem)
                            break
653
                attrs = {"type": xstype} if xstype else {}
654
                if isinstance(fixed_value, (tuple, list)):
655
                    choice = self.element("rng:choice", profile_element)
656
                    for val in fixed_value:
657
                        self.element("rng:value", choice, attrs, text=val)
658
                else:
659
                    self.element("rng:value", profile_element, attrs, text=fixed_value)
660
        elif xstype is not None:
661
            self.element("rng:data", profile_element, {"type": xstype})
662
663
664

    def concept_scheme_attribute(self, xselement, type_element, scheme):
        try:
665
            scheme_attr = xselement_scheme_attribute(xselement)
666
        except KeyError:
667
            return
668
669
670
671
        scheme_attr = self.element(
            "rng:attribute", type_element, attributes={"name": scheme_attr}
        )
        self.element("rng:value", scheme_attr, text=self.cwuri_url(scheme))
672
673

    def fill_organization_element(self, parent_element, value):
674
675
676
        target_element = self.element(
            "rng:element", parent_element, {"name": "Identifier"}
        )
677
        if value:
678
            self.element("rng:value", target_element, text=self.cwuri_url(value))
679

680
681
    def _rng_element_parent(self, occ, card_entity, profile_element):
        minimum, maximum = element_minmax_cardinality(occ, card_entity)
682
        return self.rng_element_parent(profile_element, minimum, maximum)
683
684

    def _rng_attribute_parent(self, occ, card_entity, profile_element):
685
686
        minimum = attribute_minimum_cardinality(occ, card_entity)
        return self.rng_element_parent(profile_element, minimum)
687
688
689
690

    def _rng_attribute(self, xselement, parent_element, value=None):
        xstypes = content_types(xselement.textual_content_type)
        if len(xstypes) > 1:
691
            parent_element = self.element("rng:choice", parent_element)
692
        for xstype in xstypes:
693
694
695
            attr_element = self.element(
                "rng:attribute", parent_element, {"name": xselement.local_name}
            )
696
            if value is not None:
697
698
699
                if xselement.local_name == "id":
                    attr_element.attrib[self.qname("xml:id")] = value
                    self.element("rng:data", attr_element, {"type": "ID"})
700
                else:
701
702
703
                    self.element(
                        "rng:value", attr_element, {"type": xstype}, text=value
                    )
704
            else:
705
                self.element("rng:data", attr_element, {"type": xstype})
706
707
708
709
710
711
712

    def _create_hierarchy(self, parent, tags):
        for tag in tags:
            parent = self.element(tag, parent)
        return parent


713
714
715
716
717
718
719
720
721
722
723
724
725
726
def _safe_cardinality(entity):
    """Return entity's cardinality if some entity is given, else None."""
    if entity is None:
        return None
    return entity.user_cardinality


def _safe_concept_value(entity, concepts_language):
    """Return entity's targetted concept if some entity is given, else None."""
    if entity is None:
        return None
    return _concept_value(entity.concept, concepts_language)


727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
class SEDA22RelaxNGExport(SEDA2RelaxNGExport):
    __select__ = SEDA2RelaxNGExport.__select__ & wrap_dataobjects()

    def postprocess_dataobjects(self, root, namespaces):
        """Insert rng:group node as parent of [Binary|Physical]DataObject node
        to avoid forcing an order among them
        """
        # start by looking for the rng:element node for DataObjectPackage or its parent rng:optional
        dops = root.xpath(
            "/rng:grammar/rng:start/rng:element/"
            'rng:element[@name="DataObjectPackage"]',
            namespaces=namespaces,
        )
        if not dops:
            dops = root.xpath(
                "/rng:grammar/rng:start/rng:element/"
                'rng:optional/rng:element[@name="DataObjectPackage"]',
                namespaces=namespaces,
            )
        if dops:
            assert len(dops) == 1
            dop = dops[0]
            nodes = dop.xpath(
                'rng:element[@name="BinaryDataObject" or @name="PhysicalDataObject"]',
                namespaces=namespaces,
            )
            opt_nodes = dop.xpath(
                'rng:optional[rng:element[@name="BinaryDataObject" or @name="PhysicalDataObject"]]',
                namespaces=namespaces,
            )
            if nodes or opt_nodes:
                # insert after definition of dop's id attribute
                for node in chain(nodes, opt_nodes):
760
761
762
                    zeroormore = self.element("rng:zeroOrMore")
                    choice = self.element("rng:choice", zeroormore)
                    dop[0].addnext(zeroormore)
763
764
                    # insert DataObjectGroup
                    group = self.element(
765
                        "rng:element", choice, attributes={"name": "DataObjectGroup"}
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
                    )
                    group.append(node)


class SEDA21RelaxNGExport(SEDA2RelaxNGExport):
    __select__ = SEDA2RelaxNGExport.__select__ & ~wrap_dataobjects()

    def postprocess_dataobjects(self, root, namespaces):
        """Insert rng:group node as parent of [Binary|Physical]DataObject node
        to avoid forcing an order among them
        """
        # start by looking for the rng:element node for DataObjectPackage or its parent rng:optional
        dops = root.xpath(
            "/rng:grammar/rng:start/rng:element/"
            'rng:element[@name="DataObjectPackage"]',
            namespaces=namespaces,
        )
        if not dops:
            dops = root.xpath(
                "/rng:grammar/rng:start/rng:element/"
                'rng:optional/rng:element[@name="DataObjectPackage"]',
                namespaces=namespaces,
            )
        if dops:
            assert len(dops) == 1
            dop = dops[0]
            nodes = dop.xpath(
                'rng:element[@name="BinaryDataObject" or @name="PhysicalDataObject"]',
                namespaces=namespaces,
            )
            opt_nodes = dop.xpath(
                'rng:optional[rng:element[@name="BinaryDataObject" or @name="PhysicalDataObject"]]',
                namespaces=namespaces,
            )
            if nodes or opt_nodes:
                group = self.element("rng:group")
                # insert after definition of dop's id attribute
                dop[0].addnext(group)
                for node in chain(nodes, opt_nodes):
                    group.append(node)


class XAttr(
    namedtuple("_XAttr", ["name", "qualified_type", "cardinality", "fixed_value"])
):
811
    """Simple representation of an attribute element in a schema (RNG or XSD).
812

813
    Parameters:
814

815
    * `name`, the attribute's name,
816

817
    * `qualified_type`, its qualified type (e.g. 'xsd:string'),
818

819
820
821
822
    * `cardinality`, optional cardinality as string (None, '1' or '0..1') - default to '1' if some
      fixed value is provided, else to None (i.e. attribute is prohibited),

    * `fixed_value`, optional fixed value for the attribute.
823
824

    """
825
826
827

    def __new__(cls, name, qualified_type, cardinality="0..1", fixed_value=None):
        assert cardinality in (None, "1", "0..1"), cardinality
828
        if fixed_value:
829
            cardinality = "1"
830
            if isinstance(fixed_value, (tuple, list)) and len(fixed_value) == 1:
831
832
833
                fixed_value = fixed_value[0]
        else:
            fixed_value = None
834
835
836
        return super(XAttr, cls).__new__(
            cls, name, qualified_type, cardinality, fixed_value
        )
837
838


839
840
LIST_VERSION_ID_2009 = XAttr("listVersionID", "xsd:token", "1", "edition 2009")
LIST_VERSION_ID_2011 = XAttr("listVersionID", "xsd:token", "1", "edition 2011")
841
842


843
class SEDA1XSDExport(SEDA2ExportAdapter):
844
845
846
    """Adapter to build an XSD representation of a simplified SEDA profile, using SEDA 1.0
    specification.

Élodie Thiéblin's avatar
Élodie Thiéblin committed
847
    The SEDA2XSDExport implementation may be driven by the SEDA 2.1 XSD model because it's used as
848
849
850
    the basis for the Yams model generation. We can't do the same thing with lower version of SEDA,
    hence the limitation to simplified profile, and a direct implementation of the export.
    """
851
852

    __regid__ = "SEDA-1.0.xsd"
853
    __select__ = SEDA2ExportAdapter.__select__ & simplified_profile()
854
855

    namespaces = {
856
857
858
859
860
861
862
863
864
865
        None: "fr:gouv:culture:archivesdefrance:seda:v1.0",
        "seda": "fr:gouv:culture:archivesdefrance:seda:v2.1",
        "xml": "http://www.w3.org/XML/1998/namespace",
        "xsd": "http://www.w3.org/2001/XMLSchema",
        "qdt": "fr:gouv:culture:archivesdefrance:seda:v1.0:QualifiedDataType:1",
        "udt": "urn:un:unece:uncefact:data:standard:UnqualifiedDataType:10",
        "clmDAFFileTypeCode": "urn:un:unece:uncefact:codelist:draft:DAF:fileTypeCode:2009-08-18",
        "clmIANACharacterSetCode": "urn:un:unece:uncefact:codelist:standard:IANA:CharacterSetCode:2007-05-14",
        "clmIANAMIMEMediaType": "urn:un:unece:uncefact:codelist:standard:IANA:MIMEMediaType:2008-11-12",
        "clm60133": "urn:un:unece:uncefact:codelist:standard:6:0133:40106",
866
    }
867
    _root_attributes = {
868
869
870
871
        "targetNamespace": "fr:gouv:culture:archivesdefrance:seda:v1.0",
        "attributeFormDefault": "unqualified",
        "elementFormDefault": "qualified",
        "version": "1.0",
872
873
    }

874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
    concepts_language = "seda-1"

    def element_schema(
        self,
        parent,
        name,
        xsd_type=None,
        fixed_value=None,
        default_value=None,
        cardinality="1",
        documentation=None,
        xsd_attributes=(),
        extra_attributes=None,
    ):
        assert not (
            fixed_value and default_value
        ), "only one of fixed_value or default_value may be specified"
        attributes = {"name": name}
892
893
        if extra_attributes is not None:
            attributes.update(extra_attributes)
894
        if fixed_value is not None:
895
            attributes["fixed"] = str(fixed_value)
896
        elif default_value is not None:
897
            attributes["default"] = str(default_value)
898
        if xsd_type is not None and not xsd_attributes:
899
900
901
902
903
904
905
            attributes["type"] = xsd_type
        assert cardinality in ("0..1", "0..n", "1", "1..n")
        if cardinality != "1":
            if cardinality[0] == "0":
                attributes["minOccurs"] = "0"
            if cardinality[-1] == "n":
                attributes["maxOccurs"] = "unbounded"
906
        if documentation:
907
908
            attributes["documentation"] = documentation
        element = self.element("xsd:element", parent, attributes)
909
910
        children_parent = None
        if xsd_type is None:
911
912
            attributes_parent = self.element("xsd:complexType", element)
            children_parent = self.element("xsd:sequence", attributes_parent)
913
        elif xsd_attributes:
914
915
916
917
918
            ct = self.element("xsd:complexType", element)
            scontent = self.element("xsd:simpleContent", ct)
            attributes_parent = self.element(
                "xsd:extension", scontent, {"base": xsd_type}
            )
919
920
        for xattr in xsd_attributes:
            self.attribute_schema(attributes_parent, xattr)
921
922
        return children_parent

923
    def attribute_schema(self, parent, xattr):
924
        attrs = {"name": xattr.name}
925
        if xattr.cardinality is None:
926
927
928
            attrs["use"] = "prohibited"
        elif xattr.cardinality == "1":
            attrs["use"] = "required"
929
        else:
930
            attrs["use"] = "optional"
931
        if not isinstance(xattr.fixed_value, (tuple, list)):
932
            attrs["type"] = xattr.qualified_type
Noé Gaumont's avatar
Noé Gaumont committed
933
            if isinstance(xattr.fixed_value, str):
934
935
                attrs["fixed"] = xattr.fixed_value
        attribute_element = self.element("xsd:attribute", parent, attrs)
936
        if isinstance(xattr.fixed_value, (tuple, list)):
937
938
939
940
            type_element = self.element("xsd:simpleType", attribute_element)
            restriction_element = self.element(
                "xsd:restriction", type_element, {"base": "xsd:token"}
            )
941
            for value in xattr.fixed_value:
942
                self.element("xsd:enumeration", restriction_element, {"value": value})
943

944
945
946
947
    # business visit methods #######################################################################

    def dump_etree(self):
        """Return an XSD etree for the adapted SEDA profile."""
948
        root = self.element("xsd:schema", attributes=self.root_attributes)
949
950
951
952
953
954
955
956
        # self.element('xsd:import', parent=root,
        #              attributes={'namespace': 'http://www.w3.org/XML/1998/namespace',
        #                          'schemaLocation': 'http://www.w3.org/2001/xml.xsd'})
        self.xsd_transfer(root, self.entity)
        return root

    def xsd_transfer(self, parent, archive_transfer):
        """Append XSD elements for the archive transfer to the given parent node."""
957
        transfer_node = self.xsd_transfer_base(parent, archive_transfer)
958
        for archive_unit in archive_transfer.archive_units:
959
960
961
962
            self.xsd_archive(transfer_node, archive_unit)

    def xsd_archive(self, parent, archive_unit):
        """Append XSD elements for an archive to the given parent node."""
963
        archive_node = self.element_schema(
964
965
            parent,
            "Archive",
966
967
            cardinality=archive_unit.user_cardinality,
            documentation=archive_unit.user_annotation,
968
969
            xsd_attributes=[XAttr("Id", "xsd:ID")],
            extra_attributes={"xml:id": xmlid(archive_unit)},
970
        )
971
        transfer = archive_unit.cw_adapt_to("ITreeBase").parent()
972
        self.xsd_archival_agreement(archive_node, transfer)
973
        # hard-coded description's language XXX fine, content language may be specified
974
975
976
977
978
979
980
        self.element_schema(
            archive_node,
            "DescriptionLanguage",
            "qdt:CodeLanguageType",
            fixed_value="fra",
            xsd_attributes=[LIST_VERSION_ID_2011],
        )
981
        name_entity = self.archive_unit_name(archive_unit)
982
983
984
985
986
987
988
989
        self.element_schema(
            archive_node,
            "Name",
            "udt:TextType",
            fixed_value=name_entity.title,
            documentation=name_entity.user_annotation,
            xsd_attributes=[XAttr("languageID", "xsd:language")],
        )
990
        content_entity = self.archive_unit_content(archive_unit)
991
992
993
        self.xsd_transferring_agency_archive_identifier(
            archive_node, content_entity, "TransferringAgencyArchiveIdentifier"
        )
994
        self.xsd_content_description(archive_node, content_entity)
995
        self.xsd_rules(archive_node, archive_unit)
996
997
        self.xsd_children(archive_node, archive_unit)

998
    archive_object_tag_name = "ArchiveObject"
999

1000
1001
    def xsd_archive_object(self, parent, archive_unit):
        """Append XSD elements for the archive object to the given parent node."""
1002
        ao_node = self.element_schema(
1003
1004
            parent,
            self.archive_object_tag_name,
1005
1006
            cardinality=archive_unit.user_cardinality,
            documentation=archive_unit.user_annotation,
1007
1008
            xsd_attributes=[XAttr("Id", "xsd:ID")],
            extra_attributes={"xml:id": xmlid(archive_unit)},
1009
        )
1010
        content_entity = self.archive_unit_content(archive_unit)
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
        self.element_schema(
            ao_node,
            "Name",
            "udt:TextType",
            fixed_value=content_entity.title.title,
            documentation=content_entity.title.user_annotation,
            xsd_attributes=[XAttr("languageID", "xsd:language")],
        )
        self.xsd_transferring_agency_archive_identifier(
            ao_node, content_entity, "TransferringAgencyObjectIdentifier"
        )
        if (
            self.__regid__.startswith("SEDA-1.0")
            or content_entity.start_date
            or content_entity.end_date
            or content_entity.description
            or content_entity.keywords
        ):