unittest_querier.py 89.3 KB
Newer Older
Adrien Di Mascio's avatar
Adrien Di Mascio committed
1
# -*- coding: iso-8859-1 -*-
2
# copyright 2003 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
3
4
5
6
7
8
9
10
11
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
13
14
15
16
17
18
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
Sylvain Thénault's avatar
Sylvain Thénault committed
19
"""unit tests for modules cubicweb.server.querier and cubicweb.server.ssplanner
Adrien Di Mascio's avatar
Adrien Di Mascio committed
20
"""
21

22
from contextlib import contextmanager
23
from datetime import date, datetime, timedelta, tzinfo
24
import unittest
Adrien Di Mascio's avatar
Adrien Di Mascio committed
25

26
27
import pytz

Sylvain Thénault's avatar
Sylvain Thénault committed
28
from rql import BadRQLQuery
Sylvain Thénault's avatar
Sylvain Thénault committed
29
from rql.utils import register_function, FunctionDescr
30

31
from cubicweb import QueryError, Unauthorized, Binary, devtools
Sylvain Thénault's avatar
Sylvain Thénault committed
32
from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX
Adrien Di Mascio's avatar
Adrien Di Mascio committed
33
from cubicweb.server.utils import crypt_password
34
from cubicweb.server.querier import manual_build_descr, _make_description
35
from cubicweb.devtools.testlib import CubicWebTC
36
from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
Adrien Di Mascio's avatar
Adrien Di Mascio committed
37

Sylvain Thénault's avatar
Sylvain Thénault committed
38

39
40
41
class FixedOffset(tzinfo):
    def __init__(self, hours=0):
        self.hours = hours
Sylvain Thénault's avatar
Sylvain Thénault committed
42

43
44
    def utcoffset(self, dt):
        return timedelta(hours=self.hours)
Sylvain Thénault's avatar
Sylvain Thénault committed
45

46
47
48
    def dst(self, dt):
        return timedelta(0)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
49
50
51
52
53
54

# register priority/severity sorting registered procedure

class group_sort_value(FunctionDescr):
    supported_backends = ('sqlite',)
    rtype = 'Int'
Sylvain Thénault's avatar
Sylvain Thénault committed
55

Adrien Di Mascio's avatar
Adrien Di Mascio committed
56
57
58
59
try:
    register_function(group_sort_value)
except AssertionError:
    pass
Sylvain Thénault's avatar
Sylvain Thénault committed
60
61


Adrien Di Mascio's avatar
Adrien Di Mascio committed
62
def init_sqlite_connexion(cnx):
Sylvain Thénault's avatar
Sylvain Thénault committed
63

Adrien Di Mascio's avatar
Adrien Di Mascio committed
64
    def group_sort_value(text):
Sylvain Thénault's avatar
Sylvain Thénault committed
65
66
        return {"managers": "3", "users": "2", "guests": "1", "owners": "0"}[text]

Adrien Di Mascio's avatar
Adrien Di Mascio committed
67
    cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value)
Sylvain Thénault's avatar
Sylvain Thénault committed
68

Adrien Di Mascio's avatar
Adrien Di Mascio committed
69
70
71
SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion)


72
def setUpClass(cls, *args):
73
    global repo, cnx
74
75
    config = devtools.TestServerConfiguration('data', __file__)
    handler = devtools.get_test_db_handler(config)
76
77
    handler.build_db_cache()
    repo, cnx = handler.get_repo_and_cnx()
78
    cls.repo = repo
Adrien Di Mascio's avatar
Adrien Di Mascio committed
79

80
def tearDownClass(cls, *args):
81
82
83
    global repo, cnx
    repo.shutdown()
    del repo, cnx
Adrien Di Mascio's avatar
Adrien Di Mascio committed
84
85


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
class Variable:
    def __init__(self, name):
        self.name = name
        self.children = []

    def get_type(self, solution, args=None):
        return solution[self.name]
    def as_string(self):
        return self.name

class Function:
    def __init__(self, name, varname):
        self.name = name
        self.children = [Variable(varname)]
    def get_type(self, solution, args=None):
        return 'Int'

103
class MakeDescriptionTC(unittest.TestCase):
104
105
106
107
108
109
    def test_known_values(self):
        solution = {'A': 'Int', 'B': 'CWUser'}
        self.assertEqual(_make_description((Function('max', 'A'), Variable('B')), {}, solution),
                          ['Int','CWUser'])


Adrien Di Mascio's avatar
Adrien Di Mascio committed
110
class UtilsTC(BaseQuerierTC):
111
112
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
113

Adrien Di Mascio's avatar
Adrien Di Mascio committed
114
    def test_preprocess_1(self):
115
        with self.admin_access.cnx() as cnx:
116
            reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
117
            rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s',
118
119
120
                                  {'x': reid})
            self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}],
                             rqlst.solutions)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
121

Adrien Di Mascio's avatar
Adrien Di Mascio committed
122
    def test_preprocess_2(self):
123
        with self.admin_access.cnx() as cnx:
124
125
126
127
128
129
130
131
            teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0]
            #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0]
            #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
            #             {'g': geid, 't': teid}, 'g')
            rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid})
            # the query may be optimized, should keep only one solution
            # (any one, etype will be discarded)
            self.assertEqual(1, len(rqlst.solutions))
132
133
134

    def assertRQLEqual(self, expected, got):
        from rql import parse
Denis Laxalde's avatar
Denis Laxalde committed
135
136
        self.assertMultiLineEqual(str(parse(expected)),
                                  str(parse(got)))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
137

Adrien Di Mascio's avatar
Adrien Di Mascio committed
138
    def test_preprocess_security(self):
139
        with self.user_groups_session('users') as cnx:
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
                                      'WHERE X is ET, ET name ETN')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['ETN','COUNT(X)'])
            self.assertEqual([t.as_string() for t in union.children[0].groupby],
                              ['ETN'])
            partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
            rql, solutions = partrqls[0]
            self.assertRQLEqual(rql,
                                'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
                                ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, '
                                '               X identity D, C is Division, D is Affaire))'
                                ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, '
                                '            X identity H, H is Affaire)))'
                                ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, '
                                '            X identity I, I is Affaire)))'
                                ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, '
                                '            X identity J, J is Affaire)))'
                                ', ET is CWEType, X is Affaire')
            self.assertEqual(solutions, [{'C': 'Division',
                                           'D': 'Affaire',
                                           'E': 'Note',
                                           'F': 'Societe',
                                           'G': 'SubDivision',
                                           'H': 'Affaire',
                                           'I': 'Affaire',
                                           'J': 'Affaire',
                                           'X': 'Affaire',
                                           'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[1]
            self.assertRQLEqual(rql,  'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
177
                                'X is IN(BaseTransition, Bookmark, CWAttribute, CWComputedRType, '
178
179
                                '        CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, '
                                '        CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, '
Julien Cristau's avatar
Julien Cristau committed
180
181
                                '        Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, '
                                '        Frozable, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, '
182
                                '        SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
            self.assertCountEqual(solutions,
                                  [{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWComputedRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Frozable', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'},
                                   {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])
222
223
224
225
226
227
228
229
230
231
            rql, solutions = partrqls[2]
            self.assertEqual(rql,
                             'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), '
                             'ET is CWEType, X is EmailAddress')
            self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}])
            rql, solutions = partrqls[3]
            self.assertEqual(rql,
                              'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), '
                              'ET is CWEType, X is Basket')
            self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
232
233

    def test_preprocess_security_aggregat(self):
234
        with self.user_groups_session('users') as cnx:
235
236
237
238
239
240
241
242
243
            plan = self._prepare_plan(cnx, 'Any MAX(X)')
            union = plan.rqlst
            plan.preprocess(union)
            self.assertEqual(len(union.children), 1)
            self.assertEqual(len(union.children[0].with_), 1)
            subq = union.children[0].with_[0].query
            self.assertEqual(len(subq.children), 4)
            self.assertEqual([t.as_string() for t in union.children[0].selection],
                              ['MAX(X)'])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
244

Adrien Di Mascio's avatar
Adrien Di Mascio committed
245
    def test_preprocess_nonregr(self):
246
        with self.admin_access.cnx() as cnx:
247
248
            rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
            self.assertEqual(len(rqlst.solutions), 1)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
249

Adrien Di Mascio's avatar
Adrien Di Mascio committed
250
251
    def test_build_description(self):
        # should return an empty result set
252
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.admin_access._user.eid})
253
        self.assertEqual(rset.description[0][0], 'CWUser')
254
        rset = self.qexecute('Any 1')
255
        self.assertEqual(rset.description[0][0], 'Int')
256
        rset = self.qexecute('Any TRUE')
257
        self.assertEqual(rset.description[0][0], 'Boolean')
258
        rset = self.qexecute('Any "hop"')
259
        self.assertEqual(rset.description[0][0], 'String')
260
        rset = self.qexecute('Any TODAY')
261
        self.assertEqual(rset.description[0][0], 'Date')
262
        rset = self.qexecute('Any NOW')
263
        self.assertEqual(rset.description[0][0], 'Datetime')
264
        rset = self.qexecute('Any %(x)s', {'x': 1})
265
        self.assertEqual(rset.description[0][0], 'Int')
266
        rset = self.qexecute('Any %(x)s', {'x': True})
267
        self.assertEqual(rset.description[0][0], 'Boolean')
268
        rset = self.qexecute('Any %(x)s', {'x': 1.0})
269
        self.assertEqual(rset.description[0][0], 'Float')
270
        rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
271
        self.assertEqual(rset.description[0][0], 'Datetime')
272
        rset = self.qexecute('Any %(x)s', {'x': 'str'})
273
        self.assertEqual(rset.description[0][0], 'String')
274
        rset = self.qexecute('Any %(x)s', {'x': u'str'})
275
        self.assertEqual(rset.description[0][0], 'String')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
276

277
    def test_build_descr1(self):
278
        with self.admin_access.cnx() as cnx:
279
280
281
282
283
284
285
286
            rset = cnx.execute('(Any U,L WHERE U login L) UNION '
                               '(Any G,N WHERE G name N, G is CWGroup)')
            orig_length = len(rset)
            rset.rows[0][0] = 9999999
            description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows)
            self.assertEqual(len(description), orig_length - 1)
            self.assertEqual(len(rset.rows), orig_length - 1)
            self.assertNotEqual(rset.rows[0][0], 9999999)
287
288

    def test_build_descr2(self):
289
290
        rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G))')
291
292
293
294
295
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

    def test_build_descr3(self):
296
297
        rset = self.qexecute('(Any G,NULL WHERE G is CWGroup) UNION '
                             '(Any U,G WHERE U in_group G)')
298
299
300
301
        for x, y in rset.description:
            if y is not None:
                self.assertEqual(y, 'CWGroup')

Adrien Di Mascio's avatar
Adrien Di Mascio committed
302
303

class QuerierTC(BaseQuerierTC):
304
305
    setUpClass = classmethod(setUpClass)
    tearDownClass = classmethod(tearDownClass)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
306

307
308
309
310
311
312
313
314
315
316
317
    def setUp(self):
        super(QuerierTC, self).setUp()
        with self.admin_access.cnx() as cnx:
            self.maxeid = cnx.execute('Any MAX(X)')[0][0]

    def tearDown(self):
        super(QuerierTC, self).tearDown()
        with self.admin_access.cnx() as cnx:
            cnx.execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
            cnx.commit()

Adrien Di Mascio's avatar
Adrien Di Mascio committed
318
319
    def test_unknown_eid(self):
        # should return an empty result set
320
        self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
321

322
323
    def test_typed_eid(self):
        # should return an empty result set
324
        rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
Denis Laxalde's avatar
Denis Laxalde committed
325
        self.assertIsInstance(rset[0][0], int)
326

327
    def test_bytes_storage(self):
328
329
        feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
                             'X data_format "text/plain", X data %(data)s',
Julien Cristau's avatar
Julien Cristau committed
330
                            {'data': Binary(b"xxx")})[0][0]
331
        fdata = self.qexecute('Any D WHERE X data D, X eid %(x)s', {'x': feid})[0][0]
332
        self.assertIsInstance(fdata, Binary)
Julien Cristau's avatar
Julien Cristau committed
333
        self.assertEqual(fdata.getvalue(), b'xxx')
334

Adrien Di Mascio's avatar
Adrien Di Mascio committed
335
    # selection queries tests #################################################
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
336

Adrien Di Mascio's avatar
Adrien Di Mascio committed
337
    def test_select_1(self):
338
        rset = self.qexecute('Any X ORDERBY X WHERE X is CWGroup')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
339
        result, descr = rset.rows, rset.description
340
        self.assertEqual(tuplify(result), [(2,), (3,), (4,), (5,)])
341
        self.assertEqual(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
342

Adrien Di Mascio's avatar
Adrien Di Mascio committed
343
    def test_select_2(self):
344
        rset = self.qexecute('Any X ORDERBY N WHERE X is CWGroup, X name N')
345
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
346
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
347
        rset = self.qexecute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N')
348
        self.assertEqual(tuplify(rset.rows), [(5,), (4,), (3,), (2,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
349

Adrien Di Mascio's avatar
Adrien Di Mascio committed
350
    def test_select_3(self):
351
        rset = self.qexecute('Any N GROUPBY N WHERE X is CWGroup, X name N')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
352
353
        result, descr = rset.rows, rset.description
        result.sort()
354
355
        self.assertEqual(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(descr, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
356

Adrien Di Mascio's avatar
Adrien Di Mascio committed
357
    def test_select_is(self):
358
        rset = self.qexecute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
359
        result, descr = rset.rows, rset.description
360
        self.assertEqual(result[0][1], descr[0][0])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
361

Adrien Di Mascio's avatar
Adrien Di Mascio committed
362
    def test_select_is_aggr(self):
363
        rset = self.qexecute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
364
        result, descr = rset.rows, rset.description
365
366
        self.assertEqual(descr[0][0], 'String')
        self.assertEqual(descr[0][1], 'Int')
Rémi Cardona's avatar
Rémi Cardona committed
367
        self.assertEqual(result[0][0], 'RQLExpression') # XXX may change as schema evolve
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
368

Adrien Di Mascio's avatar
Adrien Di Mascio committed
369
    def test_select_groupby_orderby(self):
370
        rset = self.qexecute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N')
371
372
        self.assertEqual(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
        self.assertEqual(rset.description, [('String',), ('String',), ('String',), ('String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
373

Adrien Di Mascio's avatar
Adrien Di Mascio committed
374
    def test_select_complex_groupby(self):
375
376
        rset = self.qexecute('Any N GROUPBY N WHERE X name N')
        rset = self.qexecute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
377

Adrien Di Mascio's avatar
Adrien Di Mascio committed
378
    def test_select_inlined_groupby(self):
379
380
        seid = self.qexecute('State X WHERE X name "deactivated"')[0][0]
        rset = self.qexecute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
381

382
    def test_select_groupby_funccall(self):
383
384
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY YEAR(CD) '
                             'WHERE X is CWUser, X creation_date CD')
385
386
387
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

    def test_select_groupby_colnumber(self):
388
389
        rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY 1 '
                             'WHERE X is CWUser, X creation_date CD')
390
391
        self.assertListEqual(rset.rows, [[date.today().year, 2]])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
392
    def test_select_complex_orderby(self):
393
        rset1 = self.qexecute('Any N ORDERBY N WHERE X name N')
394
        self.assertEqual(sorted(rset1.rows), rset1.rows)
395
        rset = self.qexecute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
396
397
        self.assertEqual(rset.rows[0][0], rset1.rows[1][0])
        self.assertEqual(len(rset), 5)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
398

Adrien Di Mascio's avatar
Adrien Di Mascio committed
399
    def test_select_5(self):
400
401
402
403
404
405
406
407
408
409
410
        rset = self.qexecute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup')
        self.assertEqual(tuplify(rset.rows),
                         [(2, 'guests',),
                          (3, 'managers',),
                          (4, 'owners',),
                          (5, 'users',)])
        self.assertEqual(rset.description,
                         [('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',),
                          ('CWGroup', 'String',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
411

Adrien Di Mascio's avatar
Adrien Di Mascio committed
412
    def test_select_6(self):
413
414
        self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
        rset = self.qexecute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
415
        #self.assertEqual(rset.description, [('Personne',), ('Personne',)])
416
        self.assertIn(('Personne',), rset.description)
417
        rset = self.qexecute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
418
        self.assertIn(('Personne',), rset.description)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
419

Adrien Di Mascio's avatar
Adrien Di Mascio committed
420
    def test_select_not_attr(self):
421
422
423
        peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        seid = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        rset = self.qexecute('Personne X WHERE NOT X nom "bidule"')
424
        self.assertEqual(len(rset.rows), 0, rset.rows)
425
        rset = self.qexecute('Personne X WHERE NOT X nom "bid"')
426
        self.assertEqual(len(rset.rows), 1, rset.rows)
427
428
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
429
        self.assertEqual(len(rset.rows), 0, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
430

Adrien Di Mascio's avatar
Adrien Di Mascio committed
431
    def test_select_is_in(self):
432
433
434
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.assertEqual(len(self.qexecute("Any X WHERE X is IN (Personne, Societe)")),
Adrien Di Mascio's avatar
Adrien Di Mascio committed
435
                          2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
436

Adrien Di Mascio's avatar
Adrien Di Mascio committed
437
    def test_select_not_rel(self):
438
439
440
441
442
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Personne X WHERE NOT X travaille S')
443
        self.assertEqual(len(rset.rows), 1, rset.rows)
444
        rset = self.qexecute('Personne X WHERE NOT X travaille S, S nom "chouette"')
445
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
446

Adrien Di Mascio's avatar
Adrien Di Mascio committed
447
    def test_select_nonregr_inlined(self):
448
449
450
451
452
453
        self.qexecute("INSERT Note X: X para 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'autre'")
        self.qexecute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'")
        rset = self.qexecute('Any U,T ORDERBY T DESC WHERE U is CWUser, '
                             'N ecrit_par U, N type T')#, {'x': self.ueid})
454
        self.assertEqual(len(rset.rows), 0)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
455

Adrien Di Mascio's avatar
Adrien Di Mascio committed
456
    def test_select_nonregr_edition_not(self):
457
        groupeids = set((2, 3, 4))
458
459
460
461
        groupreadperms = set(r[0] for r in self.qexecute('Any Y WHERE X name "CWGroup", '
                                                         'Y eid IN(2, 3, 4), X read_permission Y'))
        rset = self.qexecute('DISTINCT Any Y WHERE X is CWEType, X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
462
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
463
464
        rset = self.qexecute('DISTINCT Any Y WHERE X name "CWGroup", '
                             'Y eid IN(2, 3, 4), NOT X read_permission Y')
465
        self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
466

Adrien Di Mascio's avatar
Adrien Di Mascio committed
467
    def test_select_outer_join(self):
468
469
470
471
472
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'autre'")[0][0]
        seid1 = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
        seid2 = self.qexecute("INSERT Societe X: X nom 'chouetos'")[0][0]
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
473
        self.assertEqual(rset.rows, [[peid1, None], [peid2, None]])
474
475
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
476
        self.assertEqual(rset.rows, [[peid1, seid1], [peid2, None]])
477
        rset = self.qexecute('Any S,X ORDERBY S WHERE X? travaille S')
478
        self.assertEqual(rset.rows, [[seid1, peid1], [seid2, None]])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
479

Adrien Di Mascio's avatar
Adrien Di Mascio committed
480
    def test_select_outer_join_optimized(self):
481
482
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        rset = self.qexecute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1})
483
        self.assertEqual(rset.rows, [[peid1]])
484
        rset = self.qexecute('Any X WHERE X eid %(x)s, X require_permission P?',
485
                            {'x':peid1})
486
        self.assertEqual(rset.rows, [[peid1]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
487
488

    def test_select_left_outer_join(self):
489
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G')
490
        self.assertEqual(len(rset), 4)
491
        rset = self.qexecute('DISTINCT Any G WHERE U? in_group G, U eid %(x)s',
492
                            {'x': self.admin_access._user.eid})
493
        self.assertEqual(len(rset), 4)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
494
495

    def test_select_ambigous_outer_join(self):
496
497
498
499
        teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
        self.qexecute("INSERT Tag X: X name 'tagbis'")[0][0]
        geid = self.qexecute("CWGroup G WHERE G name 'users'")[0][0]
        self.qexecute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
500
                     {'g': geid, 't': teid})
501
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN")
502
503
        self.assertIn(['users', 'tag'], rset.rows)
        self.assertIn(['activated', None], rset.rows)
504
        rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
505
        self.assertEqual(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
506

Adrien Di Mascio's avatar
Adrien Di Mascio committed
507
    def test_select_not_inline_rel(self):
508
509
510
511
512
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT X ecrit_par P')
513
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
514

Adrien Di Mascio's avatar
Adrien Di Mascio committed
515
    def test_select_not_unlinked_multiple_solutions(self):
516
517
518
519
520
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("INSERT Note X: X type 'b'")
        self.qexecute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Note X WHERE NOT Y evaluee X')
521
        self.assertEqual(len(rset.rows), 1, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
522

523
    def test_select_date_extraction(self):
524
        self.qexecute("INSERT Personne X: X nom 'foo', X datenaiss %(d)s",
525
526
                     {'d': datetime(2001, 2,3, 12,13)})
        test_data = [('YEAR', 2001), ('MONTH', 2), ('DAY', 3),
527
                     ('HOUR', 12), ('MINUTE', 13), ('WEEKDAY', 6)]
528
        for funcname, result in test_data:
529
            rset = self.qexecute('Any %s(D) WHERE X is Personne, X datenaiss D'
530
                                % funcname)
531
532
533
            self.assertEqual(len(rset.rows), 1)
            self.assertEqual(rset.rows[0][0], result)
            self.assertEqual(rset.description, [('Int',)])
534

535
    def test_regexp_based_pattern_matching(self):
536
537
538
        peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
        peid2 = self.qexecute("INSERT Personne X: X nom 'cidule'")[0][0]
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "^b"')
539
540
        self.assertEqual(len(rset.rows), 1, rset.rows)
        self.assertEqual(rset.rows[0][0], peid1)
541
        rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "idu"')
542
543
        self.assertEqual(len(rset.rows), 2, rset.rows)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
544
    def test_select_aggregat_count(self):
545
        rset = self.qexecute('Any COUNT(X)')
546
547
548
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
549

Adrien Di Mascio's avatar
Adrien Di Mascio committed
550
    def test_select_aggregat_sum(self):
551
        rset = self.qexecute('Any SUM(O) WHERE X ordernum O')
552
553
554
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
555

Adrien Di Mascio's avatar
Adrien Di Mascio committed
556
    def test_select_aggregat_min(self):
557
        rset = self.qexecute('Any MIN(X) WHERE X is Personne')
558
559
560
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
561
        rset = self.qexecute('Any MIN(O) WHERE X ordernum O')
562
563
564
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
565

Adrien Di Mascio's avatar
Adrien Di Mascio committed
566
    def test_select_aggregat_max(self):
567
        rset = self.qexecute('Any MAX(X) WHERE X is Personne')
568
569
570
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Personne',)])
571
        rset = self.qexecute('Any MAX(O) WHERE X ordernum O')
572
573
574
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(len(rset.rows[0]), 1)
        self.assertEqual(rset.description, [('Int',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
575
576

    def test_select_custom_aggregat_concat_string(self):
577
        rset = self.qexecute('Any GROUP_CONCAT(N) WHERE X is CWGroup, X name N')
578
579
        self.assertTrue(rset)
        self.assertEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
580
581
582
                                                             'owners', 'users'])

    def test_select_custom_regproc_limit_size(self):
583
        rset = self.qexecute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is CWGroup, X name N, X name "managers"')
584
585
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'man...')
586
587
        self.qexecute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'")
        rset = self.qexecute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF')
588
589
        self.assertTrue(rset)
        self.assertEqual(rset[0][0], 'hop...')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
590
591

    def test_select_regproc_orderby(self):
592
        rset = self.qexecute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, X name "managers"')
593
594
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][1], 'managers')
595
        rset = self.qexecute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"')
596
597
        self.assertEqual(len(rset), 3)
        self.assertEqual(rset[0][1], 'owners')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
598

Adrien Di Mascio's avatar
Adrien Di Mascio committed
599
    def test_select_aggregat_sort(self):
600
        rset = self.qexecute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
601
602
603
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(len(rset.rows[0]), 2)
        self.assertEqual(rset.description[0], ('CWGroup', 'Int',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
604
605

    def test_select_aggregat_having(self):
606
        rset = self.qexecute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
607
608
                            'WHERE RT name N, RDEF relation_type RT '
                            'HAVING COUNT(RDEF) > 10')
609
        self.assertListEqual(rset.rows,
610
611
                              [[u'description_format', 13],
                               [u'description', 14],
612
613
614
615
616
617
618
619
620
621
                               [u'name', 18],
                               [u'created_by', 44],
                               [u'creation_date', 44],
                               [u'cw_source', 44],
                               [u'cwuri', 44],
                               [u'in_basket', 44],
                               [u'is', 44],
                               [u'is_instance_of', 44],
                               [u'modification_date', 44],
                               [u'owned_by', 44]])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
622
623
624

    def test_select_aggregat_having_dumb(self):
        # dumb but should not raise an error
625
        rset = self.qexecute('Any U,COUNT(X) GROUPBY U '
Adrien Di Mascio's avatar
Adrien Di Mascio committed
626
627
                            'WHERE U eid %(x)s, X owned_by U '
                            'HAVING COUNT(X) > 10', {'x': self.ueid})
628
629
        self.assertEqual(len(rset.rows), 1)
        self.assertEqual(rset.rows[0][0], self.ueid)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
630

631
    def test_select_having_non_aggregat_1(self):
632
        rset = self.qexecute('Any L WHERE X login L, X creation_date CD '
633
                            'HAVING YEAR(CD) = %s' % date.today().year)
634
        self.assertListEqual(rset.rows,
635
636
637
638
                              [[u'admin'],
                               [u'anon']])

    def test_select_having_non_aggregat_2(self):
639
        rset = self.qexecute('Any L GROUPBY L WHERE X login L, X in_group G, '
640
641
                            'X creation_date CD HAVING YEAR(CD) = %s OR COUNT(G) > 1'
                            % date.today().year)
642
        self.assertListEqual(rset.rows,
643
644
645
                              [[u'admin'],
                               [u'anon']])

Adrien Di Mascio's avatar
Adrien Di Mascio committed
646
    def test_select_complex_sort(self):
647
        """need sqlite including http://www.sqlite.org/cvstrac/tktview?tn=3773 fix"""
648
        rset = self.qexecute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
649
650
        result = rset.rows
        result.sort()
651
        self.assertEqual(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
652

Adrien Di Mascio's avatar
Adrien Di Mascio committed
653
    def test_select_upper(self):
654
        rset = self.qexecute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L')
655
656
657
658
659
        self.assertEqual(len(rset.rows), 2)
        self.assertEqual(rset.rows[0][1], 'ADMIN')
        self.assertEqual(rset.description[0], ('CWUser', 'String',))
        self.assertEqual(rset.rows[1][1], 'ANON')
        self.assertEqual(rset.description[1], ('CWUser', 'String',))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
660
        eid = rset.rows[0][0]
661
        rset = self.qexecute('Any UPPER(L) WHERE X eid %s, X login L'%eid)
662
663
        self.assertEqual(rset.rows[0][0], 'ADMIN')
        self.assertEqual(rset.description, [('String',)])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
664

665
666
    def test_select_float_abs(self):
        # test positive number
667
668
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': 1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
669
        self.assertEqual(rset.rows[0][0], 1.2)
670
        # test negative number
671
672
        eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': -1.2})[0][0]
        rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
673
        self.assertEqual(rset.rows[0][0], 1.2)
674
675
676

    def test_select_int_abs(self):
        # test positive number
677
678
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': 12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
679
        self.assertEqual(rset.rows[0][0], 12)
680
        # test negative number
681
682
        eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': -12})[0][0]
        rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
Sylvain Thénault's avatar
Sylvain Thénault committed
683
        self.assertEqual(rset.rows[0][0], 12)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
684
685

##     def test_select_simplified(self):
686
##         ueid = self.admin_access._user.eid
687
##         rset = self.qexecute('Any L WHERE %s login L'%ueid)
688
##         self.assertEqual(rset.rows[0][0], 'admin')
689
##         rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid})
690
##         self.assertEqual(rset.rows[0][0], 'admin')
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
691

Adrien Di Mascio's avatar
Adrien Di Mascio committed
692
    def test_select_searchable_text_1(self):
693
694
695
696
        rset = self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute(u"INSERT Societe X: X nom 'bidle'")
        rset = self.qexecute("INSERT Societe X: X nom 'chouette'")
        rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidle'})
697
        self.assertEqual(len(rset.rows), 2, rset.rows)
698
        rset = self.qexecute(u'Any N where N has_text "bidle"')
699
        self.assertEqual(len(rset.rows), 2, rset.rows)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
700
        biduleeids = [r[0] for r in rset.rows]
701
        rset = self.qexecute(u'Any N where NOT N has_text "bidle"')
702
        self.assertFalse([r[0] for r in rset.rows if r[0] in biduleeids])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
703
        # duh?
704
        rset = self.qexecute('Any X WHERE X has_text %(text)s', {'text': u'a'})
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
705

Adrien Di Mascio's avatar
Adrien Di Mascio committed
706
    def test_select_searchable_text_2(self):
707
708
709
710
        rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute("INSERT Personne X: X nom 'chouette'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Personne N where N has_text "bidule"')
711
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
712

Adrien Di Mascio's avatar
Adrien Di Mascio committed
713
    def test_select_searchable_text_3(self):
714
715
716
717
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
        rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
        rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
        rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"')
718
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
719

Adrien Di Mascio's avatar
Adrien Di Mascio committed
720
    def test_select_multiple_searchable_text(self):
721
722
723
724
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X")
        self.qexecute(u"INSERT Personne X: X nom 'bidle'")
        rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
Adrien Di Mascio's avatar
Adrien Di Mascio committed
725
726
727
                            {'text': u'bidle',
                             'text2': u'chouette',}
                            )
728
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
729

Adrien Di Mascio's avatar
Adrien Di Mascio committed
730
    def test_select_no_descr(self):
731
        rset = self.qexecute('Any X WHERE X is CWGroup', build_descr=0)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
732
        rset.rows.sort()
733
        self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
734
        self.assertEqual(rset.description, ())
Adrien Di Mascio's avatar
Adrien Di Mascio committed
735
736

    def test_select_limit_offset(self):
737
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 WHERE X name N')
738
        self.assertEqual(tuplify(rset.rows), [(2,), (3,)])
739
        self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',)])
740
        rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
741
        self.assertEqual(tuplify(rset.rows), [(4,), (5,)])
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
742

743
    def test_select_symmetric(self):
744
745
746
747
748
749
750
        self.qexecute("INSERT Personne X: X nom 'machin'")
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'trucmuche'")
        self.qexecute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'")
        self.qexecute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'")
        rset = self.qexecute('Any P WHERE P connait P2')
751
        self.assertEqual(len(rset.rows), 4, rset.rows)
752
        rset = self.qexecute('Any P WHERE NOT P connait P2')
753
        self.assertEqual(len(rset.rows), 1, rset.rows) # trucmuche
754
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "bidule"')
755
        self.assertEqual(len(rset.rows), 1, rset.rows)
756
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "bidule"')
757
        self.assertEqual(len(rset.rows), 1, rset.rows)
758
        rset = self.qexecute('Any P WHERE P connait P2, P2 nom "chouette"')
759
        self.assertEqual(len(rset.rows), 2, rset.rows)
760
        rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"')
761
        self.assertEqual(len(rset.rows), 2, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
762

Adrien Di Mascio's avatar
Adrien Di Mascio committed
763
    def test_select_inline(self):
764
765
766
767
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X type 'a'")
        self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
        rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"')
768
        self.assertEqual(len(rset.rows), 1, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
769

Adrien Di Mascio's avatar
Adrien Di Mascio committed
770
    def test_select_creation_date(self):
771
772
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
773
774
775
        self.assertEqual(len(rset.rows), 1)

    def test_select_or_relation(self):
776
777
778
779
780
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Societe X: X nom 'logilab'")
        self.qexecute("INSERT Societe X: X nom 'caesium'")
        self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
781
782
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
783
        self.assertEqual(len(rset.rows), 1)
784
        self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
785
786
        rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
                             'S1 nom "logilab", S2 nom "caesium"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
787
        self.assertEqual(len(rset.rows), 2)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
788

Adrien Di Mascio's avatar
Adrien Di Mascio committed
789
    def test_select_or_sym_relation(self):
790
791
792
793
794
        self.qexecute("INSERT Personne X: X nom 'bidule'")
        self.qexecute("INSERT Personne X: X nom 'chouette'")
        self.qexecute("INSERT Personne X: X nom 'truc'")
        self.qexecute("SET P connait S WHERE P nom 'bidule', S nom 'chouette'")
        rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
795
        self.assertEqual(len(rset.rows), 1, rset.rows)
796
        rset = self.qexecute('DISTINCT Any P WHERE P connait S or S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
797
        self.assertEqual(len(rset.rows), 1, rset.rows)
798
799
        self.qexecute("SET P connait S WHERE P nom 'chouette', S nom 'truc'")
        rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
800
        self.assertEqual(len(rset.rows), 2, rset.rows)
801
        rset = self.qexecute('DISTINCT Any P WHERE P connait S OR S connait P, S nom "chouette"')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
802
        self.assertEqual(len(rset.rows), 2, rset.rows)
sylvain.thenault@logilab.fr's avatar
sylvain.thenault@logilab.fr committed
803

Adrien Di Mascio's avatar
Adrien Di Mascio committed
804
    def test_select_follow_relation(self):
805
806
807
808
809
810
811
812
813
        self.qexecute("INSERT Affaire X: X sujet 'cool'")
        self.qexecute("INSERT Societe X: X nom 'chouette'")
        self.qexecute("SET A concerne S WHERE A is Affaire, S is Societe")
        self.qexecute("INSERT Note X: X para 'truc'")
        self.qexecute("SET S evaluee N WHERE S is Societe, N is Note")
        self.qexecute("INSERT Societe X: X nom 'bidule'")
        self.qexecute("INSERT Note X: X para 'troc'")
        self.qexecute("SET S evaluee N WHERE S nom 'bidule', N para 'troc'")
        rset = self.qexecute('DISTINCT Any A,N WHERE A concerne S, S evaluee N')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
814
815
816
        self.assertEqual(len(rset.rows), 1, rset.rows)

    def test_select_ordered_distinct_1(self):
817
        self.assertRaises(BadRQLQuery,
818
                          self.qexecute, 'DISTINCT Any S ORDERBY R WHERE A is Affaire, A sujet S, A ref R')
Adrien Di Mascio's avatar
Adrien Di Mascio committed
819
820

    def test_select_ordered_distinct_2(self):
821
822
823
824
        self.qexecute("INSERT Affaire X: X sujet 'minor'")
        self.qexecute("INSERT Affaire X: X sujet 'zou'")
        self.qexecute("INSERT Affaire X: X sujet 'abcd'")
        rset = self.qexecute(