Newer
Older
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-compound entity's classes
This module essentially provides adapters to handle a structure of composite
entities similarly to the container_ cube. The idea is to provide an API
allowing access to a root entity (the container) from any contained entities.
Assumptions:
* an entity may belong to one and only one container
* an entity may be both a container an contained
.. _container: https://www.cubicweb.org/project/cubicweb-container
"""
from warnings import warn
from cubicweb import neg_role
from cubicweb.rset import ResultSet
from cubicweb.schema import META_RTYPES, WORKFLOW_RTYPES, SYSTEM_RTYPES
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
from cubicweb.entities.adapters import ITreeAdapter
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def _skipped(skiprtypes, skipetypes): # TODO decorator
"""Return `skiprtypes` and `skipetypes` frozensets that include "meta"
objects that should be implicitely ignored.
"""
skiprtypes = frozenset(set(skiprtypes) | META_RTYPES | WORKFLOW_RTYPES |
SYSTEM_RTYPES)
skipetypes = frozenset(skipetypes)
return skiprtypes, skipetypes
def composite_schema_graph(schema, etype, skiprtypes=(), skipetypes=()):
"""Return a dict structure representing the graph of compositely-related
entity types reachable from `etype` root entity type.
"""
def recurse(etype):
return composite_schema_graph(schema, etype, skiprtypes=skiprtypes,
skipetypes=skipetypes)
skiprtypes, skipetypes = _skipped(skiprtypes, skipetypes)
contained = {}
if etype not in schema:
# Would occur, e.g., during migration.
warn('%s not found in schema, cannot build a container' % etype,
RuntimeWarning)
return {}
graph = {}
eschema = schema[etype]
for rschema, teschemas, role in eschema.relation_definitions():
if rschema.meta or rschema in skiprtypes:
continue
target_role = neg_role(role)
for rdef in rschema.rdefs.itervalues():
if rdef.composite != role:
continue
target = getattr(rdef, target_role)
if target in skipetypes:
continue
graph.setdefault((rschema.type, role), {})[target.type] = recurse(target.type)
return graph
def composite_entities_graph(parent):
"""Yield elements from a graph of compositely-related entities reachable
from `parent` entity.
"""
schema_graph = composite_schema_graph(parent._cw.vreg.schema, parent.cw_etype)
for (rtype, role), edges in schema_graph.iteritems():
for child_etype, _ in edges.iteritems(): # XXX useless to build subgraph at this point?
rset = parent.related(rtype, role=role, targettypes=[child_etype])
for child in rset.entities():
yield child, (rtype, neg_role(role)), parent
for x in composite_entities_graph(child):
yield x
def copy_entity(original, **attributes):
"""Return a copy of an entity.
Only attributes and non-composite relations are copied (relying on
`entity.copy_relations()` for the latter).
"""
attrs = attributes.copy()
original.complete()
for rschema in original.e_schema.subject_relations():
if rschema.final and not rschema.meta:
attr = rschema.type
attrs.setdefault(attr, original.cw_attr_value(attr))
clone = original._cw.create_entity(original.cw_etype, **attrs)
clone.copy_relations(original.eid)
return clone
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class IContainer(EntityAdapter):
"""Abstract adapter for entities which are a container root."""
__abstract__ = True
__regid__ = 'IContainer'
@classmethod
def build_class(cls, etype):
selector = is_instance(etype)
return type(etype + 'IContainer', (cls,),
{'__select__': selector})
@property
def container(self):
"""For the sake of consistency with `IContained`, return the container to which this entity
belongs (i.e. the entity wrapped by this adapter).
"""
return self.entity
@property
def parent(self):
"""Returns the direct parent entity : always None in the case of a container.
"""
return None
class IContained(EntityAdapter):
"""Abstract adapter for entities which are part of a container.
This default implementation is purely computational and doesn't rely on any additional data in
the model, beside a relation to go up to the direct parent.
"""
__abstract__ = True
__regid__ = 'IContained'
_classes = {}
parent_relations = None # set this to a set of (rtype, role) in concret classes
@classmethod
def register_class(cls, vreg, etype, parent_relations):
contained_cls = cls.build_class(etype, parent_relations)
if contained_cls is not None:
vreg.register(contained_cls)
@classmethod
def build_class(cls, etype, parent_relations):
# check given parent relations
for parent_relation in parent_relations:
assert (isinstance(parent_relation, tuple)
and len(parent_relation) == 2
and parent_relation[-1] in ('subject', 'object')), parent_relation
# use already created class if any
if etype in cls._classes:
contained_cls = cls._classes[etype]
# ensure registered class is the same at the one that would be generated
assert contained_cls.__bases__ == (cls,)
contained_cls.parent_relations |= parent_relations
return None
# else generate one
else:
selector = is_instance(etype)
contained_cls = type(etype + 'IContained', (cls,),
{'__select__': selector,
'parent_relations': parent_relations})
cls._classes[etype] = contained_cls
return contained_cls
@property
def container(self):
"""Return the container to which this entity belongs, or None."""
parent_relation = self.parent_relation()
if parent_relation is None:
# not yet attached to a parent
return None
parent = self.entity.related(*parent_relation, entities=True)[0]
if parent.cw_adapt_to('IContainer'):
return parent
return parent.cw_adapt_to('IContained').container
@property
def parent(self):
"""Returns the direct parent entity if any, else None (not yet linked to our parent).
"""
parent_relation = self.parent_relation()
if parent_relation is None:
return None # not yet linked to our parent
parent_rset = self.entity.related(*parent_relation)
if parent_rset:
return parent_rset.one()
return None
def parent_relation(self):
"""Return the relation used to attach this entity to its parent, or None if no parent is set
yet.
"""
for parent_relation in self.parent_relations:
parents = self.entity.related(*parent_relation)
assert len(parents) <= 1, 'more than one parent on %s: %s' % (self.entity, parents)
if parents:
return parent_relation
return None
def structure_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return the container structure with `etype` as root entity.
This structure is a dictionary with entity types as keys and `(relation
type, role)` as values. These entity types are reachable through composite
relations from the root <etype>. Each key gives the name of an entity
type, associated to a list of relation/role allowing to access to its
parent (which may be the container or a contained).
"""
return _relatives_def(schema, etype, skiprtypes=skiprtypes,
skipetypes=skipetypes, fetch_parents=True)
def tree_def(schema, etype, skiprtypes=(), skipetypes=()):
"""Return a dictionary {etype: [(relation type, role)]} which are
reachable through composite relations from the root <etype>. Each key
gives the name of an entity type, associated to a list of relation/role
allowing to access to its children (which is expected to be a contained).
"""
return _relatives_def(schema, etype, skiprtypes=skiprtypes,
skipetypes=skipetypes, fetch_parents=False)
def _relatives_def(schema, etype, skiprtypes=(), skipetypes=(), fetch_parents=True):
"""Return a dictionary {etype: [(relation type, role)]} which are
reachable through composite relations from the root <etype>. Each key
gives the name of an entity type, associated to a list of relation/role
allowing to access to its relatives: its parent if fetch_parents is true,
or its children if fetch_parents is false.
"""
skiprtypes, skipetypes = _skipped(skiprtypes, skipetypes)
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
contained = {}
if etype not in schema:
# Would occur, e.g., during migration.
warn('%s not found in schema, cannot build a container' % etype,
RuntimeWarning)
return {}
candidates = [schema[etype]]
while candidates:
eschema = candidates.pop()
if not fetch_parents:
assert eschema not in contained, (eschema.type, contained)
contained[eschema.type] = set()
for rschema, teschemas, role in eschema.relation_definitions():
if rschema.meta or rschema in skiprtypes:
continue
target_role = neg_role(role)
for rdef in rschema.rdefs.itervalues():
if rdef.composite != role:
continue
target = getattr(rdef, target_role)
if target in skipetypes:
continue
if target not in contained:
candidates.append(target)
if fetch_parents:
assert target != etype, 'cycle to the container'
contained.setdefault(target.type, set()).add((rschema.type, target_role))
else:
contained[eschema.type].add((rschema.type, role))
if not fetch_parents:
candidates = list(set(candidates))
return contained
class IContainedToITree(ITreeAdapter):
"""Map IContained adapters to ITree, additionaly configured with a list of relations leading to
contained's children
"""
children_relations = None # list of (relation type, role) to get entity's children
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
@property
def tree_relation(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[0]
@property
def child_role(self):
parent_relation = self.entity.cw_adapt_to('IContained').parent_relation()
return parent_relation[1]
def children_rql(self):
# XXX may have different shapes
return ' UNION '.join('(%s)' % self.entity.cw_related_rql(rel, role)
for rel, role in self.children_relations)
def _children(self, entities=True):
if entities:
res = []
else:
res = ResultSet([], '')
res.req = self._cw
for rel, role in self.children_relations:
res += self.entity.related(rel, role, entities=entities)
return res
def different_type_children(self, entities=True):
"""Return children entities of different type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype != etype]
return res.filtered_rset(lambda x: x.cw_etype != etype, self.entity.cw_col)
def same_type_children(self, entities=True):
"""Return children entities of the same type as this entity.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
res = self._children(entities)
etype = self.entity.cw_etype
if entities:
return [e for e in res if e.cw_etype == etype]
return res.filtered_rset(lambda x: x.cw_etype == etype, self.entity.cw_col)
def children(self, entities=True, sametype=False):
"""Return children entities.
According to the `entities` parameter, return entity objects or the
equivalent result set.
"""
if sametype:
return self.same_type_children(entities)
else:
return self._children(entities)
class IContainerToITree(IContainedToITree):
"""Map IContainer adapters to ITree, similarly to :class:`IContainedToITree` but considering
parent as None
"""
@classmethod
def build_class(cls, etype, children_relations):
selector = is_instance(etype)
return type(etype + 'ITree', (cls,),
{'__select__': selector,
'children_relations': children_relations})
def parent(self):
return None