unittest_predicates.py 21 KB
Newer Older
1
# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2
3
4
5
6
7
8
9
10
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
11
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
12
13
14
15
16
17
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
18
"""unit tests for selectors mechanism"""
19

20
from operator import eq, lt, le, gt
21
22
from contextlib import contextmanager

23
from logilab.common.testlib import TestCase, unittest_main
24
from logilab.common.decorators import clear_cache
25

26
from cubicweb import Binary
27
from cubicweb.devtools.testlib import CubicWebTC
28
from cubicweb.predicates import (is_instance, adaptable, match_kwargs, match_user_groups,
29
30
31
                                 multi_lines_rset, score_entity, is_in_state,
                                 rql_condition, relation_possible, match_form_params,
                                 paginated_rset)
32
from cubicweb.entity import EntityAdapter
33
from cubicweb.web import action
34

35

36
class ImplementsTC(CubicWebTC):
37
    def test_etype_priority(self):
38
        with self.admin_access.web_request() as req:
39
            f = req.create_entity('FakeFile', data_name=u'hop.txt', data=Binary(b'hop'),
40
                                  data_format=u'text/plain')
41
42
43
44
            rset = f.as_rset()
            anyscore = is_instance('Any')(f.__class__, req, rset=rset)
            idownscore = adaptable('IDownloadable')(f.__class__, req, rset=rset)
            self.assertTrue(idownscore > anyscore, (idownscore, anyscore))
45
            filescore = is_instance('FakeFile')(f.__class__, req, rset=rset)
46
            self.assertTrue(filescore > idownscore, (filescore, idownscore))
47
48

    def test_etype_inheritance_no_yams_inheritance(self):
49
        cls = self.vreg['etypes'].etype_class('Personne')
50
51
        with self.admin_access.web_request() as req:
            self.assertFalse(is_instance('Societe').score_class(cls, req))
52

53
54
    def test_yams_inheritance(self):
        cls = self.vreg['etypes'].etype_class('Transition')
55
56
57
        with self.admin_access.web_request() as req:
            self.assertEqual(is_instance('BaseTransition').score_class(cls, req),
                             3)
58

59
    def test_outer_join(self):
60
61
62
63
        with self.admin_access.web_request() as req:
            rset = req.execute('Any U,B WHERE B? bookmarked_by U, U login "anon"')
            self.assertEqual(is_instance('Bookmark')(None, req, rset=rset, row=0, col=1),
                             0)
64

65

66
67
68
69
70
71
72
73
74
75
76
class WorkflowSelectorTC(CubicWebTC):

    def setUp(self):
        super(WorkflowSelectorTC, self).setUp()
        # enable debug mode to state/transition validation on the fly
        self.vreg.config.debugmode = True

    def tearDown(self):
        self.vreg.config.debugmode = False
        super(WorkflowSelectorTC, self).tearDown()

77
    def setup_database(self):
78
79
80
81
82
83
84
        with self.admin_access.shell() as shell:
            wf = shell.add_workflow("wf_test", 'StateFull', default=True)
            created   = wf.add_state('created', initial=True)
            validated = wf.add_state('validated')
            abandoned = wf.add_state('abandoned')
            wf.add_transition('validate', created, validated, ('managers',))
            wf.add_transition('forsake', (created, validated,), abandoned, ('managers',))
85
86
87
88
89
90
91
92
93
94
95

    @contextmanager
    def statefull_stuff(self):
        with self.admin_access.web_request() as req:
            wf_entity = req.create_entity('StateFull', name=u'')
            rset = wf_entity.as_rset()
            adapter = wf_entity.cw_adapt_to('IWorkflowable')
            req.cnx.commit()
            self.assertEqual(adapter.state, 'created')
            yield req, wf_entity, rset, adapter

96
    def test_is_in_state(self):
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
        with self.statefull_stuff() as (req, wf_entity, rset, adapter):
            for state in ('created', 'validated', 'abandoned'):
                selector = is_in_state(state)
                self.assertEqual(selector(None, req, rset=rset),
                                 state=="created")

            adapter.fire_transition('validate')
            req.cnx.commit(); wf_entity.cw_clear_all_caches()
            self.assertEqual(adapter.state, 'validated')

            clear_cache(rset, 'get_entity')

            selector = is_in_state('created')
            self.assertEqual(selector(None, req, rset=rset), 0)
            selector = is_in_state('validated')
            self.assertEqual(selector(None, req, rset=rset), 1)
            selector = is_in_state('validated', 'abandoned')
            self.assertEqual(selector(None, req, rset=rset), 1)
            selector = is_in_state('abandoned')
            self.assertEqual(selector(None, req, rset=rset), 0)

            adapter.fire_transition('forsake')
            req.cnx.commit(); wf_entity.cw_clear_all_caches()
            self.assertEqual(adapter.state, 'abandoned')

            clear_cache(rset, 'get_entity')

            selector = is_in_state('created')
            self.assertEqual(selector(None, req, rset=rset), 0)
            selector = is_in_state('validated')
            self.assertEqual(selector(None, req, rset=rset), 0)
            selector = is_in_state('validated', 'abandoned')
            self.assertEqual(selector(None, req, rset=rset), 1)
            self.assertEqual(adapter.state, 'abandoned')
            self.assertEqual(selector(None, req, rset=rset), 1)
132
133

    def test_is_in_state_unvalid_names(self):
134
135
136
137
138
139
140
141
142
143
144
        with self.statefull_stuff() as (req, wf_entity, rset, adapter):
            selector = is_in_state("unknown")
            with self.assertRaises(ValueError) as cm:
                selector(None, req, rset=rset)
            self.assertEqual(str(cm.exception),
                             "wf_test: unknown state(s): unknown")
            selector = is_in_state("weird", "unknown", "created", "weird")
            with self.assertRaises(ValueError) as cm:
                selector(None, req, rset=rset)
            self.assertEqual(str(cm.exception),
                             "wf_test: unknown state(s): unknown,weird")
145
146


147
148
149
class RelationPossibleTC(CubicWebTC):

    def test_rqlst_1(self):
150
151
152
153
154
155
        with self.admin_access.web_request() as req:
            selector = relation_possible('in_group')
            select = self.vreg.parse(req, 'Any X WHERE X is CWUser').children[0]
            score = selector(None, req, rset=1,
                             select=select, filtered_variable=select.defined_vars['X'])
            self.assertEqual(score, 1)
156
157

    def test_rqlst_2(self):
158
159
160
161
162
163
164
165
        with self.admin_access.web_request() as req:
            selector = relation_possible('in_group')
            select = self.vreg.parse(req, 'Any 1, COUNT(X) WHERE X is CWUser, X creation_date XD, '
                                     'Y creation_date YD, Y is CWGroup '
                                     'HAVING DAY(XD)=DAY(YD)').children[0]
            score = selector(None, req, rset=1,
                             select=select, filtered_variable=select.defined_vars['X'])
            self.assertEqual(score, 1)
166

167
168
169
170
171
172
173
    def test_ambiguous(self):
        # Ambiguous relations are :
        # (Service, fabrique_par, Personne) and (Produit, fabrique_par, Usine)
        # There used to be a crash here with a bad rdef choice in the strict
        # checking case.
        selector = relation_possible('fabrique_par', role='object',
                                     target_etype='Personne', strict=True)
174
175
176
177
178
        with self.admin_access.web_request() as req:
            usine = req.create_entity('Usine', lieu=u'here')
            score = selector(None, req, rset=usine.as_rset())
            self.assertEqual(0, score)

179

180
class MatchUserGroupsTC(CubicWebTC):
181
182
183
    def test_owners_group(self):
        """tests usage of 'owners' group with match_user_group"""
        class SomeAction(action.Action):
184
            __regid__ = 'yo'
185
186
            category = 'foo'
            __select__ = match_user_groups('owners')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
187
        self.vreg._loadedmods[__name__] = {}
188
        self.vreg.register(SomeAction)
189
        SomeAction.__registered__(self.vreg['actions'])
190
        self.assertTrue(SomeAction in self.vreg['actions']['yo'], self.vreg['actions'])
191
        try:
192
193
            with self.admin_access.web_request() as req:
                self.create_user(req, 'john')
194
            # login as a simple user
195
196
197
198
199
200
201
202
203
204
205
            john_access = self.new_access('john')
            with john_access.web_request() as req:
                # it should not be possible to use SomeAction not owned objects
                rset = req.execute('Any G WHERE G is CWGroup, G name "managers"')
                self.assertFalse('yo' in dict(self.pactions(req, rset)))
                # insert a new card, and check that we can use SomeAction on our object
                req.execute('INSERT Card C: C title "zoubidou"')
                req.cnx.commit()
            with john_access.web_request() as req:
                rset = req.execute('Card C WHERE C title "zoubidou"')
                self.assertTrue('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset))
206
            # make sure even managers can't use the action
207
208
209
            with self.admin_access.web_request() as req:
                rset = req.execute('Card C WHERE C title "zoubidou"')
                self.assertFalse('yo' in dict(self.pactions(req, rset)))
210
        finally:
211
            del self.vreg[SomeAction.__registry__][SomeAction.__regid__]
212

213

214
215
216
217
218
219
class MultiLinesRsetTC(CubicWebTC):
    def setup_database(self):
        with self.admin_access.web_request() as req:
            req.execute('INSERT CWGroup G: G name "group1"')
            req.execute('INSERT CWGroup G: G name "group2"')
            req.cnx.commit()
220
221

    def test_default_op_in_selector(self):
222
223
224
225
226
227
228
229
230
231
232
233
        with self.admin_access.web_request() as req:
            rset = req.execute('Any G WHERE G is CWGroup')
            expected = len(rset)
            selector = multi_lines_rset(expected)
            self.assertEqual(selector(None, req, rset=rset), 1)
            self.assertEqual(selector(None, req, None), 0)
            selector = multi_lines_rset(expected + 1)
            self.assertEqual(selector(None, req, rset=rset), 0)
            self.assertEqual(selector(None, req, None), 0)
            selector = multi_lines_rset(expected - 1)
            self.assertEqual(selector(None, req, rset=rset), 0)
            self.assertEqual(selector(None, req, None), 0)
234
235

    def test_without_rset(self):
236
237
238
239
240
241
242
243
244
        with self.admin_access.web_request() as req:
            rset = req.execute('Any G WHERE G is CWGroup')
            expected = len(rset)
            selector = multi_lines_rset(expected)
            self.assertEqual(selector(None, req, None), 0)
            selector = multi_lines_rset(expected + 1)
            self.assertEqual(selector(None, req, None), 0)
            selector = multi_lines_rset(expected - 1)
            self.assertEqual(selector(None, req, None), 0)
245
246

    def test_with_operators(self):
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
        with self.admin_access.web_request() as req:
            rset = req.execute('Any G WHERE G is CWGroup')
            expected = len(rset)

            # Format     'expected', 'operator', 'assert'
            testdata = (( expected,         eq,        1),
                        ( expected+1,       eq,        0),
                        ( expected-1,       eq,        0),
                        ( expected,         le,        1),
                        ( expected+1,       le,        1),
                        ( expected-1,       le,        0),
                        ( expected-1,       gt,        1),
                        ( expected,         gt,        0),
                        ( expected+1,       gt,        0),
                        ( expected+1,       lt,        1),
                        ( expected,         lt,        0),
                        ( expected-1,       lt,        0))

            for (expected, operator, assertion) in testdata:
                selector = multi_lines_rset(expected, operator)
267
268
                with self.subTest(expected=expected, operator=operator):
                    self.assertEqual(selector(None, req, rset=rset), assertion)
269
270
271


class MatchKwargsTC(TestCase):
272

273
274
275
276
277
278
279
280
281
282
283
284
285
286
    def test_match_kwargs_default(self):
        selector = match_kwargs( set( ('a', 'b') ) )
        self.assertEqual(selector(None, None, a=1, b=2), 2)
        self.assertEqual(selector(None, None, a=1), 0)
        self.assertEqual(selector(None, None, c=1), 0)
        self.assertEqual(selector(None, None, a=1, c=1), 0)

    def test_match_kwargs_any(self):
        selector = match_kwargs( set( ('a', 'b') ), mode='any')
        self.assertEqual(selector(None, None, a=1, b=2), 2)
        self.assertEqual(selector(None, None, a=1), 1)
        self.assertEqual(selector(None, None, c=1), 0)
        self.assertEqual(selector(None, None, a=1, c=1), 1)

287

288
class ScoreEntityTC(CubicWebTC):
289
290

    def test_intscore_entity_selector(self):
291
292
293
294
295
296
297
298
299
300
301
302
303
        with self.admin_access.web_request() as req:
            rset = req.execute('Any E WHERE E eid 1')
            selector = score_entity(lambda x: None)
            self.assertEqual(selector(None, req, rset=rset), 0)
            selector = score_entity(lambda x: "something")
            self.assertEqual(selector(None, req, rset=rset), 1)
            selector = score_entity(lambda x: object)
            self.assertEqual(selector(None, req, rset=rset), 1)
            rset = req.execute('Any G LIMIT 2 WHERE G is CWGroup')
            selector = score_entity(lambda x: 10)
            self.assertEqual(selector(None, req, rset=rset), 20)
            selector = score_entity(lambda x: 10, mode='any')
            self.assertEqual(selector(None, req, rset=rset), 10)
304

305
    def test_rql_condition_entity(self):
306
307
308
309
310
311
        with self.admin_access.web_request() as req:
            selector = rql_condition('X identity U')
            rset = req.user.as_rset()
            self.assertEqual(selector(None, req, rset=rset), 1)
            self.assertEqual(selector(None, req, entity=req.user), 1)
            self.assertEqual(selector(None, req), 0)
312

313
    def test_rql_condition_user(self):
314
315
316
317
318
        with self.admin_access.web_request() as req:
            selector = rql_condition('U login "admin"', user_condition=True)
            self.assertEqual(selector(None, req), 1)
            selector = rql_condition('U login "toto"', user_condition=True)
            self.assertEqual(selector(None, req), 0)
319

320
321
322
323
324
325
326
327
328
329
330

class AdaptablePredicateTC(CubicWebTC):

    def test_multiple_entity_types_rset(self):
        class CWUserIWhatever(EntityAdapter):
            __regid__ = 'IWhatever'
            __select__ = is_instance('CWUser')
        class CWGroupIWhatever(EntityAdapter):
            __regid__ = 'IWhatever'
            __select__ = is_instance('CWGroup')
        with self.temporary_appobjects(CWUserIWhatever, CWGroupIWhatever):
331
332
333
334
            with self.admin_access.web_request() as req:
                selector = adaptable('IWhatever')
                rset = req.execute('Any X WHERE X is IN(CWGroup, CWUser)')
                self.assertTrue(selector(None, req, rset=rset))
335

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430

class MatchFormParamsTC(CubicWebTC):
    """tests for match_form_params predicate"""

    def test_keyonly_match(self):
        """test standard usage: ``match_form_params('param1', 'param2')``

        ``param1`` and ``param2`` must be specified in request's form.
        """
        web_request = self.admin_access.web_request
        vid_selector = match_form_params('vid')
        vid_subvid_selector = match_form_params('vid', 'subvid')
        # no parameter => KO,KO
        with web_request() as req:
            self.assertEqual(vid_selector(None, req), 0)
            self.assertEqual(vid_subvid_selector(None, req), 0)
        # one expected parameter found => OK,KO
        with web_request(vid='foo') as req:
            self.assertEqual(vid_selector(None, req), 1)
            self.assertEqual(vid_subvid_selector(None, req), 0)
        # all expected parameters found => OK,OK
        with web_request(vid='foo', subvid='bar') as req:
            self.assertEqual(vid_selector(None, req), 1)
            self.assertEqual(vid_subvid_selector(None, req), 2)

    def test_keyvalue_match_one_parameter(self):
        """test dict usage: ``match_form_params(param1=value1)``

        ``param1`` must be specified in the request's form and its value
        must be ``value1``.
        """
        web_request = self.admin_access.web_request
        # test both positional and named parameters
        vid_selector = match_form_params(vid='foo')
        # no parameter => should fail
        with web_request() as req:
            self.assertEqual(vid_selector(None, req), 0)
        # expected parameter found with expected value => OK
        with web_request(vid='foo', subvid='bar') as req:
            self.assertEqual(vid_selector(None, req), 1)
        # expected parameter found but value is incorrect => KO
        with web_request(vid='bar') as req:
            self.assertEqual(vid_selector(None, req), 0)

    def test_keyvalue_match_two_parameters(self):
        """test dict usage: ``match_form_params(param1=value1, param2=value2)``

        ``param1`` and ``param2`` must be specified in the request's form and
        their respective value must be ``value1`` and ``value2``.
        """
        web_request = self.admin_access.web_request
        vid_subvid_selector = match_form_params(vid='list', subvid='tsearch')
        # missing one expected parameter => KO
        with web_request(vid='list') as req:
            self.assertEqual(vid_subvid_selector(None, req), 0)
        # expected parameters found but values are incorrect => KO
        with web_request(vid='list', subvid='foo') as req:
            self.assertEqual(vid_subvid_selector(None, req), 0)
        # expected parameters found and values are correct => OK
        with web_request(vid='list', subvid='tsearch') as req:
            self.assertEqual(vid_subvid_selector(None, req), 2)

    def test_keyvalue_multiple_match(self):
        """test dict usage with multiple values

        i.e. as in ``match_form_params(param1=('value1', 'value2'))``

        ``param1`` must be specified in the request's form and its value
        must be either ``value1`` or ``value2``.
        """
        web_request = self.admin_access.web_request
        vid_subvid_selector = match_form_params(vid='list', subvid=('tsearch', 'listitem'))
        # expected parameters found and values correct => OK
        with web_request(vid='list', subvid='tsearch') as req:
            self.assertEqual(vid_subvid_selector(None, req), 2)
        with web_request(vid='list', subvid='listitem') as req:
            self.assertEqual(vid_subvid_selector(None, req), 2)
        # expected parameters found but values are incorrect => OK
        with web_request(vid='list', subvid='foo') as req:
            self.assertEqual(vid_subvid_selector(None, req), 0)

    def test_invalid_calls(self):
        """checks invalid calls raise a ValueError"""
        # mixing named and positional arguments should fail
        with self.assertRaises(ValueError) as cm:
            match_form_params('list', x='1', y='2')
        self.assertEqual(str(cm.exception),
                         "match_form_params() can't be called with both "
                         "positional and named arguments")
        # using a dict as first and unique argument should fail
        with self.assertRaises(ValueError) as cm:
            match_form_params({'x': 1})
        self.assertEqual(str(cm.exception),
                         "match_form_params() positional arguments must be strings")

431

432
433
434
435
436
class PaginatedTC(CubicWebTC):
    """tests for paginated_rset predicate"""

    def setup_database(self):
        with self.admin_access.repo_cnx() as cnx:
437
            for i in range(30):
Rémi Cardona's avatar
Rémi Cardona committed
438
                cnx.create_entity('CWGroup', name=u"group%d" % i)
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
            cnx.commit()

    def test_paginated_rset(self):
        default_nb_pages = 1
        web_request = self.admin_access.web_request
        with web_request() as req:
            rset = req.execute('Any G WHERE G is CWGroup')
        self.assertEqual(len(rset), 34)
        with web_request(vid='list', page_size='10') as req:
            self.assertEqual(paginated_rset()(None, req, rset), default_nb_pages)
        with web_request(vid='list', page_size='20') as req:
            self.assertEqual(paginated_rset()(None, req, rset), default_nb_pages)
        with web_request(vid='list', page_size='50') as req:
            self.assertEqual(paginated_rset()(None, req, rset), 0)
        with web_request(vid='list', page_size='10/') as req:
            self.assertEqual(paginated_rset()(None, req, rset), 0)
        with web_request(vid='list', page_size='.1') as req:
            self.assertEqual(paginated_rset()(None, req, rset), 0)
        with web_request(vid='list', page_size='not_an_int') as req:
            self.assertEqual(paginated_rset()(None, req, rset), 0)
459

460

461
462
if __name__ == '__main__':
    unittest_main()