Newer
Older
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-compound entity's classes
This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.
Assumptions:
* an entity may belong to one and only one container
* an entity may be both a container an contained
.. _container: https://www.cubicweb.org/project/cubicweb-container
"""

Denis Laxalde
committed
from functools import partial, wraps
from warnings import warn
from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
from cubicweb.entities.adapters import ITreeAdapter

Denis Laxalde
committed
def skip_meta(func):
"""Decorator to set `skiprtypes` and `skipetypes` parameters as frozensets
including "meta" objects that should be implicitely ignored.

Denis Laxalde
committed
@wraps(func)
def wrapped(*args, **kwargs):
kwargs['skiprtypes'] = frozenset(
set(kwargs.get('skiprtypes', ())) | META_RTYPES
| WORKFLOW_RTYPES | SYSTEM_RTYPES)
kwargs['skipetypes'] = frozenset(kwargs.get('skipetypes', ()))
return func(*args, **kwargs)
return wrapped

Denis Laxalde
committed
@skip_meta
def composite_schema_graph(schema, etype, skiprtypes=(), skipetypes=()):
"""Yield `(rtype, role), ends` values corresponding to arcs of the graph
obtained from walking the graph of compositely-related entity types
reachable from `etype` root entity type.
`ends` is a dict with keys corresponding to possible entity types
reachable through `(rtype, role)` relation and with respective sub-graph
iterator as values.

Denis Laxalde
committed
subgraph = partial(composite_schema_graph, schema, skiprtypes=skiprtypes,
skipetypes=skipetypes)
if etype not in schema:
# Would occur, e.g., during migration.
warn('%s not found in schema, cannot build a container' % etype,
RuntimeWarning)
return
eschema = schema[etype]
for rschema, teschemas, role in eschema.relation_definitions():
if rschema.meta or rschema in skiprtypes:
continue
target_role = neg_role(role)
relation, ends = (rschema.type, role), {}
for rdef in rschema.rdefs.itervalues():
if rdef.composite != role:
continue
target = getattr(rdef, target_role)
if target in skipetypes:
continue

Denis Laxalde
committed
ends[target.type] = subgraph(target.type)
yield relation, ends

Denis Laxalde
committed
@skip_meta
def composite_entities_graph(parent, **kwargs):
"""Yield arcs of a graph of compositely-related entities reachable from
`parent` entity.
An "arc" is a tuple `(child entity, (rtype, role), parent entity)`.
for (rtype, role), ends in composite_schema_graph(parent._cw.vreg.schema,
for child_etype in ends:
rset = parent.related(rtype, role=role, targettypes=[child_etype])
for child in rset.entities():
yield child, (rtype, neg_role(role)), parent
for x in composite_entities_graph(child, **kwargs):
def copy_entity(original, **attributes):
"""Return a copy of an entity.
Only attributes and non-composite relations are copied (relying on
`entity.copy_relations()` for the latter).
"""
attrs = attributes.copy()
original.complete()
for rschema in original.e_schema.subject_relations():
if rschema.final and not rschema.meta:
attr = rschema.type
attrs.setdefault(attr, original.cw_attr_value(attr))
clone = original._cw.create_entity(original.cw_etype, **attrs)
clone.copy_relations(original.eid)
return clone
class IContainer(EntityAdapter):
"""Abstract adapter for entities which are a container root."""
__abstract__ = True
__regid__ = 'IContainer'
@classmethod
def build_class(cls, etype):
selector = is_instance(etype)
return type(etype + 'IContainer', (cls,),
{'__select__': selector})
@property
def container(self):
"""For the sake of consistency with `IContained`, return the container to which this entity
belongs (i.e. the entity wrapped by this adapter).
"""
return self.entity
@property
def parent(self):
"""Returns the direct parent entity : always None in the case of a container.
"""
return None
class IClonable(EntityAdapter):
"""Adapter for entity cloning"""
__regid__ = 'IClonable'
def clone_into(self, clone):
"""Recursivily clone the container graph of this entity into `clone`."""
assert clone.cw_etype == self.entity.cw_etype, \
"clone entity type {} does not match with original's {}".format(
clone.cw_etype, self.entity.cw_etype)
clone.copy_relations(self.entity.eid)
clones = {self.entity: clone}
for child, (rtype, role), parent in composite_entities_graph(self.entity):
rel = rtype if role == 'subject' else 'reverse_' + rtype
kwargs = {rel: clones[parent]}
clone = clones.get(child)
if clone is not None:
clone.cw_set(**kwargs)
else:
clones[child] = copy_entity(child, **kwargs)
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class IContained(EntityAdapter):
"""Abstract adapter for entities which are part of a container.
This default implementation is purely computational and doesn't rely on any additional data in
the model, beside a relation to go up to the direct parent.
"""
__abstract__ = True
__regid__ = 'IContained'
_classes = {}
parent_relations = None # set this to a set of (rtype, role) in concret classes
@classmethod
def register_class(cls, vreg, etype, parent_relations):
contained_cls = cls.build_class(etype, parent_relations)
if contained_cls is not None:
vreg.register(contained_cls)
@classmethod
def build_class(cls, etype, parent_relations):
# check given parent relations
for parent_relation in parent_relations:
assert (isinstance(parent_relation, tuple)
and len(parent_relation) == 2
and parent_relation[-1] in ('subject', 'object')), parent_relation
# use already created class if any
if etype in cls._classes:
contained_cls = cls._classes[etype]
# ensure registered class is the same at the one that would be generated
assert contained_cls.__bases__ == (cls,)
contained_cls.parent_relations |= parent_relations
return None
# else generate one
else:
selector = is_instance(etype)
contained_cls = type(etype + 'IContained', (cls,),
{'__select__': selector,
'parent_relations': parent_relations})
cls._classes[etype] = contained_cls
return contained_cls
@property
def container(self):
"""Return the container to which this entity belongs, or None."""
parent_relation = self.parent_relation()
if parent_relation is None:
# not yet attached to a parent
return None
parent = self.entity.related(*parent_relation, entities=True)[0]
if parent.cw_adapt_to('IContainer'):
return parent
return parent.cw_adapt_to('IContained').container
@property
def parent(self):
"""Returns the direct parent entity if any, else None (not yet linked to our parent).
"""
parent_relation = self.parent_relation()
if parent_relation is None:
return None # not yet linked to our parent
parent_rset = self.entity.related(*parent_relation)
if parent_rset:
return parent_rset.one()
return None
def parent_relation(self):
"""Return the relation used to attach this entity to its parent, or None if no parent is set
yet.
"""
for parent_relation in self.parent_relations:
parents = self.entity.related(*parent_relation)
assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
if parents:
return parent_relation
return None

Denis Laxalde
committed
@skip_meta
def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return the container structure with `etype` as root entity.
This structure is a dictionary with entity types as keys and `(relation
type, role)` as values. These entity types are reachable through composite
relations from the root <etype>. Each key gives the name of an entity
type, associated to a list of relation/role allowing to access to its
parent (which may be the container or a contained).
"""
return _relatives_def(schema, etype, skiprtypes=skiprtypes,
skipetypes=skipetypes, fetch_parents=True)

Denis Laxalde
committed
@skip_meta
def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return a dictionary {etype: [(relation type, role)]} which are
reachable through composite relations from the root <etype>. Each key
gives the name of an entity type, associated to a list of relation/role
allowing to access to its children (which is expected to be a contained).
"""
return _relatives_def(schema, etype, skiprtypes=skiprtypes,
skipetypes=skipetypes, fetch_parents=False)

Denis Laxalde
committed
def _relatives_def(schema, etype, skiprtypes, skipetypes, fetch_parents=True):
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
"""Return a dictionary {etype: [(relation type, role)]} which are
reachable through composite relations from the root <etype>. Each key
gives the name of an entity type, associated to a list of relation/role
allowing to access to its relatives: its parent if fetch_parents is true,
or its children if fetch_parents is false.
"""
contained = {}
if etype not in schema:
# Would occur, e.g., during migration.
warn('%s not found in schema, cannot build a container' % etype,
RuntimeWarning)
return {}
candidates = [schema[etype]]
while candidates:
eschema = candidates.pop()
if not fetch_parents:
assert eschema not in contained, (eschema.type, contained)
contained[eschema.type] = set()
for rschema, teschemas, role in eschema.relation_definitions():
if rschema.meta or rschema in skiprtypes:
continue
target_role = neg_role(role)
for rdef in rschema.rdefs.itervalues():
if rdef.composite != role:
continue
target = getattr(rdef, target_role)
if target in skipetypes:
continue
if target not in contained:
candidates.append(target)
if fetch_parents:
assert target != etype, 'cycle to the container'
contained.setdefault(target.type, set()).add((rschema.type, target_role))
else:
contained[eschema.type].add((rschema.type, role))
if not fetch_parents:
candidates = list(set(candidates))
return contained
class IContainedToITree(ITreeAdapter):
"""Map IContained adapters to ITree, additionaly configured with a list of relations leading to
contained's children
"""
children_relations = None # list of (relation type, role) to get entity's children
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
@property
def tree_relation(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[0]
@property
def child_role(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[1]
def children_rql(self):
# XXX may have different shapes
return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
for rel, role in self.children_relations)
def _children(self, entities=True):
if entities:
res = []
else:
res = ResultSet([], '')
res.req = self._cw
for rel, role in self.children_relations:
res += self.entity.related(rel, role, entities=entities)
return res
def different_type_children(self, entities=True):
"""Return children entities of different type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype != etype]
return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)
def same_type_children(self, entities=True):
"""Return children entities of the same type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype == etype]
return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)
def children(self, entities=True, sametype=False):
"""Return children entities.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
if sametype:
return self.same_type_children(entities)
else:
return self._children(entities)
class IContainerToITree(IContainedToITree):
"""Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
parent as None
"""
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
def parent(self):
return None