unittest_querier.py 86.9 KB
Newer Older
Adrien Di Mascio's avatar
Adrien Di Mascio committed
1
# -*- coding: iso-8859-1 -*-
Sylvain Thénault's avatar
Sylvain Thénault committed
2
# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
3
4
5
6
7
8
9
10
11
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
13
14
15
16
17
18
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
Sylvain Thénault's avatar
Sylvain Thénault committed
19
"""unit tests for modules cubicweb.server.querier and cubicweb.server.ssplanner
Adrien Di Mascio's avatar
Adrien Di Mascio committed
20
"""
21

22
from datetime import date, datetime, timedelta, tzinfo
23
import unittest
Adrien Di Mascio's avatar
Adrien Di Mascio committed
24

25
26
import pytz

27
from six import PY2, integer_types, binary_type, text_type
Sylvain Thénault's avatar
Sylvain Thénault committed
28
29

from rql import BadRQLQuery
Sylvain Thénault's avatar
Sylvain Thénault committed
30
from rql.utils import register_function, FunctionDescr
31

32
from cubicweb import QueryError, Unauthorized, Binary, devtools
Sylvain Thénault's avatar
Sylvain Thénault committed
33
from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX
Adrien Di Mascio's avatar
Adrien Di Mascio committed
34
from cubicweb.server.utils import crypt_password
35
from cubicweb.server.querier import manual_build_descr, _make_description
36
from cubicweb.devtools.testlib import CubicWebTC
37
from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
Adrien Di Mascio's avatar
Adrien Di Mascio committed
38

Sylvain Thénault's avatar
Sylvain Thénault committed
39

40
41
42
class FixedOffset(tzinfo):
    def __init__(self, hours=0):
        self.hours = hours
Sylvain Thénault's avatar
Sylvain Thénault committed
43

44
45
    def utcoffset(self, dt):
        return timedelta(hours=self.hours)
Sylvain Thénault's avatar
Sylvain Thénault committed
46

47
48
49
    def dst(self, dt):
        return timedelta(0)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
50
51
52
53
54
55

# register priority/severity sorting registered procedure

class group_sort_value(FunctionDescr):
    supported_backends = ('sqlite',)
    rtype = 'Int'
Sylvain Thénault's avatar
Sylvain Thénault committed
56

Adrien Di Mascio's avatar
Adrien Di Mascio committed
57
58
59
60
try:
    register_function(group_sort_value)
except AssertionError:
    pass
Sylvain Thénault's avatar
Sylvain Thénault committed
61
62


Adrien Di Mascio's avatar
Adrien Di Mascio committed
63
def init_sqlite_connexion(cnx):
Sylvain Thénault's avatar
Sylvain Thénault committed
64

Adrien Di Mascio's avatar
Adrien Di Mascio committed
65
    def group_sort_value(text):
Sylvain Thénault's avatar
Sylvain Thénault committed
66
67
        return {"managers": "3", "users": "2", "guests": "1", "owners": "0"}[text]

Adrien Di Mascio's avatar
Adrien Di Mascio committed
68
    cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value)
Sylvain Thénault's avatar
Sylvain Thénault committed
69

Adrien Di Mascio's avatar
Adrien Di Mascio committed
70
71
72
SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion)


73
def setUpClass(cls, *args):
74
    global repo, cnx
75
76
    config = devtools.TestServerConfiguration('data', __file__)
    handler = devtools.get_test_db_handler(config)
77
78
    handler.build_db_cache()
    repo, cnx = handler.get_repo_and_cnx()
79
    cls.repo = repo
Adrien Di Mascio's avatar
Adrien Di Mascio committed
80

81
def tearDownClass(cls, *args):
82
83
84
    global repo, cnx
    repo.shutdown()
    del repo, cnx
Adrien Di Mascio's avatar
Adrien Di Mascio committed
85
86


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
class Variable:
    def __init__(self, name):
        self.name = name
        self.children = []

    def get_type(self, solution, args=None):
        return solution[self.name]
    def as_string(self):
        return self.name

class Function:
    def __init__(self, name, varname):
        self.name = name
        self.children = [Variable(varname)]
    def get_type(self, solution, args=None):
        return 'Int'

104
class MakeDescriptionTC(unittest.TestCase):
105
106
107
108
109
110
    def test_known_values(self):
        solution = {'A': 'Int', 'B': 'CWUser'}
        self.assertEqual(_make_description((Function('max', 'A'), Variable('B')), {}, solution),
                          ['Int','CWUser'])


Adrien Di Mascio's avatar
Adrien Di Mascio committed
111
class UtilsTC(BaseQuerierTC):
112
113
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
114

Adrien Di Mascio's avatar
Adrien Di Mascio committed
115
116
117
118
119
120
    def get_max_eid(self):
        # no need for cleanup here
        return None
    def cleanup(self):
        # no need for cleanup here
        pass
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
121

Adrien Di Mascio's avatar
Adrien Di Mascio committed
122
    def test_preprocess_1(self):
123
        with self._access.cnx() as cnx:
124
            reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
125
            rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s',
126
127
128
                                  {'x': reid})
            self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}],
                             rqlst.solutions)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
129

Adrien Di Mascio's avatar
Adrien Di Mascio committed
130
    def test_preprocess_2(self):
131
        with self._access.cnx() as cnx:
132
133
134
135
136
137
138
139
            teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0]
            #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0]
            #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
            #             {'g': geid, 't': teid}, 'g')
            rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid})
            # the query may be optimized, should keep only one solution
            # (any one, etype will be discarded)
            self.assertEqual(1, len(rqlst.solutions))
140
141
142

    def assertRQLEqual(self, expected, got):
        from rql import parse
143
144
        self.assertMultiLineEqual(text_type(parse(expected)),
                                  text_type(parse(got)))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
145

Adrien Di Mascio's avatar
Adrien Di Mascio committed
146
    def test_preprocess_security(self):
147
        with self.user_groups_session('users') as cnx:
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
            plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
                                      'WHERE X is ET, ET name ETN')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['ETN','COUNT(X)'])
            self.assertEqual([t.as_string() for t in union.children[0].groupby],
                              ['ETN'])
            partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
            rql, solutions = partrqls[0]
            self.assertRQLEqual(rql,
                                'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
                                ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, '
                                '               X identity D, C is Division, D is Affaire))'
                                ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, '
                                '            X identity H, H is Affaire)))'
                                ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, '
                                '            X identity I, I is Affaire)))'
                                ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, '
                                '            X identity J, J is Affaire)))'
                                ', ET is CWEType, X is Affaire')
            self.assertEqual(solutions, [{'C': 'Division',
                                           'D': 'Affaire',
                                           'E': 'Note',
                                           'F': 'Societe',
                                           'G': 'SubDivision',
                                           'H': 'Affaire',
                                           'I': 'Affaire',
                                           'J': 'Affaire',
                                           'X': 'Affaire',
                                           'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[1]
            self.assertRQLEqual(rql,  'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
185
186
187
                                'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWComputedRType, '
                                '        CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, '
                                '        CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, '
Julien Cristau's avatar
Julien Cristau committed
188
189
                                '        Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, '
                                '        Frozable, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, '
190
                                '        SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
            self.assertCountEqual(solutions,
                                  [{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWComputedRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Frozable', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])
231
232
233
234
235
236
237
238
239
240
            rql, solutions = partrqls[2]
            self.assertEqual(rql,
                             'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), '
                             'ET is CWEType, X is EmailAddress')
            self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[3]
            self.assertEqual(rql,
                              'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), '
                              'ET is CWEType, X is Basket')
            self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
241
242

    def test_preprocess_security_aggregat(self):
243
        with self.user_groups_session('users') as cnx:
244
245
246
247
248
249
250
251
252
            plan = self._prepare_plan(cnx, 'Any MAX(X)')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['MAX(X)'])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
253

Adrien Di Mascio's avatar
Adrien Di Mascio committed
254
    def test_preprocess_nonregr(self):
255
        with self._access.cnx() as cnx:
256
257
            rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
            self.assertEqual(len(rqlst.solutions), 1)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
258

Adrien Di Mascio's avatar
Adrien Di Mascio committed
259
260
    def test_build_description(self):
        # should return an empty result set
261
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self._access._user.eid})
262
        self.assertEqual(rset.description[0][0], 'CWUser')
263
        rset = self.qexecute('Any 1')
264
        self.assertEqual(rset.description[0][0], 'Int')
265
        rset = self.qexecute('Any TRUE')
266
        self.assertEqual(rset.description[0][0], 'Boolean')
267
        rset = self.qexecute('Any "hop"')
268
        self.assertEqual(rset.description[0][0], 'String')
269
        rset = self.qexecute('Any TODAY')
270
        self.assertEqual(rset.description[0][0], 'Date')
271
        rset = self.qexecute('Any NOW')
272
        self.assertEqual(rset.description[0][0], 'Datetime')
273
        rset = self.qexecute('Any %(x)s', {'x': 1})
274
        self.assertEqual(rset.description[0][0], 'Int')
Samuel Trégouët's avatar
Samuel Trégouët committed
275
276
277
        if PY2:
            rset = self.qexecute('Any %(x)s', {'x': long(1)})
            self.assertEqual(rset.description[0][0], 'Int')
278
        rset = self.qexecute('Any %(x)s', {'x': True})
279
        self.assertEqual(rset.description[0][0], 'Boolean')
280
        rset = self.qexecute('Any %(x)s', {'x': 1.0})
281
        self.assertEqual(rset.description[0][0], 'Float')
282
        rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
283
        self.assertEqual(rset.description[0][0], 'Datetime')
284
        rset = self.qexecute('Any %(x)s', {'x': 'str'})
285
        self.assertEqual(rset.description[0][0], 'String')
286
        rset = self.qexecute('Any %(x)s', {'x': u'str'})
287
        self.assertEqual(rset.description[0][0], 'String')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
288

289
    def test_build_descr1(self):
290
        with self._access.cnx() as cnx:
291
292
293
294
295
296
297
298
            rset = cnx.execute('(Any U,L WHERE U login L) UNION '
                               '(Any G,N WHERE G name N, G is CWGroup)')
            orig_length = len(rset)
            rset.rows[0][0] = 9999999
            description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows)
            self.assertEqual(len(description), orig_length - 1)
            self.assertEqual(len(rset.rows), orig_length - 1)
            self.assertNotEqual(rset.rows[0][0], 9999999)
299
300

    def test_build_descr2(self):
301
302
        rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G))')
303
304
305
306
307
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

    def test_build_descr3(self):
308
309
        rset = self.qexecute('(Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G)')
310
311
312
313
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

Adrien Di Mascio's avatar
Adrien Di Mascio committed
314
315

class QuerierTC(BaseQuerierTC):
316
317
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
318
319
320

    def test_unknown_eid(self):
        # should return an empty result set
321
        self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
322

323
324
    def test_typed_eid(self):
        # should return an empty result set
325
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
Rémi Cardona's avatar
Rémi Cardona committed
326
        self.assertIsInstance(rset[0][0], integer_types)
327

328
    def test_bytes_storage(self):
329
330
        feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
                             'X data_format "text/plain", X data %(data)s',
Julien Cristau's avatar
Julien Cristau committed
331
                            {'data': Binary(b"xxx")})[0][0]
332
        fdata = self.qexecute('Any D WHERE X data D, X eid %(x)s', {'x': feid})[0][0]
333
        self.assertIsInstance(fdata, Binary)
Julien Cristau's avatar
Julien Cristau committed
334
        self.assertEqual(fdata.getvalue(), b'xxx')
335

Adrien Di Mascio's avatar
Adrien Di Mascio committed
336
    # selection queries tests #################################################
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
337

Adrien Di Mascio's avatar
Adrien Di Mascio committed
338
    def test_select_1(self):
339
        rset = self.qexecute('Any X ORDERBY X WHERE X is CWGroup')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
340
        result, descr = rset.rows, rset.description
341
        self.assertEqual(tuplify(result), [(2,), (3,), (4,), (5,)])
342
        self.assertEqual(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
343

Adrien Di Mascio's avatar
Adrien Di Mascio committed
344
    def test_select_2(self):
345
        rset = self.qexecute('Any X ORDERBY N WHERE X is CWGroup, X name N')
346
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
347
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
348
        rset = self.qexecute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N')
349
        self.assertEqual(tuplify(rset.rows), [(5,), (4,), (3,), (2,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
350

Adrien Di Mascio's avatar
Adrien Di Mascio committed
351
    def test_select_3(self):
352
        rset = self.qexecute('Any N GROUPBY N WHERE X is CWGroup, X name N')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
353
354
        result, descr = rset.rows, rset.description
        result.sort()
355
356
        self.assertEqual(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(descr, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
357

Adrien Di Mascio's avatar
Adrien Di Mascio committed
358
    def test_select_is(self):
359
        rset = self.qexecute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
360
        result, descr = rset.rows, rset.description
361
        self.assertEqual(result[0][1], descr[0][0])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
362

Adrien Di Mascio's avatar
Adrien Di Mascio committed
363
    def test_select_is_aggr(self):
364
        rset = self.qexecute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
365
        result, descr = rset.rows, rset.description
366
367
        self.assertEqual(descr[0][0], 'String')
        self.assertEqual(descr[0][1], 'Int')
Rémi Cardona's avatar
Rémi Cardona committed
368
        self.assertEqual(result[0][0], 'RQLExpression') # XXX may change as schema evolve
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
369

Adrien Di Mascio's avatar
Adrien Di Mascio committed
370
    def test_select_groupby_orderby(self):
371
        rset = self.qexecute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N')
372
373
        self.assertEqual(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(rset.description, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
374

Adrien Di Mascio's avatar
Adrien Di Mascio committed
375
    def test_select_complex_groupby(self):
376
377
        rset = self.qexecute('Any N GROUPBY N WHERE X name N')
        rset = self.qexecute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
378

Adrien Di Mascio's avatar
Adrien Di Mascio committed
379
    def test_select_inlined_groupby(self):
380
381
        seid = self.qexecute('State X WHERE X name "deactivated"')[0][0]
        rset = self.qexecute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
382

383
    def test_select_groupby_funccall(self):
384
385
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY YEAR(CD) '
                             'WHERE X is CWUser, X creation_date CD')
386
387
388
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

    def test_select_groupby_colnumber(self):
389
390
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY 1 '
                             'WHERE X is CWUser, X creation_date CD')
391
392
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
393
    def test_select_complex_orderby(self):
394
        rset1 = self.qexecute('Any N ORDERBY N WHERE X name N')
395
        self.assertEqual(sorted(rset1.rows), rset1.rows)
396
        rset = self.qexecute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
397
398
        self.assertEqual(rset.rows[0][0], rset1.rows[1][0])
        self.assertEqual(len(rset), 5)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
399

Adrien Di Mascio's avatar
Adrien Di Mascio committed
400
    def test_select_5(self):
401
402
403
404
405
406
407
408
409
410
411
        rset = self.qexecute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup')
        self.assertEqual(tuplify(rset.rows),
                         [(2, 'guests',),
                          (3, 'managers',),
                          (4, 'owners',),
                          (5, 'users',)])
        self.assertEqual(rset.description,
                         [('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
412

Adrien Di Mascio's avatar
Adrien Di Mascio committed
413
    def test_select_6(self):
414
415
        self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
        rset = self.qexecute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
416
        #self.assertEqual(rset.description, [('Personne',), ('Personne',)])
417
        self.assertIn(('Personne',), rset.description)
418
        rset = self.qexecute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
419
        self.assertIn(('Personne',), rset.description)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
420

Adrien Di Mascio's avatar
Adrien Di Mascio committed
421
    def test_select_not_attr(self):
422
423
424
        peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        seid = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        rset = self.qexecute('Personne X WHERE NOT X nom "bidule"')
425
        self.assertEqual(len(rset.rows), 0, rset.rows)
426
        rset = self.qexecute('Personne X WHERE NOT X nom "bid"')
427
        self.assertEqual(len(rset.rows), 1, rset.rows)
428
429
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
430
        self.assertEqual(len(rset.rows), 0, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
431

Adrien Di Mascio's avatar
Adrien Di Mascio committed
432
    def test_select_is_in(self):
433
434
435
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.assertEqual(len(self.qexecute("Any X WHERE X is IN (Personne, Societe)")),
Adrien Di Mascio's avatar
Adrien Di Mascio committed
436
                          2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
437

Adrien Di Mascio's avatar
Adrien Di Mascio committed
438
    def test_select_not_rel(self):
439
440
441
442
443
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
444
        self.assertEqual(len(rset.rows), 1, rset.rows)
445
        rset = self.qexecute('Personne X WHERE NOT X travaille S, S nom "chouette"')
446
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
447

Adrien Di Mascio's avatar
Adrien Di Mascio committed
448
    def test_select_nonregr_inlined(self):
449
450
451
452
453
454
        self.qexecute("INSERT Note X: X para 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'")
        rset = self.qexecute('Any U,T ORDERBY T DESC WHERE U is CWUser, '
                             'N ecrit_par U, N type T')#, {'x': self.ueid})
455
        self.assertEqual(len(rset.rows), 0)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
456

Adrien Di Mascio's avatar
Adrien Di Mascio committed
457
    def test_select_nonregr_edition_not(self):
458
        groupeids = set((2, 3, 4))
459
460
461
462
        groupreadperms = set(r[0] for r in self.qexecute('Any Y WHERE X name "CWGroup", '
                                                         'Y eid IN(2, 3, 4), X read_permission Y'))
        rset = self.qexecute('DISTINCT Any Y WHERE X is CWEType, X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
463
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
464
465
        rset = self.qexecute('DISTINCT Any Y WHERE X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
466
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
467

Adrien Di Mascio's avatar
Adrien Di Mascio committed
468
    def test_select_outer_join(self):
469
470
471
472
473
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'autre'")[0][0]
        seid1 = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        seid2 = self.qexecute("INSERT Societe X: X nom 'chouetos'")[0][0]
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
474
        self.assertEqual(rset.rows, [[peid1, None], [peid2, None]])
475
476
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
477
        self.assertEqual(rset.rows, [[peid1, seid1], [peid2, None]])
478
        rset = self.qexecute('Any S,X ORDERBY S WHERE X? travaille S')
479
        self.assertEqual(rset.rows, [[seid1, peid1], [seid2, None]])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
480

Adrien Di Mascio's avatar
Adrien Di Mascio committed
481
    def test_select_outer_join_optimized(self):
482
483
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        rset = self.qexecute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1})
484
        self.assertEqual(rset.rows, [[peid1]])
485
        rset = self.qexecute('Any X WHERE X eid %(x)s, X require_permission P?',
486
                            {'x':peid1})
487
        self.assertEqual(rset.rows, [[peid1]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
488
489

    def test_select_left_outer_join(self):
490
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G')
491
        self.assertEqual(len(rset), 4)
492
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G, U eid %(x)s',
493
                            {'x': self._access._user.eid})
494
        self.assertEqual(len(rset), 4)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
495
496

    def test_select_ambigous_outer_join(self):
497
498
499
500
        teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
        self.qexecute("INSERT Tag X: X name 'tagbis'")[0][0]
        geid = self.qexecute("CWGroup G WHERE G name 'users'")[0][0]
        self.qexecute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
501
                     {'g': geid, 't': teid})
502
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN")
503
504
        self.assertIn(['users', 'tag'], rset.rows)
        self.assertIn(['activated', None], rset.rows)
505
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
506
        self.assertEqual(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
507

Adrien Di Mascio's avatar
Adrien Di Mascio committed
508
    def test_select_not_inline_rel(self):
509
510
511
512
513
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT X ecrit_par P')
514
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
515

Adrien Di Mascio's avatar
Adrien Di Mascio committed
516
    def test_select_not_unlinked_multiple_solutions(self):
517
518
519
520
521
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT Y evaluee X')
522
        self.assertEqual(len(rset.rows), 1, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
523

524
    def test_select_date_extraction(self):
525
        self.qexecute("INSERT Personne X: X nom 'foo', X datenaiss %(d)s",
526
527
                     {'d': datetime(2001, 2,3, 12,13)})
        test_data = [('YEAR', 2001), ('MONTH', 2), ('DAY', 3),
528
                     ('HOUR', 12), ('MINUTE', 13), ('WEEKDAY', 6)]
529
        for funcname, result in test_data:
530
            rset = self.qexecute('Any %s(D) WHERE X is Personne, X datenaiss D'
531
                                % funcname)
532
533
534
            self.assertEqual(len(rset.rows), 1)
            self.assertEqual(rset.rows[0][0], result)
            self.assertEqual(rset.description, [('Int',)])
535

536
    def test_regexp_based_pattern_matching(self):
537
538
539
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'cidule'")[0][0]
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "^b"')
540
541
        self.assertEqual(len(rset.rows), 1, rset.rows)
        self.assertEqual(rset.rows[0][0], peid1)
542
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "idu"')
543
544
        self.assertEqual(len(rset.rows), 2, rset.rows)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
545
    def test_select_aggregat_count(self):
546
        rset = self.qexecute('Any COUNT(X)')
547
548
549
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
550

Adrien Di Mascio's avatar
Adrien Di Mascio committed
551
    def test_select_aggregat_sum(self):
552
        rset = self.qexecute('Any SUM(O) WHERE X ordernum O')
553
554
555
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
556

Adrien Di Mascio's avatar
Adrien Di Mascio committed
557
    def test_select_aggregat_min(self):
558
        rset = self.qexecute('Any MIN(X) WHERE X is Personne')
559
560
561
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
562
        rset = self.qexecute('Any MIN(O) WHERE X ordernum O')
563
564
565
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
566

Adrien Di Mascio's avatar
Adrien Di Mascio committed
567
    def test_select_aggregat_max(self):
568
        rset = self.qexecute('Any MAX(X) WHERE X is Personne')
569
570
571
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
572
        rset = self.qexecute('Any MAX(O) WHERE X ordernum O')
573
574
575
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
576
577

    def test_select_custom_aggregat_concat_string(self):
578
        rset = self.qexecute('Any GROUP_CONCAT(N) WHERE X is CWGroup, X name N')
579
580
        self.assertTrue(rset)
        self.assertEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
581
582
583
                                                             'owners', 'users'])

    def test_select_custom_regproc_limit_size(self):
584
        rset = self.qexecute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is CWGroup, X name N, X name "managers"')
585
586
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'man...')
587
588
        self.qexecute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'")
        rset = self.qexecute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF')
589
590
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'hop...')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
591
592

    def test_select_regproc_orderby(self):
593
        rset = self.qexecute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, X name "managers"')
594
595
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][1], 'managers')
596
        rset = self.qexecute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"')
597
598
        self.assertEqual(len(rset), 3)
        self.assertEqual(rset[0][1], 'owners')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
599

Adrien Di Mascio's avatar
Adrien Di Mascio committed
600
    def test_select_aggregat_sort(self):
601
        rset = self.qexecute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
602
603
604
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(len(rset.rows[0]), 2)
        self.assertEqual(rset.description[0], ('CWGroup', 'Int',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
605
606

    def test_select_aggregat_having(self):
607
        rset = self.qexecute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
608
609
                            'WHERE RT name N, RDEF relation_type RT '
                            'HAVING COUNT(RDEF) > 10')
610
        self.assertListEqual(rset.rows,
611
612
                              [[u'description_format', 13],
                               [u'description', 14],
Julien Cristau's avatar
Julien Cristau committed
613
                               [u'name', 19],
614
615
616
617
618
619
620
621
622
                               [u'created_by', 45],
                               [u'creation_date', 45],
                               [u'cw_source', 45],
                               [u'cwuri', 45],
                               [u'in_basket', 45],
                               [u'is', 45],
                               [u'is_instance_of', 45],
                               [u'modification_date', 45],
                               [u'owned_by', 45]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
623
624
625

    def test_select_aggregat_having_dumb(self):
        # dumb but should not raise an error
626
        rset = self.qexecute('Any U,COUNT(X) GROUPBY U '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
627
628
                            'WHERE U eid %(x)s, X owned_by U '
                            'HAVING COUNT(X) > 10', {'x': self.ueid})
629
630
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(rset.rows[0][0], self.ueid)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
631

632
    def test_select_having_non_aggregat_1(self):
633
        rset = self.qexecute('Any L WHERE X login L, X creation_date CD '
634
                            'HAVING YEAR(CD) = %s' % date.today().year)
635
        self.assertListEqual(rset.rows,
636
637
638
639
                              [[u'admin'],
                               [u'anon']])

    def test_select_having_non_aggregat_2(self):
640
        rset = self.qexecute('Any L GROUPBY L WHERE X login L, X in_group G, '
641
642
                            'X creation_date CD HAVING YEAR(CD) = %s OR COUNT(G) > 1'
                            % date.today().year)
643
        self.assertListEqual(rset.rows,
644
645
646
                              [[u'admin'],
                               [u'anon']])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
647
    def test_select_complex_sort(self):
648
        """need sqlite including http://www.sqlite.org/cvstrac/tktview?tn=3773 fix"""
649
        rset = self.qexecute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
650
651
        result = rset.rows
        result.sort()
652
        self.assertEqual(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
653

Adrien Di Mascio's avatar
Adrien Di Mascio committed
654
    def test_select_upper(self):
655
        rset = self.qexecute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L')
656
657
658
659
660
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(rset.rows[0][1], 'ADMIN')
        self.assertEqual(rset.description[0], ('CWUser', 'String',))
        self.assertEqual(rset.rows[1][1], 'ANON')
        self.assertEqual(rset.description[1], ('CWUser', 'String',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
661
        eid = rset.rows[0][0]
662
        rset = self.qexecute('Any UPPER(L) WHERE X eid %s, X login L'%eid)
663
664
        self.assertEqual(rset.rows[0][0], 'ADMIN')
        self.assertEqual(rset.description, [('String',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
665

666
667
    def test_select_float_abs(self):
        # test positive number
668
669
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': 1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
670
        self.assertEqual(rset.rows[0][0], 1.2)
671
        # test negative number
672
673
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': -1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
674
        self.assertEqual(rset.rows[0][0], 1.2)
675
676
677

    def test_select_int_abs(self):
        # test positive number
678
679
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': 12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
680
        self.assertEqual(rset.rows[0][0], 12)
681
        # test negative number
682
683
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': -12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
684
        self.assertEqual(rset.rows[0][0], 12)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
685
686

##     def test_select_simplified(self):
687
##         ueid = self._access._user.eid
688
##         rset = self.qexecute('Any L WHERE %s login L'%ueid)
689
##         self.assertEqual(rset.rows[0][0], 'admin')
690
##         rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid})
691
##         self.assertEqual(rset.rows[0][0], 'admin')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
692

Adrien Di Mascio's avatar
Adrien Di Mascio committed
693
    def test_select_searchable_text_1(self):
694
695
696
697
        rset = self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute(u"INSERT Societe X: X nom 'bidle'")
        rset = self.qexecute("INSERT Societe X: X nom 'chouette'")
        rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidle'})
698
        self.assertEqual(len(rset.rows), 2, rset.rows)
699
        rset = self.qexecute(u'Any N where N has_text "bidle"')
700
        self.assertEqual(len(rset.rows), 2, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
701
        biduleeids = [r[0] for r in rset.rows]
702
        rset = self.qexecute(u'Any N where NOT N has_text "bidle"')
703
        self.assertFalse([r[0] for r in rset.rows if r[0] in biduleeids])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
704
        # duh?
705
        rset = self.qexecute('Any X WHERE X has_text %(text)s', {'text': u'a'})
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
706

Adrien Di Mascio's avatar
Adrien Di Mascio committed
707
    def test_select_searchable_text_2(self):
708
709
710
711
        rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute("INSERT Personne X: X nom 'chouette'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Personne N where N has_text "bidule"')
712
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
713

Adrien Di Mascio's avatar
Adrien Di Mascio committed
714
    def test_select_searchable_text_3(self):
715
716
717
718
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"')
719
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
720

Adrien Di Mascio's avatar
Adrien Di Mascio committed
721
    def test_select_multiple_searchable_text(self):
722
723
724
725
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X")
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
726
727
728
                            {'text': u'bidle',
                             'text2': u'chouette',}
                            )
729
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
730

Adrien Di Mascio's avatar
Adrien Di Mascio committed
731
    def test_select_no_descr(self):
732
        rset = self.qexecute('Any X WHERE X is CWGroup', build_descr=0)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
733
        rset.rows.sort()
734
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
735
        self.assertEqual(rset.description, ())
Adrien Di Mascio's avatar
Adrien Di Mascio committed
736
737

    def test_select_limit_offset(self):
738
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 WHERE X name N')
739
        self.assertEqual(tuplify(rset.rows), [(2,), (3,)])
740
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',)])
741
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
742
        self.assertEqual(tuplify(rset.rows), [(4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
743

744
    def test_select_symmetric(self):
745
746
747
748
749
750
751
        self.qexecute("INSERT Personne X: X nom 'machin'")
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'trucmuche'")
        self.qexecute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'")
        self.qexecute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'")
        rset = self.qexecute('Any P WHERE P connait P2')
752
        self.assertEqual(len(rset.rows), 4, rset.rows)
753
        rset = self.qexecute('Any P WHERE NOT P connait P2')
754
        self.assertEqual(len(rset.rows), 1, rset.rows) # trucmuche
755
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "bidule"')
756
        self.assertEqual(len(rset.rows), 1, rset.rows)
757
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "bidule"')
758
        self.assertEqual(len(rset.rows), 1, rset.rows)
759
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "chouette"')
760
        self.assertEqual(len(rset.rows), 2, rset.rows)
761
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"')
762
        self.assertEqual(len(rset.rows), 2, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
763

Adrien Di Mascio's avatar
Adrien Di Mascio committed
764
    def test_select_inline(self):
765
766
767
768
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"')
769
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
770

Adrien Di Mascio's avatar
Adrien Di Mascio committed
771
    def test_select_creation_date(self):
772
773
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
774
775
776
        self.assertEqual(len(rset.rows), 1)

    def test_select_or_relation(self):
777
778
779
780
781
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Societe X: X nom 'logilab'")
        self.qexecute("INSERT Societe X: X nom 'caesium'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
782
783
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
784
        self.assertEqual(len(rset.rows), 1)
785
        self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
786
787
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
788
        self.assertEqual(len(rset.rows), 2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
789

Adrien Di Mascio's avatar
Adrien Di Mascio committed
790
    def test_select_or_sym_relation(self):
791
792
793
794
795
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'truc'")
        self.qexecute("SET P connait S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')