Skip to content
Snippets Groups Projects
entities.py 14.8 KiB
Newer Older
Denis Laxalde's avatar
Denis Laxalde committed
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.

"""cubicweb-compound entity's classes

This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.

Assumptions:

* an entity may belong to one and only one container
* an entity may be both a container an contained

.. _container: https://www.cubicweb.org/project/cubicweb-container
"""

from warnings import warn

from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
from cubicweb.entities.adapters import ITreeAdapter


def _skipped(skiprtypes, skipetypes):  # TODO decorator
    """Return `skiprtypes` and `skipetypes` frozensets that include "meta"
    objects that should be implicitely ignored.
    """
    skiprtypes = frozenset(set(skiprtypes) | META_RTYPES | WORKFLOW_RTYPES |
                           SYSTEM_RTYPES)
    skipetypes = frozenset(skipetypes)
    return skiprtypes, skipetypes


def composite_schema_graph(schema, etype, skiprtypes=(), skipetypes=()):
    """Return a dict structure representing the graph of compositely-related
    entity types reachable from `etype` root entity type.
    """
    def recurse(etype):
        return composite_schema_graph(schema, etype, skiprtypes=skiprtypes,
                                      skipetypes=skipetypes)
    skiprtypes, skipetypes = _skipped(skiprtypes, skipetypes)
    contained = {}
    if etype not in schema:
        # Would occur, e.g., during migration.
        warn('%s not found in schema, cannot build a container' % etype,
             RuntimeWarning)
        return {}
    graph = {}
    eschema = schema[etype]
    for rschema, teschemas, role in eschema.relation_definitions():
        if rschema.meta or rschema in skiprtypes:
            continue
        target_role = neg_role(role)
        for rdef in rschema.rdefs.itervalues():
            if rdef.composite != role:
                continue
            target = getattr(rdef, target_role)
            if target in skipetypes:
                continue
            graph.setdefault((rschema.type, role), {})[target.type] = recurse(target.type)
    return graph


def composite_entities_graph(parent):
    """Yield elements from a graph of compositely-related entities reachable
    from `parent` entity.
    """
    schema_graph = composite_schema_graph(parent._cw.vreg.schema, parent.cw_etype)
    for (rtype, role), edges in schema_graph.iteritems():
        for child_etype, _ in edges.iteritems():  # XXX useless to build subgraph at this point?
            rset = parent.related(rtype, role=role, targettypes=[child_etype])
            for child in rset.entities():
                yield child, (rtype, neg_role(role)), parent
                for x in composite_entities_graph(child):
                    yield x


def copy_entity(original, **attributes):
    """Return a copy of an entity.

    Only attributes and non-composite relations are copied (relying on
    `entity.copy_relations()` for the latter).
    """
    attrs = attributes.copy()
    original.complete()
    for rschema in original.e_schema.subject_relations():
        if rschema.final and not rschema.meta:
            attr = rschema.type
            attrs.setdefault(attr, original.cw_attr_value(attr))
    clone = original._cw.create_entity(original.cw_etype, **attrs)
    clone.copy_relations(original.eid)
    return clone


class IContainer(EntityAdapter):
    """Abstract adapter for entities which are a container root."""
    __abstract__ = True
    __regid__ = 'IContainer'

    @classmethod
    def build_class(cls, etype):
        selector = is_instance(etype)
        return type(etype + 'IContainer', (cls,),
                    {'__select__': selector})

    @property
    def container(self):
        """For the sake of consistency with `IContained`, return the container to which this entity
        belongs (i.e. the entity wrapped by this adapter).
        """
        return self.entity

    @property
    def parent(self):
        """Returns the direct parent entity : always None in the case of a container.
        """
        return None


class IClonable(EntityAdapter):
    """Adapter for entity cloning"""
    __regid__ = 'IClonable'

    def clone_into(self, clone):
        """Recursivily clone the container graph of this entity into `clone`."""
        assert clone.cw_etype == self.entity.cw_etype, \
            "clone entity type {} does not match with original's {}".format(
                clone.cw_etype, self.entity.cw_etype)
        clone.copy_relations(self.entity.eid)
        clones = {self.entity: clone}
        for child, (rtype, role), parent in composite_entities_graph(self.entity):
            rel = rtype if role == 'subject' else 'reverse_' + rtype
            kwargs = {rel: clones[parent]}
            clone = clones.get(child)
            if clone is not None:
                clone.cw_set(**kwargs)
            else:
                clones[child] = copy_entity(child, **kwargs)


class IContained(EntityAdapter):
    """Abstract adapter for entities which are part of a container.

    This default implementation is purely computational and doesn't rely on any additional data in
    the model, beside a relation to go up to the direct parent.
    """
    __abstract__ = True
    __regid__ = 'IContained'
    _classes = {}
    parent_relations = None  # set this to a set of (rtype, role) in concret classes

    @classmethod
    def register_class(cls, vreg, etype, parent_relations):
        contained_cls = cls.build_class(etype, parent_relations)
        if contained_cls is not None:
            vreg.register(contained_cls)

    @classmethod
    def build_class(cls, etype, parent_relations):
        # check given parent relations
        for parent_relation in parent_relations:
            assert (isinstance(parent_relation, tuple)
                    and len(parent_relation) == 2
                    and parent_relation[-1] in ('subject', 'object')), parent_relation
        # use already created class if any
        if etype in cls._classes:
            contained_cls = cls._classes[etype]
            # ensure registered class is the same at the one that would be generated
            assert contained_cls.__bases__ == (cls,)
            contained_cls.parent_relations |= parent_relations
            return None
        # else generate one
        else:
            selector = is_instance(etype)
            contained_cls = type(etype + 'IContained', (cls,),
                                 {'__select__': selector,
                                  'parent_relations': parent_relations})
            cls._classes[etype] = contained_cls
            return contained_cls

    @property
    def container(self):
        """Return the container to which this entity belongs, or None."""
        parent_relation = self.parent_relation()
        if parent_relation is None:
            # not yet attached to a parent
            return None
        parent = self.entity.related(*parent_relation, entities=True)[0]
        if parent.cw_adapt_to('IContainer'):
            return parent
        return parent.cw_adapt_to('IContained').container

    @property
    def parent(self):
        """Returns the direct parent entity if any, else None (not yet linked to our parent).
        """
        parent_relation = self.parent_relation()
        if parent_relation is None:
            return None  # not yet linked to our parent
        parent_rset = self.entity.related(*parent_relation)
        if parent_rset:
            return parent_rset.one()
        return None

    def parent_relation(self):
        """Return the relation used to attach this entity to its parent, or None if no parent is set
        yet.
        """
        for parent_relation in self.parent_relations:
            parents = self.entity.related(*parent_relation)
            assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
            if parents:
                return parent_relation
        return None


def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
    """Return the container structure with `etype` as root entity.

    This structure is a dictionary with entity types as keys and `(relation
    type, role)` as values. These entity types are reachable through composite
    relations from the root <etype>. Each key gives the name of an entity
    type, associated to a list of relation/role allowing to access to its
    parent (which may be the container or a contained).
    """
    return _relatives_def(schema, etype, skiprtypes=skiprtypes,
                          skipetypes=skipetypes, fetch_parents=True)


def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
    """Return a dictionary {etype: [(relation type, role)]} which are
    reachable through composite relations from the root <etype>. Each key
    gives the name of an entity type, associated to a list of relation/role
    allowing to access to its children (which is expected to be a contained).
    """
    return _relatives_def(schema, etype, skiprtypes=skiprtypes,
                          skipetypes=skipetypes, fetch_parents=False)


def _relatives_def(schema, etype, skiprtypes=(), skipetypes=(), fetch_parents=True):
    """Return a dictionary {etype: [(relation type, role)]} which are
    reachable through composite relations from the root <etype>. Each key
    gives the name of an entity type, associated to a list of relation/role
    allowing to access to its relatives: its parent if fetch_parents is true,
    or its children if fetch_parents is false.
    """
    skiprtypes, skipetypes = _skipped(skiprtypes, skipetypes)
    contained = {}
    if etype not in schema:
        # Would occur, e.g., during migration.
        warn('%s not found in schema, cannot build a container' % etype,
             RuntimeWarning)
        return {}
    candidates = [schema[etype]]
    while candidates:
        eschema = candidates.pop()
        if not fetch_parents:
            assert eschema not in contained, (eschema.type, contained)
            contained[eschema.type] = set()
        for rschema, teschemas, role in eschema.relation_definitions():
            if rschema.meta or rschema in skiprtypes:
                continue
            target_role = neg_role(role)
            for rdef in rschema.rdefs.itervalues():
                if rdef.composite != role:
                    continue
                target = getattr(rdef, target_role)
                if target in skipetypes:
                    continue
                if target not in contained:
                    candidates.append(target)
                if fetch_parents:
                    assert target != etype, 'cycle to the container'
                    contained.setdefault(target.type, set()).add((rschema.type, target_role))
                else:
                    contained[eschema.type].add((rschema.type, role))
        if not fetch_parents:
            candidates = list(set(candidates))
    return contained


class IContainedToITree(ITreeAdapter):
    """Map IContained adapters to ITree, additionaly configured with a list of relations leading to
    contained's children
    """

    children_relations = None  # list of (relation type, role) to get entity's children

    @classmethod
    def build_class(cls, etype, children_relations):
        selector = is_instance(etype)
        return type(etype + 'ITree', (cls,),
                    {'__select__': selector,
                     'children_relations': children_relations})

    @property
    def tree_relation(self):
        parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
        return parent_relation[0]

    @property
    def child_role(self):
        parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
        return parent_relation[1]

    def children_rql(self):
        # XXX may have different shapes
        return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
                              for rel, role in self.children_relations)

    def _children(self, entities=True):
        if entities:
            res = []
        else:
            res = ResultSet([], '')
            res.req = self._cw
        for rel, role in self.children_relations:
            res += self.entity.related(rel, role, entities=entities)
        return res

    def different_type_children(self, entities=True):
        """Return children entities of different type as this entity.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        res = self._children(entities)
        etype = self.entity.cw_etype
        if entities:
            return [e for e in res if e.cw_etype != etype]
        return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)

    def same_type_children(self, entities=True):
        """Return children entities of the same type as this entity.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        res = self._children(entities)
        etype = self.entity.cw_etype
        if entities:
            return [e for e in res if e.cw_etype == etype]
        return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)

    def children(self, entities=True, sametype=False):
        """Return children entities.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        if sametype:
            return self.same_type_children(entities)
        else:
            return self._children(entities)


class IContainerToITree(IContainedToITree):
    """Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
    parent as None
    """

    @classmethod
    def build_class(cls, etype, children_relations):
        selector = is_instance(etype)
        return type(etype + 'ITree', (cls,),
                    {'__select__': selector,
                     'children_relations': children_relations})

    def parent(self):
        return None