unittest_querier.py 87.1 KB
Newer Older
Adrien Di Mascio's avatar
Adrien Di Mascio committed
1
# -*- coding: iso-8859-1 -*-
Sylvain Thénault's avatar
Sylvain Thénault committed
2
# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
3
4
5
6
7
8
9
10
11
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
13
14
15
16
17
18
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
Sylvain Thénault's avatar
Sylvain Thénault committed
19
"""unit tests for modules cubicweb.server.querier and cubicweb.server.ssplanner
Adrien Di Mascio's avatar
Adrien Di Mascio committed
20
"""
21

22
from datetime import date, datetime, timedelta, tzinfo
23
import unittest
Adrien Di Mascio's avatar
Adrien Di Mascio committed
24

25
26
import pytz

27
from six import PY2, integer_types, binary_type, text_type
Sylvain Thénault's avatar
Sylvain Thénault committed
28
29

from rql import BadRQLQuery
Sylvain Thénault's avatar
Sylvain Thénault committed
30
from rql.utils import register_function, FunctionDescr
31

32
from cubicweb import QueryError, Unauthorized, Binary, devtools
Sylvain Thénault's avatar
Sylvain Thénault committed
33
from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX
Adrien Di Mascio's avatar
Adrien Di Mascio committed
34
from cubicweb.server.utils import crypt_password
35
from cubicweb.server.querier import manual_build_descr, _make_description
36
from cubicweb.devtools.testlib import CubicWebTC
37
from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
Adrien Di Mascio's avatar
Adrien Di Mascio committed
38

Sylvain Thénault's avatar
Sylvain Thénault committed
39

40
41
42
class FixedOffset(tzinfo):
    def __init__(self, hours=0):
        self.hours = hours
Sylvain Thénault's avatar
Sylvain Thénault committed
43

44
45
    def utcoffset(self, dt):
        return timedelta(hours=self.hours)
Sylvain Thénault's avatar
Sylvain Thénault committed
46

47
48
49
    def dst(self, dt):
        return timedelta(0)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
50
51
52
53
54
55

# register priority/severity sorting registered procedure

class group_sort_value(FunctionDescr):
    supported_backends = ('sqlite',)
    rtype = 'Int'
Sylvain Thénault's avatar
Sylvain Thénault committed
56

Adrien Di Mascio's avatar
Adrien Di Mascio committed
57
58
59
60
try:
    register_function(group_sort_value)
except AssertionError:
    pass
Sylvain Thénault's avatar
Sylvain Thénault committed
61
62


Adrien Di Mascio's avatar
Adrien Di Mascio committed
63
def init_sqlite_connexion(cnx):
Sylvain Thénault's avatar
Sylvain Thénault committed
64

Adrien Di Mascio's avatar
Adrien Di Mascio committed
65
    def group_sort_value(text):
Sylvain Thénault's avatar
Sylvain Thénault committed
66
67
        return {"managers": "3", "users": "2", "guests": "1", "owners": "0"}[text]

Adrien Di Mascio's avatar
Adrien Di Mascio committed
68
    cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value)
Sylvain Thénault's avatar
Sylvain Thénault committed
69

Adrien Di Mascio's avatar
Adrien Di Mascio committed
70
71
72
SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion)


73
def setUpClass(cls, *args):
74
    global repo, cnx
75
76
    config = devtools.TestServerConfiguration('data', __file__)
    handler = devtools.get_test_db_handler(config)
77
78
    handler.build_db_cache()
    repo, cnx = handler.get_repo_and_cnx()
79
    cls.repo = repo
Adrien Di Mascio's avatar
Adrien Di Mascio committed
80

81
def tearDownClass(cls, *args):
82
83
84
    global repo, cnx
    repo.shutdown()
    del repo, cnx
Adrien Di Mascio's avatar
Adrien Di Mascio committed
85
86


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
class Variable:
    def __init__(self, name):
        self.name = name
        self.children = []

    def get_type(self, solution, args=None):
        return solution[self.name]
    def as_string(self):
        return self.name

class Function:
    def __init__(self, name, varname):
        self.name = name
        self.children = [Variable(varname)]
    def get_type(self, solution, args=None):
        return 'Int'

104
class MakeDescriptionTC(unittest.TestCase):
105
106
107
108
109
110
    def test_known_values(self):
        solution = {'A': 'Int', 'B': 'CWUser'}
        self.assertEqual(_make_description((Function('max', 'A'), Variable('B')), {}, solution),
                          ['Int','CWUser'])


Adrien Di Mascio's avatar
Adrien Di Mascio committed
111
class UtilsTC(BaseQuerierTC):
112
113
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
114

Adrien Di Mascio's avatar
Adrien Di Mascio committed
115
116
117
118
119
120
    def get_max_eid(self):
        # no need for cleanup here
        return None
    def cleanup(self):
        # no need for cleanup here
        pass
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
121

Adrien Di Mascio's avatar
Adrien Di Mascio committed
122
    def test_preprocess_1(self):
123
124
        with self.session.new_cnx() as cnx:
            reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
125
            rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s',
126
127
128
                                  {'x': reid})
            self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}],
                             rqlst.solutions)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
129

Adrien Di Mascio's avatar
Adrien Di Mascio committed
130
    def test_preprocess_2(self):
131
132
133
134
135
136
137
138
139
        with self.session.new_cnx() as cnx:
            teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0]
            #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0]
            #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
            #             {'g': geid, 't': teid}, 'g')
            rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid})
            # the query may be optimized, should keep only one solution
            # (any one, etype will be discarded)
            self.assertEqual(1, len(rqlst.solutions))
140
141
142

    def assertRQLEqual(self, expected, got):
        from rql import parse
143
144
        self.assertMultiLineEqual(text_type(parse(expected)),
                                  text_type(parse(got)))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
145

Adrien Di Mascio's avatar
Adrien Di Mascio committed
146
    def test_preprocess_security(self):
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
        s = self.user_groups_session('users')
        with s.new_cnx() as cnx:
            plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
                                      'WHERE X is ET, ET name ETN')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['ETN','COUNT(X)'])
            self.assertEqual([t.as_string() for t in union.children[0].groupby],
                              ['ETN'])
            partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
            rql, solutions = partrqls[0]
            self.assertRQLEqual(rql,
                                'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
                                ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, '
                                '               X identity D, C is Division, D is Affaire))'
                                ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, '
                                '            X identity H, H is Affaire)))'
                                ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, '
                                '            X identity I, I is Affaire)))'
                                ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, '
                                '            X identity J, J is Affaire)))'
                                ', ET is CWEType, X is Affaire')
            self.assertEqual(solutions, [{'C': 'Division',
                                           'D': 'Affaire',
                                           'E': 'Note',
                                           'F': 'Societe',
                                           'G': 'SubDivision',
                                           'H': 'Affaire',
                                           'I': 'Affaire',
                                           'J': 'Affaire',
                                           'X': 'Affaire',
                                           'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[1]
            self.assertRQLEqual(rql,  'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
186
187
188
                                'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWComputedRType, '
                                '        CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, '
                                '        CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, '
Julien Cristau's avatar
Julien Cristau committed
189
190
                                '        Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, '
                                '        Frozable, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, '
191
                                '        SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
            self.assertCountEqual(solutions,
                                  [{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWComputedRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Frozable', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])
232
233
234
235
236
237
238
239
240
241
            rql, solutions = partrqls[2]
            self.assertEqual(rql,
                             'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), '
                             'ET is CWEType, X is EmailAddress')
            self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[3]
            self.assertEqual(rql,
                              'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), '
                              'ET is CWEType, X is Basket')
            self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
242
243

    def test_preprocess_security_aggregat(self):
244
245
246
247
248
249
250
251
252
253
254
        s = self.user_groups_session('users')
        with s.new_cnx() as cnx:
            plan = self._prepare_plan(cnx, 'Any MAX(X)')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['MAX(X)'])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
255

Adrien Di Mascio's avatar
Adrien Di Mascio committed
256
    def test_preprocess_nonregr(self):
257
258
259
        with self.session.new_cnx() as cnx:
            rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
            self.assertEqual(len(rqlst.solutions), 1)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
260

Adrien Di Mascio's avatar
Adrien Di Mascio committed
261
262
    def test_build_description(self):
        # should return an empty result set
263
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid})
264
        self.assertEqual(rset.description[0][0], 'CWUser')
265
        rset = self.qexecute('Any 1')
266
        self.assertEqual(rset.description[0][0], 'Int')
267
        rset = self.qexecute('Any TRUE')
268
        self.assertEqual(rset.description[0][0], 'Boolean')
269
        rset = self.qexecute('Any "hop"')
270
        self.assertEqual(rset.description[0][0], 'String')
271
        rset = self.qexecute('Any TODAY')
272
        self.assertEqual(rset.description[0][0], 'Date')
273
        rset = self.qexecute('Any NOW')
274
        self.assertEqual(rset.description[0][0], 'Datetime')
275
        rset = self.qexecute('Any %(x)s', {'x': 1})
276
        self.assertEqual(rset.description[0][0], 'Int')
Samuel Trégouët's avatar
Samuel Trégouët committed
277
278
279
        if PY2:
            rset = self.qexecute('Any %(x)s', {'x': long(1)})
            self.assertEqual(rset.description[0][0], 'Int')
280
        rset = self.qexecute('Any %(x)s', {'x': True})
281
        self.assertEqual(rset.description[0][0], 'Boolean')
282
        rset = self.qexecute('Any %(x)s', {'x': 1.0})
283
        self.assertEqual(rset.description[0][0], 'Float')
284
        rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
285
        self.assertEqual(rset.description[0][0], 'Datetime')
286
        rset = self.qexecute('Any %(x)s', {'x': 'str'})
287
        self.assertEqual(rset.description[0][0], 'String')
288
        rset = self.qexecute('Any %(x)s', {'x': u'str'})
289
        self.assertEqual(rset.description[0][0], 'String')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
290

291
    def test_build_descr1(self):
292
293
294
295
296
297
298
299
300
301
        with self.session.new_cnx() as cnx:
            rset = cnx.execute('(Any U,L WHERE U login L) UNION '
                               '(Any G,N WHERE G name N, G is CWGroup)')
            # rset.req = self.session
            orig_length = len(rset)
            rset.rows[0][0] = 9999999
            description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows)
            self.assertEqual(len(description), orig_length - 1)
            self.assertEqual(len(rset.rows), orig_length - 1)
            self.assertNotEqual(rset.rows[0][0], 9999999)
302
303

    def test_build_descr2(self):
304
305
        rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G))')
306
307
308
309
310
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

    def test_build_descr3(self):
311
312
        rset = self.qexecute('(Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G)')
313
314
315
316
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

Adrien Di Mascio's avatar
Adrien Di Mascio committed
317
318

class QuerierTC(BaseQuerierTC):
319
320
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
321
322
323

    def test_unknown_eid(self):
        # should return an empty result set
324
        self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
325

326
327
    def test_typed_eid(self):
        # should return an empty result set
328
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
Rémi Cardona's avatar
Rémi Cardona committed
329
        self.assertIsInstance(rset[0][0], integer_types)
330

331
    def test_bytes_storage(self):
332
333
        feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
                             'X data_format "text/plain", X data %(data)s',
Julien Cristau's avatar
Julien Cristau committed
334
                            {'data': Binary(b"xxx")})[0][0]
335
        fdata = self.qexecute('Any D WHERE X data D, X eid %(x)s', {'x': feid})[0][0]
336
        self.assertIsInstance(fdata, Binary)
Julien Cristau's avatar
Julien Cristau committed
337
        self.assertEqual(fdata.getvalue(), b'xxx')
338

Adrien Di Mascio's avatar
Adrien Di Mascio committed
339
    # selection queries tests #################################################
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
340

Adrien Di Mascio's avatar
Adrien Di Mascio committed
341
    def test_select_1(self):
342
        rset = self.qexecute('Any X ORDERBY X WHERE X is CWGroup')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
343
        result, descr = rset.rows, rset.description
344
        self.assertEqual(tuplify(result), [(2,), (3,), (4,), (5,)])
345
        self.assertEqual(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
346

Adrien Di Mascio's avatar
Adrien Di Mascio committed
347
    def test_select_2(self):
348
        rset = self.qexecute('Any X ORDERBY N WHERE X is CWGroup, X name N')
349
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
350
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
351
        rset = self.qexecute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N')
352
        self.assertEqual(tuplify(rset.rows), [(5,), (4,), (3,), (2,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
353

Adrien Di Mascio's avatar
Adrien Di Mascio committed
354
    def test_select_3(self):
355
        rset = self.qexecute('Any N GROUPBY N WHERE X is CWGroup, X name N')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
356
357
        result, descr = rset.rows, rset.description
        result.sort()
358
359
        self.assertEqual(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(descr, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
360

Adrien Di Mascio's avatar
Adrien Di Mascio committed
361
    def test_select_is(self):
362
        rset = self.qexecute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
363
        result, descr = rset.rows, rset.description
364
        self.assertEqual(result[0][1], descr[0][0])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
365

Adrien Di Mascio's avatar
Adrien Di Mascio committed
366
    def test_select_is_aggr(self):
367
        rset = self.qexecute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
368
        result, descr = rset.rows, rset.description
369
370
        self.assertEqual(descr[0][0], 'String')
        self.assertEqual(descr[0][1], 'Int')
Rémi Cardona's avatar
Rémi Cardona committed
371
        self.assertEqual(result[0][0], 'RQLExpression') # XXX may change as schema evolve
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
372

Adrien Di Mascio's avatar
Adrien Di Mascio committed
373
    def test_select_groupby_orderby(self):
374
        rset = self.qexecute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N')
375
376
        self.assertEqual(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(rset.description, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
377

Adrien Di Mascio's avatar
Adrien Di Mascio committed
378
    def test_select_complex_groupby(self):
379
380
        rset = self.qexecute('Any N GROUPBY N WHERE X name N')
        rset = self.qexecute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
381

Adrien Di Mascio's avatar
Adrien Di Mascio committed
382
    def test_select_inlined_groupby(self):
383
384
        seid = self.qexecute('State X WHERE X name "deactivated"')[0][0]
        rset = self.qexecute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
385

386
    def test_select_groupby_funccall(self):
387
388
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY YEAR(CD) '
                             'WHERE X is CWUser, X creation_date CD')
389
390
391
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

    def test_select_groupby_colnumber(self):
392
393
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY 1 '
                             'WHERE X is CWUser, X creation_date CD')
394
395
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
396
    def test_select_complex_orderby(self):
397
        rset1 = self.qexecute('Any N ORDERBY N WHERE X name N')
398
        self.assertEqual(sorted(rset1.rows), rset1.rows)
399
        rset = self.qexecute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
400
401
        self.assertEqual(rset.rows[0][0], rset1.rows[1][0])
        self.assertEqual(len(rset), 5)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
402

Adrien Di Mascio's avatar
Adrien Di Mascio committed
403
    def test_select_5(self):
404
405
406
407
408
409
410
411
412
413
414
        rset = self.qexecute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup')
        self.assertEqual(tuplify(rset.rows),
                         [(2, 'guests',),
                          (3, 'managers',),
                          (4, 'owners',),
                          (5, 'users',)])
        self.assertEqual(rset.description,
                         [('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
415

Adrien Di Mascio's avatar
Adrien Di Mascio committed
416
    def test_select_6(self):
417
418
        self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
        rset = self.qexecute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
419
        #self.assertEqual(rset.description, [('Personne',), ('Personne',)])
420
        self.assertIn(('Personne',), rset.description)
421
        rset = self.qexecute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
422
        self.assertIn(('Personne',), rset.description)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
423

Adrien Di Mascio's avatar
Adrien Di Mascio committed
424
    def test_select_not_attr(self):
425
426
427
        peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        seid = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        rset = self.qexecute('Personne X WHERE NOT X nom "bidule"')
428
        self.assertEqual(len(rset.rows), 0, rset.rows)
429
        rset = self.qexecute('Personne X WHERE NOT X nom "bid"')
430
        self.assertEqual(len(rset.rows), 1, rset.rows)
431
432
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
433
        self.assertEqual(len(rset.rows), 0, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
434

Adrien Di Mascio's avatar
Adrien Di Mascio committed
435
    def test_select_is_in(self):
436
437
438
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.assertEqual(len(self.qexecute("Any X WHERE X is IN (Personne, Societe)")),
Adrien Di Mascio's avatar
Adrien Di Mascio committed
439
                          2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
440

Adrien Di Mascio's avatar
Adrien Di Mascio committed
441
    def test_select_not_rel(self):
442
443
444
445
446
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
447
        self.assertEqual(len(rset.rows), 1, rset.rows)
448
        rset = self.qexecute('Personne X WHERE NOT X travaille S, S nom "chouette"')
449
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
450

Adrien Di Mascio's avatar
Adrien Di Mascio committed
451
    def test_select_nonregr_inlined(self):
452
453
454
455
456
457
        self.qexecute("INSERT Note X: X para 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'")
        rset = self.qexecute('Any U,T ORDERBY T DESC WHERE U is CWUser, '
                             'N ecrit_par U, N type T')#, {'x': self.ueid})
458
        self.assertEqual(len(rset.rows), 0)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
459

Adrien Di Mascio's avatar
Adrien Di Mascio committed
460
    def test_select_nonregr_edition_not(self):
461
        groupeids = set((2, 3, 4))
462
463
464
465
        groupreadperms = set(r[0] for r in self.qexecute('Any Y WHERE X name "CWGroup", '
                                                         'Y eid IN(2, 3, 4), X read_permission Y'))
        rset = self.qexecute('DISTINCT Any Y WHERE X is CWEType, X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
466
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
467
468
        rset = self.qexecute('DISTINCT Any Y WHERE X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
469
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
470

Adrien Di Mascio's avatar
Adrien Di Mascio committed
471
    def test_select_outer_join(self):
472
473
474
475
476
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'autre'")[0][0]
        seid1 = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        seid2 = self.qexecute("INSERT Societe X: X nom 'chouetos'")[0][0]
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
477
        self.assertEqual(rset.rows, [[peid1, None], [peid2, None]])
478
479
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
480
        self.assertEqual(rset.rows, [[peid1, seid1], [peid2, None]])
481
        rset = self.qexecute('Any S,X ORDERBY S WHERE X? travaille S')
482
        self.assertEqual(rset.rows, [[seid1, peid1], [seid2, None]])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
483

Adrien Di Mascio's avatar
Adrien Di Mascio committed
484
    def test_select_outer_join_optimized(self):
485
486
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        rset = self.qexecute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1})
487
        self.assertEqual(rset.rows, [[peid1]])
488
        rset = self.qexecute('Any X WHERE X eid %(x)s, X require_permission P?',
489
                            {'x':peid1})
490
        self.assertEqual(rset.rows, [[peid1]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
491
492

    def test_select_left_outer_join(self):
493
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G')
494
        self.assertEqual(len(rset), 4)
495
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G, U eid %(x)s',
496
                            {'x': self.session.user.eid})
497
        self.assertEqual(len(rset), 4)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
498
499

    def test_select_ambigous_outer_join(self):
500
501
502
503
        teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
        self.qexecute("INSERT Tag X: X name 'tagbis'")[0][0]
        geid = self.qexecute("CWGroup G WHERE G name 'users'")[0][0]
        self.qexecute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
504
                     {'g': geid, 't': teid})
505
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN")
506
507
        self.assertIn(['users', 'tag'], rset.rows)
        self.assertIn(['activated', None], rset.rows)
508
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
509
        self.assertEqual(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
510

Adrien Di Mascio's avatar
Adrien Di Mascio committed
511
    def test_select_not_inline_rel(self):
512
513
514
515
516
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT X ecrit_par P')
517
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
518

Adrien Di Mascio's avatar
Adrien Di Mascio committed
519
    def test_select_not_unlinked_multiple_solutions(self):
520
521
522
523
524
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT Y evaluee X')
525
        self.assertEqual(len(rset.rows), 1, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
526

527
    def test_select_date_extraction(self):
528
        self.qexecute("INSERT Personne X: X nom 'foo', X datenaiss %(d)s",
529
530
                     {'d': datetime(2001, 2,3, 12,13)})
        test_data = [('YEAR', 2001), ('MONTH', 2), ('DAY', 3),
531
                     ('HOUR', 12), ('MINUTE', 13), ('WEEKDAY', 6)]
532
        for funcname, result in test_data:
533
            rset = self.qexecute('Any %s(D) WHERE X is Personne, X datenaiss D'
534
                                % funcname)
535
536
537
            self.assertEqual(len(rset.rows), 1)
            self.assertEqual(rset.rows[0][0], result)
            self.assertEqual(rset.description, [('Int',)])
538

539
    def test_regexp_based_pattern_matching(self):
540
541
542
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'cidule'")[0][0]
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "^b"')
543
544
        self.assertEqual(len(rset.rows), 1, rset.rows)
        self.assertEqual(rset.rows[0][0], peid1)
545
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "idu"')
546
547
        self.assertEqual(len(rset.rows), 2, rset.rows)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
548
    def test_select_aggregat_count(self):
549
        rset = self.qexecute('Any COUNT(X)')
550
551
552
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
553

Adrien Di Mascio's avatar
Adrien Di Mascio committed
554
    def test_select_aggregat_sum(self):
555
        rset = self.qexecute('Any SUM(O) WHERE X ordernum O')
556
557
558
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
559

Adrien Di Mascio's avatar
Adrien Di Mascio committed
560
    def test_select_aggregat_min(self):
561
        rset = self.qexecute('Any MIN(X) WHERE X is Personne')
562
563
564
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
565
        rset = self.qexecute('Any MIN(O) WHERE X ordernum O')
566
567
568
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
569

Adrien Di Mascio's avatar
Adrien Di Mascio committed
570
    def test_select_aggregat_max(self):
571
        rset = self.qexecute('Any MAX(X) WHERE X is Personne')
572
573
574
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
575
        rset = self.qexecute('Any MAX(O) WHERE X ordernum O')
576
577
578
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
579
580

    def test_select_custom_aggregat_concat_string(self):
581
        rset = self.qexecute('Any GROUP_CONCAT(N) WHERE X is CWGroup, X name N')
582
583
        self.assertTrue(rset)
        self.assertEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
584
585
586
                                                             'owners', 'users'])

    def test_select_custom_regproc_limit_size(self):
587
        rset = self.qexecute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is CWGroup, X name N, X name "managers"')
588
589
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'man...')
590
591
        self.qexecute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'")
        rset = self.qexecute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF')
592
593
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'hop...')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
594
595

    def test_select_regproc_orderby(self):
596
        rset = self.qexecute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, X name "managers"')
597
598
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][1], 'managers')
599
        rset = self.qexecute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"')
600
601
        self.assertEqual(len(rset), 3)
        self.assertEqual(rset[0][1], 'owners')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
602

Adrien Di Mascio's avatar
Adrien Di Mascio committed
603
    def test_select_aggregat_sort(self):
604
        rset = self.qexecute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
605
606
607
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(len(rset.rows[0]), 2)
        self.assertEqual(rset.description[0], ('CWGroup', 'Int',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
608
609

    def test_select_aggregat_having(self):
610
        rset = self.qexecute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
611
612
                            'WHERE RT name N, RDEF relation_type RT '
                            'HAVING COUNT(RDEF) > 10')
613
        self.assertListEqual(rset.rows,
614
615
                              [[u'description_format', 13],
                               [u'description', 14],
Julien Cristau's avatar
Julien Cristau committed
616
                               [u'name', 19],
617
618
619
620
621
622
623
624
625
                               [u'created_by', 45],
                               [u'creation_date', 45],
                               [u'cw_source', 45],
                               [u'cwuri', 45],
                               [u'in_basket', 45],
                               [u'is', 45],
                               [u'is_instance_of', 45],
                               [u'modification_date', 45],
                               [u'owned_by', 45]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
626
627
628

    def test_select_aggregat_having_dumb(self):
        # dumb but should not raise an error
629
        rset = self.qexecute('Any U,COUNT(X) GROUPBY U '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
630
631
                            'WHERE U eid %(x)s, X owned_by U '
                            'HAVING COUNT(X) > 10', {'x': self.ueid})
632
633
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(rset.rows[0][0], self.ueid)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
634

635
    def test_select_having_non_aggregat_1(self):
636
        rset = self.qexecute('Any L WHERE X login L, X creation_date CD '
637
                            'HAVING YEAR(CD) = %s' % date.today().year)
638
        self.assertListEqual(rset.rows,
639
640
641
642
                              [[u'admin'],
                               [u'anon']])

    def test_select_having_non_aggregat_2(self):
643
        rset = self.qexecute('Any L GROUPBY L WHERE X login L, X in_group G, '
644
645
                            'X creation_date CD HAVING YEAR(CD) = %s OR COUNT(G) > 1'
                            % date.today().year)
646
        self.assertListEqual(rset.rows,
647
648
649
                              [[u'admin'],
                               [u'anon']])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
650
    def test_select_complex_sort(self):
651
        """need sqlite including http://www.sqlite.org/cvstrac/tktview?tn=3773 fix"""
652
        rset = self.qexecute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
653
654
        result = rset.rows
        result.sort()
655
        self.assertEqual(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
656

Adrien Di Mascio's avatar
Adrien Di Mascio committed
657
    def test_select_upper(self):
658
        rset = self.qexecute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L')
659
660
661
662
663
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(rset.rows[0][1], 'ADMIN')
        self.assertEqual(rset.description[0], ('CWUser', 'String',))
        self.assertEqual(rset.rows[1][1], 'ANON')
        self.assertEqual(rset.description[1], ('CWUser', 'String',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
664
        eid = rset.rows[0][0]
665
        rset = self.qexecute('Any UPPER(L) WHERE X eid %s, X login L'%eid)
666
667
        self.assertEqual(rset.rows[0][0], 'ADMIN')
        self.assertEqual(rset.description, [('String',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
668

669
670
    def test_select_float_abs(self):
        # test positive number
671
672
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': 1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
673
        self.assertEqual(rset.rows[0][0], 1.2)
674
        # test negative number
675
676
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': -1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
677
        self.assertEqual(rset.rows[0][0], 1.2)
678
679
680

    def test_select_int_abs(self):
        # test positive number
681
682
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': 12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
683
        self.assertEqual(rset.rows[0][0], 12)
684
        # test negative number
685
686
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': -12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
687
        self.assertEqual(rset.rows[0][0], 12)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
688
689
690

##     def test_select_simplified(self):
##         ueid = self.session.user.eid
691
##         rset = self.qexecute('Any L WHERE %s login L'%ueid)
692
##         self.assertEqual(rset.rows[0][0], 'admin')
693
##         rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid})
694
##         self.assertEqual(rset.rows[0][0], 'admin')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
695

Adrien Di Mascio's avatar
Adrien Di Mascio committed
696
    def test_select_searchable_text_1(self):
697
698
699
700
        rset = self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute(u"INSERT Societe X: X nom 'bidle'")
        rset = self.qexecute("INSERT Societe X: X nom 'chouette'")
        rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidle'})
701
        self.assertEqual(len(rset.rows), 2, rset.rows)
702
        rset = self.qexecute(u'Any N where N has_text "bidle"')
703
        self.assertEqual(len(rset.rows), 2, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
704
        biduleeids = [r[0] for r in rset.rows]
705
        rset = self.qexecute(u'Any N where NOT N has_text "bidle"')
706
        self.assertFalse([r[0] for r in rset.rows if r[0] in biduleeids])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
707
        # duh?
708
        rset = self.qexecute('Any X WHERE X has_text %(text)s', {'text': u'a'})
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
709

Adrien Di Mascio's avatar
Adrien Di Mascio committed
710
    def test_select_searchable_text_2(self):
711
712
713
714
        rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute("INSERT Personne X: X nom 'chouette'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Personne N where N has_text "bidule"')
715
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
716

Adrien Di Mascio's avatar
Adrien Di Mascio committed
717
    def test_select_searchable_text_3(self):
718
719
720
721
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"')
722
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
723

Adrien Di Mascio's avatar
Adrien Di Mascio committed
724
    def test_select_multiple_searchable_text(self):
725
726
727
728
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X")
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
729
730
731
                            {'text': u'bidle',
                             'text2': u'chouette',}
                            )
732
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
733

Adrien Di Mascio's avatar
Adrien Di Mascio committed
734
    def test_select_no_descr(self):
735
        rset = self.qexecute('Any X WHERE X is CWGroup', build_descr=0)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
736
        rset.rows.sort()
737
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
738
        self.assertEqual(rset.description, ())
Adrien Di Mascio's avatar
Adrien Di Mascio committed
739
740

    def test_select_limit_offset(self):
741
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 WHERE X name N')
742
        self.assertEqual(tuplify(rset.rows), [(2,), (3,)])
743
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',)])
744
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
745
        self.assertEqual(tuplify(rset.rows), [(4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
746

747
    def test_select_symmetric(self):
748
749
750
751
752
753
754
        self.qexecute("INSERT Personne X: X nom 'machin'")
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'trucmuche'")
        self.qexecute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'")
        self.qexecute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'")
        rset = self.qexecute('Any P WHERE P connait P2')
755
        self.assertEqual(len(rset.rows), 4, rset.rows)
756
        rset = self.qexecute('Any P WHERE NOT P connait P2')
757
        self.assertEqual(len(rset.rows), 1, rset.rows) # trucmuche
758
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "bidule"')
759
        self.assertEqual(len(rset.rows), 1, rset.rows)
760
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "bidule"')
761
        self.assertEqual(len(rset.rows), 1, rset.rows)
762
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "chouette"')
763
        self.assertEqual(len(rset.rows), 2, rset.rows)
764
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"')
765
        self.assertEqual(len(rset.rows), 2, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
766

Adrien Di Mascio's avatar
Adrien Di Mascio committed
767
    def test_select_inline(self):
768
769
770
771
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"')
772
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
773

Adrien Di Mascio's avatar
Adrien Di Mascio committed
774
    def test_select_creation_date(self):
775
776
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
777
778
779
        self.assertEqual(len(rset.rows), 1)

    def test_select_or_relation(self):
780
781
782
783
784
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Societe X: X nom 'logilab'")
        self.qexecute("INSERT Societe X: X nom 'caesium'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
785
786
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
787
        self.assertEqual(len(rset.rows), 1)
788
        self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
789
790
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
791
        self.assertEqual(len(rset.rows), 2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
792

Adrien Di Mascio's avatar
Adrien Di Mascio committed
793
    def test_select_or_sym_relation(self):
794
795
796
797
798
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'truc'")
        self.qexecute("SET P connait S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
799
        self.assertEqual(len(rset.rows), 1, rset.rows)
800
        rset = self.qexecute('DISTINCT Any P WHERE P connait S or S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
801
        self.assertEqual(len(rset.rows), 1, rset.rows)
802
803
        self.qexecute("SET P connait S WHERE P nom 'chouette', S nom 'truc'")
        rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
804
        self.assertEqual(len(rset.rows), 2, rset.rows)
Aurelien Campeas's avatar