Newer
Older
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-compound entity's classes
This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.
Assumptions:
* an entity may belong to one and only one container
* an entity may be both a container an contained
.. _container: https://www.cubicweb.org/project/cubicweb-container
"""
from functools import wraps
from warnings import warn
from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance, partial_relation_possible
from cubicweb.entities.adapters import ITreeAdapter

Denis Laxalde
committed
def skip_meta(func):
"""Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets
including "meta" objects that should be implicitely ignored.

Denis Laxalde
committed
@wraps(func)
def wrapped(*args, **kwargs):
kwargs['skiprtypes'] = frozenset(
set(kwargs.get('skiprtypes', ())) | META_RTYPES
| WORKFLOW_RTYPES | SYSTEM_RTYPES)
kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ()))
return func(*args, **kwargs)
return wrapped
class CompositeGraph(object):
"""Represent a graph of entity types related through composite relations.
A `CompositeGraph` can be used to iterate on schema objects through
`parent_relations`/`child_relations` methods as well as on entities through
`parent_related`/`child_related` methods.
@skip_meta
def __init__(self, schema, skiprtypes=(), skipetypes=()):
self.schema = schema
self.skiprtypes = skiprtypes
self.skipetypes = skipetypes
def parent_relations(self, etype):
"""Yield composite relation information items walking the graph
upstream from `etype`.
These items are `(rtype, role), parents` where `parents` is a list of
possible parent entity types reachable through `(rtype, role)`
relation.
"""
return self._composite_relations(etype, topdown=False)
def child_relations(self, etype):
"""Yield composite relation information items walking the graph
downstream from `etype`.
These items are `(rtype, role), children` where `children` is a list of
possible child entity types reachable through `(rtype, role)` relation.
"""
return self._composite_relations(etype, topdown=True)
def _composite_relations(self, etype, topdown):
"""Yield `(rtype, role), etypes` values corresponding to arcs of the
graph of compositely-related entity types reachable from a `etype`
entity type. `etypes` is a list of possible entity types reachable
through `(rtype, role)` relation "upstream" (resp. "downstream") if
`topdown` is True (resp. False).
"""
eschema = self.schema[etype]
return
for rschema, teschemas, role in eschema.relation_definitions():
if rschema.meta or rschema in self.skiprtypes:
composite_role = role if topdown else neg_role(role)
relation, children = (rschema.type, role), []
for target in teschemas:
if target in self.skipetypes:
continue
rdef = rschema.role_rdef(eschema, target, role)
if rdef.composite != composite_role:
continue
children.append(target.type)
if children:
yield relation, children
def parent_structure(self, etype, _visited=None):
"""Return the parent structure of the graph from `etype`.
The structure is a dictionary mapping entity type in the graph with
root `etype` to relation information allowing to walk the graph
upstream from this entity type.
"""
if _visited is None:
_visited = set()
structure = {}
def update_structure(left, relation, right):
structure.setdefault(left, {}).setdefault(relation, []).append(right)
for (rtype, role), children in self.child_relations(etype):
for child in sorted(children):
update_structure(child, (rtype, neg_role(role)), etype)
if child in _visited:
continue
_visited.add(child)
for left, rels in self.parent_structure(child, _visited).iteritems():
for relation, rights in rels.iteritems():
for right in rights:
update_structure(left, relation, right)
return structure
def topdown_definition(self, parent):
"""Return the container definition of the composite graph from
`parent` entity.
This "definition" is a dictionary mapping entity types of the
composite graph reachable from `parent` entity type to a set of
`(rtype, role)` where `role` refers to the upstream entity type in
`rtype` relation walking towards the top-level `parent` entity type.
"""
return self._definition(parent, True)
def bottomup_definition(self, parent):
"""Return the container definition of the composite graph from
`parent` entity.
This "definition" is a dictionary mapping entity types of the
composite graph reachable from `parent` entity type to a set of
`(rtype, role)` where `role` refers to the downstream entity type in
`rtype` relation walking downwards from the top-level `parent` entity
type.
"""
return self._definition(parent, False)
def _definition(self, parent, topdown, _visited=None):
"""Return a dict {etype: [(rtype, role)]} with structural relations of
the composite graph starting from `parent` entity type and walking the
graph either downstream or upstream depending on `topdown` being True
or False.
"""
if _visited is None:
_visited = set()
defn = {}
for (rtype, role), children in self.child_relations(parent):
for child in sorted(children):
defn.setdefault(child, set()).add((rtype, role if topdown else neg_role(role)))
if child in _visited:
continue
_visited.add(child)
for etype, relations in self._definition(
child, topdown, _visited=_visited).iteritems():
defn.setdefault(etype, set()).update(relations)
return defn
def parent_related(self, entity):
"""Yield information items on entities related to `entity` through
composite relations walking the graph upstream from `entity`.
These items are tuples `(child, (rtype, role), parent)` where `role` is
the role of `child` entity in `rtype` relation with `parent`.
"""
return self._composite_related(entity, False)
def child_related(self, entity):
"""Yield information items on entities related to `entity` through
composite relations walking the graph downstream from `entity`.
These items are tuples `(parent, (rtype, role), child)` where `role` is
the role of `parent` entity in `rtype` relation with `child`.
"""
return self._composite_related(entity, True)
def _composite_related(self, entity, topdown, _visited=None):
"""Yield arcs of the graph of compositely-related entities reachable
An "arc" is a tuple `(l_entity, (rtype, role), r_entity)` where
`l_entity` is a "parent" (resp. "child") entity when `topdown` is True
(resp. False) and, conversely, `r_entity` is the "child" (resp.
"parent") entity. `role` is always the role of `l_entity` in `rtype`
relation.
"""

Denis Laxalde
committed
if _visited is None:
_visited = set()
for (rtype, role), targettypes in self._composite_relations(
entity.cw_etype, topdown):
rset = entity.related(rtype, role=role, targettypes=targettypes)
for target in rset.entities():
yield entity, (rtype, role), target
if target.eid in _visited:

Denis Laxalde
committed
continue
_visited.add(target.eid)
for x in self._composite_related(target, topdown, _visited):
yield x
def copy_entity(original, **attributes):
"""Return a copy of an entity.
Only attributes and non-composite relations are copied (relying on
`entity.copy_relations()` for the latter).
"""
attrs = attributes.copy()
original.complete()
for rschema in original.e_schema.subject_relations():
if rschema.final and not rschema.meta:
attr = rschema.type
attrs.setdefault(attr, original.cw_attr_value(attr))
clone = original._cw.create_entity(original.cw_etype, **attrs)
clone.copy_relations(original.eid)
return clone
class IContainer(EntityAdapter):
"""Abstract adapter for entities which are a container root."""
__abstract__ = True
__regid__ = 'IContainer'
@classmethod
def build_class(cls, etype):
selector = is_instance(etype)
return type(etype + 'IContainer', (cls,),
{'__select__': selector})
@property
def container(self):
"""For the sake of consistency with `IContained`, return the container to which this entity
belongs (i.e. the entity wrapped by this adapter).
"""
return self.entity
@property
def parent(self):
"""Returns the direct parent entity : always None in the case of a container.
"""
return None
class IClonableAdapter(EntityAdapter):
"""Adapter for entity cloning.
Concrete classes should specify `rtype` (and possible `role`) class
attribute to something like `clone_of` depending on application schema.
"""
__regid__ = 'IClonable'
__abstract__ = True
__select__ = partial_relation_possible()
rtype, role = None, 'object' # relation between the clone and the original.
skiprtypes = ()
skipetypes = ()
clone_relations = {} # registered relation type and role of the original entity.
@classmethod
def __registered__(cls, *args):
assert cls.rtype not in cls.clone_relations, '{} already registered for {}'.format(
cls.__name__, cls.rtype)
cls.clone_relations[cls.rtype] = cls.role
return super(IClonableAdapter, cls).__registered__(*args)
@property
def graph(self):
"""The composite graph associated with this adapter."""
return CompositeGraph(self._cw.vreg.schema,
skiprtypes=self.skiprtypes,
skipetypes=self.skipetypes)
def clone_into(self, clone):
"""Recursivily clone the container graph of this entity into `clone`."""
assert clone.cw_etype == self.entity.cw_etype, \
"clone entity type {} does not match with original's {}".format(
clone.cw_etype, self.entity.cw_etype)
clone.copy_relations(self.entity.eid)
clones = {self.entity: clone}
for parent, (rtype, parent_role), child in self.graph.child_related(self.entity):
rel = rtype if parent_role == 'object' else 'reverse_' + rtype
kwargs = {rel: clones[parent]}
clone = clones.get(child)
if clone is not None:
clone.cw_set(**kwargs)
else:
clones[child] = copy_entity(child, **kwargs)
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
class IContained(EntityAdapter):
"""Abstract adapter for entities which are part of a container.
This default implementation is purely computational and doesn't rely on any additional data in
the model, beside a relation to go up to the direct parent.
"""
__abstract__ = True
__regid__ = 'IContained'
_classes = {}
parent_relations = None # set this to a set of (rtype, role) in concret classes
@classmethod
def register_class(cls, vreg, etype, parent_relations):
contained_cls = cls.build_class(etype, parent_relations)
if contained_cls is not None:
vreg.register(contained_cls)
@classmethod
def build_class(cls, etype, parent_relations):
# check given parent relations
for parent_relation in parent_relations:
assert (isinstance(parent_relation, tuple)
and len(parent_relation) == 2
and parent_relation[-1] in ('subject', 'object')), parent_relation
# use already created class if any
if etype in cls._classes:
contained_cls = cls._classes[etype]
# ensure registered class is the same at the one that would be generated
assert contained_cls.__bases__ == (cls,)
contained_cls.parent_relations |= parent_relations
return None
# else generate one
else:
selector = is_instance(etype)
contained_cls = type(etype + 'IContained', (cls,),
{'__select__': selector,
'parent_relations': parent_relations})
cls._classes[etype] = contained_cls
return contained_cls
@property
def container(self):
"""Return the container to which this entity belongs, or None."""
parent_relation = self.parent_relation()
if parent_relation is None:
# not yet attached to a parent
return None
parent = self.entity.related(*parent_relation, entities=True)[0]
if parent.cw_adapt_to('IContainer'):
return parent
return parent.cw_adapt_to('IContained').container
@property
def parent(self):
"""Returns the direct parent entity if any, else None (not yet linked to our parent).
"""
parent_relation = self.parent_relation()
if parent_relation is None:
return None # not yet linked to our parent
parent_rset = self.entity.related(*parent_relation)
if parent_rset:
return parent_rset.one()
return None
def parent_relation(self):
"""Return the relation used to attach this entity to its parent, or None if no parent is set
yet.
"""
for parent_relation in self.parent_relations:
parents = self.entity.related(*parent_relation)
assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
if parents:
return parent_relation
return None

Denis Laxalde
committed
@skip_meta
def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return the container structure with `etype` as root entity.
This structure is a dictionary with entity types as keys and `(relation
type, role)` as values. These entity types are reachable through composite
relations from the root <etype>. Each key gives the name of an entity
type, associated to a list of relation/role allowing to access to its
parent (which may be the container or a contained).
"""
graph = CompositeGraph(schema, skiprtypes=skiprtypes, skipetypes=skipetypes)
return graph.bottomup_definition(etype)

Denis Laxalde
committed
@skip_meta
def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return a dictionary {etype: [(relation type, role)]} which are
reachable through composite relations from the root <etype>. Each key
gives the name of an entity type, associated to a list of relation/role
allowing to access to its children (which is expected to be a contained).
"""
contained = {}
if etype not in schema:
# Would occur, e.g., during migration.
warn('%s not found in schema, cannot build a container' % etype,
RuntimeWarning)
return {}
candidates = [schema[etype]]
while candidates:
eschema = candidates.pop()
assert eschema not in contained, (eschema.type, contained)
contained[eschema.type] = set()
for rschema, _, role in eschema.relation_definitions():
if rschema.meta or rschema in skiprtypes:
continue
target_role = neg_role(role)
for rdef in rschema.rdefs.itervalues():
if rdef.composite != role:
continue
target = getattr(rdef, target_role)
if target in skipetypes:
continue
if target not in contained:
candidates.append(target)
contained[eschema.type].add((rschema.type, role))
candidates = list(set(candidates))
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
return contained
class IContainedToITree(ITreeAdapter):
"""Map IContained adapters to ITree, additionaly configured with a list of relations leading to
contained's children
"""
children_relations = None # list of (relation type, role) to get entity's children
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
@property
def tree_relation(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[0]
@property
def child_role(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[1]
def children_rql(self):
# XXX may have different shapes
return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
for rel, role in self.children_relations)
def _children(self, entities=True):
if entities:
res = []
else:
res = ResultSet([], '')
res.req = self._cw
for rel, role in self.children_relations:
res += self.entity.related(rel, role, entities=entities)
return res
def different_type_children(self, entities=True):
"""Return children entities of different type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype != etype]
return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)
def same_type_children(self, entities=True):
"""Return children entities of the same type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype == etype]
return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)
def children(self, entities=True, sametype=False):
"""Return children entities.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
if sametype:
return self.same_type_children(entities)
else:
return self._children(entities)
class IContainerToITree(IContainedToITree):
"""Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
parent as None
"""
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
def parent(self):
return None
def registration_callback(vreg):
vreg.register_all(globals().values(), __name__)
if vreg.config.mode == 'test':
IClonableAdapter.clone_relations.clear()