Commit 73f3b369 authored by Florent Cayré's avatar Florent Cayré
Browse files

[entity] restrict creation form field vocabulary using __linkto information (closes #1799997)

parent 3e51c2a577dd
......@@ -37,7 +37,8 @@ from cubicweb.rset import ResultSet
from cubicweb.selectors import yes
from cubicweb.appobject import AppObject
from cubicweb.req import _check_cw_unsafe
from cubicweb.schema import RQLVocabularyConstraint, RQLConstraint
from cubicweb.schema import (RQLVocabularyConstraint, RQLConstraint,
GeneratedConstraint)
from cubicweb.rqlrewrite import RQLRewriter
from cubicweb.uilib import soup2xhtml
......@@ -65,6 +66,85 @@ def can_use_rest_path(value):
return False
return True
def rel_vars(rel):
return ((isinstance(rel.children[0], VariableRef)
and rel.children[0].variable or None),
(isinstance(rel.children[1].children[0], VariableRef)
and rel.children[1].children[0].variable or None)
)
def rel_matches(rel, rtype, role, varname, operator='='):
if rel.r_type == rtype and rel.children[1].operator == operator:
same_role_var_idx = 0 if role == 'subject' else 1
variables = rel_vars(rel)
if variables[same_role_var_idx].name == varname:
return variables[1 - same_role_var_idx]
def build_cstr_with_linkto_infos(cstr, args, searchedvar, evar,
lt_infos, eidvars):
"""restrict vocabulary as much as possible in entity creation,
based on infos provided by __linkto form param.
Example based on following schema:
class works_in(RelationDefinition):
subject = 'CWUser'
object = 'Lab'
cardinality = '1*'
constraints = [RQLConstraint('S in_group G, O welcomes G')]
class welcomes(RelationDefinition):
subject = 'Lab'
object = 'CWGroup'
If you create a CWUser in the "scientists" CWGroup you can show
only the labs that welcome them using :
lt_infos = {('in_group', 'subject'): 321}
You get following restriction : 'O welcomes G, G eid 321'
"""
st = cstr.snippet_rqlst.copy()
# replace relations in ST by eid infos from linkto where possible
for (info_rtype, info_role), eids in lt_infos.iteritems():
eid = eids[0] # NOTE: we currently assume a pruned lt_info with only 1 eid
for rel in st.iget_nodes(RqlRelation):
targetvar = rel_matches(rel, info_rtype, info_role, evar.name)
if targetvar is not None:
if targetvar.name in eidvars:
rel.parent.remove(rel)
else:
eidrel = make_relation(
targetvar, 'eid', (targetvar.name, 'Substitute'),
Constant)
rel.parent.replace(rel, eidrel)
args[targetvar.name] = eid
eidvars.add(targetvar.name)
# if modified ST still contains evar references we must discard the
# constraint, otherwise evar is unknown in the final rql query which can
# lead to a SQL table cartesian product and multiple occurences of solutions
evarname = evar.name
for rel in st.iget_nodes(RqlRelation):
for variable in rel_vars(rel):
if variable and evarname == variable.name:
return
# else insert snippets into the global tree
return GeneratedConstraint(st, cstr.mainvars - set(evarname))
def pruned_lt_info(eschema, lt_infos):
pruned = {}
for (lt_rtype, lt_role), eids in lt_infos.iteritems():
# we can only use lt_infos describing relation with a cardinality
# of value 1 towards the linked entity
if not len(eids) == 1:
continue
lt_card = eschema.rdef(lt_rtype, lt_role).cardinality[
0 if lt_role == 'subject' else 1]
if lt_card not in '?1':
continue
pruned[(lt_rtype, lt_role)] = eids
return pruned
class Entity(AppObject):
"""an entity instance has e_schema automagically set on
......@@ -931,17 +1011,22 @@ class Entity(AppObject):
# generic vocabulary methods ##############################################
def cw_unrelated_rql(self, rtype, targettype, role, ordermethod=None,
vocabconstraints=True):
vocabconstraints=True, lt_infos={}):
"""build a rql to fetch `targettype` entities unrelated to this entity
using (rtype, role) relation.
Consider relation permissions so that returned entities may be actually
linked by `rtype`.
`lt_infos` are supplementary informations, usually coming from __linkto
parameter, that can help further restricting the results in case current
entity is not yet created. It is a dict describing entities the current
entity will be linked to, which keys are (rtype, role) tuples and values
are a list of eids.
"""
ordermethod = ordermethod or 'fetch_unrelated_order'
if isinstance(rtype, basestring):
rtype = self._cw.vreg.schema.rschema(rtype)
rdef = rtype.role_rdef(self.e_schema, targettype, role)
rschema = self._cw.vreg.schema.rschema(rtype)
rdef = rschema.role_rdef(self.e_schema, targettype, role)
rewriter = RQLRewriter(self._cw)
select = Select()
# initialize some variables according to the `role` of `self` in the
......@@ -955,20 +1040,20 @@ class Entity(AppObject):
searchedvar = subjvar = select.get_variable('S')
evar = objvar = select.get_variable('O')
select.add_selected(searchedvar)
# initialize some variables according to `self` existance
# initialize some variables according to `self` existence
if rdef.role_cardinality(neg_role(role)) in '?1':
# if cardinality in '1?', we want a target entity which isn't
# already linked using this relation
var = select.get_variable('ZZ') # XXX unname when tests pass
variable = select.make_variable()
if role == 'subject':
rel = make_relation(var, rtype.type, (searchedvar,), VariableRef)
rel = make_relation(variable, rtype, (searchedvar,), VariableRef)
else:
rel = make_relation(searchedvar, rtype.type, (var,), VariableRef)
rel = make_relation(searchedvar, rtype, (variable,), VariableRef)
select.add_restriction(Not(rel))
elif self.has_eid():
# elif we have an eid, we don't want a target entity which is
# already linked to ourself through this relation
rel = make_relation(subjvar, rtype.type, (objvar,), VariableRef)
rel = make_relation(subjvar, rtype, (objvar,), VariableRef)
select.add_restriction(Not(rel))
if self.has_eid():
rel = make_relation(evar, 'eid', ('x', 'Substitute'), Constant)
......@@ -998,11 +1083,23 @@ class Entity(AppObject):
cstrcls = RQLVocabularyConstraint
else:
cstrcls = RQLConstraint
lt_infos = pruned_lt_info(self.e_schema, lt_infos or {})
# if there are still lt_infos, use set to keep track of added eid
# relations (adding twice the same eid relation is incorrect RQL)
eidvars = set()
for cstr in rdef.constraints:
# consider constraint.mainvars to check if constraint apply
if isinstance(cstr, cstrcls) and searchedvar.name in cstr.mainvars:
if not self.has_eid() and evar.name in cstr.mainvars:
continue
if not self.has_eid():
if lt_infos:
# we can perhaps further restrict with linkto infos using
# a custom constraint built from cstr and lt_infos
cstr = build_cstr_with_linkto_infos(
cstr, args, searchedvar, evar, lt_infos, eidvars)
if cstr is None:
continue # could not build constraint -> discard
elif evar.name in cstr.mainvars:
continue
# compute a varmap suitable to RQLRewriter.rewrite argument
varmap = dict((v, v) for v in (searchedvar.name, evar.name)
if v in select.defined_vars and v in cstr.mainvars)
......@@ -1028,12 +1125,13 @@ class Entity(AppObject):
return rql, args
def unrelated(self, rtype, targettype, role='subject', limit=None,
ordermethod=None): # XXX .cw_unrelated
ordermethod=None, lt_infos={}): # XXX .cw_unrelated
"""return a result set of target type objects that may be related
by a given relation, with self as subject or object
"""
try:
rql, args = self.cw_unrelated_rql(rtype, targettype, role, ordermethod)
rql, args = self.cw_unrelated_rql(rtype, targettype, role,
ordermethod, lt_infos=lt_infos)
except Unauthorized:
return self._cw.empty_rset()
# XXX should be set in unrelated rql when manipulating the AST
......
......@@ -223,7 +223,7 @@ msgid ""
"<div>This schema of the data model <em>excludes</em> the meta-data, but you "
"can also display a <a href=\"%s\">complete schema with meta-data</a>.</div>"
msgstr ""
"<div>Ce schéma du modèle de données <em>exclue</em> les méta-données, mais "
"<div>Ce schéma du modèle de données <em>exclut</em> les méta-données, mais "
"vous pouvez afficher un <a href=\"%s\">schéma complet</a>.</div>"
msgid "<no relation>"
......
......@@ -850,23 +850,39 @@ class ERQLExpression(RQLExpression):
return self._check(session, **kwargs)
def vargraph(rqlst):
""" builds an adjacency graph of variables from the rql syntax tree, e.g:
Any O,S WHERE T subworkflow_exit S, T subworkflow WF, O state_of WF
=> {'WF': ['O', 'T'], 'S': ['T'], 'T': ['WF', 'S'], 'O': ['WF']}
"""
vargraph = {}
for relation in rqlst.get_nodes(nodes.Relation):
try:
rhsvarname = relation.children[1].children[0].variable.name
lhsvarname = relation.children[0].name
except AttributeError:
pass
else:
vargraph.setdefault(lhsvarname, []).append(rhsvarname)
vargraph.setdefault(rhsvarname, []).append(lhsvarname)
#vargraph[(lhsvarname, rhsvarname)] = relation.r_type
return vargraph
class GeneratedConstraint(object):
def __init__(self, rqlst, mainvars):
self.snippet_rqlst = rqlst
self.mainvars = mainvars
self.vargraph = vargraph(rqlst)
class RRQLExpression(RQLExpression):
def __init__(self, expression, mainvars=None, eid=None):
if mainvars is None:
mainvars = guess_rrqlexpr_mainvars(expression)
RQLExpression.__init__(self, expression, mainvars, eid)
# graph of links between variable, used by rql rewriter
self.vargraph = {}
for relation in self.rqlst.get_nodes(nodes.Relation):
try:
rhsvarname = relation.children[1].children[0].variable.name
lhsvarname = relation.children[0].name
except AttributeError:
pass
else:
self.vargraph.setdefault(lhsvarname, []).append(rhsvarname)
self.vargraph.setdefault(rhsvarname, []).append(lhsvarname)
#self.vargraph[(lhsvarname, rhsvarname)] = relation.r_type
self.vargraph = vargraph(self.rqlst)
@property
def full_rql(self):
......
......@@ -37,13 +37,19 @@ class Personne(EntityType):
# unittest_entity.py
RQLVocabularyConstraint('NOT (S connait P, P nom "toto")'),
RQLVocabularyConstraint('S travaille P, P nom "tutu"')])
actionnaire = SubjectRelation('Societe', cardinality='??',
constraints=[RQLConstraint('NOT EXISTS(O contrat_exclusif S)')])
dirige = SubjectRelation('Societe', cardinality='??',
constraints=[RQLConstraint('S actionnaire O')])
associe = SubjectRelation('Personne', cardinality='1*',
constraints=[RQLConstraint('S actionnaire SOC, O actionnaire SOC')])
class Societe(EntityType):
nom = String()
evaluee = SubjectRelation('Note')
fournit = SubjectRelation(('Service', 'Produit'), cardinality='1*')
contrat_exclusif = SubjectRelation('Personne', cardinality='??')
class Service(EntityType):
fabrique_par = SubjectRelation('Personne', cardinality='1*')
......
......@@ -25,7 +25,7 @@ from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.mttransforms import HAS_TAL
from cubicweb.entities import fetch_config
from cubicweb.uilib import soup2xhtml
from cubicweb.schema import RQLVocabularyConstraint
class EntityTC(CubicWebTC):
......@@ -239,7 +239,7 @@ class EntityTC(CubicWebTC):
self.assertEqual(n.cw_related_rql('evaluee', role='object',
targettypes=('Societe', 'Personne')),
"Any X,AA ORDERBY AB DESC WHERE E eid %(x)s, X evaluee E, "
"X is IN('Personne', 'Societe'), X nom AA, "
"X is IN(Personne, Societe), X nom AA, "
"X modification_date AB")
Personne.fetch_attrs, Personne.cw_fetch_order = fetch_config(('nom', ))
# XXX
......@@ -279,7 +279,7 @@ class EntityTC(CubicWebTC):
user = self.request().user
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'WHERE NOT A use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
def test_unrelated_rql_security_1_user(self):
......@@ -289,23 +289,23 @@ class EntityTC(CubicWebTC):
user = req.user
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'WHERE NOT A use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0)
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'WHERE NOT A use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, '
'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser')
'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), A is CWUser')
def test_unrelated_rql_security_1_anon(self):
self.login('anon')
user = self.request().user
rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
'WHERE NOT ZZ use_email O, S eid %(x)s, '
'WHERE NOT A use_email O, S eid %(x)s, '
'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, '
'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser')
'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), A is CWUser')
def test_unrelated_rql_security_2(self):
email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
......@@ -365,6 +365,76 @@ class EntityTC(CubicWebTC):
'NOT S identity O, NOT (S connait AD, AD nom "toto"), '
'EXISTS(S travaille AE, AE nom "tutu")')
def test_unrelated_rql_s_linkto_s(self):
req = self.request()
person = self.vreg['etypes'].etype_class('Personne')(req)
self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
soc = req.create_entity('Societe', nom=u'logilab')
lt_infos = {('actionnaire', 'subject'): [soc.eid]}
rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject',
lt_infos=lt_infos)
self.assertEqual(u'Any O ORDERBY O WHERE O is Personne, '
u'EXISTS(AA eid %(SOC)s, O actionnaire AA)', rql)
self.assertEqual({'SOC': soc.eid}, args)
def test_unrelated_rql_s_linkto_o(self):
req = self.request()
person = self.vreg['etypes'].etype_class('Personne')(req)
self.vreg['etypes'].etype_class('Societe').fetch_attrs = ()
soc = req.create_entity('Societe', nom=u'logilab')
lt_infos = {('contrat_exclusif', 'object'): [soc.eid]}
rql, args = person.cw_unrelated_rql('actionnaire', 'Societe', 'subject',
lt_infos=lt_infos)
self.assertEqual(u'Any O ORDERBY O WHERE NOT A actionnaire O, '
u'O is Societe, NOT EXISTS(O eid %(O)s), '
u'A is Personne', rql)
self.assertEqual({'O': soc.eid}, args)
def test_unrelated_rql_o_linkto_s(self):
req = self.request()
soc = self.vreg['etypes'].etype_class('Societe')(req)
self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
person = req.create_entity('Personne', nom=u'florent')
lt_infos = {('contrat_exclusif', 'subject'): [person.eid]}
rql, args = soc.cw_unrelated_rql('actionnaire', 'Personne', 'object',
lt_infos=lt_infos)
self.assertEqual(u'Any S ORDERBY S WHERE NOT S actionnaire A, '
u'S is Personne, NOT EXISTS(S eid %(S)s), '
u'A is Societe', rql)
self.assertEqual({'S': person.eid}, args)
def test_unrelated_rql_o_linkto_o(self):
req = self.request()
soc = self.vreg['etypes'].etype_class('Societe')(req)
self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
person = req.create_entity('Personne', nom=u'florent')
lt_infos = {('actionnaire', 'object'): [person.eid]}
rql, args = soc.cw_unrelated_rql('dirige', 'Personne', 'object',
lt_infos=lt_infos)
self.assertEqual(u'Any S ORDERBY S WHERE NOT S dirige A, '
u'S is Personne, EXISTS(S eid %(S)s), '
u'A is Societe', rql)
self.assertEqual({'S': person.eid}, args)
def test_unrelated_rql_s_linkto_s_no_info(self):
req = self.request()
person = self.vreg['etypes'].etype_class('Personne')(req)
self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
soc = req.create_entity('Societe', nom=u'logilab')
rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject')
self.assertEqual(u'Any O ORDERBY O WHERE O is Personne', rql)
self.assertEqual({}, args)
def test_unrelated_rql_s_linkto_s_unused_info(self):
req = self.request()
person = self.vreg['etypes'].etype_class('Personne')(req)
self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
other_p = req.create_entity('Personne', nom=u'titi')
lt_infos = {('dirige', 'subject'): [other_p.eid]}
rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject',
lt_infos=lt_infos)
self.assertEqual(u'Any O ORDERBY O WHERE O is Personne', rql)
def test_unrelated_base(self):
req = self.request()
p = req.create_entity('Personne', nom=u'di mascio', prenom=u'adrien')
......
......@@ -1107,8 +1107,8 @@ class RelationField(Field):
done = set()
res = []
entity = form.edited_entity
for entity in entity.unrelated(self.name, targettype,
self.role, limit).entities():
for entity in entity.unrelated(self.name, targettype, self.role, limit,
lt_infos=form.linked_to).entities():
if entity.eid in done:
continue
done.add(entity.eid)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment