# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr -- mailto:contact@logilab.fr # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with this program. If not, see <http://www.gnu.org/licenses/>. """cubicweb-compound entity's classes This module essentially provides adapters to handle a structure of composite entities similarly to the container_ cube. The idea is to provide an API allowing access to a root entity (the container) from any contained entities. Assumptions: * an entity may belong to one and only one container * an entity may be both a container an contained .. _container: https://www.cubicweb.org/project/cubicweb-container """ from functools import wraps from warnings import warn from cubicweb import neg_role from cubicweb.rset import ResultSet from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES from cubicweb.view import EntityAdapter from cubicweb.predicates import is_instance, partial_relation_possible from cubicweb.entities.adapters import ITreeAdapter def skip_meta(func): """Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets including "meta" objects that should be implicitely ignored. """ @wraps(func) def wrapped(*args, **kwargs): kwargs['skiprtypes'] = frozenset( set(kwargs.get('skiprtypes', ())) | META_RTYPES | WORKFLOW_RTYPES | SYSTEM_RTYPES) kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ())) return func(*args, **kwargs) return wrapped class CompositeGraph(object): """Represent a graph of entity types related through composite relations. A `CompositeGraph` can be used to iterate on schema objects through `parent_relations`/`child_relations` methods as well as on entities through `parent_related`/`child_related` methods. """ @skip_meta def __init__(self, schema, skiprtypes=(), skipetypes=()): self.schema = schema self.skiprtypes = skiprtypes self.skipetypes = skipetypes def parent_relations(self, etype): """Yield composite relation information items walking the graph upstream from `etype`. These items are `(rtype, role), parents` where `parents` is a list of possible parent entity types reachable through `(rtype, role)` relation. """ return self._composite_relations(etype, topdown=False) def child_relations(self, etype): """Yield composite relation information items walking the graph downstream from `etype`. These items are `(rtype, role), children` where `children` is a list of possible child entity types reachable through `(rtype, role)` relation. """ return self._composite_relations(etype, topdown=True) def _composite_relations(self, etype, topdown): """Yield `(rtype, role), etypes` values corresponding to arcs of the graph of compositely-related entity types reachable from a `etype` entity type. `etypes` is a list of possible entity types reachable through `(rtype, role)` relation "upstream" (resp. "downstream") if `topdown` is True (resp. False). """ try: eschema = self.schema[etype] except KeyError: return for rschema, teschemas, role in eschema.relation_definitions(): if rschema.meta or rschema in self.skiprtypes: continue composite_role = role if topdown else neg_role(role) relation, children = (rschema.type, role), [] for target in teschemas: if target in self.skipetypes: continue rdef = rschema.role_rdef(eschema, target, role) if rdef.composite != composite_role: continue children.append(target.type) if children: yield relation, children def parent_structure(self, etype, _visited=None): """Return the parent structure of the graph from `etype`. The structure is a dictionary mapping entity type in the graph with root `etype` to relation information allowing to walk the graph upstream from this entity type. """ if _visited is None: _visited = set() structure = {} def update_structure(left, relation, right): structure.setdefault(left, {}).setdefault(relation, []).append(right) for (rtype, role), children in self.child_relations(etype): for child in sorted(children): update_structure(child, (rtype, neg_role(role)), etype) if child in _visited: continue _visited.add(child) for left, rels in self.parent_structure(child, _visited).iteritems(): for relation, rights in rels.iteritems(): for right in rights: update_structure(left, relation, right) return structure def parent_related(self, entity): """Yield information items on entities related to `entity` through composite relations walking the graph upstream from `entity`. These items are tuples `(child, (rtype, role), parent)` where `role` is the role of `child` entity in `rtype` relation with `parent`. """ return self._composite_related(entity, False) def child_related(self, entity): """Yield information items on entities related to `entity` through composite relations walking the graph downstream from `entity`. These items are tuples `(parent, (rtype, role), child)` where `role` is the role of `parent` entity in `rtype` relation with `child`. """ return self._composite_related(entity, True) def _composite_related(self, entity, topdown, _visited=None): """Yield arcs of the graph of compositely-related entities reachable from `entity`. An "arc" is a tuple `(l_entity, (rtype, role), r_entity)` where `l_entity` is a "parent" (resp. "child") entity when `topdown` is True (resp. False) and, conversely, `r_entity` is the "child" (resp. "parent") entity. `role` is always the role of `l_entity` in `rtype` relation. """ if _visited is None: _visited = set() for (rtype, role), targettypes in self._composite_relations( entity.cw_etype, topdown): rset = entity.related(rtype, role=role, targettypes=targettypes) for target in rset.entities(): yield entity, (rtype, role), target if target.eid in _visited: continue _visited.add(target.eid) for x in self._composite_related(target, topdown, _visited): yield x def copy_entity(original, **attributes): """Return a copy of an entity. Only attributes and non-composite relations are copied (relying on `entity.copy_relations()` for the latter). """ attrs = attributes.copy() original.complete() for rschema in original.e_schema.subject_relations(): if rschema.final and not rschema.meta: attr = rschema.type attrs.setdefault(attr, original.cw_attr_value(attr)) clone = original._cw.create_entity(original.cw_etype, **attrs) clone.copy_relations(original.eid) return clone class IContainer(EntityAdapter): """Abstract adapter for entities which are a container root.""" __abstract__ = True __regid__ = 'IContainer' @classmethod def build_class(cls, etype): selector = is_instance(etype) return type(etype + 'IContainer', (cls,), {'__select__': selector}) @property def container(self): """For the sake of consistency with `IContained`, return the container to which this entity belongs (i.e. the entity wrapped by this adapter). """ return self.entity @property def parent(self): """Returns the direct parent entity : always None in the case of a container. """ return None class IClonableAdapter(EntityAdapter): """Adapter for entity cloning. Concrete classes should specify `rtype` (and possible `role`) class attribute to something like `clone_of` depending on application schema. """ __regid__ = 'IClonable' __abstract__ = True __select__ = partial_relation_possible() rtype, role = None, 'object' # relation between the clone and the original. skiprtypes = () skipetypes = () clone_relations = {} # registered relation type and role of the original entity. @classmethod def __registered__(cls, *args): assert cls.rtype not in cls.clone_relations, '{} already registered for {}'.format( cls.__name__, cls.rtype) cls.clone_relations[cls.rtype] = cls.role return super(IClonableAdapter, cls).__registered__(*args) @property def graph(self): """The composite graph associated with this adapter.""" return CompositeGraph(self._cw.vreg.schema, skiprtypes=self.skiprtypes, skipetypes=self.skipetypes) def clone_into(self, clone): """Recursivily clone the container graph of this entity into `clone`.""" assert clone.cw_etype == self.entity.cw_etype, \ "clone entity type {} does not match with original's {}".format( clone.cw_etype, self.entity.cw_etype) clone.copy_relations(self.entity.eid) clones = {self.entity: clone} for parent, (rtype, parent_role), child in self.graph.child_related(self.entity): rel = rtype if parent_role == 'object' else 'reverse_' + rtype kwargs = {rel: clones[parent]} clone = clones.get(child) if clone is not None: clone.cw_set(**kwargs) else: clones[child] = copy_entity(child, **kwargs) class IContained(EntityAdapter): """Abstract adapter for entities which are part of a container. This default implementation is purely computational and doesn't rely on any additional data in the model, beside a relation to go up to the direct parent. """ __abstract__ = True __regid__ = 'IContained' _classes = {} parent_relations = None # set this to a set of (rtype, role) in concret classes @classmethod def register_class(cls, vreg, etype, parent_relations): contained_cls = cls.build_class(etype, parent_relations) if contained_cls is not None: vreg.register(contained_cls) @classmethod def build_class(cls, etype, parent_relations): # check given parent relations for parent_relation in parent_relations: assert (isinstance(parent_relation, tuple) and len(parent_relation) == 2 and parent_relation[-1] in ('subject', 'object')), parent_relation # use already created class if any if etype in cls._classes: contained_cls = cls._classes[etype] # ensure registered class is the same at the one that would be generated assert contained_cls.__bases__ == (cls,) contained_cls.parent_relations |= parent_relations return None # else generate one else: selector = is_instance(etype) contained_cls = type(etype + 'IContained', (cls,), {'__select__': selector, 'parent_relations': parent_relations}) cls._classes[etype] = contained_cls return contained_cls @property def container(self): """Return the container to which this entity belongs, or None.""" parent_relation = self.parent_relation() if parent_relation is None: # not yet attached to a parent return None parent = self.entity.related(*parent_relation, entities=True)[0] if parent.cw_adapt_to('IContainer'): return parent return parent.cw_adapt_to('IContained').container @property def parent(self): """Returns the direct parent entity if any, else None (not yet linked to our parent). """ parent_relation = self.parent_relation() if parent_relation is None: return None # not yet linked to our parent parent_rset = self.entity.related(*parent_relation) if parent_rset: return parent_rset.one() return None def parent_relation(self): """Return the relation used to attach this entity to its parent, or None if no parent is set yet. """ for parent_relation in self.parent_relations: parents = self.entity.related(*parent_relation) assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents) if parents: return parent_relation return None @skip_meta def structure_def(schema, etype, skiprtypes=(), skipetypes=()): """Return the container structure with `etype` as root entity. This structure is a dictionary with entity types as keys and `(relation type, role)` as values. These entity types are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its parent (which may be the container or a contained). """ graph = CompositeGraph(schema, skiprtypes=skiprtypes, skipetypes=skipetypes) return dict((child, set(relinfo)) for child, relinfo in graph.parent_structure(etype).iteritems()) @skip_meta def tree_def(schema, etype, skiprtypes=(), skipetypes=()): """Return a dictionary {etype: [(relation type, role)]} which are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its children (which is expected to be a contained). """ contained = {} if etype not in schema: # Would occur, e.g., during migration. warn('%s not found in schema, cannot build a container' % etype, RuntimeWarning) return {} candidates = [schema[etype]] while candidates: eschema = candidates.pop() assert eschema not in contained, (eschema.type, contained) contained[eschema.type] = set() for rschema, _, role in eschema.relation_definitions(): if rschema.meta or rschema in skiprtypes: continue target_role = neg_role(role) for rdef in rschema.rdefs.itervalues(): if rdef.composite != role: continue target = getattr(rdef, target_role) if target in skipetypes: continue if target not in contained: candidates.append(target) contained[eschema.type].add((rschema.type, role)) candidates = list(set(candidates)) return contained class IContainedToITree(ITreeAdapter): """Map IContained adapters to ITree, additionaly configured with a list of relations leading to contained's children """ children_relations = None # list of (relation type, role) to get entity's children @classmethod def build_class(cls, etype, children_relations): selector = is_instance(etype) return type(etype + 'ITree', (cls,), {'__select__': selector, 'children_relations': children_relations}) @property def tree_relation(self): parent_relation = self.entity.cw_adapt_to('IContained').parent_relation() return parent_relation[0] @property def child_role(self): parent_relation = self.entity.cw_adapt_to('IContained').parent_relation() return parent_relation[1] def children_rql(self): # XXX may have different shapes return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role) for rel, role in self.children_relations) def _children(self, entities=True): if entities: res = [] else: res = ResultSet([], '') res.req = self._cw for rel, role in self.children_relations: res += self.entity.related(rel, role, entities=entities) return res def different_type_children(self, entities=True): """Return children entities of different type as this entity. According to the `entities` parameter, return entity objects or the equivalent result set. """ res = self._children(entities) etype = self.entity.cw_etype if entities: return [e for e in res if e.cw_etype != etype] return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col) def same_type_children(self, entities=True): """Return children entities of the same type as this entity. According to the `entities` parameter, return entity objects or the equivalent result set. """ res = self._children(entities) etype = self.entity.cw_etype if entities: return [e for e in res if e.cw_etype == etype] return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col) def children(self, entities=True, sametype=False): """Return children entities. According to the `entities` parameter, return entity objects or the equivalent result set. """ if sametype: return self.same_type_children(entities) else: return self._children(entities) class IContainerToITree(IContainedToITree): """Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering parent as None """ @classmethod def build_class(cls, etype, children_relations): selector = is_instance(etype) return type(etype + 'ITree', (cls,), {'__select__': selector, 'children_relations': children_relations}) def parent(self): return None def registration_callback(vreg): vreg.register_all(globals().values(), __name__) if vreg.config.mode == 'test': IClonableAdapter.clone_relations.clear()