# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr -- mailto:contact@logilab.fr # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with this program. If not, see <http://www.gnu.org/licenses/>. """cubicweb-compound entity's classes This module essentially provides adapters to handle a structure of composite entities similarly to the container_ cube. The idea is to provide an API allowing access to a root entity (the container) from any contained entities. Assumptions: * an entity may belong to one and only one container * an entity may be both a container an contained .. _container: https://www.cubicweb.org/project/cubicweb-container """ from functools import wraps from warnings import warn from cubicweb import neg_role from cubicweb.rset import ResultSet from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES from cubicweb.view import EntityAdapter from cubicweb.predicates import is_instance, partial_relation_possible from cubicweb.entities.adapters import ITreeAdapter def skip_meta(func): """Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets including "meta" objects that should be implicitely ignored. """ @wraps(func) def wrapped(*args, **kwargs): kwargs['skiprtypes'] = frozenset( set(kwargs.get('skiprtypes', ())) | META_RTYPES | WORKFLOW_RTYPES | SYSTEM_RTYPES) kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ())) return func(*args, **kwargs) return wrapped class CompositeGraph(object): """Represent a graph of entity types related through composite relations. A `CompositeGraph` can be used to iterate on schema objects through `iter_schema` method as well as on entities through `iter_entities` method. """ @skip_meta def __init__(self, schema, skiprtypes=(), skipetypes=()): self.schema = schema self.skiprtypes = skiprtypes self.skipetypes = skipetypes def iter_schema(self, parent, topdown=True): """Yield `(rtype, role), children` values corresponding to arcs of the graph of compositely-related entity types reachable from an `parent` entity type. `children` is a list of possible entity types reachable through `(rtype, role)` relation. By default, the schema is walked from the container level to the content level. To walk from the bottom level, set `topdown=False`. """ try: eschema = self.schema[parent] except KeyError: return for rschema, teschemas, role in eschema.relation_definitions(): if rschema.meta or rschema in self.skiprtypes: continue composite_role = role if topdown else neg_role(role) relation, children = (rschema.type, role), [] for target in teschemas: if target in self.skipetypes: continue rdef = rschema.role_rdef(eschema, target, role) if rdef.composite != composite_role: continue children.append(target.type) if children: yield relation, children def structure(self, parent, **kwargs): """Return a nested-dict structure obtained from walking the graph of schema objects from any `parent` entity type in the graph. """ graph = {} for relation, children in self.iter_schema(parent, **kwargs): for child in sorted(children): graph.setdefault(relation, {})[child] = self.structure(child, **kwargs) return graph def topdown_definition(self, parent): """Return the container definition of the composite graph from `parent` entity. This "definition" is a dictionary mapping entity types of the composite graph reachable from `parent` entity type to a set of `(rtype, role)` where `role` refers to the upstream entity type in `rtype` relation walking towards the top-level `parent` entity type. """ return self._definition(parent, topdown=True) def bottomup_definition(self, parent): """Return the container definition of the composite graph from `parent` entity. This "definition" is a dictionary mapping entity types of the composite graph reachable from `parent` entity type to a set of `(rtype, role)` where `role` refers to the downstream entity type in `rtype` relation walking downwards from the top-level `parent` entity type. """ return self._definition(parent, topdown=False) def _definition(self, parent, topdown=True, _visited=None): """Return a dict {etype: [(rtype, role)]} with structural relations of the composite graph starting from `parent` entity type and walking the graph either downstream or upstream depending on `topdown` being True or False. """ if _visited is None: _visited = set() defn = {} for (rtype, role), children in self.iter_schema(parent): for child in sorted(children): defn.setdefault(child, set()).add((rtype, role if topdown else neg_role(role))) if child in _visited: continue _visited.add(child) for etype, relations in self._definition(child, topdown=topdown).iteritems(): defn.setdefault(etype, set()).update(relations) return defn def iter_entities(self, parent, _visited=None): """Yield arcs of the graph of compositely-related entities reachable from `parent` entity. An "arc" is a tuple `(child entity, (rtype, role), parent entity)`. """ if _visited is None: _visited = set() for (rtype, role), children in self.iter_schema(parent.cw_etype): rset = parent.related(rtype, role=role, targettypes=children) for child in rset.entities(): yield child, (rtype, neg_role(role)), parent if child.eid in _visited: continue _visited.add(child.eid) for x in self.iter_entities(child, _visited): yield x def copy_entity(original, **attributes): """Return a copy of an entity. Only attributes and non-composite relations are copied (relying on `entity.copy_relations()` for the latter). """ attrs = attributes.copy() original.complete() for rschema in original.e_schema.subject_relations(): if rschema.final and not rschema.meta: attr = rschema.type attrs.setdefault(attr, original.cw_attr_value(attr)) clone = original._cw.create_entity(original.cw_etype, **attrs) clone.copy_relations(original.eid) return clone class IContainer(EntityAdapter): """Abstract adapter for entities which are a container root.""" __abstract__ = True __regid__ = 'IContainer' @classmethod def build_class(cls, etype): selector = is_instance(etype) return type(etype + 'IContainer', (cls,), {'__select__': selector}) @property def container(self): """For the sake of consistency with `IContained`, return the container to which this entity belongs (i.e. the entity wrapped by this adapter). """ return self.entity @property def parent(self): """Returns the direct parent entity : always None in the case of a container. """ return None class IClonableAdapter(EntityAdapter): """Adapter for entity cloning. Concrete classes should specify `rtype` (and possible `role`) class attribute to something like `clone_of` depending on application schema. """ __regid__ = 'IClonable' __abstract__ = True __select__ = partial_relation_possible() rtype, role = None, 'object' # relation between the clone and the original. skiprtypes = () skipetypes = () clone_relations = {} # registered relation type and role of the original entity. @classmethod def __registered__(cls, *args): assert cls.rtype not in cls.clone_relations, '{} already registered for {}'.format( cls.__name__, cls.rtype) cls.clone_relations[cls.rtype] = cls.role return super(IClonableAdapter, cls).__registered__(*args) @property def graph(self): """The composite graph associated with this adapter.""" return CompositeGraph(self._cw.vreg.schema, skiprtypes=self.skiprtypes, skipetypes=self.skipetypes) def clone_into(self, clone): """Recursivily clone the container graph of this entity into `clone`.""" assert clone.cw_etype == self.entity.cw_etype, \ "clone entity type {} does not match with original's {}".format( clone.cw_etype, self.entity.cw_etype) clone.copy_relations(self.entity.eid) clones = {self.entity: clone} for child, (rtype, role), parent in self.graph.iter_entities(self.entity): rel = rtype if role == 'subject' else 'reverse_' + rtype kwargs = {rel: clones[parent]} clone = clones.get(child) if clone is not None: clone.cw_set(**kwargs) else: clones[child] = copy_entity(child, **kwargs) class IContained(EntityAdapter): """Abstract adapter for entities which are part of a container. This default implementation is purely computational and doesn't rely on any additional data in the model, beside a relation to go up to the direct parent. """ __abstract__ = True __regid__ = 'IContained' _classes = {} parent_relations = None # set this to a set of (rtype, role) in concret classes @classmethod def register_class(cls, vreg, etype, parent_relations): contained_cls = cls.build_class(etype, parent_relations) if contained_cls is not None: vreg.register(contained_cls) @classmethod def build_class(cls, etype, parent_relations): # check given parent relations for parent_relation in parent_relations: assert (isinstance(parent_relation, tuple) and len(parent_relation) == 2 and parent_relation[-1] in ('subject', 'object')), parent_relation # use already created class if any if etype in cls._classes: contained_cls = cls._classes[etype] # ensure registered class is the same at the one that would be generated assert contained_cls.__bases__ == (cls,) contained_cls.parent_relations |= parent_relations return None # else generate one else: selector = is_instance(etype) contained_cls = type(etype + 'IContained', (cls,), {'__select__': selector, 'parent_relations': parent_relations}) cls._classes[etype] = contained_cls return contained_cls @property def container(self): """Return the container to which this entity belongs, or None.""" parent_relation = self.parent_relation() if parent_relation is None: # not yet attached to a parent return None parent = self.entity.related(*parent_relation, entities=True)[0] if parent.cw_adapt_to('IContainer'): return parent return parent.cw_adapt_to('IContained').container @property def parent(self): """Returns the direct parent entity if any, else None (not yet linked to our parent). """ parent_relation = self.parent_relation() if parent_relation is None: return None # not yet linked to our parent parent_rset = self.entity.related(*parent_relation) if parent_rset: return parent_rset.one() return None def parent_relation(self): """Return the relation used to attach this entity to its parent, or None if no parent is set yet. """ for parent_relation in self.parent_relations: parents = self.entity.related(*parent_relation) assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents) if parents: return parent_relation return None @skip_meta def structure_def(schema, etype, skiprtypes=(), skipetypes=()): """Return the container structure with `etype` as root entity. This structure is a dictionary with entity types as keys and `(relation type, role)` as values. These entity types are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its parent (which may be the container or a contained). """ graph = CompositeGraph(schema, skiprtypes=skiprtypes, skipetypes=skipetypes) return graph.bottomup_definition(etype) @skip_meta def tree_def(schema, etype, skiprtypes=(), skipetypes=()): """Return a dictionary {etype: [(relation type, role)]} which are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its children (which is expected to be a contained). """ return _relatives_def(schema, etype, skiprtypes=skiprtypes, skipetypes=skipetypes, fetch_parents=False) def _relatives_def(schema, etype, skiprtypes, skipetypes, fetch_parents=True): """Return a dictionary {etype: [(relation type, role)]} which are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its relatives: its parent if fetch_parents is true, or its children if fetch_parents is false. """ contained = {} if etype not in schema: # Would occur, e.g., during migration. warn('%s not found in schema, cannot build a container' % etype, RuntimeWarning) return {} candidates = [schema[etype]] while candidates: eschema = candidates.pop() if not fetch_parents: assert eschema not in contained, (eschema.type, contained) contained[eschema.type] = set() for rschema, _, role in eschema.relation_definitions(): if rschema.meta or rschema in skiprtypes: continue target_role = neg_role(role) for rdef in rschema.rdefs.itervalues(): if rdef.composite != role: continue target = getattr(rdef, target_role) if target in skipetypes: continue if target not in contained: candidates.append(target) if fetch_parents: assert target != etype, 'cycle to the container' contained.setdefault(target.type, set()).add((rschema.type, target_role)) else: contained[eschema.type].add((rschema.type, role)) if not fetch_parents: candidates = list(set(candidates)) return contained class IContainedToITree(ITreeAdapter): """Map IContained adapters to ITree, additionaly configured with a list of relations leading to contained's children """ children_relations = None # list of (relation type, role) to get entity's children @classmethod def build_class(cls, etype, children_relations): selector = is_instance(etype) return type(etype + 'ITree', (cls,), {'__select__': selector, 'children_relations': children_relations}) @property def tree_relation(self): parent_relation = self.entity.cw_adapt_to('IContained').parent_relation() return parent_relation[0] @property def child_role(self): parent_relation = self.entity.cw_adapt_to('IContained').parent_relation() return parent_relation[1] def children_rql(self): # XXX may have different shapes return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role) for rel, role in self.children_relations) def _children(self, entities=True): if entities: res = [] else: res = ResultSet([], '') res.req = self._cw for rel, role in self.children_relations: res += self.entity.related(rel, role, entities=entities) return res def different_type_children(self, entities=True): """Return children entities of different type as this entity. According to the `entities` parameter, return entity objects or the equivalent result set. """ res = self._children(entities) etype = self.entity.cw_etype if entities: return [e for e in res if e.cw_etype != etype] return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col) def same_type_children(self, entities=True): """Return children entities of the same type as this entity. According to the `entities` parameter, return entity objects or the equivalent result set. """ res = self._children(entities) etype = self.entity.cw_etype if entities: return [e for e in res if e.cw_etype == etype] return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col) def children(self, entities=True, sametype=False): """Return children entities. According to the `entities` parameter, return entity objects or the equivalent result set. """ if sametype: return self.same_type_children(entities) else: return self._children(entities) class IContainerToITree(IContainedToITree): """Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering parent as None """ @classmethod def build_class(cls, etype, children_relations): selector = is_instance(etype) return type(etype + 'ITree', (cls,), {'__select__': selector, 'children_relations': children_relations}) def parent(self): return None def registration_callback(vreg): vreg.register_all(globals().values(), __name__) if vreg.config.mode == 'test': IClonableAdapter.clone_relations.clear()