# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr -- mailto:contact@logilab.fr # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with this program. If not, see <http://www.gnu.org/licenses/>. """cubicweb-compound entity's classes This module essentially provides adapters to handle a structure of composite entities similarly to the container_ cube. The idea is to provide an API allowing access to a root entity (the container) from any contained entities. Assumptions: * an entity may belong to one and only one container * an entity may be both a container an contained .. _container: https://www.cubicweb.org/project/cubicweb-container """ from warnings import warn from cubicweb import neg_role from cubicweb.rset import ResultSet from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES from cubicweb.view import EntityAdapter from cubicweb.predicates import is_instance from cubicweb.entities.adapters import ITreeAdapter def _skipped(skiprtypes, skipetypes): # TODO decorator """Return `skiprtypes` and `skipetypes` frozensets that include "meta" objects that should be implicitely ignored. """ skiprtypes = frozenset(set(skiprtypes) | META_RTYPES | WORKFLOW_RTYPES | SYSTEM_RTYPES) skipetypes = frozenset(skipetypes) return skiprtypes, skipetypes def composite_schema_graph(schema, etype, skiprtypes=(), skipetypes=()): """Return a dict structure representing the graph of compositely-related entity types reachable from `etype` root entity type. """ def recurse(etype): return composite_schema_graph(schema, etype, skiprtypes=skiprtypes, skipetypes=skipetypes) skiprtypes, skipetypes = _skipped(skiprtypes, skipetypes) contained = {} if etype not in schema: # Would occur, e.g., during migration. warn('%s not found in schema, cannot build a container' % etype, RuntimeWarning) return {} graph = {} eschema = schema[etype] for rschema, teschemas, role in eschema.relation_definitions(): if rschema.meta or rschema in skiprtypes: continue target_role = neg_role(role) for rdef in rschema.rdefs.itervalues(): if rdef.composite != role: continue target = getattr(rdef, target_role) if target in skipetypes: continue graph.setdefault((rschema.type, role), {})[target.type] = recurse(target.type) return graph def composite_entities_graph(parent): """Yield elements from a graph of compositely-related entities reachable from `parent` entity. """ schema_graph = composite_schema_graph(parent._cw.vreg.schema, parent.cw_etype) for (rtype, role), edges in schema_graph.iteritems(): for child_etype, _ in edges.iteritems(): # XXX useless to build subgraph at this point? rset = parent.related(rtype, role=role, targettypes=[child_etype]) for child in rset.entities(): yield child, (rtype, neg_role(role)), parent for x in composite_entities_graph(child): yield x def copy_entity(original, **attributes): """Return a copy of an entity. Only attributes and non-composite relations are copied (relying on `entity.copy_relations()` for the latter). """ attrs = attributes.copy() original.complete() for rschema in original.e_schema.subject_relations(): if rschema.final and not rschema.meta: attr = rschema.type attrs.setdefault(attr, original.cw_attr_value(attr)) clone = original._cw.create_entity(original.cw_etype, **attrs) clone.copy_relations(original.eid) return clone class IContainer(EntityAdapter): """Abstract adapter for entities which are a container root.""" __abstract__ = True __regid__ = 'IContainer' @classmethod def build_class(cls, etype): selector = is_instance(etype) return type(etype + 'IContainer', (cls,), {'__select__': selector}) @property def container(self): """For the sake of consistency with `IContained`, return the container to which this entity belongs (i.e. the entity wrapped by this adapter). """ return self.entity @property def parent(self): """Returns the direct parent entity : always None in the case of a container. """ return None class IContained(EntityAdapter): """Abstract adapter for entities which are part of a container. This default implementation is purely computational and doesn't rely on any additional data in the model, beside a relation to go up to the direct parent. """ __abstract__ = True __regid__ = 'IContained' _classes = {} parent_relations = None # set this to a set of (rtype, role) in concret classes @classmethod def register_class(cls, vreg, etype, parent_relations): contained_cls = cls.build_class(etype, parent_relations) if contained_cls is not None: vreg.register(contained_cls) @classmethod def build_class(cls, etype, parent_relations): # check given parent relations for parent_relation in parent_relations: assert (isinstance(parent_relation, tuple) and len(parent_relation) == 2 and parent_relation[-1] in ('subject', 'object')), parent_relation # use already created class if any if etype in cls._classes: contained_cls = cls._classes[etype] # ensure registered class is the same at the one that would be generated assert contained_cls.__bases__ == (cls,) contained_cls.parent_relations |= parent_relations return None # else generate one else: selector = is_instance(etype) contained_cls = type(etype + 'IContained', (cls,), {'__select__': selector, 'parent_relations': parent_relations}) cls._classes[etype] = contained_cls return contained_cls @property def container(self): """Return the container to which this entity belongs, or None.""" parent_relation = self.parent_relation() if parent_relation is None: # not yet attached to a parent return None parent = self.entity.related(*parent_relation, entities=True)[0] if parent.cw_adapt_to('IContainer'): return parent return parent.cw_adapt_to('IContained').container @property def parent(self): """Returns the direct parent entity if any, else None (not yet linked to our parent). """ parent_relation = self.parent_relation() if parent_relation is None: return None # not yet linked to our parent parent_rset = self.entity.related(*parent_relation) if parent_rset: return parent_rset.one() return None def parent_relation(self): """Return the relation used to attach this entity to its parent, or None if no parent is set yet. """ for parent_relation in self.parent_relations: parents = self.entity.related(*parent_relation) assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents) if parents: return parent_relation return None def structure_def(schema, etype, skiprtypes=(), skipetypes=()): """Return the container structure with `etype` as root entity. This structure is a dictionary with entity types as keys and `(relation type, role)` as values. These entity types are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its parent (which may be the container or a contained). """ return _relatives_def(schema, etype, skiprtypes=skiprtypes, skipetypes=skipetypes, fetch_parents=True) def tree_def(schema, etype, skiprtypes=(), skipetypes=()): """Return a dictionary {etype: [(relation type, role)]} which are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its children (which is expected to be a contained). """ return _relatives_def(schema, etype, skiprtypes=skiprtypes, skipetypes=skipetypes, fetch_parents=False) def _relatives_def(schema, etype, skiprtypes=(), skipetypes=(), fetch_parents=True): """Return a dictionary {etype: [(relation type, role)]} which are reachable through composite relations from the root <etype>. Each key gives the name of an entity type, associated to a list of relation/role allowing to access to its relatives: its parent if fetch_parents is true, or its children if fetch_parents is false. """ skiprtypes, skipetypes = _skipped(skiprtypes, skipetypes) contained = {} if etype not in schema: # Would occur, e.g., during migration. warn('%s not found in schema, cannot build a container' % etype, RuntimeWarning) return {} candidates = [schema[etype]] while candidates: eschema = candidates.pop() if not fetch_parents: assert eschema not in contained, (eschema.type, contained) contained[eschema.type] = set() for rschema, teschemas, role in eschema.relation_definitions(): if rschema.meta or rschema in skiprtypes: continue target_role = neg_role(role) for rdef in rschema.rdefs.itervalues(): if rdef.composite != role: continue target = getattr(rdef, target_role) if target in skipetypes: continue if target not in contained: candidates.append(target) if fetch_parents: assert target != etype, 'cycle to the container' contained.setdefault(target.type, set()).add((rschema.type, target_role)) else: contained[eschema.type].add((rschema.type, role)) if not fetch_parents: candidates = list(set(candidates)) return contained class IContainedToITree(ITreeAdapter): """Map IContained adapters to ITree, additionaly configured with a list of relations leading to contained's children """ children_relations = None # list of (relation type, role) to get entity's children @classmethod def build_class(cls, etype, children_relations): selector = is_instance(etype) return type(etype + 'ITree', (cls,), {'__select__': selector, 'children_relations': children_relations}) @property def tree_relation(self): parent_relation = self.entity.cw_adapt_to('IContained').parent_relation() return parent_relation[0] @property def child_role(self): parent_relation = self.entity.cw_adapt_to('IContained').parent_relation() return parent_relation[1] def children_rql(self): # XXX may have different shapes return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role) for rel, role in self.children_relations) def _children(self, entities=True): if entities: res = [] else: res = ResultSet([], '') res.req = self._cw for rel, role in self.children_relations: res += self.entity.related(rel, role, entities=entities) return res def different_type_children(self, entities=True): """Return children entities of different type as this entity. According to the `entities` parameter, return entity objects or the equivalent result set. """ res = self._children(entities) etype = self.entity.cw_etype if entities: return [e for e in res if e.cw_etype != etype] return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col) def same_type_children(self, entities=True): """Return children entities of the same type as this entity. According to the `entities` parameter, return entity objects or the equivalent result set. """ res = self._children(entities) etype = self.entity.cw_etype if entities: return [e for e in res if e.cw_etype == etype] return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col) def children(self, entities=True, sametype=False): """Return children entities. According to the `entities` parameter, return entity objects or the equivalent result set. """ if sametype: return self.same_type_children(entities) else: return self._children(entities) class IContainerToITree(IContainedToITree): """Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering parent as None """ @classmethod def build_class(cls, etype, children_relations): selector = is_instance(etype) return type(etype + 'ITree', (cls,), {'__select__': selector, 'children_relations': children_relations}) def parent(self): return None