unittest_querier.py 88.8 KB
Newer Older
Adrien Di Mascio's avatar
Adrien Di Mascio committed
1
# -*- coding: iso-8859-1 -*-
2
# copyright 2003 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
3
4
5
6
7
8
9
10
11
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
13
14
15
16
17
18
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
Sylvain Thénault's avatar
Sylvain Thénault committed
19
"""unit tests for modules cubicweb.server.querier and cubicweb.server.ssplanner
Adrien Di Mascio's avatar
Adrien Di Mascio committed
20
"""
21

22
from contextlib import contextmanager
23
from datetime import date, datetime, timedelta, tzinfo
24
import unittest
Adrien Di Mascio's avatar
Adrien Di Mascio committed
25

26
27
import pytz

28
from six import PY2, integer_types, binary_type, text_type
Sylvain Thénault's avatar
Sylvain Thénault committed
29
30

from rql import BadRQLQuery
Sylvain Thénault's avatar
Sylvain Thénault committed
31
from rql.utils import register_function, FunctionDescr
32

33
from cubicweb import QueryError, Unauthorized, Binary, devtools
Sylvain Thénault's avatar
Sylvain Thénault committed
34
from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX
Adrien Di Mascio's avatar
Adrien Di Mascio committed
35
from cubicweb.server.utils import crypt_password
36
from cubicweb.server.querier import manual_build_descr, _make_description
37
from cubicweb.devtools.testlib import CubicWebTC
38
from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
Adrien Di Mascio's avatar
Adrien Di Mascio committed
39

Sylvain Thénault's avatar
Sylvain Thénault committed
40

41
42
43
class FixedOffset(tzinfo):
    def __init__(self, hours=0):
        self.hours = hours
Sylvain Thénault's avatar
Sylvain Thénault committed
44

45
46
    def utcoffset(self, dt):
        return timedelta(hours=self.hours)
Sylvain Thénault's avatar
Sylvain Thénault committed
47

48
49
50
    def dst(self, dt):
        return timedelta(0)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
51
52
53
54
55
56

# register priority/severity sorting registered procedure

class group_sort_value(FunctionDescr):
    supported_backends = ('sqlite',)
    rtype = 'Int'
Sylvain Thénault's avatar
Sylvain Thénault committed
57

Adrien Di Mascio's avatar
Adrien Di Mascio committed
58
59
60
61
try:
    register_function(group_sort_value)
except AssertionError:
    pass
Sylvain Thénault's avatar
Sylvain Thénault committed
62
63


Adrien Di Mascio's avatar
Adrien Di Mascio committed
64
def init_sqlite_connexion(cnx):
Sylvain Thénault's avatar
Sylvain Thénault committed
65

Adrien Di Mascio's avatar
Adrien Di Mascio committed
66
    def group_sort_value(text):
Sylvain Thénault's avatar
Sylvain Thénault committed
67
68
        return {"managers": "3", "users": "2", "guests": "1", "owners": "0"}[text]

Adrien Di Mascio's avatar
Adrien Di Mascio committed
69
    cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value)
Sylvain Thénault's avatar
Sylvain Thénault committed
70

Adrien Di Mascio's avatar
Adrien Di Mascio committed
71
72
73
SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion)


74
def setUpClass(cls, *args):
75
    global repo, cnx
76
77
    config = devtools.TestServerConfiguration('data', __file__)
    handler = devtools.get_test_db_handler(config)
78
79
    handler.build_db_cache()
    repo, cnx = handler.get_repo_and_cnx()
80
    cls.repo = repo
Adrien Di Mascio's avatar
Adrien Di Mascio committed
81

82
def tearDownClass(cls, *args):
83
84
85
    global repo, cnx
    repo.shutdown()
    del repo, cnx
Adrien Di Mascio's avatar
Adrien Di Mascio committed
86
87


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
class Variable:
    def __init__(self, name):
        self.name = name
        self.children = []

    def get_type(self, solution, args=None):
        return solution[self.name]
    def as_string(self):
        return self.name

class Function:
    def __init__(self, name, varname):
        self.name = name
        self.children = [Variable(varname)]
    def get_type(self, solution, args=None):
        return 'Int'

105
class MakeDescriptionTC(unittest.TestCase):
106
107
108
109
110
111
    def test_known_values(self):
        solution = {'A': 'Int', 'B': 'CWUser'}
        self.assertEqual(_make_description((Function('max', 'A'), Variable('B')), {}, solution),
                          ['Int','CWUser'])


Adrien Di Mascio's avatar
Adrien Di Mascio committed
112
class UtilsTC(BaseQuerierTC):
113
114
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
115

Adrien Di Mascio's avatar
Adrien Di Mascio committed
116
    def test_preprocess_1(self):
117
        with self.admin_access.cnx() as cnx:
118
            reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
119
            rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s',
120
121
122
                                  {'x': reid})
            self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}],
                             rqlst.solutions)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
123

Adrien Di Mascio's avatar
Adrien Di Mascio committed
124
    def test_preprocess_2(self):
125
        with self.admin_access.cnx() as cnx:
126
127
128
129
130
131
132
133
            teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0]
            #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0]
            #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
            #             {'g': geid, 't': teid}, 'g')
            rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid})
            # the query may be optimized, should keep only one solution
            # (any one, etype will be discarded)
            self.assertEqual(1, len(rqlst.solutions))
134
135
136

    def assertRQLEqual(self, expected, got):
        from rql import parse
137
138
        self.assertMultiLineEqual(text_type(parse(expected)),
                                  text_type(parse(got)))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
139

Adrien Di Mascio's avatar
Adrien Di Mascio committed
140
    def test_preprocess_security(self):
141
        with self.user_groups_session('users') as cnx:
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
            plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
                                      'WHERE X is ET, ET name ETN')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['ETN','COUNT(X)'])
            self.assertEqual([t.as_string() for t in union.children[0].groupby],
                              ['ETN'])
            partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
            rql, solutions = partrqls[0]
            self.assertRQLEqual(rql,
                                'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
                                ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, '
                                '               X identity D, C is Division, D is Affaire))'
                                ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, '
                                '            X identity H, H is Affaire)))'
                                ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, '
                                '            X identity I, I is Affaire)))'
                                ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, '
                                '            X identity J, J is Affaire)))'
                                ', ET is CWEType, X is Affaire')
            self.assertEqual(solutions, [{'C': 'Division',
                                           'D': 'Affaire',
                                           'E': 'Note',
                                           'F': 'Societe',
                                           'G': 'SubDivision',
                                           'H': 'Affaire',
                                           'I': 'Affaire',
                                           'J': 'Affaire',
                                           'X': 'Affaire',
                                           'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[1]
            self.assertRQLEqual(rql,  'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
179
                                'X is IN(BaseTransition, Bookmark, CWAttribute, CWComputedRType, '
180
181
                                '        CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, '
                                '        CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, '
Julien Cristau's avatar
Julien Cristau committed
182
183
                                '        Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, '
                                '        Frozable, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, '
184
                                '        SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
            self.assertCountEqual(solutions,
                                  [{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWComputedRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Frozable', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])
224
225
226
227
228
229
230
231
232
233
            rql, solutions = partrqls[2]
            self.assertEqual(rql,
                             'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), '
                             'ET is CWEType, X is EmailAddress')
            self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[3]
            self.assertEqual(rql,
                              'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), '
                              'ET is CWEType, X is Basket')
            self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
234
235

    def test_preprocess_security_aggregat(self):
236
        with self.user_groups_session('users') as cnx:
237
238
239
240
241
242
243
244
245
            plan = self._prepare_plan(cnx, 'Any MAX(X)')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['MAX(X)'])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
246

Adrien Di Mascio's avatar
Adrien Di Mascio committed
247
    def test_preprocess_nonregr(self):
248
        with self.admin_access.cnx() as cnx:
249
250
            rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
            self.assertEqual(len(rqlst.solutions), 1)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
251

Adrien Di Mascio's avatar
Adrien Di Mascio committed
252
253
    def test_build_description(self):
        # should return an empty result set
254
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.admin_access._user.eid})
255
        self.assertEqual(rset.description[0][0], 'CWUser')
256
        rset = self.qexecute('Any 1')
257
        self.assertEqual(rset.description[0][0], 'Int')
258
        rset = self.qexecute('Any TRUE')
259
        self.assertEqual(rset.description[0][0], 'Boolean')
260
        rset = self.qexecute('Any "hop"')
261
        self.assertEqual(rset.description[0][0], 'String')
262
        rset = self.qexecute('Any TODAY')
263
        self.assertEqual(rset.description[0][0], 'Date')
264
        rset = self.qexecute('Any NOW')
265
        self.assertEqual(rset.description[0][0], 'Datetime')
266
        rset = self.qexecute('Any %(x)s', {'x': 1})
267
        self.assertEqual(rset.description[0][0], 'Int')
Samuel Trégouët's avatar
Samuel Trégouët committed
268
269
270
        if PY2:
            rset = self.qexecute('Any %(x)s', {'x': long(1)})
            self.assertEqual(rset.description[0][0], 'Int')
271
        rset = self.qexecute('Any %(x)s', {'x': True})
272
        self.assertEqual(rset.description[0][0], 'Boolean')
273
        rset = self.qexecute('Any %(x)s', {'x': 1.0})
274
        self.assertEqual(rset.description[0][0], 'Float')
275
        rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
276
        self.assertEqual(rset.description[0][0], 'Datetime')
277
        rset = self.qexecute('Any %(x)s', {'x': 'str'})
278
        self.assertEqual(rset.description[0][0], 'String')
279
        rset = self.qexecute('Any %(x)s', {'x': u'str'})
280
        self.assertEqual(rset.description[0][0], 'String')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
281

282
    def test_build_descr1(self):
283
        with self.admin_access.cnx() as cnx:
284
285
286
287
288
289
290
291
            rset = cnx.execute('(Any U,L WHERE U login L) UNION '
                               '(Any G,N WHERE G name N, G is CWGroup)')
            orig_length = len(rset)
            rset.rows[0][0] = 9999999
            description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows)
            self.assertEqual(len(description), orig_length - 1)
            self.assertEqual(len(rset.rows), orig_length - 1)
            self.assertNotEqual(rset.rows[0][0], 9999999)
292
293

    def test_build_descr2(self):
294
295
        rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G))')
296
297
298
299
300
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

    def test_build_descr3(self):
301
302
        rset = self.qexecute('(Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G)')
303
304
305
306
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

Adrien Di Mascio's avatar
Adrien Di Mascio committed
307
308

class QuerierTC(BaseQuerierTC):
309
310
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
311

312
313
314
315
316
317
318
319
320
321
322
    def setUp(self):
        super(QuerierTC, self).setUp()
        with self.admin_access.cnx() as cnx:
            self.maxeid = cnx.execute('Any MAX(X)')[0][0]

    def tearDown(self):
        super(QuerierTC, self).tearDown()
        with self.admin_access.cnx() as cnx:
            cnx.execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
            cnx.commit()

Adrien Di Mascio's avatar
Adrien Di Mascio committed
323
324
    def test_unknown_eid(self):
        # should return an empty result set
325
        self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
326

327
328
    def test_typed_eid(self):
        # should return an empty result set
329
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
Rémi Cardona's avatar
Rémi Cardona committed
330
        self.assertIsInstance(rset[0][0], integer_types)
331

332
    def test_bytes_storage(self):
333
334
        feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
                             'X data_format "text/plain", X data %(data)s',
Julien Cristau's avatar
Julien Cristau committed
335
                            {'data': Binary(b"xxx")})[0][0]
336
        fdata = self.qexecute('Any D WHERE X data D, X eid %(x)s', {'x': feid})[0][0]
337
        self.assertIsInstance(fdata, Binary)
Julien Cristau's avatar
Julien Cristau committed
338
        self.assertEqual(fdata.getvalue(), b'xxx')
339

Adrien Di Mascio's avatar
Adrien Di Mascio committed
340
    # selection queries tests #################################################
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
341

Adrien Di Mascio's avatar
Adrien Di Mascio committed
342
    def test_select_1(self):
343
        rset = self.qexecute('Any X ORDERBY X WHERE X is CWGroup')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
344
        result, descr = rset.rows, rset.description
345
        self.assertEqual(tuplify(result), [(2,), (3,), (4,), (5,)])
346
        self.assertEqual(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
347

Adrien Di Mascio's avatar
Adrien Di Mascio committed
348
    def test_select_2(self):
349
        rset = self.qexecute('Any X ORDERBY N WHERE X is CWGroup, X name N')
350
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
351
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
352
        rset = self.qexecute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N')
353
        self.assertEqual(tuplify(rset.rows), [(5,), (4,), (3,), (2,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
354

Adrien Di Mascio's avatar
Adrien Di Mascio committed
355
    def test_select_3(self):
356
        rset = self.qexecute('Any N GROUPBY N WHERE X is CWGroup, X name N')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
357
358
        result, descr = rset.rows, rset.description
        result.sort()
359
360
        self.assertEqual(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(descr, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
361

Adrien Di Mascio's avatar
Adrien Di Mascio committed
362
    def test_select_is(self):
363
        rset = self.qexecute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
364
        result, descr = rset.rows, rset.description
365
        self.assertEqual(result[0][1], descr[0][0])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
366

Adrien Di Mascio's avatar
Adrien Di Mascio committed
367
    def test_select_is_aggr(self):
368
        rset = self.qexecute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
369
        result, descr = rset.rows, rset.description
370
371
        self.assertEqual(descr[0][0], 'String')
        self.assertEqual(descr[0][1], 'Int')
Rémi Cardona's avatar
Rémi Cardona committed
372
        self.assertEqual(result[0][0], 'RQLExpression') # XXX may change as schema evolve
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
373

Adrien Di Mascio's avatar
Adrien Di Mascio committed
374
    def test_select_groupby_orderby(self):
375
        rset = self.qexecute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N')
376
377
        self.assertEqual(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(rset.description, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
378

Adrien Di Mascio's avatar
Adrien Di Mascio committed
379
    def test_select_complex_groupby(self):
380
381
        rset = self.qexecute('Any N GROUPBY N WHERE X name N')
        rset = self.qexecute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
382

Adrien Di Mascio's avatar
Adrien Di Mascio committed
383
    def test_select_inlined_groupby(self):
384
385
        seid = self.qexecute('State X WHERE X name "deactivated"')[0][0]
        rset = self.qexecute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
386

387
    def test_select_groupby_funccall(self):
388
389
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY YEAR(CD) '
                             'WHERE X is CWUser, X creation_date CD')
390
391
392
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

    def test_select_groupby_colnumber(self):
393
394
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY 1 '
                             'WHERE X is CWUser, X creation_date CD')
395
396
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
397
    def test_select_complex_orderby(self):
398
        rset1 = self.qexecute('Any N ORDERBY N WHERE X name N')
399
        self.assertEqual(sorted(rset1.rows), rset1.rows)
400
        rset = self.qexecute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
401
402
        self.assertEqual(rset.rows[0][0], rset1.rows[1][0])
        self.assertEqual(len(rset), 5)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
403

Adrien Di Mascio's avatar
Adrien Di Mascio committed
404
    def test_select_5(self):
405
406
407
408
409
410
411
412
413
414
415
        rset = self.qexecute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup')
        self.assertEqual(tuplify(rset.rows),
                         [(2, 'guests',),
                          (3, 'managers',),
                          (4, 'owners',),
                          (5, 'users',)])
        self.assertEqual(rset.description,
                         [('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
416

Adrien Di Mascio's avatar
Adrien Di Mascio committed
417
    def test_select_6(self):
418
419
        self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
        rset = self.qexecute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
420
        #self.assertEqual(rset.description, [('Personne',), ('Personne',)])
421
        self.assertIn(('Personne',), rset.description)
422
        rset = self.qexecute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
423
        self.assertIn(('Personne',), rset.description)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
424

Adrien Di Mascio's avatar
Adrien Di Mascio committed
425
    def test_select_not_attr(self):
426
427
428
        peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        seid = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        rset = self.qexecute('Personne X WHERE NOT X nom "bidule"')
429
        self.assertEqual(len(rset.rows), 0, rset.rows)
430
        rset = self.qexecute('Personne X WHERE NOT X nom "bid"')
431
        self.assertEqual(len(rset.rows), 1, rset.rows)
432
433
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
434
        self.assertEqual(len(rset.rows), 0, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
435

Adrien Di Mascio's avatar
Adrien Di Mascio committed
436
    def test_select_is_in(self):
437
438
439
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.assertEqual(len(self.qexecute("Any X WHERE X is IN (Personne, Societe)")),
Adrien Di Mascio's avatar
Adrien Di Mascio committed
440
                          2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
441

Adrien Di Mascio's avatar
Adrien Di Mascio committed
442
    def test_select_not_rel(self):
443
444
445
446
447
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
448
        self.assertEqual(len(rset.rows), 1, rset.rows)
449
        rset = self.qexecute('Personne X WHERE NOT X travaille S, S nom "chouette"')
450
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
451

Adrien Di Mascio's avatar
Adrien Di Mascio committed
452
    def test_select_nonregr_inlined(self):
453
454
455
456
457
458
        self.qexecute("INSERT Note X: X para 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'")
        rset = self.qexecute('Any U,T ORDERBY T DESC WHERE U is CWUser, '
                             'N ecrit_par U, N type T')#, {'x': self.ueid})
459
        self.assertEqual(len(rset.rows), 0)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
460

Adrien Di Mascio's avatar
Adrien Di Mascio committed
461
    def test_select_nonregr_edition_not(self):
462
        groupeids = set((2, 3, 4))
463
464
465
466
        groupreadperms = set(r[0] for r in self.qexecute('Any Y WHERE X name "CWGroup", '
                                                         'Y eid IN(2, 3, 4), X read_permission Y'))
        rset = self.qexecute('DISTINCT Any Y WHERE X is CWEType, X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
467
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
468
469
        rset = self.qexecute('DISTINCT Any Y WHERE X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
470
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
471

Adrien Di Mascio's avatar
Adrien Di Mascio committed
472
    def test_select_outer_join(self):
473
474
475
476
477
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'autre'")[0][0]
        seid1 = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        seid2 = self.qexecute("INSERT Societe X: X nom 'chouetos'")[0][0]
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
478
        self.assertEqual(rset.rows, [[peid1, None], [peid2, None]])
479
480
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
481
        self.assertEqual(rset.rows, [[peid1, seid1], [peid2, None]])
482
        rset = self.qexecute('Any S,X ORDERBY S WHERE X? travaille S')
483
        self.assertEqual(rset.rows, [[seid1, peid1], [seid2, None]])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
484

Adrien Di Mascio's avatar
Adrien Di Mascio committed
485
    def test_select_outer_join_optimized(self):
486
487
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        rset = self.qexecute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1})
488
        self.assertEqual(rset.rows, [[peid1]])
489
        rset = self.qexecute('Any X WHERE X eid %(x)s, X require_permission P?',
490
                            {'x':peid1})
491
        self.assertEqual(rset.rows, [[peid1]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
492
493

    def test_select_left_outer_join(self):
494
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G')
495
        self.assertEqual(len(rset), 4)
496
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G, U eid %(x)s',
497
                            {'x': self.admin_access._user.eid})
498
        self.assertEqual(len(rset), 4)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
499
500

    def test_select_ambigous_outer_join(self):
501
502
503
504
        teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
        self.qexecute("INSERT Tag X: X name 'tagbis'")[0][0]
        geid = self.qexecute("CWGroup G WHERE G name 'users'")[0][0]
        self.qexecute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
505
                     {'g': geid, 't': teid})
506
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN")
507
508
        self.assertIn(['users', 'tag'], rset.rows)
        self.assertIn(['activated', None], rset.rows)
509
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
510
        self.assertEqual(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
511

Adrien Di Mascio's avatar
Adrien Di Mascio committed
512
    def test_select_not_inline_rel(self):
513
514
515
516
517
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT X ecrit_par P')
518
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
519

Adrien Di Mascio's avatar
Adrien Di Mascio committed
520
    def test_select_not_unlinked_multiple_solutions(self):
521
522
523
524
525
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT Y evaluee X')
526
        self.assertEqual(len(rset.rows), 1, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
527

528
    def test_select_date_extraction(self):
529
        self.qexecute("INSERT Personne X: X nom 'foo', X datenaiss %(d)s",
530
531
                     {'d': datetime(2001, 2,3, 12,13)})
        test_data = [('YEAR', 2001), ('MONTH', 2), ('DAY', 3),
532
                     ('HOUR', 12), ('MINUTE', 13), ('WEEKDAY', 6)]
533
        for funcname, result in test_data:
534
            rset = self.qexecute('Any %s(D) WHERE X is Personne, X datenaiss D'
535
                                % funcname)
536
537
538
            self.assertEqual(len(rset.rows), 1)
            self.assertEqual(rset.rows[0][0], result)
            self.assertEqual(rset.description, [('Int',)])
539

540
    def test_regexp_based_pattern_matching(self):
541
542
543
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'cidule'")[0][0]
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "^b"')
544
545
        self.assertEqual(len(rset.rows), 1, rset.rows)
        self.assertEqual(rset.rows[0][0], peid1)
546
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "idu"')
547
548
        self.assertEqual(len(rset.rows), 2, rset.rows)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
549
    def test_select_aggregat_count(self):
550
        rset = self.qexecute('Any COUNT(X)')
551
552
553
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
554

Adrien Di Mascio's avatar
Adrien Di Mascio committed
555
    def test_select_aggregat_sum(self):
556
        rset = self.qexecute('Any SUM(O) WHERE X ordernum O')
557
558
559
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
560

Adrien Di Mascio's avatar
Adrien Di Mascio committed
561
    def test_select_aggregat_min(self):
562
        rset = self.qexecute('Any MIN(X) WHERE X is Personne')
563
564
565
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
566
        rset = self.qexecute('Any MIN(O) WHERE X ordernum O')
567
568
569
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
570

Adrien Di Mascio's avatar
Adrien Di Mascio committed
571
    def test_select_aggregat_max(self):
572
        rset = self.qexecute('Any MAX(X) WHERE X is Personne')
573
574
575
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
576
        rset = self.qexecute('Any MAX(O) WHERE X ordernum O')
577
578
579
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
580
581

    def test_select_custom_aggregat_concat_string(self):
582
        rset = self.qexecute('Any GROUP_CONCAT(N) WHERE X is CWGroup, X name N')
583
584
        self.assertTrue(rset)
        self.assertEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
585
586
587
                                                             'owners', 'users'])

    def test_select_custom_regproc_limit_size(self):
588
        rset = self.qexecute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is CWGroup, X name N, X name "managers"')
589
590
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'man...')
591
592
        self.qexecute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'")
        rset = self.qexecute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF')
593
594
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'hop...')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
595
596

    def test_select_regproc_orderby(self):
597
        rset = self.qexecute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, X name "managers"')
598
599
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][1], 'managers')
600
        rset = self.qexecute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"')
601
602
        self.assertEqual(len(rset), 3)
        self.assertEqual(rset[0][1], 'owners')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
603

Adrien Di Mascio's avatar
Adrien Di Mascio committed
604
    def test_select_aggregat_sort(self):
605
        rset = self.qexecute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
606
607
608
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(len(rset.rows[0]), 2)
        self.assertEqual(rset.description[0], ('CWGroup', 'Int',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
609
610

    def test_select_aggregat_having(self):
611
        rset = self.qexecute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
612
613
                            'WHERE RT name N, RDEF relation_type RT '
                            'HAVING COUNT(RDEF) > 10')
614
        self.assertListEqual(rset.rows,
615
616
                              [[u'description_format', 13],
                               [u'description', 14],
617
618
619
620
621
622
623
624
625
626
                               [u'name', 18],
                               [u'created_by', 44],
                               [u'creation_date', 44],
                               [u'cw_source', 44],
                               [u'cwuri', 44],
                               [u'in_basket', 44],
                               [u'is', 44],
                               [u'is_instance_of', 44],
                               [u'modification_date', 44],
                               [u'owned_by', 44]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
627
628
629

    def test_select_aggregat_having_dumb(self):
        # dumb but should not raise an error
630
        rset = self.qexecute('Any U,COUNT(X) GROUPBY U '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
631
632
                            'WHERE U eid %(x)s, X owned_by U '
                            'HAVING COUNT(X) > 10', {'x': self.ueid})
633
634
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(rset.rows[0][0], self.ueid)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
635

636
    def test_select_having_non_aggregat_1(self):
637
        rset = self.qexecute('Any L WHERE X login L, X creation_date CD '
638
                            'HAVING YEAR(CD) = %s' % date.today().year)
639
        self.assertListEqual(rset.rows,
640
641
642
643
                              [[u'admin'],
                               [u'anon']])

    def test_select_having_non_aggregat_2(self):
644
        rset = self.qexecute('Any L GROUPBY L WHERE X login L, X in_group G, '
645
646
                            'X creation_date CD HAVING YEAR(CD) = %s OR COUNT(G) > 1'
                            % date.today().year)
647
        self.assertListEqual(rset.rows,
648
649
650
                              [[u'admin'],
                               [u'anon']])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
651
    def test_select_complex_sort(self):
652
        """need sqlite including http://www.sqlite.org/cvstrac/tktview?tn=3773 fix"""
653
        rset = self.qexecute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
654
655
        result = rset.rows
        result.sort()
656
        self.assertEqual(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
657

Adrien Di Mascio's avatar
Adrien Di Mascio committed
658
    def test_select_upper(self):
659
        rset = self.qexecute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L')
660
661
662
663
664
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(rset.rows[0][1], 'ADMIN')
        self.assertEqual(rset.description[0], ('CWUser', 'String',))
        self.assertEqual(rset.rows[1][1], 'ANON')
        self.assertEqual(rset.description[1], ('CWUser', 'String',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
665
        eid = rset.rows[0][0]
666
        rset = self.qexecute('Any UPPER(L) WHERE X eid %s, X login L'%eid)
667
668
        self.assertEqual(rset.rows[0][0], 'ADMIN')
        self.assertEqual(rset.description, [('String',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
669

670
671
    def test_select_float_abs(self):
        # test positive number
672
673
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': 1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
674
        self.assertEqual(rset.rows[0][0], 1.2)
675
        # test negative number
676
677
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': -1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
678
        self.assertEqual(rset.rows[0][0], 1.2)
679
680
681

    def test_select_int_abs(self):
        # test positive number
682
683
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': 12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
684
        self.assertEqual(rset.rows[0][0], 12)
685
        # test negative number
686
687
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': -12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
688
        self.assertEqual(rset.rows[0][0], 12)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
689
690

##     def test_select_simplified(self):
691
##         ueid = self.admin_access._user.eid
692
##         rset = self.qexecute('Any L WHERE %s login L'%ueid)
693
##         self.assertEqual(rset.rows[0][0], 'admin')
694
##         rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid})
695
##         self.assertEqual(rset.rows[0][0], 'admin')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
696

Adrien Di Mascio's avatar
Adrien Di Mascio committed
697
    def test_select_searchable_text_1(self):
698
699
700
701
        rset = self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute(u"INSERT Societe X: X nom 'bidle'")
        rset = self.qexecute("INSERT Societe X: X nom 'chouette'")
        rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidle'})
702
        self.assertEqual(len(rset.rows), 2, rset.rows)
703
        rset = self.qexecute(u'Any N where N has_text "bidle"')
704
        self.assertEqual(len(rset.rows), 2, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
705
        biduleeids = [r[0] for r in rset.rows]
706
        rset = self.qexecute(u'Any N where NOT N has_text "bidle"')
707
        self.assertFalse([r[0] for r in rset.rows if r[0] in biduleeids])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
708
        # duh?
709
        rset = self.qexecute('Any X WHERE X has_text %(text)s', {'text': u'a'})
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
710

Adrien Di Mascio's avatar
Adrien Di Mascio committed
711
    def test_select_searchable_text_2(self):
712
713
714
715
        rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute("INSERT Personne X: X nom 'chouette'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Personne N where N has_text "bidule"')
716
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
717

Adrien Di Mascio's avatar
Adrien Di Mascio committed
718
    def test_select_searchable_text_3(self):
719
720
721
722
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"')
723
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
724

Adrien Di Mascio's avatar
Adrien Di Mascio committed
725
    def test_select_multiple_searchable_text(self):
726
727
728
729
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X")
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
730
731
732
                            {'text': u'bidle',
                             'text2': u'chouette',}
                            )
733
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
734

Adrien Di Mascio's avatar
Adrien Di Mascio committed
735
    def test_select_no_descr(self):
736
        rset = self.qexecute('Any X WHERE X is CWGroup', build_descr=0)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
737
        rset.rows.sort()
738
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
739
        self.assertEqual(rset.description, ())
Adrien Di Mascio's avatar
Adrien Di Mascio committed
740
741

    def test_select_limit_offset(self):
742
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 WHERE X name N')
743
        self.assertEqual(tuplify(rset.rows), [(2,), (3,)])
744
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',)])
745
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
746
        self.assertEqual(tuplify(rset.rows), [(4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
747

748
    def test_select_symmetric(self):
749
750
751
752
753
754
755
        self.qexecute("INSERT Personne X: X nom 'machin'")
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'trucmuche'")
        self.qexecute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'")
        self.qexecute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'")
        rset = self.qexecute('Any P WHERE P connait P2')
756
        self.assertEqual(len(rset.rows), 4, rset.rows)
757
        rset = self.qexecute('Any P WHERE NOT P connait P2')
758
        self.assertEqual(len(rset.rows), 1, rset.rows) # trucmuche
759
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "bidule"')
760
        self.assertEqual(len(rset.rows), 1, rset.rows)
761
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "bidule"')
762
        self.assertEqual(len(rset.rows), 1, rset.rows)
763
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "chouette"')
764
        self.assertEqual(len(rset.rows), 2, rset.rows)
765
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"')
766
        self.assertEqual(len(rset.rows), 2, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
767

Adrien Di Mascio's avatar
Adrien Di Mascio committed
768
    def test_select_inline(self):
769
770
771
772
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"')
773
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
774

Adrien Di Mascio's avatar
Adrien Di Mascio committed
775
    def test_select_creation_date(self):
776
777
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
778
779
780
        self.assertEqual(len(rset.rows), 1)

    def test_select_or_relation(self):
781
782
783
784
785
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Societe X: X nom 'logilab'")
        self.qexecute("INSERT Societe X: X nom 'caesium'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
786
787
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
788
        self.assertEqual(len(rset.rows), 1)
789
        self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
790
791
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
792
        self.assertEqual(len(rset.rows), 2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
793

Adrien Di Mascio's avatar
Adrien Di Mascio committed
794
    def test_select_or_sym_relation(self