Commit c808eb64 authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

[hook/operation] nicer api to achieve same result as set_operation, as described in #1253630

parent 8d7c2fd2ac66
......@@ -31,7 +31,6 @@ from cubicweb.schema import (META_RTYPES, WORKFLOW_RTYPES,
from cubicweb.selectors import is_instance
from cubicweb.uilib import soup2xhtml
from cubicweb.server import hook
from cubicweb.server.hook import set_operation
# special relations that don't have to be checked for integrity, usually
# because they are handled internally by hooks (so we trust ourselves)
......@@ -68,19 +67,21 @@ class _ReleaseUniqueConstraintsOperation(hook.Operation):
class _CheckRequiredRelationOperation(hook.LateOperation):
"""checking relation cardinality has to be done after commit in
case the relation is being replaced
class _CheckRequiredRelationOperation(hook.DataOperationMixIn,
"""checking relation cardinality has to be done after commit in case the
relation is being replaced
containercls = list
role = key = base_rql = None
def precommit_event(self):
session =self.session
session = self.session
pendingeids = session.transaction_data.get('pendingeids', ())
pendingrtypes = session.transaction_data.get('pendingrtypes', ())
# poping key is not optional: if further operation trigger new deletion
# of relation, we'll need a new operation
for eid, rtype in session.transaction_data.pop(self.key):
for eid, rtype in self.get_data():
# recheck pending eids / relation types
if eid in pendingeids:
......@@ -98,13 +99,11 @@ class _CheckRequiredRelationOperation(hook.LateOperation):
class _CheckSRelationOp(_CheckRequiredRelationOperation):
"""check required subject relation"""
role = 'subject'
key = '_cwisrel'
base_rql = 'Any O WHERE S eid %%(x)s, S %s O'
class _CheckORelationOp(_CheckRequiredRelationOperation):
"""check required object relation"""
role = 'object'
key = '_cwiorel'
base_rql = 'Any S WHERE O eid %%(x)s, S %s O'
......@@ -131,11 +130,10 @@ class CheckCardinalityHook(IntegrityHook):
rdef = rschema.role_rdef(eschema, targetschemas[0], role)
if rdef.role_cardinality(role) in '1+':
if role == 'subject':
set_operation(self._cw, '_cwisrel', (eid, rschema.type),
_CheckSRelationOp, list)
op = _CheckSRelationOp.get_instance(self._cw)
set_operation(self._cw, '_cwiorel', (eid, rschema.type),
_CheckORelationOp, list)
op = _CheckORelationOp.get_instance(self._cw)
op.add_data((eid, rschema.type))
def before_delete_relation(self):
rtype = self.rtype
......@@ -148,19 +146,17 @@ class CheckCardinalityHook(IntegrityHook):
card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality')
if card[0] in '1+' and not session.deleted_in_transaction(eidfrom):
set_operation(self._cw, '_cwisrel', (eidfrom, rtype),
_CheckSRelationOp, list)
_CheckSRelationOp.get_instance(self._cw).add_data((eidfrom, rtype))
if card[1] in '1+' and not session.deleted_in_transaction(eidto):
set_operation(self._cw, '_cwiorel', (eidto, rtype),
_CheckORelationOp, list)
_CheckORelationOp.get_instance(self._cw).add_data((eidto, rtype))
class _CheckConstraintsOp(hook.LateOperation):
class _CheckConstraintsOp(hook.DataOperationMixIn, hook.LateOperation):
""" check a new relation satisfy its constraints """
containercls = list
def precommit_event(self):
session = self.session
for values in session.transaction_data.pop('check_constraints_op'):
for values in self.get_data():
eidfrom, rtype, eidto, constraints = values
# first check related entities have not been deleted in the same
# transaction
......@@ -196,9 +192,8 @@ class CheckConstraintHook(IntegrityHook):
constraints = self._cw.schema_rproperty(self.rtype, self.eidfrom, self.eidto,
if constraints:
hook.set_operation(self._cw, 'check_constraints_op',
(self.eidfrom, self.rtype, self.eidto, tuple(constraints)),
_CheckConstraintsOp, list)
(self.eidfrom, self.rtype, self.eidto, constraints))
class CheckAttributeConstraintHook(IntegrityHook):
......@@ -217,9 +212,8 @@ class CheckAttributeConstraintHook(IntegrityHook):
constraints = [c for c in eschema.rdef(attr).constraints
if isinstance(c, (RQLUniqueConstraint, RQLConstraint))]
if constraints:
hook.set_operation(self._cw, 'check_constraints_op',
(self.entity.eid, attr, None, tuple(constraints)),
_CheckConstraintsOp, list)
(self.entity.eid, attr, None, constraints))
class CheckUniqueHook(IntegrityHook):
......@@ -297,11 +291,11 @@ class StripCWUserLoginHook(IntegrityHook):
# 'active' integrity hooks: you usually don't want to deactivate them, they are
# not really integrity check, they maintain consistency on changes
class _DelayedDeleteOp(hook.Operation):
class _DelayedDeleteOp(hook.DataOperationMixIn, hook.Operation):
"""delete the object of composite relation except if the relation has
actually been redirected to another composite
key = base_rql = None
base_rql = None
def precommit_event(self):
session = self.session
......@@ -309,7 +303,7 @@ class _DelayedDeleteOp(hook.Operation):
neweids = session.transaction_data.get('neweids', ())
# poping key is not optional: if further operation trigger new deletion
# of composite relation, we'll need a new operation
for eid, rtype in session.transaction_data.pop(self.key):
for eid, rtype in self.get_data():
# don't do anything if the entity is being created or deleted
if not (eid in pendingeids or eid in neweids):
etype = session.describe(eid)[0]
......@@ -317,12 +311,10 @@ class _DelayedDeleteOp(hook.Operation):
class _DelayedDeleteSEntityOp(_DelayedDeleteOp):
"""delete orphan subject entity of a composite relation"""
key = '_cwiscomp'
base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT X %s Y'
class _DelayedDeleteOEntityOp(_DelayedDeleteOp):
"""check required object relation"""
key = '_cwiocomp'
base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT Y %s X'
......@@ -343,8 +335,8 @@ class DeleteCompositeOrphanHook(hook.Hook):
composite = self._cw.schema_rproperty(self.rtype, self.eidfrom, self.eidto,
if composite == 'subject':
set_operation(self._cw, '_cwiocomp', (self.eidto, self.rtype),
(self.eidto, self.rtype))
elif composite == 'object':
set_operation(self._cw, '_cwiscomp', (self.eidfrom, self.rtype),
(self.eidfrom, self.rtype))
......@@ -64,11 +64,11 @@ class UpdateMetaAttrsHook(MetaDataHook):
class _SetCreatorOp(hook.Operation):
class SetCreatorOp(hook.DataOperationMixIn, hook.Operation):
def precommit_event(self):
session = self.session
for eid in session.transaction_data.pop('set_creator_op'):
for eid in self.get_data():
if session.deleted_in_transaction(eid):
# entity have been created and deleted in the same transaction
......@@ -109,11 +109,12 @@ class SetOwnershipHook(MetaDataHook):
def __call__(self):
if not self._cw.is_internal_session:
self._cw.add_relation(self.entity.eid, 'owned_by', self._cw.user.eid)
hook.set_operation(self._cw, 'set_creator_op', self.entity.eid, _SetCreatorOp)
class _SyncOwnersOp(hook.Operation):
class SyncOwnersOp(hook.DataOperationMixIn, hook.Operation):
def precommit_event(self):
for compositeeid, composedeid in self.session.transaction_data.pop('sync_owners_op'):
for compositeeid, composedeid in self.get_data():
self.session.execute('SET X owned_by U WHERE C owned_by U, C eid %(c)s,'
'NOT EXISTS(X owned_by U, X eid %(x)s)',
{'c': compositeeid, 'x': composedeid})
......@@ -133,9 +134,9 @@ class SyncCompositeOwner(MetaDataHook):
eidfrom, eidto = self.eidfrom, self.eidto
composite = self._cw.schema_rproperty(self.rtype, eidfrom, eidto, 'composite')
if composite == 'subject':
hook.set_operation(self._cw, 'sync_owners_op', (eidfrom, eidto), _SyncOwnersOp)
SyncOwnersOp.get_instance(self._cw).add_data( (eidfrom, eidto) )
elif composite == 'object':
hook.set_operation(self._cw, 'sync_owners_op', (eidto, eidfrom), _SyncOwnersOp)
SyncOwnersOp.get_instance(self._cw).add_data( (eidto, eidfrom) )
class FixUserOwnershipHook(MetaDataHook):
......@@ -45,23 +45,20 @@ def check_entity_attributes(session, entity, editedattrs=None, creation=False):
rdef.check_perm(session, 'update', eid=eid)
class _CheckEntityPermissionOp(hook.LateOperation):
class CheckEntityPermissionOp(hook.DataOperationMixIn, hook.LateOperation):
def precommit_event(self):
#print 'CheckEntityPermissionOp', self.session.user, self.entity, self.action
session = self.session
for values in session.transaction_data.pop('check_entity_perm_op'):
eid, action, edited = values
for eid, action, edited in self.get_data():
entity = session.entity_from_eid(eid)
check_entity_attributes(session, entity, edited,
creation=(action == 'add'))
class _CheckRelationPermissionOp(hook.LateOperation):
class CheckRelationPermissionOp(hook.DataOperationMixIn, hook.LateOperation):
def precommit_event(self):
session = self.session
for args in session.transaction_data.pop('check_relation_perm_op'):
action, rschema, eidfrom, eidto = args
for action, rschema, eidfrom, eidto in self.get_data():
rdef = rschema.rdef(session.describe(eidfrom)[0],
rdef.check_perm(session, action, fromeid=eidfrom, toeid=eidto)
......@@ -85,9 +82,8 @@ class AfterAddEntitySecurityHook(SecurityHook):
events = ('after_add_entity',)
def __call__(self):
hook.set_operation(self._cw, 'check_entity_perm_op',
(self.entity.eid, 'add', self.entity.cw_edited),
_CheckEntityPermissionOp, creation=True)
(self.entity.eid, 'add', self.entity.cw_edited) )
class AfterUpdateEntitySecurityHook(SecurityHook):
......@@ -104,9 +100,8 @@ class AfterUpdateEntitySecurityHook(SecurityHook):
# save back editedattrs in case the entity is reedited later in the
# same transaction, which will lead to cw_edited being
# overwritten
hook.set_operation(self._cw, 'check_entity_perm_op',
(self.entity.eid, 'update', self.entity.cw_edited),
_CheckEntityPermissionOp, creation=False)
(self.entity.eid, 'update', self.entity.cw_edited) )
class BeforeDelEntitySecurityHook(SecurityHook):
......@@ -143,9 +138,8 @@ class AfterAddRelationSecurityHook(SecurityHook):
rschema = self._cw.repo.schema[self.rtype]
if self.rtype in ON_COMMIT_ADD_RELATIONS:
hook.set_operation(self._cw, 'check_relation_perm_op',
('add', rschema, self.eidfrom, self.eidto),
('add', rschema, self.eidfrom, self.eidto) )
rdef = rschema.rdef(self._cw.describe(self.eidfrom)[0],
......@@ -307,11 +307,10 @@ class CWRTypeUpdateOp(MemSchemaOperation):
return # watched changes to final relation type are unexpected
session = self.session
if 'fulltext_container' in self.values:
op = UpdateFTIndexOp.get_instance(session)
for subjtype, objtype in rschema.rdefs:
hook.set_operation(session, 'fti_update_etypes', subjtype,
hook.set_operation(session, 'fti_update_etypes', objtype,
# update the in-memory schema first
self.oldvalues = dict( (attr, getattr(rschema, attr)) for attr in self.values)
......@@ -603,8 +602,7 @@ class RDefUpdateOp(MemSchemaOperation):
syssource.update_rdef_null_allowed(self.session, rdef)
self.null_allowed_changed = True
if 'fulltextindexed' in self.values:
hook.set_operation(session, 'fti_update_etypes', rdef.subject,
def revertprecommit_event(self):
if self.rdef is None:
......@@ -1181,19 +1179,20 @@ class BeforeDelPermissionHook(AfterAddPermissionHook):
class UpdateFTIndexOp(hook.SingleLastOperation):
class UpdateFTIndexOp(hook.DataOperationMixIn, hook.SingleLastOperation):
"""operation to update full text indexation of entity whose schema change
We wait after the commit to as the schema in memory is only updated after the commit.
We wait after the commit to as the schema in memory is only updated after
the commit.
def postcommit_event(self):
session = self.session
source = session.repo.system_source
to_reindex = session.transaction_data.pop('fti_update_etypes', ())
schema = session.repo.vreg.schema
to_reindex = self.get_data()'%i etypes need full text indexed reindexation',
schema = self.session.repo.vreg.schema
for etype in to_reindex:
rset = session.execute('Any X WHERE X is %s' % etype)'Reindexing full text index for %i entity of type %s',
......@@ -1205,8 +1204,8 @@ class UpdateFTIndexOp(hook.SingleLastOperation):
if still_fti or container is not entity:
source.fti_unindex_entity(session, container.eid)
source.fti_index_entity(session, container)
if len(to_reindex):
# Transaction have already been committed
if to_reindex:
# Transaction has already been committed
......@@ -239,9 +239,8 @@ Hooks and operations classes
.. autoclass:: cubicweb.server.hook.Hook
.. autoclass:: cubicweb.server.hook.Operation
.. autoclass:: cubicweb.server.hook.DataOperation
.. autoclass:: cubicweb.server.hook.LateOperation
.. autofunction:: cubicweb.server.hook.set_operation
from __future__ import with_statement
......@@ -726,6 +725,99 @@ set_log_methods(Operation, getLogger('cubicweb.session'))
def _container_add(container, value):
{set: set.add, list: list.append}[container.__class__](container, value)
class DataOperationMixIn(object):
"""Mix-in class to ease applying a single operation on a set of data,
avoiding to create as many as operation as they are individual modification.
The body of the operation must then iterate over the values that have been
stored in a single operation instance.
You should try to use this instead of creating on operation for each
`value`, since handling operations becomes costly on massive data import.
Usage looks like:
.. sourcecode:: python
class MyEntityHook(Hook):
__regid__ = 'my.entity.hook'
__select__ = Hook.__select__ & is_instance('MyEntity')
events = ('after_add_entity',)
def __call__(self):
class MyOperation(DataOperation, DataOperationMixIn):
def precommit_event(self):
for bucket in self.get_data():
You can modify the `containercls` class attribute, which defines the
container class that should be instantiated to hold payloads. An instance is
created on instantiation, and then the :meth:`add_data` method will add the
given data to the existing container. Default to a `set`. Give `list` if you
want to keep arrival ordering. You can also use another kind of container
by redefining :meth:`_build_container` and :meth:`add_data`
More optional parameters can be given to the `get_instance` operation, that
will be given to the operation constructer (though those parameters should
not vary accross different calls to this method for a same operation for
obvious reason).
.. Note::
For sanity reason `get_data` will reset the operation, so that once
the operation has started its treatment, if some hook want to push
additional data to this same operation, a new instance will be created
(else that data has a great chance to be never treated). This implies:
* you should **always** call `get_data` when starting treatment
* you should **never** call `get_data` for another reason.
containercls = set
def data_key(cls):
return ('cw.dataops', cls.__name__)
def get_instance(cls, session, **kwargs):
# no need to lock: transaction_data already comes from thread's local storage
return session.transaction_data[cls.data_key]
except KeyError:
op = session.transaction_data[cls.data_key] = cls(session, **kwargs)
return op
def __init__(self, *args, **kwargs):
super(DataOperationMixIn, self).__init__(*args, **kwargs)
self._container = self._build_container()
self._processed = False
def __contains__(self, value):
return value in self._container
def _build_container(self):
return self.containercls()
def add_data(self, data):
assert not self._processed, """Trying to add data to a closed operation.
Iterating over operation data closed it and should be reserved to precommit /
postcommit method of the operation."""
_container_add(self._container, data)
def get_data(self):
assert not self._processed, """Trying to get data from a closed operation.
Iterating over operation data closed it and should be reserved to precommit /
postcommit method of the operation."""
self._processed = True
op = self.session.transaction_data.pop(self.data_key)
assert op is self, "Bad handling of operation data, found %s instead of %s for key %s" % (
op, self, self.data_key)
return self._container
@deprecated('[3.10] use opcls.get_instance(session, **opkwargs).add_data(value)')
def set_operation(session, datakey, value, opcls, containercls=set, **opkwargs):
"""Function to ease applying a single operation on a set of data, avoiding
to create as many as operation as they are individual modification. You
......@@ -766,9 +858,6 @@ def set_operation(session, datakey, value, opcls, containercls=set, **opkwargs):
**poping** the key from `transaction_data` is not an option, else you may
get unexpected data loss in some case of nested hooks.
# Search for session.transaction_data[`datakey`] (expected to be a set):
# if found, simply append `value`
......@@ -861,7 +950,7 @@ class RQLPrecommitOperation(Operation):
class CleanupNewEidsCacheOp(SingleLastOperation):
class CleanupNewEidsCacheOp(DataOperationMixIn, SingleLastOperation):
"""on rollback of a insert query we have to remove from repository's
type/source cache eids of entities added in that transaction.
......@@ -871,28 +960,27 @@ class CleanupNewEidsCacheOp(SingleLastOperation):
too expensive. Notice that there is no pb when using args to specify eids
instead of giving them into the rql string.
data_key = 'neweids'
def rollback_event(self):
"""the observed connections pool has been rollbacked,
remove inserted eid from repository type/source cache
except KeyError:
class CleanupDeletedEidsCacheOp(SingleLastOperation):
class CleanupDeletedEidsCacheOp(DataOperationMixIn, SingleLastOperation):
"""on commit of delete query, we have to remove from repository's
type/source cache eids of entities deleted in that transaction.
data_key = 'pendingeids'
def postcommit_event(self):
"""the observed connections pool has been rollbacked,
remove inserted eid from repository type/source cache
except KeyError:
......@@ -960,8 +960,7 @@ class ServerMigrationHelper(MigrationHelper):
# get some validation error on commit since integrity hooks
# may think some required relation is missing... This also ensure
# repository caches are properly cleanup
hook.set_operation(session, 'pendingeids', eid,
# and don't forget to remove record from system tables
session, session.entity_from_eid(eid, rdeftype),
......@@ -1031,8 +1031,7 @@ class Repository(object):
and index the entity with the full text index
# begin by inserting eid/type/source/extid into the entities table
hook.set_operation(session, 'neweids', entity.eid,
self.system_source.add_info(session, entity, source, extid, complete)
def delete_info(self, session, entity, sourceuri, extid):
......@@ -1041,8 +1040,7 @@ class Repository(object):
# mark eid as being deleted in session info and setup cache update
# operation
hook.set_operation(session, 'pendingeids', entity.eid,
self._delete_info(session, entity, sourceuri, extid)
def _delete_info(self, session, entity, sourceuri, extid):
......@@ -1220,8 +1220,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
"no more supported" % {'eid': eid, 'etype': etype})]
entity.eid = eid
# for proper eid/type cache update
hook.set_operation(session, 'pendingeids', eid,
CleanupDeletedEidsCacheOp.get_instance(session).add_data(eid)'before_delete_entity', session, entity=entity)
# remove is / is_instance_of which are added using sql by hooks, hence
# unvisible as transaction action
......@@ -1288,7 +1287,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
"""create an operation to [re]index textual content of the given entity
on commit
hook.set_operation(session, 'ftindex', entity.eid, FTIndexEntityOp)
def fti_unindex_entity(self, session, eid):
"""remove text content for entity with the given eid from the full text
......@@ -1313,7 +1312,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
self.exception('error while reindexing %s', entity)
class FTIndexEntityOp(hook.LateOperation):
class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation):
"""operation to delay entity full text indexation to commit
since fti indexing may trigger discovery of other entities, it should be
......@@ -1326,7 +1325,7 @@ class FTIndexEntityOp(hook.LateOperation):
source = session.repo.system_source
pendingeids = session.transaction_data.get('pendingeids', ())
done = session.transaction_data.setdefault('indexedeids', set())
for eid in session.transaction_data.pop('ftindex', ()):
for eid in self.get_data():
if eid in pendingeids or eid in done:
# entity added and deleted in the same transaction or already
# processed
......@@ -136,7 +136,7 @@ class BytesFileSystemStorage(Storage):
# bytes storage used to store file's path
entity.cw_edited.edited_attribute(attr, Binary(fpath))
file(fpath, 'wb').write(binary.getvalue())
hook.set_operation(entity._cw, 'bfss_added', fpath, AddFileOp)
return binary
def entity_updated(self, entity, attr):
......@@ -167,20 +167,19 @@ class BytesFileSystemStorage(Storage):
file(fpath, 'wb').write(binary.getvalue())
# Mark the new file as added during the transaction.
# The file will be removed on rollback
hook.set_operation(entity._cw, 'bfss_added', fpath, AddFileOp)
if oldpath != fpath:
# register the new location for the file.
entity.cw_edited.edited_attribute(attr, Binary(fpath))
# Mark the old file as useless so the file will be removed at
# commit.
hook.set_operation(entity._cw, 'bfss_deleted', oldpath,
return binary
def entity_deleted(self, entity, attr):
"""an entity using this storage for attr has been deleted"""
fpath = self.current_fs_path(entity, attr)
hook.set_operation(entity._cw, 'bfss_deleted', fpath, DeleteFileOp)
def new_fs_path(self, entity, attr):
# We try to get some hint about how to name the file using attribute's
......@@ -223,17 +222,17 @@ class BytesFileSystemStorage(Storage):
entity.cw_edited = None
class AddFileOp(hook.Operation):
class AddFileOp(hook.DataOperationMixIn, hook.Operation):
def rollback_event(self):
for filepath in self.session.transaction_data.pop('bfss_added'):
for filepath in self.get_data():
except Exception, ex:
self.error('cant remove %s: %s' % (filepath, ex))
class DeleteFileOp(hook.Operation):
class DeleteFileOp(hook.DataOperationMixIn, hook.Operation):
def postcommit_event(self):
for filepath in self.session