Skip to content
Snippets Groups Projects
entities.py 18.8 KiB
Newer Older
Denis Laxalde's avatar
Denis Laxalde committed
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.

"""cubicweb-compound entity's classes

This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.

Assumptions:

* an entity may belong to one and only one container
* an entity may be both a container an contained

.. _container: https://www.cubicweb.org/project/cubicweb-container
"""

from warnings import warn

from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance, partial_relation_possible
from cubicweb.entities.adapters import ITreeAdapter


def skip_meta(func):
    """Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets
    including "meta" objects that should be implicitely ignored.
    @wraps(func)
    def wrapped(*args, **kwargs):
        kwargs['skiprtypes'] = frozenset(
            set(kwargs.get('skiprtypes', ())) | META_RTYPES
            | WORKFLOW_RTYPES | SYSTEM_RTYPES)
        kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ()))
        return func(*args, **kwargs)
    return wrapped
class CompositeGraph(object):
    """Represent a graph of entity types related through composite relations.
    A `CompositeGraph` can be used to iterate on schema objects through
    `parent_relations`/`child_relations` methods as well as on entities through
    `parent_related`/`child_related` methods.

    @skip_meta
    def __init__(self, schema, skiprtypes=(), skipetypes=()):
        self.schema = schema
        self.skiprtypes = skiprtypes
        self.skipetypes = skipetypes

    def parent_relations(self, etype):
        """Yield composite relation information items walking the graph
        upstream from `etype`.

        These items are `(rtype, role), parents` where `parents` is a list of
        possible parent entity types reachable through `(rtype, role)`
        relation.
        """
        return self._composite_relations(etype, topdown=False)
    def child_relations(self, etype):
        """Yield composite relation information items walking the graph
        downstream from `etype`.

        These items are `(rtype, role), children` where `children` is a list of
        possible child entity types reachable through `(rtype, role)` relation.
        """
        return self._composite_relations(etype, topdown=True)

    def _composite_relations(self, etype, topdown):
        """Yield `(rtype, role), etypes` values corresponding to arcs of the
        graph of compositely-related entity types reachable from a `etype`
        entity type. `etypes` is a list of possible entity types reachable
        through `(rtype, role)` relation "upstream" (resp. "downstream") if
        `topdown` is True (resp. False).
            eschema = self.schema[etype]
        except KeyError:
        for rschema, teschemas, role in eschema.relation_definitions():
            if rschema.meta or rschema in self.skiprtypes:
            composite_role = role if topdown else neg_role(role)
            relation, children = (rschema.type, role), []
            for target in teschemas:
                if target in self.skipetypes:
                    continue
                rdef = rschema.role_rdef(eschema, target, role)
                if rdef.composite != composite_role:
                children.append(target.type)
            if children:
                yield relation, children
    def parent_structure(self, etype, _visited=None):
        """Return the parent structure of the graph from `etype`.
        The structure is a dictionary mapping entity type in the graph with
        root `etype` to relation information allowing to walk the graph
        upstream from this entity type.
        """
        if _visited is None:
            _visited = set()
        structure = {}
        def update_structure(left, relation, right):
            structure.setdefault(left, {}).setdefault(relation, []).append(right)

        for (rtype, role), children in self.child_relations(etype):
            for child in sorted(children):
                update_structure(child, (rtype, neg_role(role)), etype)
                if child in _visited:
                    continue
                _visited.add(child)
                for left, rels in self.parent_structure(child, _visited).iteritems():
                    for relation, rights in rels.iteritems():
                        for right in rights:
                            update_structure(left, relation, right)
        return structure
    def parent_related(self, entity):
        """Yield information items on entities related to `entity` through
        composite relations walking the graph upstream from `entity`.

        These items are tuples `(child, (rtype, role), parent)` where `role` is
        the role of `child` entity in `rtype` relation with `parent`.
        """
        return self._composite_related(entity, False)

    def child_related(self, entity):
        """Yield information items on entities related to `entity` through
        composite relations walking the graph downstream from `entity`.

        These items are tuples `(parent, (rtype, role), child)` where `role` is
        the role of `parent` entity in `rtype` relation with `child`.
        """
        return self._composite_related(entity, True)

    def _composite_related(self, entity, topdown, _visited=None):
        """Yield arcs of the graph of compositely-related entities reachable
        An "arc" is a tuple `(l_entity, (rtype, role), r_entity)` where
        `l_entity` is a "parent" (resp. "child") entity when `topdown` is True
        (resp. False) and, conversely, `r_entity` is the "child" (resp.
        "parent") entity. `role` is always the role of `l_entity` in `rtype`
        relation.
        for (rtype, role), targettypes in self._composite_relations(
                entity.cw_etype, topdown):
            rset = entity.related(rtype, role=role, targettypes=targettypes)
            for target in rset.entities():
                yield entity, (rtype, role), target
                if target.eid in _visited:
                _visited.add(target.eid)
                for x in self._composite_related(target, topdown, _visited):
def copy_entity(original, **attributes):
    """Return a copy of an entity.

    Only attributes and non-composite relations are copied (relying on
    `entity.copy_relations()` for the latter).
    """
    attrs = attributes.copy()
    original.complete()
    for rschema in original.e_schema.subject_relations():
        if rschema.final and not rschema.meta:
            attr = rschema.type
            attrs.setdefault(attr, original.cw_attr_value(attr))
    clone = original._cw.create_entity(original.cw_etype, **attrs)
    clone.copy_relations(original.eid)
    return clone


class IContainer(EntityAdapter):
    """Abstract adapter for entities which are a container root."""
    __abstract__ = True
    __regid__ = 'IContainer'

    @classmethod
    def build_class(cls, etype):
        selector = is_instance(etype)
        return type(etype + 'IContainer', (cls,),
                    {'__select__': selector})

    @property
    def container(self):
        """For the sake of consistency with `IContained`, return the container to which this entity
        belongs (i.e. the entity wrapped by this adapter).
        """
        return self.entity

    @property
    def parent(self):
        """Returns the direct parent entity : always None in the case of a container.
        """
        return None


class IClonableAdapter(EntityAdapter):
    """Adapter for entity cloning.

    Concrete classes should specify `rtype` (and possible `role`) class
    attribute to something like `clone_of` depending on application schema.
    """
    __abstract__ = True
    __select__ = partial_relation_possible()
    rtype, role = None, 'object'  # relation between the clone and the original.
    skiprtypes = ()
    skipetypes = ()
    clone_relations = {}  # registered relation type and role of the original entity.

    @classmethod
    def __registered__(cls, *args):
        assert cls.rtype not in cls.clone_relations, '{} already registered for {}'.format(
            cls.__name__, cls.rtype)
        cls.clone_relations[cls.rtype] = cls.role
        return super(IClonableAdapter, cls).__registered__(*args)
    @property
    def graph(self):
        """The composite graph associated with this adapter."""
        return CompositeGraph(self._cw.vreg.schema,
                              skiprtypes=self.skiprtypes,
                              skipetypes=self.skipetypes)

    def clone_into(self, clone):
        """Recursivily clone the container graph of this entity into `clone`."""
        assert clone.cw_etype == self.entity.cw_etype, \
            "clone entity type {} does not match with original's {}".format(
                clone.cw_etype, self.entity.cw_etype)
        clone.copy_relations(self.entity.eid)
        clones = {self.entity: clone}
        for parent, (rtype, parent_role), child in self.graph.child_related(self.entity):
            rel = rtype if parent_role == 'object' else 'reverse_' + rtype
            kwargs = {rel: clones[parent]}
            clone = clones.get(child)
            if clone is not None:
                clone.cw_set(**kwargs)
            else:
                clones[child] = copy_entity(child, **kwargs)


class IContained(EntityAdapter):
    """Abstract adapter for entities which are part of a container.

    This default implementation is purely computational and doesn't rely on any additional data in
    the model, beside a relation to go up to the direct parent.
    """
    __abstract__ = True
    __regid__ = 'IContained'
    _classes = {}
    parent_relations = None  # set this to a set of (rtype, role) in concret classes

    @classmethod
    def register_class(cls, vreg, etype, parent_relations):
        contained_cls = cls.build_class(etype, parent_relations)
        if contained_cls is not None:
            vreg.register(contained_cls)

    @classmethod
    def build_class(cls, etype, parent_relations):
        # check given parent relations
        for parent_relation in parent_relations:
            assert (isinstance(parent_relation, tuple)
                    and len(parent_relation) == 2
                    and parent_relation[-1] in ('subject', 'object')), parent_relation
        # use already created class if any
        if etype in cls._classes:
            contained_cls = cls._classes[etype]
            # ensure registered class is the same at the one that would be generated
            assert contained_cls.__bases__ == (cls,)
            contained_cls.parent_relations |= parent_relations
            return None
        # else generate one
        else:
            selector = is_instance(etype)
            contained_cls = type(etype + 'IContained', (cls,),
                                 {'__select__': selector,
                                  'parent_relations': parent_relations})
            cls._classes[etype] = contained_cls
            return contained_cls

    @property
    def container(self):
        """Return the container to which this entity belongs, or None."""
        parent_relation = self.parent_relation()
        if parent_relation is None:
            # not yet attached to a parent
            return None
        parent = self.entity.related(*parent_relation, entities=True)[0]
        if parent.cw_adapt_to('IContainer'):
            return parent
        return parent.cw_adapt_to('IContained').container

    @property
    def parent(self):
        """Returns the direct parent entity if any, else None (not yet linked to our parent).
        """
        parent_relation = self.parent_relation()
        if parent_relation is None:
            return None  # not yet linked to our parent
        parent_rset = self.entity.related(*parent_relation)
        if parent_rset:
            return parent_rset.one()
        return None

    def parent_relation(self):
        """Return the relation used to attach this entity to its parent, or None if no parent is set
        yet.
        """
        for parent_relation in self.parent_relations:
            parents = self.entity.related(*parent_relation)
            assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
            if parents:
                return parent_relation
        return None


def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
    """Return the container structure with `etype` as root entity.

    This structure is a dictionary with entity types as keys and `(relation
    type, role)` as values. These entity types are reachable through composite
    relations from the root <etype>. Each key gives the name of an entity
    type, associated to a list of relation/role allowing to access to its
    parent (which may be the container or a contained).
    """
    graph = CompositeGraph(schema, skiprtypes=skiprtypes, skipetypes=skipetypes)
    return dict((child, set(relinfo))
                for child, relinfo in graph.parent_structure(etype).iteritems())
def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
    """Return a dictionary {etype: [(relation type, role)]} which are
    reachable through composite relations from the root <etype>. Each key
    gives the name of an entity type, associated to a list of relation/role
    allowing to access to its children (which is expected to be a contained).
    """
    contained = {}
    if etype not in schema:
        # Would occur, e.g., during migration.
        warn('%s not found in schema, cannot build a container' % etype,
             RuntimeWarning)
        return {}
    candidates = [schema[etype]]
    while candidates:
        eschema = candidates.pop()
        assert eschema not in contained, (eschema.type, contained)
        contained[eschema.type] = set()
        for rschema, _, role in eschema.relation_definitions():
            if rschema.meta or rschema in skiprtypes:
                continue
            target_role = neg_role(role)
            for rdef in rschema.rdefs.itervalues():
                if rdef.composite != role:
                    continue
                target = getattr(rdef, target_role)
                if target in skipetypes:
                    continue
                if target not in contained:
                    candidates.append(target)
                contained[eschema.type].add((rschema.type, role))
        candidates = list(set(candidates))
    return contained


class IContainedToITree(ITreeAdapter):
    """Map IContained adapters to ITree, additionaly configured with a list of relations leading to
    contained's children
    """

    children_relations = None  # list of (relation type, role) to get entity's children

    @classmethod
    def build_class(cls, etype, children_relations):
        selector = is_instance(etype)
        return type(etype + 'ITree', (cls,),
                    {'__select__': selector,
                     'children_relations': children_relations})

    @property
    def tree_relation(self):
        parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
        return parent_relation[0]

    @property
    def child_role(self):
        parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
        return parent_relation[1]

    def children_rql(self):
        # XXX may have different shapes
        return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
                              for rel, role in self.children_relations)

    def _children(self, entities=True):
        if entities:
            res = []
        else:
            res = ResultSet([], '')
            res.req = self._cw
        for rel, role in self.children_relations:
            res += self.entity.related(rel, role, entities=entities)
        return res

    def different_type_children(self, entities=True):
        """Return children entities of different type as this entity.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        res = self._children(entities)
        etype = self.entity.cw_etype
        if entities:
            return [e for e in res if e.cw_etype != etype]
        return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)

    def same_type_children(self, entities=True):
        """Return children entities of the same type as this entity.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        res = self._children(entities)
        etype = self.entity.cw_etype
        if entities:
            return [e for e in res if e.cw_etype == etype]
        return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)

    def children(self, entities=True, sametype=False):
        """Return children entities.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        if sametype:
            return self.same_type_children(entities)
        else:
            return self._children(entities)


class IContainerToITree(IContainedToITree):
    """Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
    parent as None
    """

    @classmethod
    def build_class(cls, etype, children_relations):
        selector = is_instance(etype)
        return type(etype + 'ITree', (cls,),
                    {'__select__': selector,
                     'children_relations': children_relations})

    def parent(self):
        return None


def registration_callback(vreg):
    vreg.register_all(globals().values(), __name__)
    if vreg.config.mode == 'test':
        IClonableAdapter.clone_relations.clear()