Skip to content
Snippets Groups Projects
entities.py 15.8 KiB
Newer Older
Denis Laxalde's avatar
Denis Laxalde committed
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.

"""cubicweb-compound entity's classes

This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.

Assumptions:

* an entity may belong to one and only one container
* an entity may be both a container an contained

.. _container: https://www.cubicweb.org/project/cubicweb-container
"""

from warnings import warn

from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance, partial_relation_possible
from cubicweb.entities.adapters import ITreeAdapter


def skip_meta(func):
    """Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets
    including "meta" objects that should be implicitely ignored.
    @wraps(func)
    def wrapped(*args, **kwargs):
        kwargs['skiprtypes'] = frozenset(
            set(kwargs.get('skiprtypes', ())) | META_RTYPES
            | WORKFLOW_RTYPES | SYSTEM_RTYPES)
        kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ()))
        return func(*args, **kwargs)
    return wrapped
def composite_schema_graph(schema, etype, skiprtypes=(), skipetypes=()):
    """Yield `(rtype, role), ends` values corresponding to arcs of the graph
    obtained from walking the graph of compositely-related entity types
    reachable from `etype` root entity type.

    `ends` is a dict with keys corresponding to possible entity types
    reachable through `(rtype, role)` relation and with respective sub-graph
    iterator as values.
    subgraph = partial(composite_schema_graph, schema, skiprtypes=skiprtypes,
                       skipetypes=skipetypes)
    if etype not in schema:
        # Would occur, e.g., during migration.
        warn('%s not found in schema, cannot build a container' % etype,
             RuntimeWarning)
    eschema = schema[etype]
    for rschema, teschemas, role in eschema.relation_definitions():
        if rschema.meta or rschema in skiprtypes:
            continue
        target_role = neg_role(role)
        relation, ends = (rschema.type, role), {}
        for rdef in rschema.rdefs.itervalues():
            if rdef.composite != role:
                continue
            target = getattr(rdef, target_role)
            if target in skipetypes:
                continue
            ends[target.type] = subgraph(target.type)
def composite_entities_graph(parent, **kwargs):
    """Yield arcs of a graph of compositely-related entities reachable from
    `parent` entity.

    An "arc" is a tuple `(child entity, (rtype, role), parent entity)`.
    for (rtype, role), ends in composite_schema_graph(parent._cw.vreg.schema,
                                                      parent.cw_etype,
                                                      **kwargs):
            rset = parent.related(rtype, role=role, targettypes=[child_etype])
            for child in rset.entities():
                yield child, (rtype, neg_role(role)), parent
                for x in composite_entities_graph(child, **kwargs):
def copy_entity(original, **attributes):
    """Return a copy of an entity.

    Only attributes and non-composite relations are copied (relying on
    `entity.copy_relations()` for the latter).
    """
    attrs = attributes.copy()
    original.complete()
    for rschema in original.e_schema.subject_relations():
        if rschema.final and not rschema.meta:
            attr = rschema.type
            attrs.setdefault(attr, original.cw_attr_value(attr))
    clone = original._cw.create_entity(original.cw_etype, **attrs)
    clone.copy_relations(original.eid)
    return clone


class IContainer(EntityAdapter):
    """Abstract adapter for entities which are a container root."""
    __abstract__ = True
    __regid__ = 'IContainer'

    @classmethod
    def build_class(cls, etype):
        selector = is_instance(etype)
        return type(etype + 'IContainer', (cls,),
                    {'__select__': selector})

    @property
    def container(self):
        """For the sake of consistency with `IContained`, return the container to which this entity
        belongs (i.e. the entity wrapped by this adapter).
        """
        return self.entity

    @property
    def parent(self):
        """Returns the direct parent entity : always None in the case of a container.
        """
        return None


class IClonableAdapter(EntityAdapter):
    """Adapter for entity cloning.

    Concrete classes should specify `rtype` (and possible `role`) class
    attribute to something like `clone_of` depending on application schema.
    """
    __abstract__ = True
    __select__ = partial_relation_possible()
    rtype, role = None, 'object'  # relation between the clone and the original.

    def clone_into(self, clone):
        """Recursivily clone the container graph of this entity into `clone`."""
        assert clone.cw_etype == self.entity.cw_etype, \
            "clone entity type {} does not match with original's {}".format(
                clone.cw_etype, self.entity.cw_etype)
        clone.copy_relations(self.entity.eid)
        clones = {self.entity: clone}
        for child, (rtype, role), parent in composite_entities_graph(self.entity):
            rel = rtype if role == 'subject' else 'reverse_' + rtype
            kwargs = {rel: clones[parent]}
            clone = clones.get(child)
            if clone is not None:
                clone.cw_set(**kwargs)
            else:
                clones[child] = copy_entity(child, **kwargs)

    @property
    def clone(self):
        """The entity being cloned from this entity (the original) or None."""
        rset = self.entity.related(self.rtype, role=self.role, limit=1)
        if rset:
            return rset.get_entity(0, 0)
        return None

class IContained(EntityAdapter):
    """Abstract adapter for entities which are part of a container.

    This default implementation is purely computational and doesn't rely on any additional data in
    the model, beside a relation to go up to the direct parent.
    """
    __abstract__ = True
    __regid__ = 'IContained'
    _classes = {}
    parent_relations = None  # set this to a set of (rtype, role) in concret classes

    @classmethod
    def register_class(cls, vreg, etype, parent_relations):
        contained_cls = cls.build_class(etype, parent_relations)
        if contained_cls is not None:
            vreg.register(contained_cls)

    @classmethod
    def build_class(cls, etype, parent_relations):
        # check given parent relations
        for parent_relation in parent_relations:
            assert (isinstance(parent_relation, tuple)
                    and len(parent_relation) == 2
                    and parent_relation[-1] in ('subject', 'object')), parent_relation
        # use already created class if any
        if etype in cls._classes:
            contained_cls = cls._classes[etype]
            # ensure registered class is the same at the one that would be generated
            assert contained_cls.__bases__ == (cls,)
            contained_cls.parent_relations |= parent_relations
            return None
        # else generate one
        else:
            selector = is_instance(etype)
            contained_cls = type(etype + 'IContained', (cls,),
                                 {'__select__': selector,
                                  'parent_relations': parent_relations})
            cls._classes[etype] = contained_cls
            return contained_cls

    @property
    def container(self):
        """Return the container to which this entity belongs, or None."""
        parent_relation = self.parent_relation()
        if parent_relation is None:
            # not yet attached to a parent
            return None
        parent = self.entity.related(*parent_relation, entities=True)[0]
        if parent.cw_adapt_to('IContainer'):
            return parent
        return parent.cw_adapt_to('IContained').container

    @property
    def parent(self):
        """Returns the direct parent entity if any, else None (not yet linked to our parent).
        """
        parent_relation = self.parent_relation()
        if parent_relation is None:
            return None  # not yet linked to our parent
        parent_rset = self.entity.related(*parent_relation)
        if parent_rset:
            return parent_rset.one()
        return None

    def parent_relation(self):
        """Return the relation used to attach this entity to its parent, or None if no parent is set
        yet.
        """
        for parent_relation in self.parent_relations:
            parents = self.entity.related(*parent_relation)
            assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
            if parents:
                return parent_relation
        return None


def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
    """Return the container structure with `etype` as root entity.

    This structure is a dictionary with entity types as keys and `(relation
    type, role)` as values. These entity types are reachable through composite
    relations from the root <etype>. Each key gives the name of an entity
    type, associated to a list of relation/role allowing to access to its
    parent (which may be the container or a contained).
    """
    return _relatives_def(schema, etype, skiprtypes=skiprtypes,
                          skipetypes=skipetypes, fetch_parents=True)


def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
    """Return a dictionary {etype: [(relation type, role)]} which are
    reachable through composite relations from the root <etype>. Each key
    gives the name of an entity type, associated to a list of relation/role
    allowing to access to its children (which is expected to be a contained).
    """
    return _relatives_def(schema, etype, skiprtypes=skiprtypes,
                          skipetypes=skipetypes, fetch_parents=False)


def _relatives_def(schema, etype, skiprtypes, skipetypes, fetch_parents=True):
    """Return a dictionary {etype: [(relation type, role)]} which are
    reachable through composite relations from the root <etype>. Each key
    gives the name of an entity type, associated to a list of relation/role
    allowing to access to its relatives: its parent if fetch_parents is true,
    or its children if fetch_parents is false.
    """
    contained = {}
    if etype not in schema:
        # Would occur, e.g., during migration.
        warn('%s not found in schema, cannot build a container' % etype,
             RuntimeWarning)
        return {}
    candidates = [schema[etype]]
    while candidates:
        eschema = candidates.pop()
        if not fetch_parents:
            assert eschema not in contained, (eschema.type, contained)
            contained[eschema.type] = set()
        for rschema, teschemas, role in eschema.relation_definitions():
            if rschema.meta or rschema in skiprtypes:
                continue
            target_role = neg_role(role)
            for rdef in rschema.rdefs.itervalues():
                if rdef.composite != role:
                    continue
                target = getattr(rdef, target_role)
                if target in skipetypes:
                    continue
                if target not in contained:
                    candidates.append(target)
                if fetch_parents:
                    assert target != etype, 'cycle to the container'
                    contained.setdefault(target.type, set()).add((rschema.type, target_role))
                else:
                    contained[eschema.type].add((rschema.type, role))
        if not fetch_parents:
            candidates = list(set(candidates))
    return contained


class IContainedToITree(ITreeAdapter):
    """Map IContained adapters to ITree, additionaly configured with a list of relations leading to
    contained's children
    """

    children_relations = None  # list of (relation type, role) to get entity's children

    @classmethod
    def build_class(cls, etype, children_relations):
        selector = is_instance(etype)
        return type(etype + 'ITree', (cls,),
                    {'__select__': selector,
                     'children_relations': children_relations})

    @property
    def tree_relation(self):
        parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
        return parent_relation[0]

    @property
    def child_role(self):
        parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
        return parent_relation[1]

    def children_rql(self):
        # XXX may have different shapes
        return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
                              for rel, role in self.children_relations)

    def _children(self, entities=True):
        if entities:
            res = []
        else:
            res = ResultSet([], '')
            res.req = self._cw
        for rel, role in self.children_relations:
            res += self.entity.related(rel, role, entities=entities)
        return res

    def different_type_children(self, entities=True):
        """Return children entities of different type as this entity.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        res = self._children(entities)
        etype = self.entity.cw_etype
        if entities:
            return [e for e in res if e.cw_etype != etype]
        return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)

    def same_type_children(self, entities=True):
        """Return children entities of the same type as this entity.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        res = self._children(entities)
        etype = self.entity.cw_etype
        if entities:
            return [e for e in res if e.cw_etype == etype]
        return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)

    def children(self, entities=True, sametype=False):
        """Return children entities.

        According to the `entities` parameter, return entity objects or the
        equivalent result set.
        """
        if sametype:
            return self.same_type_children(entities)
        else:
            return self._children(entities)


class IContainerToITree(IContainedToITree):
    """Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
    parent as None
    """

    @classmethod
    def build_class(cls, etype, children_relations):
        selector = is_instance(etype)
        return type(etype + 'ITree', (cls,),
                    {'__select__': selector,
                     'children_relations': children_relations})

    def parent(self):
        return None