test_profile_generation.py 52.8 KB
Newer Older
1
# copyright 2016-2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
"""cubicweb-seda unit tests for XSD profile generation.

You may want to set the TEST_WRITE_SEDA_FILES environment variable to activate
writing of generated content back to the file-system.
"""
21

22
from doctest import Example
23
from itertools import chain, izip, repeat
24
25
import os
from os.path import basename, join
26
import unittest
27

28
from six import binary_type, text_type
29

30
from lxml import etree
31
from lxml.doctestcompare import LXMLOutputChecker
32

33
34
from logilab.common.decorators import cached

35
36
from cubicweb.devtools.testlib import CubicWebTC

37
from cubicweb_seda.xsd2yams import XSDMMapping
38
from cubicweb_seda.entities import profile_generation as pg
39

40
from cubicweb_seda import testutils
41

42
43

class XmlTestMixin(object):
Sylvain Thénault's avatar
Sylvain Thénault committed
44
    """Mixin class providing additional assertion methods for checking XML data."""
45
46
47
48
49
50
    NAMESPACES = {
        'xs': 'http://www.w3.org/2001/XMLSchema',
        'seda': 'fr:gouv:culture:archivesdefrance:seda:v2.0',
        'rng': 'http://relaxng.org/ns/structure/1.0',
        'a': 'http://relaxng.org/ns/compatibility/annotations/1.0',
    }
51
52
53
54
    # TBD in concret implementation
    schema_class = None
    schema_file = None
    adapter_id = None
55

56
57
58
59
    @classmethod
    @cached
    def schema(cls, xsd_filename):
        with open(xsd_filename) as stream:
60
            return cls.schema_class(etree.parse(stream))
61

62
63
64
    def xpath(self, element, expression):
        return element.xpath(expression, namespaces=self.NAMESPACES)

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
    def get_element(self, profile, name):
        """Return etree element definition for 'name' (there should be only one)"""
        elements = self.get_elements(profile, name)
        self.assertEqual(len(elements), 1)
        return elements[0]

    def get_elements(self, profile, name):
        """Return etree element definitions for 'name'"""
        raise NotImplementedError()

    def get_attribute(self, profile, name):
        """Return etree attribute definition for 'name' (there should be only one)"""
        attributes = self.get_attributes(profile, name)
        self.assertEqual(len(attributes), 1)
        return attributes[0]

    def get_attributes(self, profile, name):
        """Return etree attribute definitions for 'name'"""
        raise NotImplementedError()

85
    def profile_etree(self, transfer_entity, adapter_id=None):
86
        """Return etree representation of profile's XSD for the given transfer entity."""
87
        adapter = transfer_entity.cw_adapt_to(adapter_id or self.adapter_id)
88
        root = adapter.dump_etree()
89
90
91
92
93
        self.assertXmlValid(root)
        return root

    def assertXmlValid(self, root):
        """Validate an XML etree according to an XSD file (.xsd)."""
94
95
        schema = self.schema(self.datapath(self.schema_file))
        schema.assert_(root)
96

97
98
99
100
101
102
103
    def assertXmlEqual(self, actual, expected):
        """Check that both XML strings represent the same XML tree."""
        checker = LXMLOutputChecker()
        if not checker.check_output(expected, actual, 0):
            message = checker.output_difference(Example("", expected), actual, 0)
            self.fail(message)

104
105
    def check_xsd_profile(self, root, sample_file, **substitutions):
        """Check that the SEDA profile can be used to validate a sample XML document."""
106
107
108
109
110
        if os.environ.get('TEST_WRITE_SEDA_FILES'):
            fname = join('/tmp', basename(sample_file))
            with open(fname, 'w') as stream:
                stream.write(etree.tostring(root, pretty_print=True))
            print('Generated profile saved to {}'.format(fname))
111
112
113
114
115
        profile = self.schema_class(root)
        with open(sample_file) as f:
            sample_xml_string = f.read().format(**substitutions)
        profile.assert_(etree.fromstring(sample_xml_string))

116
117
118
119
120
121
122
123

class XMLSchemaTestMixin(XmlTestMixin):
    """Mixin extending XmlTestMixin with implementation of some assert methods for XMLSchema."""

    schema_class = etree.XMLSchema
    schema_file = 'XMLSchema.xsd'
    adapter_id = 'SEDA-2.0.xsd'

124
    def get_elements(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
125
        return self.xpath(profile, '//xs:element[@name="{0}"]'.format(name))
126
127

    def get_attributes(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
128
        return self.xpath(profile, '//xs:attribute[@name="{0}"]'.format(name))
129
130
131

    def assertElementDefinition(self, element, expected):
        edef = dict(element.attrib)
132
133
134
        types = self.xpath(element, 'xs:complexType/xs:simpleContent/xs:extension')
        assert len(types) <= 1
        if types:
135
136
137
138
139
140
141
            edef['type'] = types[0].attrib['base']
        self.assertEqual(edef, expected)

    def assertAttributeDefinition(self, element, expected):
        edef = dict(element.attrib)
        edef.setdefault('use', 'required')
        self.assertEqual(edef, expected)
142
143
144
145
146
147
148
149
150

    def assertXSDAttributes(self, element, expected_attributes):
        # attributes for regular elements
        attrs = [x.attrib for x in self.xpath(element, 'xs:complexType/xs:attribute')]
        # attributes for simple elements
        attrs += [x.attrib for x in self.xpath(
            element, 'xs:complexType/xs:simpleContent/xs:extension/xs:attribute')]
        self.assertEqual(sorted(attrs), expected_attributes)

151

152
153
154
155
156
157
158
159
class RelaxNGTestMixin(XmlTestMixin):
    """Mixin extending XmlTestMixin with implementation of some assert methods for RelaxNG."""

    schema_class = etree.RelaxNG
    schema_file = 'relaxng.rng'
    adapter_id = 'SEDA-2.0.rng'

    def get_elements(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
160
        return self.xpath(profile, '//rng:element[@name="{0}"]'.format(name))
161
162

    def get_attributes(self, profile, name):
Sylvain Thénault's avatar
Sylvain Thénault committed
163
        return self.xpath(profile, '//rng:attribute[@name="{0}"]'.format(name))
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220

    def assertElementDefinition(self, element, expected):
        edef = dict(element.attrib)  # {name: element name}
        if element.getparent().tag == '{http://relaxng.org/ns/structure/1.0}optional':
            edef['minOccurs'] = '0'
        elif element.getparent().tag == '{http://relaxng.org/ns/structure/1.0}oneOrMore':
            edef['maxOccurs'] = 'unbounded'
        elif element.getparent().tag == '{http://relaxng.org/ns/structure/1.0}zeroOrMore':
            edef['minOccurs'] = '0'
            edef['maxOccurs'] = 'unbounded'
        refs = self.xpath(element, 'rng:ref')
        if refs:
            assert len(refs) == 1
            edef['type'] = refs[0].attrib['name']
        self._element_fixed_value(edef, element)
        self.assertEqual(edef, expected)

    def assertAttributeDefinition(self, element, expected):
        edef = dict(element.attrib)  # {name: element name}
        if element.getparent().tag == '{http://relaxng.org/ns/structure/1.0}optional':
            edef['use'] = 'optional'
        else:
            edef['use'] = 'required'
        self._element_fixed_value(edef, element)
        self.assertEqual(edef, expected)

    def assertXSDAttributes(self, element, expected_attributes):
        adefs = []
        optattrs = self.xpath(element, 'rng:optional/rng:attribute')
        attrs = self.xpath(element, 'rng:attribute')
        for use, adef_element in chain(izip(repeat('optional'), optattrs),
                                       izip(repeat('required'), attrs)):
            adef = dict(adef_element.attrib)
            adef['use'] = use
            data_elements = self.xpath(adef_element, 'rng:data')
            if data_elements:
                assert len(data_elements) == 1
                adef['type'] = 'xsd:' + data_elements[0].attrib['type']
            self._element_fixed_value(adef, adef_element)
            adefs.append(adef)
        return sorted(adefs)

    def _element_fixed_value(self, edef, element):
        values = self.xpath(element, 'rng:value')
        if values:
            assert len(values) == 1
            value = values[0]
            edef['fixed'] = value.text
            if value.attrib.get('type'):
                edef['type'] = 'xsd:' + value.attrib['type']
        else:
            datatypes = self.xpath(element, 'rng:data')
            if datatypes:
                assert len(datatypes) == 1
                datatype = datatypes[0]
                edef['type'] = 'xsd:' + datatype.attrib['type']

221

222
class PathTargetValuesTC(CubicWebTC):
223

224
    def test_keyword_path(self):
225
        element_defs = iter(XSDMMapping('Keyword'))
226
        with self.admin_access.cnx() as cnx:
227
228
229
230
231
232
233
234
235
236
            create = cnx.create_entity
            kw = create('SEDAKeyword', user_cardinality=u'0..n')
            kt = create('SEDAKeywordType', seda_keyword_type_from=kw)

            edef = next(element_defs)
            # self.assertEqual(
            #  readable_edef(edef),
            #  ('Keyword', 'SEDAKeyword', [
            #      ('id', [('seda_id', 'object', 'SEDAid'),
            #              ('id', 'subject', 'String')]),
237
            #      ('KeywordContent', []),
238
239
240
241
242
243
244
            #      ('KeywordReference',
            #       [('seda_keyword_reference_from', 'object', 'SEDAKeywordReference'),
            #        ('seda_keyword_reference_to', 'subject', 'Concept')]),
            #      ('KeywordType', [('seda_keyword_type_from', 'object', 'SEDAKeywordType'),
            #                       ('seda_keyword_type_to', 'subject', 'Concept')]),
            #  ]))
            path = edef[-1][0][1]
245
            target_values = pg._path_target_values(kw, path)
246
247
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
248
            self.assertEqual(target_value[0], None)
249
250
251
            self.assertEqual(target_value[1], None)

            path = edef[-1][2][1]
252
            target_values = pg._path_target_values(kw, path)
253
254
255
            self.assertEqual(len(target_values), 0)

            path = edef[-1][3][1]
256
            target_values = pg._path_target_values(kw, path)
257
258
259
260
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0].eid, kt.eid)
            self.assertEqual(target_value[1], None)
261

262
            kt_scheme = testutils.scheme_for_rtype(cnx, 'seda_keyword_type_to', u'theme')
263
            kw_type = kt_scheme.reverse_in_scheme[0]
264
265
            kt.cw_set(seda_keyword_type_to=kw_type)
            path = edef[-1][3][1]
266
            target_values = pg._path_target_values(kw, path)
267
268
269
270
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0].eid, kt.eid)
            self.assertEqual(target_value[1].eid, kw_type.eid)
271

272
273
274
            edef = next(element_defs)
            # self.assertEqual(
            #     readable_edef(edef),
275
            #     ('KeywordContent', 'SEDAKeyword', [
276
277
            #         ('KeywordContent', [('keyword_content', 'subject', 'String')]),
            #     ]))
278
            path = edef[-1][0][1]
279
            target_values = pg._path_target_values(kw, path)
280
281
282
283
284
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0], None)
            self.assertEqual(target_value[1], None)

285
286
    def test_internal_reference(self):
        element_defs = iter(XSDMMapping('DataObjectReference'))
287
        with self.admin_access.cnx() as cnx:
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
            create = cnx.create_entity
            transfer = create('SEDAArchiveTransfer', title=u'test profile')
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
                transfer)
            bdo = cnx.create_entity('SEDABinaryDataObject',
                                    user_annotation=u'I am mandatory',
                                    seda_binary_data_object=transfer)
            ref = create('SEDADataObjectReference', seda_data_object_reference=unit_alt_seq,
                         seda_data_object_reference_id=bdo)

            edef = next(element_defs)
            # readable_edef(edef)
            # ('DataObjectReference',
            #  'SEDADataObjectReference',
            #  [('id', [('seda_id', 'object', 'SEDAid'), ('id', 'subject', 'String')]),
            #   ('DataObjectReferenceId',
            #    [('seda_data_object_reference_id',
            #      'subject',
            #      ('SEDABinaryDataObject', 'SEDAPhysicalDataObject'))])])

            path = edef[-1][1][1]
309
            target_values = pg._path_target_values(ref, path)
310
311
312
313
314
            self.assertEqual(len(target_values), 1)
            target_value = target_values[0]
            self.assertEqual(target_value[0], None)
            self.assertEqual(target_value[1].eid, bdo.eid)

315

316
class SEDA2RNGExportTC(RelaxNGTestMixin, CubicWebTC):
317

318
    def test_skipped_mandatory_simple(self):
319
        with self.admin_access.cnx() as cnx:
320
321
            profile = self.profile_etree(cnx.create_entity('SEDAArchiveTransfer',
                                                           title=u'test profile'))
322
323
324
325
326
327
328
            date = self.get_element(profile, 'Date')
            self.assertElementDefinition(date, {'name': 'Date',
                                                'type': 'xsd:dateTime'})
            self.assertXSDAttributes(date, [])
            identifier = self.get_element(profile, 'MessageIdentifier')
            self.assertElementDefinition(identifier, {'name': 'MessageIdentifier',
                                                      'type': 'xsd:token'})
329
            self.assertXSDAttributes(
330
                identifier,
331
332
333
334
335
336
337
                [{'name': 'schemeAgencyID', 'use': 'optional', 'type': 'xsd:token'},
                 {'name': 'schemeAgencyName', 'use': 'optional', 'type': 'xsd:string'},
                 {'name': 'schemeDataURI', 'use': 'optional', 'type': 'xsd:anyURI'},
                 {'name': 'schemeID', 'use': 'optional', 'type': 'xsd:token'},
                 {'name': 'schemeName', 'use': 'optional', 'type': 'xsd:string'},
                 {'name': 'schemeURI', 'use': 'optional', 'type': 'xsd:anyURI'},
                 {'name': 'schemeVersionID', 'use': 'optional', 'type': 'xsd:token'}])
338

339
    def test_skipped_mandatory_complex(self):
340
        with self.admin_access.cnx() as cnx:
341
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
342
            testutils.create_data_object(transfer, filename=u'fixed.txt')
343
            profile = self.profile_etree(transfer)
344
345
346
347
348
349
350
351
352
            fname = self.get_element(profile, 'Filename')
            self.assertElementDefinition(fname, {'name': 'Filename',
                                                 'fixed': 'fixed.txt',
                                                 'type': 'xsd:string'})
            self.assertXSDAttributes(fname, [])
            attachment = self.get_element(profile, 'Attachment')
            self.assertElementDefinition(attachment, {'name': 'Attachment',
                                                      'type': 'xsd:base64Binary'})
            self.assertXSDAttributes(attachment,
353
354
355
356
                                     [{'name': 'filename', 'use': 'optional',
                                       'type': 'xsd:string'},
                                      {'name': 'uri', 'use': 'optional',
                                       'type': 'xsd:anyURI'}])
357

358
    def test_fileinfo_card(self):
359
        with self.admin_access.cnx() as cnx:
360
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
361
            bdo = cnx.create_entity('SEDABinaryDataObject',
362
                                    user_annotation=u'I am mandatory',
363
364
365
                                    seda_binary_data_object=transfer)
            appname = cnx.create_entity('SEDACreatingApplicationName',
                                        seda_creating_application_name=bdo)
366

367
            profile = self.profile_etree(transfer)
368
            fileinfo = self.get_element(profile, 'FileInfo')
369
            self.assertElementDefinition(fileinfo, {'name': 'FileInfo'})
370

371
372
            appname.cw_set(user_cardinality=u'1')
            profile = self.profile_etree(transfer)
373
374
            fileinfo = self.get_element(profile, 'FileInfo')
            self.assertElementDefinition(fileinfo, {'name': 'FileInfo'})
375

376
377
378
            appname.cw_set(user_cardinality=u'0..1')
            bdo.cw_set(filename=u'fixed.txt')
            profile = self.profile_etree(transfer)
379
380
            fileinfo = self.get_element(profile, 'FileInfo')
            self.assertElementDefinition(fileinfo, {'name': 'FileInfo'})
381

382
    def test_data_object_package_card(self):
383
        with self.admin_access.cnx() as cnx:
384
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
385
            bdo = cnx.create_entity('SEDABinaryDataObject',
386
387
                                    user_annotation=u'I am mandatory',
                                    seda_binary_data_object=transfer)
388
389
390

            profile = self.profile_etree(transfer)
            fileinfo = self.get_element(profile, 'DataObjectPackage')
391
            self.assertElementDefinition(fileinfo, {'name': 'DataObjectPackage'})
392
393
394

            bdo.cw_set(user_cardinality=u'1')
            profile = self.profile_etree(transfer)
395
396
            dop = self.get_element(profile, 'DataObjectPackage')
            self.assertElementDefinition(dop, {'name': 'DataObjectPackage'})
397

398
    def test_object_package_group(self):
399
        with self.admin_access.cnx() as cnx:
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
            bdo = cnx.create_entity('SEDABinaryDataObject',
                                    user_annotation=u'I am number one',
                                    seda_binary_data_object=transfer)
            cnx.create_entity('SEDABinaryDataObject',
                              user_annotation=u'I am number two',
                              seda_binary_data_object=transfer)
            cnx.create_entity('SEDAPhysicalDataObject',
                              user_annotation=u'I am number three',
                              seda_physical_data_object=transfer)

            profile = self.profile_etree(transfer)
            dop = self.get_element(profile, 'DataObjectPackage')
            self.assertEqual(len(self.xpath(dop, './rng:group/*')), 3)

            # setting some cardinality to 1 will remove rng:optional parent of the DataObjectPackage
            # and BinaryDataObject nodes
            bdo.cw_set(user_cardinality=u'1')
            profile = self.profile_etree(transfer)
            dop = self.get_element(profile, 'DataObjectPackage')
            self.assertEqual(len(self.xpath(dop, './rng:group/*')), 3)

422
    def test_transfer_annotation(self):
423
        with self.admin_access.cnx() as cnx:
424
            profile = self.profile_etree(cnx.create_entity('SEDAArchiveTransfer',
425
                                                           title=u'test profile',
426
                                                           user_annotation=u'some description'))
427
            docs = self.xpath(profile, '///xs:documentation')
428
429
430
            self.assertEqual(len(docs), 1)
            self.assertEqual(docs[0].text, 'some description')

431
    def test_transfer_signature(self):
432
        with self.admin_access.cnx() as cnx:
433
434
435
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
            cnx.create_entity('SEDASignature', seda_signature=transfer)
            profile = self.profile_etree(transfer)
436
            signature = self.get_element(profile, 'Signature')
437
            self.assertElementDefinition(signature, {'name': 'Signature', 'type': 'OpenType'})
438
            self.assertOpenTypeIsDefined(profile)
439

440
    def test_keyword(self):
441
        with self.admin_access.cnx() as cnx:
442
443
            create = cnx.create_entity

444
            scheme = testutils.scheme_for_rtype(cnx, 'seda_keyword_type_to', u'theme')
445
446
447
            kw_type = scheme.reverse_in_scheme[0]

            transfer = create('SEDAArchiveTransfer', title=u'test profile')
448
449
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
                transfer, user_cardinality=u'0..n')
450

451
            create('SEDAKeyword', user_cardinality=u'0..n', seda_keyword=unit_alt_seq)
452
453
            kw = create('SEDAKeyword', seda_keyword=unit_alt_seq,
                        keyword_content=u'kwick')
454
            kwr_e = create('SEDAKeywordReference', seda_keyword_reference_from=kw)
455
456
457
458
459
            create('SEDAKeywordType', seda_keyword_type_from=kw,
                   seda_keyword_type_to=kw_type)

            profile = self.profile_etree(transfer)

460
461
462
463
464
465
466
467
468
469
470
471
472
            keywords = self.get_elements(profile, 'Keyword')
            self.assertElementDefinition(keywords[0], {'name': 'Keyword'})
            self.assertElementDefinition(keywords[1], {'name': 'Keyword',
                                                       'minOccurs': '0',
                                                       'maxOccurs': 'unbounded'})

            kwc = self.get_elements(profile, 'KeywordContent')
            self.assertElementDefinition(kwc[0], {'name': 'KeywordContent',
                                                  'type': 'xsd:string',
                                                  'fixed': 'kwick'})
            self.assertElementDefinition(kwc[1], {'name': 'KeywordContent',
                                                  'type': 'xsd:string'})

473
474
475
            kwt = self.get_element(profile, 'KeywordType')
            self.assertElementDefinition(kwt, {'name': 'KeywordType',
                                               'type': 'xsd:token',
476
                                               'fixed': 'theme'})
477
            self.assertXSDAttributes(
478
                kwt,
479
                [{'name': 'listVersionID', 'fixed': 'seda_keyword_type_to/None vocabulary'}])
480
481
            kwr = self.get_element(profile, 'KeywordReference')
            self.assertElementDefinition(kwr, {'name': 'KeywordReference',
482
                                               'type': 'xsd:token'})
483
            self.assertXSDAttributes(
484
                kwr,
485
486
487
488
489
490
491
492
493
494
                [{'name': 'schemeAgencyID', 'type': 'xsd:token', 'use': 'optional'},
                 {'name': 'schemeAgencyName', 'type': 'xsd:string', 'use': 'optional'},
                 {'name': 'schemeDataURI', 'type': 'xsd:anyURI', 'use': 'optional'},
                 {'name': 'schemeID', 'type': 'xsd:token', 'use': 'optional'},
                 {'name': 'schemeName', 'type': 'xsd:string', 'use': 'optional'},
                 {'name': 'schemeURI', 'type': 'xsd:anyURI', 'use': 'optional'},
                 {'name': 'schemeVersionID', 'type': 'xsd:token', 'use': 'optional'}])

            kwr_e.cw_set(seda_keyword_reference_to_scheme=scheme)
            profile = self.profile_etree(transfer)
495
496
            kwr = self.get_element(profile, 'KeywordReference')
            self.assertElementDefinition(kwr, {'name': 'KeywordReference',
497
                                               'type': 'xsd:token'})
498
            self.assertXSDAttributes(
499
                kwr,
500
                [{'name': 'schemeURI', 'fixed': scheme.cwuri}])
501
502
503

            kwr_e.cw_set(seda_keyword_reference_to=kw_type)
            profile = self.profile_etree(transfer)
504
505
506
            kwr = self.get_element(profile, 'KeywordReference')
            self.assertElementDefinition(kwr, {'name': 'KeywordReference',
                                               'type': 'xsd:token',
507
                                               'fixed': kw_type.cwuri})
508
            self.assertXSDAttributes(
509
                kwr,
510
                [{'name': 'schemeURI', 'fixed': scheme.cwuri}])
511

512
    def test_code_list(self):
513
        with self.admin_access.cnx() as cnx:
514
515
516
517
518
519
520
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
            scheme = cnx.create_entity('ConceptScheme', title=u'Keyword Types')
            cnx.create_entity('SEDAMimeTypeCodeListVersion',
                              seda_mime_type_code_list_version_from=transfer,
                              seda_mime_type_code_list_version_to=scheme)

            profile = self.profile_etree(transfer)
521
522
            mt_clv = self.get_element(profile, 'MimeTypeCodeListVersion')
            self.assertElementDefinition(mt_clv, {'name': 'MimeTypeCodeListVersion',
523
                                                  'fixed': scheme.cwuri,
524
                                                  'type': 'xsd:token'})
525
            # XXX also fix listSchemeURI ?
526
            self.assertXSDAttributes(
527
                mt_clv,
528
529
530
531
532
533
534
535
                [{'name': 'listAgencyID', 'use': 'optional', 'type': 'xsd:token'},
                 {'name': 'listAgencyName', 'use': 'optional', 'type': 'xsd:string'},
                 {'name': 'listID', 'use': 'optional', 'type': 'xsd:token'},
                 {'name': 'listName', 'use': 'optional', 'type': 'xsd:string'},
                 {'name': 'listSchemeURI', 'use': 'optional', 'type': 'xsd:anyURI'},
                 {'name': 'listURI', 'use': 'optional', 'type': 'xsd:anyURI'},
                 {'name': 'listVersionID', 'use': 'optional', 'type': 'xsd:token'}])

536
    def test_seda2_concept(self):
537
        with self.admin_access.cnx() as cnx:
538
539
540
            create = cnx.create_entity
            scheme = create('ConceptScheme', title=u'Digest algorithm')
            some_concept = scheme.add_concept(label=u'md5 algorithm', language_code=u'en')
541
542
543
544
            transfer = create('SEDAArchiveTransfer', title=u'test profile')
            cnx.create_entity('SEDAMessageDigestAlgorithmCodeListVersion',
                              seda_message_digest_algorithm_code_list_version_from=transfer,
                              seda_message_digest_algorithm_code_list_version_to=scheme)
545
            create('SEDABinaryDataObject', user_cardinality=u'0..n',
546
                   user_annotation=u'I am mandatory',
547
548
549
550
                   seda_binary_data_object=transfer,
                   seda_algorithm=some_concept)

            profile = self.profile_etree(transfer)
551
552
553
554
555
            algo = self.get_attribute(profile, 'algorithm')
            self.assertAttributeDefinition(algo, {'name': 'algorithm',
                                                  'use': 'required',
                                                  'type': 'xsd:token',
                                                  'fixed': 'md5 algorithm'})
556
557
558
559
560
561

            create('Label', label_of=some_concept, kind=u'preferred',
                   language_code=u'seda-2', label=u'md5')

            some_concept.cw_clear_all_caches()
            profile = self.profile_etree(transfer)
562
563
564
565
566
            algo = self.get_attribute(profile, 'algorithm')
            self.assertAttributeDefinition(algo, {'name': 'algorithm',
                                                  'use': 'required',
                                                  'type': 'xsd:token',
                                                  'fixed': 'md5'})
567

568
569
570
571
572
    def assertOpenTypeIsDefined(self, profile):
        open_types = self.xpath(profile, '//rng:define[@name="OpenType"]')
        self.assertEqual(len(open_types), 1)

    def test_data_duplicates(self):
573
        with self.admin_access.cnx() as cnx:
574
            transfer = cnx.create_entity('SEDAArchiveTransfer', title=u'test profile')
575
576
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
                transfer, user_cardinality=u'0..n')
577
578
579
580
581
            profile = self.profile_etree(transfer)
            title = self.get_element(profile, 'Title')
            self.assertEqual(len(self.xpath(title, 'rng:data')), 1)


582
class SEDAExportFuncTCMixIn(object):
583
584
585
    """Test that SEDA profile export works correctly."""

    def setup_database(self):
586
        with self.admin_access.cnx() as cnx:
587
588
            create = cnx.create_entity
            scheme = create('ConceptScheme', title=u'Keyword Types')
589
590
            # ensure we're able to export concept with unexpected language code
            some_concept = scheme.add_concept(label=u'md5', language_code=u'de')
591

592
            transfer = create('SEDAArchiveTransfer', title=u'test profile')
593
594
595
596
597
598
599
600
            create('SEDAMessageDigestAlgorithmCodeListVersion',
                   seda_message_digest_algorithm_code_list_version_from=transfer,
                   seda_message_digest_algorithm_code_list_version_to=scheme)
            create('SEDAFileFormatCodeListVersion',
                   seda_file_format_code_list_version_from=transfer)
            create('SEDAMimeTypeCodeListVersion',
                   user_cardinality=u'0..1',
                   seda_mime_type_code_list_version_from=transfer,
601
                   seda_mime_type_code_list_version_to=scheme)
602
603
604
            access_rule = create('SEDAAccessRule',
                                 user_cardinality=u'0..1',
                                 seda_access_rule=transfer)
605
606
            access_rule_seq = create('SEDASeqAccessRuleRule',
                                     reverse_seda_seq_access_rule_rule=access_rule)
607
            create('SEDAStartDate', user_cardinality=u'0..1', seda_start_date=access_rule_seq)
608
            # binary data object
609
            bdo = testutils.create_data_object(transfer, user_cardinality=u'0..n',
610
                                               seda_algorithm=some_concept)
611
612
613
614
615
616
            create('SEDAFormatLitteral',
                   user_cardinality=u'0..1',
                   seda_format_litteral=bdo)
            create('SEDAEncoding',
                   user_cardinality=u'0..1',
                   seda_encoding_from=bdo)
617
            create('SEDAUri', seda_uri=bdo.seda_alt_binary_data_object_attachment)
618
            # first level archive unit
619
620
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(
                transfer, user_cardinality=u'0..n', user_annotation=u'Composant ISAD(G)')
621
            # sub archive unit
622
            # testutils.create_archive_unit(unit_alt_seq, user_cardinality=u'0..n')
623
            # management
624
            appraisal_rule = create('SEDAAppraisalRule', seda_appraisal_rule=unit_alt_seq)
625
626
            appraisal_rule_seq = create('SEDASeqAppraisalRuleRule',
                                        reverse_seda_seq_appraisal_rule_rule=appraisal_rule)
627
            create('SEDAStartDate', user_cardinality=u'0..1', seda_start_date=appraisal_rule_seq)
628
629
630
631
            access_rule = create('SEDAAccessRule', seda_access_rule=unit_alt_seq)
            create('SEDADisseminationRule', seda_dissemination_rule=unit_alt_seq)
            create('SEDAReuseRule', seda_reuse_rule=unit_alt_seq)
            create('SEDANeedAuthorization', seda_need_authorization=unit_alt_seq)
632
            # content
633
            kw = create('SEDAKeyword', user_cardinality=u'0..n', seda_keyword=unit_alt_seq)
634
            create('SEDAKeywordType', seda_keyword_type_from=kw)
635
            create('SEDAKeywordReference', seda_keyword_reference_from=kw)
636
637
            history_item = create('SEDACustodialHistoryItem',
                                  seda_custodial_history_item=unit_alt_seq)
638
639
640
            create('SEDAwhen',
                   user_cardinality=u'0..1',
                   seda_when=history_item)
641
            version_of = create('SEDAIsVersionOf', seda_is_version_of=unit_alt_seq)
642
643
            alt2 = create('SEDAAltIsVersionOfArchiveUnitRefId',
                          reverse_seda_alt_is_version_of_archive_unit_ref_id=version_of)
644
            create('SEDADataObjectReference', seda_data_object_reference=alt2)
645
            create('SEDAOriginatingAgencyArchiveUnitIdentifier',
646
                   seda_originating_agency_archive_unit_identifier=unit_alt_seq)
647
            create('SEDATransferringAgencyArchiveUnitIdentifier',
648
649
650
651
652
653
                   seda_transferring_agency_archive_unit_identifier=unit_alt_seq)
            create('SEDADescription', seda_description=unit_alt_seq)
            create('SEDALanguage', seda_language_from=unit_alt_seq)
            create('SEDADescriptionLanguage', seda_description_language_from=unit_alt_seq)
            create('SEDACreatedDate', seda_created_date=unit_alt_seq)
            create('SEDAEndDate', seda_end_date=unit_alt_seq)
654
655
656
            create('SEDADataObjectReference', user_cardinality=u'0..n',
                   seda_data_object_reference=unit_alt_seq,
                   seda_data_object_reference_id=bdo)
657

658
            cnx.commit()
659
        self.transfer_eid = transfer.eid
660
661
        self.bdo_eid = bdo.eid
        self.au_eid = unit.eid
662

663
664
    def test_profile1(self):
        """Check a minimal SEDA profile validating BV2.0_min.xml."""
665
        with self.admin_access.cnx() as cnx:
666
            mda_scheme = cnx.find('ConceptScheme', title=u'Keyword Types').one()
667
            transfer = cnx.entity_from_eid(self.transfer_eid)
668
669
            root = self.profile_etree(transfer)
        self.check_xsd_profile(root, self.datapath('BV2.0_min.xml'),
670
                               mda_scheme_url=mda_scheme.cwuri)
671
672
        # ensure jumped element without content are not there
        self.assertEqual(len(self.get_elements(root, 'Gps')), 0)
673
674
        # ensure element with skipped value are not there
        self.assertEqual(len(self.get_elements(root, 'TransactedDate')), 0)
675
676
677
        self.assertProfileDetails(root)


678
679
680
681
682
class SEDARNGExportFuncTC(SEDAExportFuncTCMixIn, RelaxNGTestMixin, CubicWebTC):

    def assertProfileDetails(self, root):
        # ensure profile's temporary id are exported in custom seda:profid attribute
        self.assertEqual(len(self.xpath(root, '//rng:attribute[@seda:profid]')), 2)
683
684
        for attrdef in self.xpath(root, '//xs:attribute[@seda:profid]'):
            self.assertEqual(attrdef[0]['type'], 'ID')
685
        # ensure they are properly referenced using 'default' attribute
686
        xmlid = pg.eid2xmlid(self.bdo_eid)
687
        references = self.xpath(root, '//rng:element[@a:defaultValue="{}"]'.format(xmlid))
688
689
        self.assertEqual(len(references), 1)
        self.assertEqual(references[0].attrib['name'], 'DataObjectReferenceId')
690
691
        for reference_id in self.xpath(root, '//rng:element[@name="DataObjectReferenceId"]'):
            self.assertEqual(reference_id[0].attrib['type'], 'NCName')
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
        # ensure optional id are properly reinjected
        references = self.xpath(root,
                                '//rng:element[@name="Keyword"]/rng:optional'
                                '/rng:attribute[@name="id"]')
        self.assertEqual(len(references), 1)
        # ensure custodial item content type is properly serialized
        chi = self.xpath(root, '//rng:element[@name="CustodialHistoryItem"]')
        self.assertEqual(len(chi), 1)
        self.assertXSDAttributes(
            chi[0],
            [{'name': 'when', 'use': 'optional', 'type': 'datedateTime'}])
        # ensure types union handling
        ddt = self.xpath(root, '//rng:element[@name="CreatedDate"]/rng:choice/rng:data')
        self.assertEqual(len(ddt), 2)
        self.assertEqual(set(elmt.attrib['type'] for elmt in ddt), set(['date', 'dateTime']))
707
708


709
class OldSEDAExportMixin(object):
710
    def setup_database(self):
711
        with self.admin_access.cnx() as cnx:
712
713
714
715
716
717
            create = cnx.create_entity

            concepts = {}
            for rtype, etype, value in [
                    ('seda_format_id_to', None, u'fmt/123'),
                    ('seda_encoding_to', None, u'6'),
718
                    ('seda_type_to', None, u'CDO'),
719
                    ('seda_description_level', None, u'file'),
720
                    ('seda_algorithm', 'SEDABinaryDataObject', u'md5'),
721
722
723
724
                    ('seda_rule', 'SEDASeqAppraisalRuleRule', u'P10Y'),
                    ('seda_rule', 'SEDASeqAccessRuleRule', u'AR038'),
                    ('seda_final_action', 'SEDAAppraisalRule', u'detruire'),
            ]:
725
                scheme = testutils.scheme_for_type(cnx, rtype, etype, value)
726
727
                concepts[value] = scheme.reverse_in_scheme[0]

728
729
730
            # ensure we're able to export concept with unexpected language code
            concepts['md5'].preferred_label[0].cw_set(language_code=u'de')

731
            agent = testutils.create_authority_record(cnx, u'bob')
732

733
734
735
736
737
738
739
740
741
742
            transfer = create('SEDAArchiveTransfer', title=u'my profile title &&',
                              simplified_profile=True)

            create('SEDAComment',
                   user_cardinality=u'1',
                   comment=u'my profile description &&',
                   seda_comment=transfer)

            create('SEDAAccessRule',  # XXX mandatory for seda 1.0
                   user_cardinality=u'1',
743
                   seda_access_rule=transfer,
744
745
                   seda_seq_access_rule_rule=create(
                       'SEDASeqAccessRuleRule', reverse_seda_start_date=create('SEDAStartDate')))
746
747
748
749
750
            appraisal_rule_rule = create('SEDASeqAppraisalRuleRule',
                                         seda_rule=concepts['P10Y'],
                                         user_annotation=u"C'est dans 10ans je m'en irai",
                                         reverse_seda_start_date=create('SEDAStartDate'))
            create('SEDAAppraisalRule',
751
                   user_cardinality=u'0..1',
752
753
754
755
756
                   seda_appraisal_rule=transfer,
                   seda_final_action=concepts['detruire'],
                   seda_seq_appraisal_rule_rule=appraisal_rule_rule,
                   user_annotation=u'detruire le document')

757
            _, _, unit_alt_seq = testutils.create_archive_unit(transfer,
758
                                                               user_cardinality=u'1..n')
759

760
            unit_alt_seq.cw_set(reverse_seda_start_date=create('SEDAStartDate',
761
                                                               user_cardinality=u'0..1'),
762
763
                                reverse_seda_end_date=create('SEDAEndDate',
                                                             user_cardinality=u'0..1'),
764
                                # XXX, value=date(2015, 2, 24)),
765
766
                                reverse_seda_description=create('SEDADescription',
                                                                user_cardinality=u'0..1'))
767
768
769

            kw = create('SEDAKeyword',
                        user_cardinality=u'0..n',
770
                        seda_keyword=unit_alt_seq)
771
772
773
774
            create('SEDAKeywordReference',
                   seda_keyword_reference_from=kw,
                   seda_keyword_reference_to=concepts['file'],
                   seda_keyword_reference_to_scheme=concepts['file'].scheme)
775
            create('SEDAKeywordType',
776
777
                   user_cardinality=u'0..1',
                   seda_keyword_type_from=kw)
778

779
780
781
            create('SEDAOriginatingAgency',
                   user_cardinality=u'0..1',
                   seda_originating_agency_from=unit_alt_seq,
782
783
                   seda_originating_agency_to=agent)

784
785
786
            create('SEDAType',
                   user_cardinality=u'0..1',
                   seda_type_from=unit_alt_seq,
787
                   seda_type_to=concepts['CDO'])
788

789
790
791
792
793
            create('SEDACustodialHistoryItem',
                   user_cardinality=u'0..1',
                   seda_custodial_history_item=unit_alt_seq,
                   reverse_seda_when=create('SEDAwhen',
                                            user_cardinality=u'0..1'))
794

795
            # Add sub archive unit
796
            _, _, subunit_alt_seq = testutils.create_archive_unit(unit_alt_seq,
797
                                                                  user_cardinality=u'1..n')
798
799

            create('SEDAAppraisalRule',
800
                   user_cardinality=u'0..1',
801
802
                   seda_appraisal_rule=subunit_alt_seq,
                   seda_seq_appraisal_rule_rule=create(
803
804
                       'SEDASeqAppraisalRuleRule',
                       reverse_seda_start_date=create('SEDAStartDate')))
805
806
807
808
809
810

            create('SEDAAccessRule',
                   user_cardinality=u'1',
                   user_annotation=u'restrict',
                   seda_access_rule=subunit_alt_seq,
                   seda_seq_access_rule_rule=create('SEDASeqAccessRuleRule',
811
                                                    reverse_seda_start_date=create('SEDAStartDate'),
812
813
814
                                                    seda_rule=concepts['AR038']))

            # Add minimal document to first level archive
815
816
817
            ref = create('SEDADataObjectReference',
                         user_cardinality=u'0..1',
                         seda_data_object_reference=unit_alt_seq)
818
            bdo = testutils.create_data_object(transfer, user_cardinality=u'0..n',
819
                                               filename=u'this_is_the_filename.pdf',
820
                                               seda_algorithm=concepts['md5'],
821
                                               reverse_seda_data_object_reference_id=ref)
822
823
824
825
826
827
828
829
830
831
832

            create('SEDAFormatId',
                   user_cardinality=u'1',
                   seda_format_id_from=bdo,
                   seda_format_id_to=concepts['fmt/123'])
            create('SEDAEncoding',
                   user_cardinality=u'1',
                   seda_encoding_from=bdo,
                   seda_encoding_to=concepts['6'])

            # Add another sub archive unit
833
            _, _, subunit2_alt_seq = testutils.create_archive_unit(unit_alt_seq,
834
                                                                   user_cardinality=u'1')
835
836
837
838

            cnx.commit()

        self.transfer_eid = transfer.eid
839
        self.bdo_eid = bdo.eid
840
        self.file_concept_eid = concepts['file'].eid
841
        self.agent_eid = agent.eid
842

843
    def _test_profile(self, adapter_id, expected_file):
844
        with self.admin_access.cnx() as cnx:
845
            transfer = cnx.entity_from_eid(self.transfer_eid)
846
847
848
            file_concept = cnx.entity_from_eid(self.file_concept_eid)
            agent = cnx.entity_from_eid(self.agent_eid)

849
850
            adapter = transfer.cw_adapt_to(adapter_id)
            generated_xsd = adapter.dump()
851
852
853
854
855
856

            if os.environ.get('TEST_WRITE_SEDA_FILES'):
                orig_content = generated_xsd
                for value, key in [(file_concept.cwuri, 'concept-uri'),
                                   (file_concept.scheme.cwuri, 'scheme-uri'),
                                   (text_type(agent.eid), 'agent-id'),
857
                                   (agent.dc_title(), 'agent-name')]:
858
859
860
861
862
                    orig_content = orig_content.replace(value, '%({})s'.format(key))
                with open(self.datapath(expected_file + '.new'), 'w') as stream:
                    stream.write(orig_content)
                print('Regenerated expected file as {}.new'.format(expected_file))

863
864
            root = etree.fromstring(generated_xsd)
            self.assertXmlValid(root)
865
866
            with open(self.datapath(expected_file)) as expected:
                self.assertXmlEqual(expected.read()
867
868
                                    % {'bdo-eid': binary_type(self.bdo_eid),
                                       'concept-uri': binary_type(file_concept.cwuri),
869
870
                                       'scheme-uri': binary_type(file_concept.scheme.cwuri),
                                       'agent-id': binary_type(agent.eid),
871
                                       'agent-name': binary_type(agent.dc_title())},
872
                                    generated_xsd)
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
            return adapter, root


class OldSEDAXSDExportTC(XMLSchemaTestMixin, OldSEDAExportMixin, CubicWebTC):

    def test_seda_1_0(self):
        self._test_profile('SEDA-1.0.xsd', 'seda_1_export.xsd')

    def test_seda_0_2(self):
        self._test_profile('SEDA-0.2.xsd', 'seda_02_export.xsd')

    def _test_profile(self, adapter_id, expected_file):
        adapter, root = super(OldSEDAXSDExportTC, self)._test_profile(adapter_id, expected_file)
        # ensure there is no element with @type but a complex type
        namespaces = adapter.namespaces.copy()
        namespaces.pop(None)
        dates = root.xpath('//xsd:element[@name="Date"]/xsd:complexType/xsd:sequence',
                           namespaces=namespaces)
        self.assertEqual(len(dates), 0)


class OldSEDARNGExportTC(RelaxNGTestMixin, OldSEDAExportMixin, CubicWebTC):

    def test_seda_1_0(self):
        self._test_profile('SEDA-1.0.rng', 'seda_1_export.rng')
898

899
900
901
    def test_seda_0_2_rng(self):
        self._test_profile('SEDA-0.2.rng', 'seda_02_export.rng')

902
903
    def test_seda_0_2_bordereau_ref(self):
        """Check a sample SEDA 0.2 profile validation."""
904
        with self.admin_access.cnx() as cnx:
905
906
907
908
909
910
            create = cnx.create_entity

            transfer = create('SEDAArchiveTransfer', title=u'test profile',
                              simplified_profile=True)
            create('SEDAComment', seda_comment=transfer)

911
            unit, unit_alt, unit_alt_seq = testutils.create_archive_unit(transfer)
912
913
            create('SEDAArchivalAgreement', seda_archival_agreement=transfer)
            create('SEDATransferringAgencyArchiveUnitIdentifier',
914
915
916
                   seda_transferring_agency_archive_unit_identifier=unit_alt_seq)
            create('SEDAStartDate', seda_start_date=unit_alt_seq)
            create('SEDAEndDate', seda_end_date=unit_alt_seq)
917
918
919
            appraisal_rule = create('SEDAAppraisalRule',
                                    user_cardinality=u'0..1',
                                    seda_appraisal_rule=unit_alt_seq)
920
921
922
            appraisal_rule_seq = create('SEDASeqAppraisalRuleRule',
                                        reverse_seda_seq_appraisal_rule_rule=appraisal_rule)
            create('SEDAStartDate', seda_start_date=appraisal_rule_seq)
923
924
925
            access_rule = create('SEDAAccessRule',
                                 user_cardinality=u'0..1',
                                 seda_access_rule=unit_alt_seq)
926
927
928
929
            access_rule_seq = create('SEDASeqAccessRuleRule',
                                     reverse_seda_seq_access_rule_rule=access_rule)
            create('SEDAStartDate', seda_start_date=access_rule_seq)

930
            subunit, subunit_alt, subunit_alt_seq = testutils.create_archive_unit(
931
                unit_alt_seq)
932
            create('SEDATransferringAgencyArchiveUnitIdentifier',
933
934
935
                   seda_transferring_agency_archive_unit_identifier=subunit_alt_seq)
            create('SEDAStartDate', seda_start_date=subunit_alt_seq)
            create('SEDAEndDate', seda_end_date=subunit_alt_seq)
936
            create('SEDADescription', seda_description=subunit_alt_seq)
937
            kw = create('SEDAKeyword', user_cardinality=u'0..n', seda_keyword=subunit_alt_seq)
938
            create('SEDAKeywordReference', seda_keyword_reference_from=kw)
939

940
            create('SEDASystemId', seda_system_id=subunit_alt_seq)
941

942
            bdo = testutils.create_data_object(transfer)
943
944
945
946
947
            create('SEDADataObjectReference',
                   seda_data_object_reference=subunit_alt_seq,
                   seda_data_object_reference_id=bdo)
            create('SEDAEncoding', seda_encoding_from=bdo)
            create('SEDAMimeType', seda_mime_type_from=bdo)
948
            create('SEDADateCreatedByApplication', seda_date_created_by_application=bdo)
949
950
951
952
953
954

            cnx.commit()

            root = self.profile_etree(transfer, 'SEDA-0.2.rng')
        self.check_xsd_profile(root, self.datapath('seda_02_bordereau_ref.xml'))