Newer
Older
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-compound entity's classes
This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.
Assumptions:
* an entity may belong to one and only one container
* an entity may be both a container an contained
.. _container: https://www.cubicweb.org/project/cubicweb-container
"""
from functools import wraps
from warnings import warn
from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance, partial_relation_possible
from cubicweb.entities.adapters import ITreeAdapter

Denis Laxalde
committed
def skip_meta(func):
"""Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets
including "meta" objects that should be implicitely ignored.

Denis Laxalde
committed
@wraps(func)
def wrapped(*args, **kwargs):
kwargs['skiprtypes'] = frozenset(
set(kwargs.get('skiprtypes', ())) | META_RTYPES
| WORKFLOW_RTYPES | SYSTEM_RTYPES)
kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ()))
return func(*args, **kwargs)
return wrapped
class CompositeGraph(object):
"""Represent a graph of entity types related through composite relations.
A `CompositeGraph` can be used to iterate on schema objects through
`parent_relations`/`child_relations` methods as well as on entities through
`parent_related`/`child_related` methods.
@skip_meta
def __init__(self, schema, skiprtypes=(), skipetypes=()):
self.schema = schema
self.skiprtypes = skiprtypes
self.skipetypes = skipetypes
def parent_relations(self, etype):
"""Yield composite relation information items walking the graph
upstream from `etype`.
These items are `(rtype, role), parents` where `parents` is a list of
possible parent entity types reachable through `(rtype, role)`
relation.
"""
return self._composite_relations(etype, topdown=False)
def child_relations(self, etype):
"""Yield composite relation information items walking the graph
downstream from `etype`.
These items are `(rtype, role), children` where `children` is a list of
possible child entity types reachable through `(rtype, role)` relation.
"""
return self._composite_relations(etype, topdown=True)
def _composite_relations(self, etype, topdown):
"""Yield `(rtype, role), etypes` values corresponding to arcs of the
graph of compositely-related entity types reachable from a `etype`
entity type. `etypes` is a list of possible entity types reachable
through `(rtype, role)` relation "upstream" (resp. "downstream") if
`topdown` is True (resp. False).
"""
eschema = self.schema[etype]
return
for rschema, teschemas, role in eschema.relation_definitions():
if rschema.meta or rschema in self.skiprtypes:
composite_role = role if topdown else neg_role(role)
relation, children = (rschema.type, role), []
for target in teschemas:
if target in self.skipetypes:
continue
rdef = rschema.role_rdef(eschema, target, role)
if rdef.composite != composite_role:
continue
children.append(target.type)
if children:
yield relation, children
def parent_structure(self, etype, _visited=None):
"""Return the parent structure of the graph from `etype`.
The structure is a dictionary mapping entity type in the graph with
root `etype` to relation information allowing to walk the graph
upstream from this entity type.
"""
if _visited is None:
_visited = set()
structure = {}
def update_structure(left, relation, right):
structure.setdefault(left, {}).setdefault(relation, []).append(right)
for (rtype, role), children in self.child_relations(etype):
for child in sorted(children):
update_structure(child, (rtype, neg_role(role)), etype)
if child in _visited:
continue
_visited.add(child)
for left, rels in self.parent_structure(child, _visited).iteritems():
for relation, rights in rels.iteritems():
for right in rights:
update_structure(left, relation, right)
return structure
def parent_related(self, entity):
"""Yield information items on entities related to `entity` through
composite relations walking the graph upstream from `entity`.
These items are tuples `(child, (rtype, role), parent)` where `role` is
the role of `child` entity in `rtype` relation with `parent`.
"""
return self._composite_related(entity, False)
def child_related(self, entity):
"""Yield information items on entities related to `entity` through
composite relations walking the graph downstream from `entity`.
These items are tuples `(parent, (rtype, role), child)` where `role` is
the role of `parent` entity in `rtype` relation with `child`.
"""
return self._composite_related(entity, True)
def _composite_related(self, entity, topdown, _visited=None):
"""Yield arcs of the graph of compositely-related entities reachable
An "arc" is a tuple `(l_entity, (rtype, role), r_entity)` where
`l_entity` is a "parent" (resp. "child") entity when `topdown` is True
(resp. False) and, conversely, `r_entity` is the "child" (resp.
"parent") entity. `role` is always the role of `l_entity` in `rtype`
relation.
"""

Denis Laxalde
committed
if _visited is None:
_visited = set()
for (rtype, role), targettypes in self._composite_relations(
entity.cw_etype, topdown):
rset = entity.related(rtype, role=role, targettypes=targettypes)
for target in rset.entities():
yield entity, (rtype, role), target
if target.eid in _visited:

Denis Laxalde
committed
continue
_visited.add(target.eid)
for x in self._composite_related(target, topdown, _visited):
yield x
def copy_entity(original, **attributes):
"""Return a copy of an entity.
Only attributes and non-composite relations are copied (relying on
`entity.copy_relations()` for the latter).
"""
attrs = attributes.copy()
original.complete()
for rschema in original.e_schema.subject_relations():
if rschema.final and not rschema.meta:
attr = rschema.type
attrs.setdefault(attr, original.cw_attr_value(attr))
clone = original._cw.create_entity(original.cw_etype, **attrs)
clone.copy_relations(original.eid)
return clone
class IContainer(EntityAdapter):
"""Abstract adapter for entities which are a container root."""
__abstract__ = True
__regid__ = 'IContainer'
@classmethod
def build_class(cls, etype):
selector = is_instance(etype)
return type(etype + 'IContainer', (cls,),
{'__select__': selector})
@property
def container(self):
"""For the sake of consistency with `IContained`, return the container to which this entity
belongs (i.e. the entity wrapped by this adapter).
"""
return self.entity
@property
def parent(self):
"""Returns the direct parent entity : always None in the case of a container.
"""
return None
class IClonableAdapter(EntityAdapter):
"""Adapter for entity cloning.
Concrete classes should specify `rtype` (and possible `role`) class
attribute to something like `clone_of` depending on application schema.
"""
__regid__ = 'IClonable'
__abstract__ = True
__select__ = partial_relation_possible()
rtype, role = None, 'object' # relation between the clone and the original.
skiprtypes = ()
skipetypes = ()
clone_relations = {} # registered relation type and role of the original entity.
@classmethod
def __registered__(cls, *args):
assert cls.rtype not in cls.clone_relations, '{} already registered for {}'.format(
cls.__name__, cls.rtype)
cls.clone_relations[cls.rtype] = cls.role
return super(IClonableAdapter, cls).__registered__(*args)
@property
def graph(self):
"""The composite graph associated with this adapter."""
return CompositeGraph(self._cw.vreg.schema,
skiprtypes=self.skiprtypes,
skipetypes=self.skipetypes)
def clone_into(self, clone):
"""Recursivily clone the container graph of this entity into `clone`."""
assert clone.cw_etype == self.entity.cw_etype, \
"clone entity type {} does not match with original's {}".format(
clone.cw_etype, self.entity.cw_etype)
clone.copy_relations(self.entity.eid)
clones = {self.entity: clone}
for parent, (rtype, parent_role), child in self.graph.child_related(self.entity):
rel = rtype if parent_role == 'object' else 'reverse_' + rtype
kwargs = {rel: clones[parent]}
clone = clones.get(child)
if clone is not None:
clone.cw_set(**kwargs)
else:
clones[child] = copy_entity(child, **kwargs)
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
class IContained(EntityAdapter):
"""Abstract adapter for entities which are part of a container.
This default implementation is purely computational and doesn't rely on any additional data in
the model, beside a relation to go up to the direct parent.
"""
__abstract__ = True
__regid__ = 'IContained'
_classes = {}
parent_relations = None # set this to a set of (rtype, role) in concret classes
@classmethod
def register_class(cls, vreg, etype, parent_relations):
contained_cls = cls.build_class(etype, parent_relations)
if contained_cls is not None:
vreg.register(contained_cls)
@classmethod
def build_class(cls, etype, parent_relations):
# check given parent relations
for parent_relation in parent_relations:
assert (isinstance(parent_relation, tuple)
and len(parent_relation) == 2
and parent_relation[-1] in ('subject', 'object')), parent_relation
# use already created class if any
if etype in cls._classes:
contained_cls = cls._classes[etype]
# ensure registered class is the same at the one that would be generated
assert contained_cls.__bases__ == (cls,)
contained_cls.parent_relations |= parent_relations
return None
# else generate one
else:
selector = is_instance(etype)
contained_cls = type(etype + 'IContained', (cls,),
{'__select__': selector,
'parent_relations': parent_relations})
cls._classes[etype] = contained_cls
return contained_cls
@property
def container(self):
"""Return the container to which this entity belongs, or None."""
parent_relation = self.parent_relation()
if parent_relation is None:
# not yet attached to a parent
return None
parent = self.entity.related(*parent_relation, entities=True)[0]
if parent.cw_adapt_to('IContainer'):
return parent
return parent.cw_adapt_to('IContained').container
@property
def parent(self):
"""Returns the direct parent entity if any, else None (not yet linked to our parent).
"""
parent_relation = self.parent_relation()
if parent_relation is None:
return None # not yet linked to our parent
parent_rset = self.entity.related(*parent_relation)
if parent_rset:
return parent_rset.one()
return None
def parent_relation(self):
"""Return the relation used to attach this entity to its parent, or None if no parent is set
yet.
"""
for parent_relation in self.parent_relations:
parents = self.entity.related(*parent_relation)
assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
if parents:
return parent_relation
return None

Denis Laxalde
committed
@skip_meta
def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return the container structure with `etype` as root entity.
This structure is a dictionary with entity types as keys and `(relation
type, role)` as values. These entity types are reachable through composite
relations from the root <etype>. Each key gives the name of an entity
type, associated to a list of relation/role allowing to access to its
parent (which may be the container or a contained).
"""
graph = CompositeGraph(schema, skiprtypes=skiprtypes, skipetypes=skipetypes)
return dict((child, set(relinfo))
for child, relinfo in graph.parent_structure(etype).iteritems())

Denis Laxalde
committed
@skip_meta
def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return a dictionary {etype: [(relation type, role)]} which are
reachable through composite relations from the root <etype>. Each key
gives the name of an entity type, associated to a list of relation/role
allowing to access to its children (which is expected to be a contained).
"""
contained = {}
if etype not in schema:
# Would occur, e.g., during migration.
warn('%s not found in schema, cannot build a container' % etype,
RuntimeWarning)
return {}
candidates = [schema[etype]]
while candidates:
eschema = candidates.pop()
assert eschema not in contained, (eschema.type, contained)
contained[eschema.type] = set()
for rschema, _, role in eschema.relation_definitions():
if rschema.meta or rschema in skiprtypes:
continue
target_role = neg_role(role)
for rdef in rschema.rdefs.itervalues():
if rdef.composite != role:
continue
target = getattr(rdef, target_role)
if target in skipetypes:
continue
if target not in contained:
candidates.append(target)
contained[eschema.type].add((rschema.type, role))
candidates = list(set(candidates))
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
return contained
class IContainedToITree(ITreeAdapter):
"""Map IContained adapters to ITree, additionaly configured with a list of relations leading to
contained's children
"""
children_relations = None # list of (relation type, role) to get entity's children
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
@property
def tree_relation(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[0]
@property
def child_role(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[1]
def children_rql(self):
# XXX may have different shapes
return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
for rel, role in self.children_relations)
def _children(self, entities=True):
if entities:
res = []
else:
res = ResultSet([], '')
res.req = self._cw
for rel, role in self.children_relations:
res += self.entity.related(rel, role, entities=entities)
return res
def different_type_children(self, entities=True):
"""Return children entities of different type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype != etype]
return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)
def same_type_children(self, entities=True):
"""Return children entities of the same type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype == etype]
return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)
def children(self, entities=True, sametype=False):
"""Return children entities.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
if sametype:
return self.same_type_children(entities)
else:
return self._children(entities)
class IContainerToITree(IContainedToITree):
"""Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
parent as None
"""
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
def parent(self):
return None
def registration_callback(vreg):
vreg.register_all(globals().values(), __name__)
if vreg.config.mode == 'test':
IClonableAdapter.clone_relations.clear()