Commit e553a145 authored by Florent Cayré's avatar Florent Cayré
Browse files

refactor entity fetch_rql method to use a RQL syntax tree instead of RQL strings ; closes #1585650

parent 163d25c9fdd2
......@@ -27,6 +27,9 @@ from logilab.common.deprecation import deprecated
from logilab.mtconverter import TransformData, TransformError, xml_escape
from rql.utils import rqlvar_maker
from rql.stmts import Select
from rql.nodes import (Not, VariableRef, Constant, make_relation,
Relation as RqlRelation)
from cubicweb import Unauthorized, typed_eid, neg_role
from cubicweb.rset import ResultSet
......@@ -175,30 +178,45 @@ class Entity(AppObject):
@classmethod
def fetch_rql(cls, user, restriction=None, fetchattrs=None, mainvar='X',
settype=True, ordermethod='fetch_order'):
"""return a rql to fetch all entities of the class type"""
# XXX update api and implementation to AST manipulation (see unrelated rql)
restrictions = restriction or []
st = cls.fetch_rqlst(user, mainvar=mainvar, fetchattrs=fetchattrs,
settype=settype, ordermethod=ordermethod)
rql = st.as_string()
if restriction:
# cannot use RQLRewriter API to insert 'X rtype %(x)s' restriction
warn('[3.14] fetch_rql: use of `restriction` parameter is '
'deprecated, please use fetch_rqlst and supply a syntax'
'tree with your restriction instead', DeprecationWarning)
insert = ' WHERE ' + ','.join(restriction)
if ' WHERE ' in rql:
select, where = rql.split(' WHERE ', 1)
rql = select + insert + ',' + where
else:
rql += insert
return rql
@classmethod
def fetch_rqlst(cls, user, select=None, mainvar='X', fetchattrs=None,
settype=True, ordermethod='fetch_order'):
if select is None:
select = Select()
mainvar = select.get_variable(mainvar)
select.add_selected(mainvar)
elif isinstance(mainvar, basestring):
assert mainvar in select.defined_vars
mainvar = select.get_variable(mainvar)
# eases string -> syntax tree test transition: please remove once stable
select._varmaker = rqlvar_maker(defined=select.defined_vars,
aliases=select.aliases, index=26)
if settype:
restrictions.append('%s is %s' % (mainvar, cls.__regid__))
select.add_type_restriction(mainvar, cls.__regid__)
if fetchattrs is None:
fetchattrs = cls.fetch_attrs
selection = [mainvar]
orderby = []
# start from 26 to avoid possible conflicts with X
# XXX not enough to be sure it'll be no conflicts
varmaker = rqlvar_maker(index=26)
cls._fetch_restrictions(mainvar, varmaker, fetchattrs, selection,
orderby, restrictions, user, ordermethod)
rql = 'Any %s' % ','.join(selection)
if orderby:
rql += ' ORDERBY %s' % ','.join(orderby)
rql += ' WHERE %s' % ', '.join(restrictions)
return rql
cls._fetch_restrictions(mainvar, select, fetchattrs, user, ordermethod)
return select
@classmethod
def _fetch_restrictions(cls, mainvar, varmaker, fetchattrs,
selection, orderby, restrictions, user,
ordermethod='fetch_order', visited=None):
def _fetch_restrictions(cls, mainvar, select, fetchattrs,
user, ordermethod='fetch_order', visited=None):
eschema = cls.e_schema
if visited is None:
visited = set((eschema.type,))
......@@ -218,23 +236,21 @@ class Entity(AppObject):
rdef = eschema.rdef(attr)
if not user.matching_groups(rdef.get_groups('read')):
continue
var = varmaker.next()
selection.append(var)
restriction = '%s %s %s' % (mainvar, attr, var)
restrictions.append(restriction)
if rschema.final or rdef.cardinality[0] in '?1':
var = select.make_variable()
select.add_selected(var)
rel = make_relation(mainvar, attr, (var,), VariableRef)
select.add_restriction(rel)
else:
cls.warning('bad relation %s specified in fetch attrs for %s',
attr, cls)
continue
if not rschema.final:
card = rdef.cardinality[0]
if card not in '?1':
cls.warning('bad relation %s specified in fetch attrs for %s',
attr, cls)
selection.pop()
restrictions.pop()
continue
# XXX we need outer join in case the relation is not mandatory
# (card == '?') *or if the entity is being added*, since in
# that case the relation may still be missing. As we miss this
# later information here, systematically add it.
restrictions[-1] += '?'
rel.change_optional('right')
targettypes = rschema.objects(eschema.type)
# XXX user._cw.vreg iiiirk
etypecls = user._cw.vreg['etypes'].etype_class(targettypes[0])
......@@ -244,14 +260,13 @@ class Entity(AppObject):
remove_ambiguous_rels(fetchattrs, targettypes, user._cw.vreg.schema)
else:
fetchattrs = etypecls.fetch_attrs
etypecls._fetch_restrictions(var, varmaker, fetchattrs,
selection, orderby, restrictions,
etypecls._fetch_restrictions(var, select, fetchattrs,
user, ordermethod, visited=visited)
if ordermethod is not None:
orderterm = getattr(cls, ordermethod)(attr, var)
orderterm = getattr(cls, ordermethod)(attr, var.name)
if orderterm:
orderby.append(orderterm)
return selection, orderby, restrictions
var, order = orderterm.split()
select.add_sort_var(select.get_variable(var), order=='ASC')
@classmethod
@cached
......@@ -742,6 +757,7 @@ class Entity(AppObject):
:param limit: resultset's maximum size
:param entities: if True, the entites are returned; if False, a result set is returned
"""
rtype = str(rtype)
try:
return self._cw_relation_cache(rtype, role, entities, limit)
except KeyError:
......@@ -757,50 +773,61 @@ class Entity(AppObject):
def cw_related_rql(self, rtype, role='subject', targettypes=None):
rschema = self._cw.vreg.schema[rtype]
select = Select()
mainvar, evar = select.get_variable('X'), select.get_variable('E')
select.add_selected(mainvar)
select.add_eid_restriction(evar, 'x', 'Substitute')
if role == 'subject':
restriction = 'E eid %%(x)s, E %s X' % rtype
rel = make_relation(evar, rtype, (mainvar,), VariableRef)
select.add_restriction(rel)
if targettypes is None:
targettypes = rschema.objects(self.e_schema)
else:
restriction += ', X is IN (%s)' % ','.join(targettypes)
card = greater_card(rschema, (self.e_schema,), targettypes, 0)
select.add_constant_restriction(mainvar, 'is',
targettypes, 'etype')
gcard = greater_card(rschema, (self.e_schema,), targettypes, 0)
else:
restriction = 'E eid %%(x)s, X %s E' % rtype
rel = make_relation(mainvar, rtype, (evar,), VariableRef)
select.add_restriction(rel)
if targettypes is None:
targettypes = rschema.subjects(self.e_schema)
else:
restriction += ', X is IN (%s)' % ','.join(targettypes)
card = greater_card(rschema, targettypes, (self.e_schema,), 1)
select.add_constant_restriction(mainvar, 'is', targettypes,
'String')
gcard = greater_card(rschema, targettypes, (self.e_schema,), 1)
etypecls = self._cw.vreg['etypes'].etype_class(targettypes[0])
if len(targettypes) > 1:
fetchattrs = self._cw.vreg['etypes'].fetch_attrs(targettypes)
# XXX we should fetch ambiguous relation objects too but not
# recurse on them in _fetch_restrictions; it is easier to remove
# them completely for now, as it would require an deeper api rewrite
# them completely for now, as it would require a deeper api rewrite
remove_ambiguous_rels(fetchattrs, targettypes, self._cw.vreg.schema)
else:
fetchattrs = etypecls.fetch_attrs
rql = etypecls.fetch_rql(self._cw.user, [restriction], fetchattrs,
settype=False)
etypecls.fetch_rqlst(self._cw.user, select, mainvar, fetchattrs,
settype=False)
# optimisation: remove ORDERBY if cardinality is 1 or ? (though
# greater_card return 1 for those both cases)
if card == '1':
if ' ORDERBY ' in rql:
rql = '%s WHERE %s' % (rql.split(' ORDERBY ', 1)[0],
rql.split(' WHERE ', 1)[1])
elif not ' ORDERBY ' in rql:
args = rql.split(' WHERE ', 1)
# if modification_date already retreived, we should use it instead
# of adding another variable for sort. This should be be problematic
# but it's actually with sqlserver, see ticket #694445
if 'X modification_date ' in args[1]:
var = args[1].split('X modification_date ', 1)[1].split(',', 1)[0]
args.insert(1, var.strip())
rql = '%s ORDERBY %s DESC WHERE %s' % tuple(args)
if gcard == '1':
select.remove_sort_terms()
elif not select.orderby:
# if modification_date is already retrieved, we use it instead
# of adding another variable for sorting. This should not be
# problematic, but it is with sqlserver, see ticket #694445
for rel in select.where.get_nodes(RqlRelation):
if (rel.r_type == 'modification_date'
and rel.children[0].variable == mainvar
and rel.children[1].operator == '='):
var = rel.children[1].children[0].variable
select.add_sort_var(var, asc=False)
break
else:
rql = '%s ORDERBY Z DESC WHERE X modification_date Z, %s' % \
tuple(args)
return rql
mdvar = select.make_variable()
rel = make_relation(mainvar, 'modification_date',
(mdvar,), VariableRef)
select.add_restriction(rel)
select.add_sort_var(mdvar, asc=False)
return select.as_string()
# generic vocabulary methods ##############################################
......@@ -817,33 +844,36 @@ class Entity(AppObject):
rtype = self._cw.vreg.schema.rschema(rtype)
rdef = rtype.role_rdef(self.e_schema, targettype, role)
rewriter = RQLRewriter(self._cw)
select = Select()
# initialize some variables according to the `role` of `self` in the
# relation:
# * variable for myself (`evar`) and searched entities (`searchvedvar`)
# * entity type of the subject (`subjtype`) and of the object
# (`objtype`) of the relation
# relation (variable names must respect constraints conventions):
# * variable for myself (`evar`)
# * variable for searched entities (`searchvedvar`)
if role == 'subject':
evar, searchedvar = 'S', 'O'
subjtype, objtype = self.e_schema, targettype
evar = subjvar = select.get_variable('S')
searchedvar = objvar = select.get_variable('O')
else:
searchedvar, evar = 'S', 'O'
objtype, subjtype = self.e_schema, targettype
searchedvar = subjvar = select.get_variable('S')
evar = objvar = select.get_variable('O')
select.add_selected(searchedvar)
# initialize some variables according to `self` existance
if rdef.role_cardinality(neg_role(role)) in '?1':
# if cardinality in '1?', we want a target entity which isn't
# already linked using this relation
if searchedvar == 'S':
restriction = ['NOT S %s ZZ' % rtype]
var = select.get_variable('ZZ') # XXX unname when tests pass
if role == 'subject':
rel = make_relation(var, rtype.type, (searchedvar,), VariableRef)
else:
restriction = ['NOT ZZ %s O' % rtype]
rel = make_relation(searchedvar, rtype.type, (var,), VariableRef)
select.add_restriction(Not(rel))
elif self.has_eid():
# elif we have an eid, we don't want a target entity which is
# already linked to ourself through this relation
restriction = ['NOT S %s O' % rtype]
else:
restriction = []
rel = make_relation(subjvar, rtype.type, (objvar,), VariableRef)
select.add_restriction(Not(rel))
if self.has_eid():
restriction += ['%s eid %%(x)s' % evar]
rel = make_relation(evar, 'eid', ('x', 'Substitute'), Constant)
select.add_restriction(rel)
args = {'x': self.eid}
if role == 'subject':
sec_check_args = {'fromeid': self.eid}
......@@ -853,12 +883,15 @@ class Entity(AppObject):
else:
args = {}
sec_check_args = {}
existant = searchedvar
# retreive entity class for targettype to compute base rql
existant = searchedvar.name
# undefine unused evar, or the type resolver will consider it
select.undefine_variable(evar)
# retrieve entity class for targettype to compute base rql
etypecls = self._cw.vreg['etypes'].etype_class(targettype)
rql = etypecls.fetch_rql(self._cw.user, restriction,
mainvar=searchedvar, ordermethod=ordermethod)
select = self._cw.vreg.parse(self._cw, rql, args).children[0]
etypecls.fetch_rqlst(self._cw.user, select, searchedvar,
ordermethod=ordermethod)
# from now on, we need variable type resolving
self._cw.vreg.solutions(self._cw, select, args)
# insert RQL expressions for schema constraints into the rql syntax tree
if vocabconstraints:
# RQLConstraint is a subclass for RQLVocabularyConstraint, so they
......@@ -868,12 +901,12 @@ class Entity(AppObject):
cstrcls = RQLConstraint
for cstr in rdef.constraints:
# consider constraint.mainvars to check if constraint apply
if isinstance(cstr, cstrcls) and searchedvar in cstr.mainvars:
if not self.has_eid() and evar in cstr.mainvars:
if isinstance(cstr, cstrcls) and searchedvar.name in cstr.mainvars:
if not self.has_eid() and evar.name in cstr.mainvars:
continue
# compute a varmap suitable to RQLRewriter.rewrite argument
varmap = dict((v, v) for v in 'SO' if v in select.defined_vars
and v in cstr.mainvars)
varmap = dict((v, v) for v in (searchedvar.name, evar.name)
if v in select.defined_vars and v in cstr.mainvars)
# rewrite constraint by constraint since we want a AND between
# expressions.
rewriter.rewrite(select, [(varmap, (cstr,))], select.solutions,
......@@ -883,13 +916,14 @@ class Entity(AppObject):
rqlexprs = rdef.get_rqlexprs('add')
if rqlexprs and not rdef.has_perm(self._cw, 'add', **sec_check_args):
# compute a varmap suitable to RQLRewriter.rewrite argument
varmap = dict((v, v) for v in 'SO' if v in select.defined_vars)
varmap = dict((v, v) for v in (searchedvar.name, evar.name)
if v in select.defined_vars)
# rewrite all expressions at once since we want a OR between them.
rewriter.rewrite(select, [(varmap, rqlexprs)], select.solutions,
args, existant)
# ensure we have an order defined
if not select.orderby:
select.add_sort_var(select.defined_vars[searchedvar])
select.add_sort_var(select.defined_vars[searchedvar.name])
# we're done, turn the rql syntax tree as a string
rql = select.as_string()
return rql, args
......
......@@ -459,8 +459,9 @@ class Repository(object):
def _build_user(self, session, eid):
"""return a CWUser entity for user with the given eid"""
cls = self.vreg['etypes'].etype_class('CWUser')
rql = cls.fetch_rql(session.user, ['X eid %(x)s'])
rset = session.execute(rql, {'x': eid})
st = cls.fetch_rqlst(session.user, ordermethod=None)
st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
rset = session.execute(st.as_string(), {'x': eid})
assert len(rset) == 1, rset
cwuser = rset.get_entity(0, 0)
# pylint: disable=W0104
......
......@@ -179,7 +179,7 @@ class EntityTC(CubicWebTC):
try:
# testing basic fetch_attrs attribute
self.assertEqual(Personne.fetch_rql(user),
'Any X,AA,AB,AC ORDERBY AA ASC '
'Any X,AA,AB,AC ORDERBY AA '
'WHERE X is Personne, X nom AA, X prenom AB, X modification_date AC')
# testing unknown attributes
Personne.fetch_attrs = ('bloug', 'beep')
......@@ -187,36 +187,36 @@ class EntityTC(CubicWebTC):
# testing one non final relation
Personne.fetch_attrs = ('nom', 'prenom', 'travaille')
self.assertEqual(Personne.fetch_rql(user),
'Any X,AA,AB,AC,AD ORDERBY AA ASC '
'Any X,AA,AB,AC,AD ORDERBY AA '
'WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD')
# testing two non final relations
Personne.fetch_attrs = ('nom', 'prenom', 'travaille', 'evaluee')
self.assertEqual(Personne.fetch_rql(user),
'Any X,AA,AB,AC,AD,AE ORDERBY AA ASC '
'Any X,AA,AB,AC,AD,AE ORDERBY AA '
'WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD, '
'X evaluee AE?')
# testing one non final relation with recursion
Personne.fetch_attrs = ('nom', 'prenom', 'travaille')
Societe.fetch_attrs = ('nom', 'evaluee')
self.assertEqual(Personne.fetch_rql(user),
'Any X,AA,AB,AC,AD,AE,AF ORDERBY AA ASC,AF DESC '
'Any X,AA,AB,AC,AD,AE,AF ORDERBY AA,AF DESC '
'WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD, '
'AC evaluee AE?, AE modification_date AF'
)
# testing symmetric relation
Personne.fetch_attrs = ('nom', 'connait')
self.assertEqual(Personne.fetch_rql(user), 'Any X,AA,AB ORDERBY AA ASC '
self.assertEqual(Personne.fetch_rql(user), 'Any X,AA,AB ORDERBY AA '
'WHERE X is Personne, X nom AA, X connait AB?')
# testing optional relation
peschema.subjrels['travaille'].rdef(peschema, seschema).cardinality = '?*'
Personne.fetch_attrs = ('nom', 'prenom', 'travaille')
Societe.fetch_attrs = ('nom',)
self.assertEqual(Personne.fetch_rql(user),
'Any X,AA,AB,AC,AD ORDERBY AA ASC WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD')
'Any X,AA,AB,AC,AD ORDERBY AA WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD')
# testing relation with cardinality > 1
peschema.subjrels['travaille'].rdef(peschema, seschema).cardinality = '**'
self.assertEqual(Personne.fetch_rql(user),
'Any X,AA,AB ORDERBY AA ASC WHERE X is Personne, X nom AA, X prenom AB')
'Any X,AA,AB ORDERBY AA WHERE X is Personne, X nom AA, X prenom AB')
# XXX test unauthorized attribute
finally:
# fetch_attrs restored by generic tearDown
......@@ -233,8 +233,14 @@ class EntityTC(CubicWebTC):
SubNote.fetch_attrs, SubNote.fetch_order = fetch_config(('type',))
p = self.request().create_entity('Personne', nom=u'pouet')
self.assertEqual(p.cw_related_rql('evaluee'),
'Any X,AA,AB ORDERBY AA ASC WHERE E eid %(x)s, E evaluee X, '
'X type AA, X modification_date AB')
'Any X,AA,AB ORDERBY AA WHERE E eid %(x)s, E evaluee X, '
'X type AA, X modification_date AB')
n = self.request().create_entity('Note')
self.assertEqual(n.cw_related_rql('evaluee', role='object',
targettypes=('Societe', 'Personne')),
"Any X,AA ORDERBY AB DESC WHERE E eid %(x)s, X evaluee E, "
"X is IN('Personne', 'Societe'), X nom AA, "
"X modification_date AB")
Personne.fetch_attrs, Personne.fetch_order = fetch_config(('nom', ))
# XXX
self.assertEqual(p.cw_related_rql('evaluee'),
......@@ -246,8 +252,8 @@ class EntityTC(CubicWebTC):
'Any X,AA ORDERBY AA DESC '
'WHERE E eid %(x)s, E tags X, X modification_date AA')
self.assertEqual(tag.cw_related_rql('tags', 'subject', ('Personne',)),
'Any X,AA,AB ORDERBY AA ASC '
'WHERE E eid %(x)s, E tags X, X is IN (Personne), X nom AA, '
'Any X,AA,AB ORDERBY AA '
'WHERE E eid %(x)s, E tags X, X is Personne, X nom AA, '
'X modification_date AB')
def test_related_rql_ambiguous_cant_use_fetch_order(self):
......@@ -274,7 +280,7 @@ class EntityTC(CubicWebTC):
user = self.request().user
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT EXISTS(ZZ use_email O), S eid %(x)s, '
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
def test_unrelated_rql_security_1_user(self):
......@@ -284,37 +290,37 @@ class EntityTC(CubicWebTC):
user = req.user
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT EXISTS(ZZ use_email O), S eid %(x)s, '
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0)
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT EXISTS(ZZ use_email O, ZZ is CWUser), S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC, A eid %(B)s, '
'EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, '
'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser')
def test_unrelated_rql_security_1_anon(self):
self.login('anon')
user = self.request().user
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT EXISTS(ZZ use_email O, ZZ is CWUser), S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC, A eid %(B)s, '
'EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, '
'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser')
def test_unrelated_rql_security_2(self):
email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0]
self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA '
'WHERE NOT EXISTS(S use_email O), O eid %(x)s, S is CWUser, '
'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, '
'S login AA, S firstname AB, S surname AC, S modification_date AD')
self.login('anon')
email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}).get_entity(0, 0)
rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0]
self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA '
'WHERE NOT EXISTS(S use_email O), O eid %(x)s, S is CWUser, '
'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, '
'S login AA, S firstname AB, S surname AC, S modification_date AD, '
'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
'AE eid %(AF)s, EXISTS(S identity AE, NOT AE in_group AG, AG name "guests", AG is CWGroup)')
def test_unrelated_rql_security_nonexistant(self):
self.login('anon')
......@@ -323,7 +329,7 @@ class EntityTC(CubicWebTC):
self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA '
'WHERE S is CWUser, '
'S login AA, S firstname AB, S surname AC, S modification_date AD, '
'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
'AE eid %(AF)s, EXISTS(S identity AE, NOT AE in_group AG, AG name "guests", AG is CWGroup)')
def test_unrelated_rql_constraints_creation_subject(self):
person = self.vreg['etypes'].etype_class('Personne')(self.request())
......@@ -338,14 +344,15 @@ class EntityTC(CubicWebTC):
self.assertEqual(
rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE '
'S is Personne, S nom AA, S prenom AB, S modification_date AC, '
'NOT (S connait A, A nom "toto"), A is Personne, EXISTS(S travaille B, B nom "tutu")')
'NOT (S connait AD, AD nom "toto"), AD is Personne, '
'EXISTS(S travaille AE, AE nom "tutu")')
def test_unrelated_rql_constraints_edition_subject(self):
person = self.request().create_entity('Personne', nom=u'sylvain')
rql = person.cw_unrelated_rql('connait', 'Personne', 'subject')[0]
self.assertEqual(
rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE '
'NOT EXISTS(S connait O), S eid %(x)s, O is Personne, '
'NOT S connait O, S eid %(x)s, O is Personne, '
'O nom AA, O prenom AB, O modification_date AC, '
'NOT S identity O')
......@@ -354,10 +361,10 @@ class EntityTC(CubicWebTC):
rql = person.cw_unrelated_rql('connait', 'Personne', 'object')[0]
self.assertEqual(
rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE '
'NOT EXISTS(S connait O), O eid %(x)s, S is Personne, '
'NOT S connait O, O eid %(x)s, S is Personne, '
'S nom AA, S prenom AB, S modification_date AC, '
'NOT S identity O, NOT (S connait A, A nom "toto"), '
'EXISTS(S travaille B, B nom "tutu")')
'NOT S identity O, NOT (S connait AD, AD nom "toto"), '
'EXISTS(S travaille AE, AE nom "tutu")')
def test_unrelated_base(self):
req = self.request()
......
......@@ -69,35 +69,35 @@ class URLPublisherTC(CubicWebTC):
self.assertEqual(ctrl, 'view')
self.assertEqual(len(rset), 1)
self.assertEqual(rset.description[0][0], 'CWUser')
self.assertEqual(rset.printable_rql(), 'Any X,AA,AB,AC,AD WHERE X login "admin", X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD')
self.assertEqual(rset.printable_rql(), 'Any X,AA,AB,AC,AD WHERE X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD, X login "admin"')
def test_rest_path_unique_attr(self):
ctrl, rset = self.process('cwuser/admin')
self.assertEqual(ctrl, 'view')
self.assertEqual(len(rset), 1)
self.assertEqual(rset.description[0][0], 'CWUser')
self.assertEqual(rset.printable_rql(), 'Any X,AA,AB,AC,AD WHERE X login "admin", X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD')
self.assertEqual(rset.printable_rql(), 'Any X,AA,AB,AC,AD WHERE X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD, X login "admin"')
def test_rest_path_eid(self):
ctrl, rset = self.process('cwuser/eid/%s' % self.user().eid)
self.assertEqual(ctrl, 'view')
self.assertEqual(len(rset), 1)
self.assertEqual(rset.description[0][0], 'CWUser')
self.assertEqual(rset.printable_rql(), 'Any X,AA,AB,AC,AD WHERE X eid %s, X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD' % rset[0][0])
self.assertEqual(rset.printable_rql(), 'Any X,AA,AB,AC,AD WHERE X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD, X eid %s' % rset[0][0])
def test_rest_path_non_ascii_paths(self):
ctrl, rset = self.process('CWUser/login/%C3%BFsa%C3%BFe')
self.assertEqual(ctrl, 'view')
self.assertEqual(len(rset), 1)
self.assertEqual(rset.description[0][0], 'CWUser')
self.assertEqual(rset.printable_rql(), u'Any X,AA,AB,AC,AD WHERE X login "\xffsa\xffe", X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD')
self.assertEqual(rset.printable_rql(), u'Any X,AA,AB,AC,AD WHERE X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD, X login "\xffsa\xffe"')
def test_rest_path_quoted_paths(self):
ctrl, rset = self.process('BlogEntry/title/hell%27o')
self.assertEqual(ctrl, 'view')
self.assertEqual(len(rset), 1)
self.assertEqual(rset.description[0][0], 'BlogEntry')
self.assertEqual(rset.printable_rql(), u'Any X,AA,AB,AC WHERE X title "hell\'o", X is BlogEntry, X creation_date AA, X title AB, X modification_date AC')
self.assertEqual(rset.printable_rql(), u'Any X,AA,AB,AC WHERE X is BlogEntry, X creation_date AA, X title AB, X modification_date AC, X title "hell\'o"')
def test_rest_path_errors(self):
self.assertRaises(NotFound, self.process, 'CWUser/eid/30000')
......
......@@ -195,16 +195,17 @@ class RestPathEvaluator(URLPathEvaluator):
return None, rset
def handle_etype_attr(self, req, cls, attrname, value):
rql = cls.fetch_rql(req.user, ['X %s %%(x)s' % (attrname)],
mainvar='X', ordermethod=None)
st = cls.fetch_rqlst(req.user, ordermethod=None)
st.add_constant_restriction(st.get_variable('X'), attrname,
'x', 'Substitute')
if attrname == 'eid':
try:
rset = req.execute(rql, {'x': typed_eid(value)})
rset = req.execute(st.as_string(), {'x': typed_eid(value)})
except (ValueError, TypeResolverException):
# conflicting eid/type
raise PathDontMatch()
else:
rset = req.execute(rql, {'x': value})
rset = req.execute(st.as_string(), {'x': value})
self.set_vid_for_rset(req, cls, rset)
return None, rset
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment