Commit 88bb8174 authored by Aurelien Campeas's avatar Aurelien Campeas
Browse files

[connection] remove ensure_cnx_set context manager uses

It has been deprecated in a previous changeset and is now a no-op.

Related to #2919309.
parent 8b35a898b334
......@@ -259,12 +259,11 @@ class BaseQuerierTC(TestCase):
def qexecute(self, rql, args=None, build_descr=True):
with self.session.new_cnx() as cnx:
with cnx.ensure_cnx_set:
try:
return self.o.execute(cnx, rql, args, build_descr)
finally:
if rql.startswith(('INSERT', 'DELETE', 'SET')):
cnx.commit()
try:
return self.o.execute(cnx, rql, args, build_descr)
finally:
if rql.startswith(('INSERT', 'DELETE', 'SET')):
cnx.commit()
class BasePlannerTC(BaseQuerierTC):
......
......@@ -38,12 +38,11 @@ class SchemaModificationHooksTC(CubicWebTC):
def index_exists(self, cnx, etype, attr, unique=False):
dbhelper = self.repo.system_source.dbhelper
with cnx.ensure_cnx_set:
sqlcursor = cnx.cnxset.cu
return dbhelper.index_exists(sqlcursor,
SQL_PREFIX + etype,
SQL_PREFIX + attr,
unique=unique)
sqlcursor = cnx.cnxset.cu
return dbhelper.index_exists(sqlcursor,
SQL_PREFIX + etype,
SQL_PREFIX + attr,
unique=unique)
def _set_perms(self, cnx, eid):
cnx.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup',
......
......@@ -88,11 +88,10 @@ def reindex_entities(schema, cnx, withpb=True, etypes=None):
# to be updated due to the reindexation
repo = cnx.repo
dbhelper = repo.system_source.dbhelper
with cnx.ensure_cnx_set:
cursor = cnx.cnxset.cu
if not dbhelper.has_fti_table(cursor):
print 'no text index table'
dbhelper.init_fti(cursor)
cursor = cnx.cnxset.cu
if not dbhelper.has_fti_table(cursor):
print 'no text index table'
dbhelper.init_fti(cursor)
repo.system_source.do_fti = True # ensure full-text indexation is activated
if etypes is None:
print 'Reindexing entities'
......@@ -400,8 +399,7 @@ def check(repo, cnx, checks, reindex, fix, withpb=True):
with cnx.security_enabled(read=False, write=False): # ensure no read security
for check in checks:
check_func = globals()['check_%s' % check]
with cnx.ensure_cnx_set:
check_func(repo.schema, cnx, eids_cache, fix=fix)
check_func(repo.schema, cnx, eids_cache, fix=fix)
if fix:
cnx.commit()
else:
......@@ -410,6 +408,5 @@ def check(repo, cnx, checks, reindex, fix, withpb=True):
print 'WARNING: Diagnostic run, nothing has been corrected'
if reindex:
cnx.rollback()
with cnx.ensure_cnx_set:
reindex_entities(repo.schema, cnx, withpb=withpb)
reindex_entities(repo.schema, cnx, withpb=withpb)
cnx.commit()
......@@ -178,15 +178,14 @@ class ServerMigrationHelper(MigrationHelper):
super(ServerMigrationHelper, self).migrate(vcconf, toupgrade, options)
def cmd_process_script(self, migrscript, funcname=None, *args, **kwargs):
with self.cnx.ensure_cnx_set:
try:
return super(ServerMigrationHelper, self).cmd_process_script(
migrscript, funcname, *args, **kwargs)
except ExecutionError as err:
sys.stderr.write("-> %s\n" % err)
except BaseException:
self.rollback()
raise
try:
return super(ServerMigrationHelper, self).cmd_process_script(
migrscript, funcname, *args, **kwargs)
except ExecutionError as err:
sys.stderr.write("-> %s\n" % err)
except BaseException:
self.rollback()
raise
# Adjust docstring
cmd_process_script.__doc__ = MigrationHelper.cmd_process_script.__doc__
......
......@@ -448,8 +448,7 @@ class Repository(object):
for source in self.sources_by_uri.itervalues():
if self.config.source_enabled(source) and source.support_entity('CWUser'):
try:
with cnx.ensure_cnx_set:
return source.authenticate(cnx, login, **authinfo)
return source.authenticate(cnx, login, **authinfo)
except AuthenticationError:
continue
else:
......@@ -468,19 +467,18 @@ class Repository(object):
def _build_user(self, cnx, eid):
"""return a CWUser entity for user with the given eid"""
with cnx.ensure_cnx_set:
cls = self.vreg['etypes'].etype_class('CWUser')
st = cls.fetch_rqlst(cnx.user, ordermethod=None)
st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
rset = cnx.execute(st.as_string(), {'x': eid})
assert len(rset) == 1, rset
cwuser = rset.get_entity(0, 0)
# pylint: disable=W0104
# prefetch / cache cwuser's groups and properties. This is especially
# useful for internal sessions to avoid security insertions
cwuser.groups
cwuser.properties
return cwuser
cls = self.vreg['etypes'].etype_class('CWUser')
st = cls.fetch_rqlst(cnx.user, ordermethod=None)
st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
rset = cnx.execute(st.as_string(), {'x': eid})
assert len(rset) == 1, rset
cwuser = rset.get_entity(0, 0)
# pylint: disable=W0104
# prefetch / cache cwuser's groups and properties. This is especially
# useful for internal sessions to avoid security insertions
cwuser.groups
cwuser.properties
return cwuser
# public (dbapi) interface ################################################
......@@ -719,8 +717,7 @@ class Repository(object):
with Session(InternalManager(), self) as session:
with session.new_cnx() as cnx:
with cnx.security_enabled(read=False, write=False):
with cnx.ensure_cnx_set:
yield cnx
yield cnx
def _get_session(self, sessionid, txid=None, checkshuttingdown=True):
"""return the session associated with the given session identifier"""
......@@ -812,8 +809,7 @@ class Repository(object):
return self._extid_cache[extid]
except KeyError:
pass
with cnx.ensure_cnx_set:
eid = self.system_source.extid2eid(cnx, extid)
eid = self.system_source.extid2eid(cnx, extid)
if eid is not None:
self._extid_cache[extid] = eid
self._type_source_cache[eid] = (etype, extid, source.uri)
......@@ -821,37 +817,35 @@ class Repository(object):
if not insert:
return
# no link between extid and eid, create one
with cnx.ensure_cnx_set:
# write query, ensure connection's mode is 'write' so connections
# won't be released until commit/rollback
cnx.mode = 'write'
try:
eid = self.system_source.create_eid(cnx)
self._extid_cache[extid] = eid
self._type_source_cache[eid] = (etype, extid, source.uri)
entity = source.before_entity_insertion(
cnx, extid, etype, eid, sourceparams)
if source.should_call_hooks:
# get back a copy of operation for later restore if
# necessary, see below
pending_operations = cnx.pending_operations[:]
self.hm.call_hooks('before_add_entity', cnx, entity=entity)
self.add_info(cnx, entity, source, extid)
source.after_entity_insertion(cnx, extid, entity, sourceparams)
# write query, ensure connection's mode is 'write' so connections
# won't be released until commit/rollback
try:
eid = self.system_source.create_eid(cnx)
self._extid_cache[extid] = eid
self._type_source_cache[eid] = (etype, extid, source.uri)
entity = source.before_entity_insertion(
cnx, extid, etype, eid, sourceparams)
if source.should_call_hooks:
# get back a copy of operation for later restore if
# necessary, see below
pending_operations = cnx.pending_operations[:]
self.hm.call_hooks('before_add_entity', cnx, entity=entity)
self.add_info(cnx, entity, source, extid)
source.after_entity_insertion(cnx, extid, entity, sourceparams)
if source.should_call_hooks:
self.hm.call_hooks('after_add_entity', cnx, entity=entity)
return eid
except Exception:
# XXX do some cleanup manually so that the transaction has a
# chance to be commited, with simply this entity discarded
self._extid_cache.pop(extid, None)
self._type_source_cache.pop(eid, None)
if 'entity' in locals():
hook.CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(entity.eid)
self.system_source.delete_info_multi(cnx, [entity])
if source.should_call_hooks:
self.hm.call_hooks('after_add_entity', cnx, entity=entity)
return eid
except Exception:
# XXX do some cleanup manually so that the transaction has a
# chance to be commited, with simply this entity discarded
self._extid_cache.pop(extid, None)
self._type_source_cache.pop(eid, None)
if 'entity' in locals():
hook.CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(entity.eid)
self.system_source.delete_info_multi(cnx, [entity])
if source.should_call_hooks:
cnx.pending_operations = pending_operations
raise
cnx.pending_operations = pending_operations
raise
def add_info(self, cnx, entity, source, extid=None):
"""add type and source info for an eid into the system table,
......
......@@ -108,14 +108,13 @@ def deserialize_schema(schema, cnx):
has_computed_attributes = False
# XXX bw compat (3.6 migration)
with cnx.ensure_cnx_set:
sqlcu = cnx.system_sql("SELECT * FROM cw_CWRType WHERE cw_name='symetric'")
if sqlcu.fetchall():
sql = dbhelper.sql_rename_col('cw_CWRType', 'cw_symetric', 'cw_symmetric',
dbhelper.TYPE_MAPPING['Boolean'], True)
sqlcu.execute(sql)
sqlcu.execute("UPDATE cw_CWRType SET cw_name='symmetric' WHERE cw_name='symetric'")
cnx.commit(False)
sqlcu = cnx.system_sql("SELECT * FROM cw_CWRType WHERE cw_name='symetric'")
if sqlcu.fetchall():
sql = dbhelper.sql_rename_col('cw_CWRType', 'cw_symetric', 'cw_symmetric',
dbhelper.TYPE_MAPPING['Boolean'], True)
sqlcu.execute(sql)
sqlcu.execute("UPDATE cw_CWRType SET cw_name='symmetric' WHERE cw_name='symetric'")
cnx.commit()
ertidx = {}
copiedeids = set()
permsidx = deserialize_ertype_permissions(cnx)
......
......@@ -1019,10 +1019,9 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
restr.update(tearestr)
# we want results ordered by transaction's time descendant
sql += ' ORDER BY tx_time DESC'
with cnx.ensure_cnx_set:
cu = self.doexec(cnx, sql, restr)
# turn results into transaction objects
return [tx.Transaction(cnx, *args) for args in cu.fetchall()]
cu = self.doexec(cnx, sql, restr)
# turn results into transaction objects
return [tx.Transaction(cnx, *args) for args in cu.fetchall()]
def tx_info(self, cnx, txuuid):
"""See :class:`cubicweb.repoapi.Connection.transaction_info`"""
......@@ -1119,19 +1118,18 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
raise `NoSuchTransaction` if there is no such transaction of if the
connection's user isn't allowed to see it.
"""
with cnx.ensure_cnx_set:
restr = {'tx_uuid': txuuid}
sql = self.sqlgen.select('transactions', restr,
('tx_time', 'tx_user'))
cu = self.doexec(cnx, sql, restr)
try:
time, ueid = cu.fetchone()
except TypeError:
raise tx.NoSuchTransaction(txuuid)
if not (cnx.user.is_in_group('managers')
or cnx.user.eid == ueid):
raise tx.NoSuchTransaction(txuuid)
return time, ueid
restr = {'tx_uuid': txuuid}
sql = self.sqlgen.select('transactions', restr,
('tx_time', 'tx_user'))
cu = self.doexec(cnx, sql, restr)
try:
time, ueid = cu.fetchone()
except TypeError:
raise tx.NoSuchTransaction(txuuid)
if not (cnx.user.is_in_group('managers')
or cnx.user.eid == ueid):
raise tx.NoSuchTransaction(txuuid)
return time, ueid
def _reedit_entity(self, entity, changes, err):
cnx = entity._cw
......
......@@ -1172,11 +1172,10 @@ Any P1,B,E WHERE P1 identity P2 WITH
def test_delete_3(self):
s = self.user_groups_session('users')
with s.new_cnx() as cnx:
with cnx.ensure_cnx_set:
peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0]
seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0]
self.o.execute(cnx, "SET P travaille S")
cnx.commit()
peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0]
seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0]
self.o.execute(cnx, "SET P travaille S")
cnx.commit()
rset = self.qexecute('Personne P WHERE P travaille S')
self.assertEqual(len(rset.rows), 1)
self.qexecute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid))
......@@ -1211,12 +1210,11 @@ Any P1,B,E WHERE P1 identity P2 WITH
'X sender Y, X recipients Y WHERE Y is EmailAddress')[0]
self.qexecute("DELETE Email X")
with self.session.new_cnx() as cnx:
with cnx.ensure_cnx_set:
sqlc = cnx.cnxset.cu
sqlc.execute('SELECT * FROM recipients_relation')
self.assertEqual(len(sqlc.fetchall()), 0)
sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
self.assertEqual(len(sqlc.fetchall()), 0)
sqlc = cnx.cnxset.cu
sqlc.execute('SELECT * FROM recipients_relation')
self.assertEqual(len(sqlc.fetchall()), 0)
sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
self.assertEqual(len(sqlc.fetchall()), 0)
def test_nonregr_delete_cache2(self):
eid = self.qexecute("INSERT Folder T: T name 'toto'")[0][0]
......@@ -1363,12 +1361,11 @@ Any P1,B,E WHERE P1 identity P2 WITH
self.assertRaises(Unauthorized,
self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
with self.session.new_cnx() as cnx:
with cnx.ensure_cnx_set:
cursor = cnx.cnxset.cu
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = str(cursor.fetchone()[0])
self.assertEqual(passwd, crypt_password('toto', passwd))
cursor = cnx.cnxset.cu
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = str(cursor.fetchone()[0])
self.assertEqual(passwd, crypt_password('toto', passwd))
rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
{'pwd': Binary(passwd)})
self.assertEqual(len(rset.rows), 1)
......@@ -1376,21 +1373,20 @@ Any P1,B,E WHERE P1 identity P2 WITH
def test_update_upassword(self):
with self.session.new_cnx() as cnx:
with cnx.ensure_cnx_set:
rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s",
{'pwd': 'toto'})
self.assertEqual(rset.description[0][0], 'CWUser')
rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
{'pwd': 'tutu'})
cursor = cnx.cnxset.cu
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = str(cursor.fetchone()[0])
self.assertEqual(passwd, crypt_password('tutu', passwd))
rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
{'pwd': Binary(passwd)})
self.assertEqual(len(rset.rows), 1)
self.assertEqual(rset.description, [('CWUser',)])
rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s",
{'pwd': 'toto'})
self.assertEqual(rset.description[0][0], 'CWUser')
rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
{'pwd': 'tutu'})
cursor = cnx.cnxset.cu
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = str(cursor.fetchone()[0])
self.assertEqual(passwd, crypt_password('tutu', passwd))
rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
{'pwd': Binary(passwd)})
self.assertEqual(len(rset.rows), 1)
self.assertEqual(rset.description, [('CWUser',)])
# ZT datetime tests ########################################################
......
......@@ -203,10 +203,9 @@ class RepositoryTC(CubicWebTC):
cnxid = repo.connect(self.admlogin, password=self.admpassword)
session = repo._get_session(cnxid)
with session.new_cnx() as cnx:
with cnx.ensure_cnx_set:
self.assertEqual(repo.type_and_source_from_eid(2, cnx),
('CWGroup', None, 'system'))
self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup')
self.assertEqual(repo.type_and_source_from_eid(2, cnx),
('CWGroup', None, 'system'))
self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup')
repo.close(cnxid)
def test_public_api(self):
......@@ -385,38 +384,37 @@ class SchemaDeserialTC(CubicWebTC):
namecol = SQL_PREFIX + 'name'
finalcol = SQL_PREFIX + 'final'
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
cu = cnx.system_sql('SELECT %s FROM %s WHERE %s is NULL'
% (namecol, table, finalcol))
self.assertEqual(cu.fetchall(), [])
cu = cnx.system_sql('SELECT %s FROM %s '
'WHERE %s=%%(final)s ORDER BY %s'
% (namecol, table, finalcol, namecol),
{'final': True})
self.assertEqual(cu.fetchall(),
[(u'BabarTestType',),
(u'BigInt',), (u'Boolean',), (u'Bytes',),
(u'Date',), (u'Datetime',),
(u'Decimal',),(u'Float',),
(u'Int',),
(u'Interval',), (u'Password',),
(u'String',),
(u'TZDatetime',), (u'TZTime',), (u'Time',)])
sql = ("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to "
"FROM cw_CWUniqueTogetherConstraint as cstr, "
" relations_relation as rel, "
" cw_CWEType as etype "
"WHERE cstr.cw_eid = rel.eid_from "
" AND cstr.cw_constraint_of = etype.cw_eid "
" AND etype.cw_name = 'Personne' "
";")
cu = cnx.system_sql(sql)
rows = cu.fetchall()
self.assertEqual(len(rows), 3)
person = self.repo.schema.eschema('Personne')
self.assertEqual(len(person._unique_together), 1)
self.assertItemsEqual(person._unique_together[0],
('nom', 'prenom', 'inline2'))
cu = cnx.system_sql('SELECT %s FROM %s WHERE %s is NULL'
% (namecol, table, finalcol))
self.assertEqual(cu.fetchall(), [])
cu = cnx.system_sql('SELECT %s FROM %s '
'WHERE %s=%%(final)s ORDER BY %s'
% (namecol, table, finalcol, namecol),
{'final': True})
self.assertEqual(cu.fetchall(),
[(u'BabarTestType',),
(u'BigInt',), (u'Boolean',), (u'Bytes',),
(u'Date',), (u'Datetime',),
(u'Decimal',),(u'Float',),
(u'Int',),
(u'Interval',), (u'Password',),
(u'String',),
(u'TZDatetime',), (u'TZTime',), (u'Time',)])
sql = ("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to "
"FROM cw_CWUniqueTogetherConstraint as cstr, "
" relations_relation as rel, "
" cw_CWEType as etype "
"WHERE cstr.cw_eid = rel.eid_from "
" AND cstr.cw_constraint_of = etype.cw_eid "
" AND etype.cw_name = 'Personne' "
";")
cu = cnx.system_sql(sql)
rows = cu.fetchall()
self.assertEqual(len(rows), 3)
person = self.repo.schema.eschema('Personne')
self.assertEqual(len(person._unique_together), 1)
self.assertItemsEqual(person._unique_together[0],
('nom', 'prenom', 'inline2'))
finally:
self.repo.set_schema(origshema)
......@@ -439,30 +437,27 @@ class DataHelpersTC(CubicWebTC):
def test_type_from_eid(self):
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
self.assertEqual(self.repo.type_from_eid(2, cnx), 'CWGroup')
self.assertEqual(self.repo.type_from_eid(2, cnx), 'CWGroup')
def test_type_from_eid_raise(self):
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, cnx)
self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, cnx)
def test_add_delete_info(self):
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
cnx.mode = 'write'
entity = self.repo.vreg['etypes'].etype_class('Personne')(cnx)
entity.eid = -1
entity.complete = lambda x: None
self.repo.add_info(cnx, entity, self.repo.system_source)
cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1')
data = cu.fetchall()
self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None)])
self.repo._delete_cascade_multi(cnx, [entity])
self.repo.system_source.delete_info_multi(cnx, [entity])
cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1')
data = cu.fetchall()
self.assertEqual(data, [])
cnx.mode = 'write'
entity = self.repo.vreg['etypes'].etype_class('Personne')(cnx)
entity.eid = -1
entity.complete = lambda x: None
self.repo.add_info(cnx, entity, self.repo.system_source)
cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1')
data = cu.fetchall()
self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None)])
self.repo._delete_cascade_multi(cnx, [entity])
self.repo.system_source.delete_info_multi(cnx, [entity])
cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1')
data = cu.fetchall()
self.assertEqual(data, [])
class FTITC(CubicWebTC):
......
......@@ -214,8 +214,7 @@ class UndoableTransactionTC(CubicWebTC):
self.assertRaises(NoSuchTransaction,
cnx.transaction_info, txuuid)
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
self.check_transaction_deleted(cnx, txuuid)
self.check_transaction_deleted(cnx, txuuid)
# the final test: check we can login with the previously deleted user
with self.new_access('toto').client_cnx():
pass
......@@ -273,18 +272,17 @@ class UndoableTransactionTC(CubicWebTC):
self.assertFalse(cnx.execute('Any X WHERE X eid %(x)s', {'x': p.eid}))
self.assertFalse(cnx.execute('Any X,Y WHERE X fiche Y'))
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
for eid in (p.eid, c.eid):
self.assertFalse(cnx.system_sql(
'SELECT * FROM entities WHERE eid=%s' % eid).fetchall())
self.assertFalse(cnx.system_sql(
'SELECT 1 FROM owned_by_relation WHERE eid_from=%s' % eid).fetchall())
# added by sql in hooks (except when using dataimport)
self.assertFalse(cnx.system_sql(
'SELECT 1 FROM is_relation WHERE eid_from=%s' % eid).fetchall())
self.assertFalse(cnx.system_sql(
'SELECT 1 FROM is_instance_of_relation WHERE eid_from=%s' % eid).fetchall())
self.check_transaction_deleted(cnx, txuuid)
for eid in (p.eid, c.eid):
self.assertFalse(cnx.system_sql(
'SELECT * FROM entities WHERE eid=%s' % eid).fetchall())
self.assertFalse(cnx.system_sql(
'SELECT 1 FROM owned_by_relation WHERE eid_from=%s' % eid).fetchall())
# added by sql in hooks (except when using dataimport)
self.assertFalse(cnx.system_sql(
'SELECT 1 FROM is_relation WHERE eid_from=%s' % eid).fetchall())
self.assertFalse(cnx.system_sql(
'SELECT 1 FROM is_instance_of_relation WHERE eid_from=%s' % eid).fetchall())
self.check_transaction_deleted(cnx, txuuid)
def test_undo_creation_integrity_1(self):
with self.admin_access.client_cnx() as cnx:
......@@ -357,9 +355,8 @@ class UndoableTransactionTC(CubicWebTC):
p.cw_clear_all_caches()
self.assertFalse(p.fiche)
with self.admin_access.repo_cnx() as cnx:
with cnx.ensure_cnx_set:
self.assertIsNone(cnx.system_sql(
'SELECT cw_fiche FROM cw_Personne WHERE cw_eid=%s' % p.eid).fetchall()[0][0])
self.assertIsNone(cnx.system_sql(
'SELECT cw_fiche FROM cw_Personne WHERE cw_eid=%s' % p.eid).fetchall()[0][0])
def test_undo_inline_rel_add_ok(self):
"""Undo add relation Personne (?) fiche (?) Card
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment