Commit 6bec2cc2 authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

[repository api] definitly kill usage of word 'pool' to refer to connections set used by a session

Also document session's data storage and some other internals.
Hopefuly things will get clearer.

Closes #1684860: vocabulary confusion in repository code: notion of 'pool'
parent 6a9e66d788b3
......@@ -445,14 +445,14 @@ class RQLObjectStore(ObjectStore):
ObjectStore.__init__(self)
if session is None:
sys.exit('please provide a session of run this script with cubicweb-ctl shell and pass cnx as session')
if not hasattr(session, 'set_pool'):
if not hasattr(session, 'set_cnxset'):
# connection
cnx = session
session = session.request()
session.set_pool = lambda : None
session.set_cnxset = lambda : None
commit = commit or cnx.commit
else:
session.set_pool()
session.set_cnxset()
self.session = session
self._commit = commit or session.commit
......@@ -462,7 +462,7 @@ class RQLObjectStore(ObjectStore):
def commit(self):
txuuid = self._commit()
self.session.set_pool()
self.session.set_cnxset()
return txuuid
def rql(self, *args):
......
......@@ -93,7 +93,7 @@ def turn_repo_off(repo):
""" Idea: this is less costly than a full re-creation of the repo object.
off:
* session are closed,
* pools are closed
* cnxsets are closed
* system source is shutdown
"""
if not repo._needs_refresh:
......@@ -104,8 +104,8 @@ def turn_repo_off(repo):
repo.close(sessionid)
except BadConnectionId: #this is strange ? thread issue ?
print 'XXX unknown session', sessionid
for pool in repo.pools:
pool.close(True)
for cnxset in repo.cnxsets:
cnxset.close(True)
repo.system_source.shutdown()
repo._needs_refresh = True
repo._has_started = False
......@@ -113,12 +113,12 @@ def turn_repo_off(repo):
def turn_repo_on(repo):
"""Idea: this is less costly than a full re-creation of the repo object.
on:
* pools are connected
* cnxsets are connected
* cache are cleared
"""
if repo._needs_refresh:
for pool in repo.pools:
pool.reconnect()
for cnxset in repo.cnxsets:
cnxset.reconnect()
repo._type_source_cache = {}
repo._extid_cache = {}
repo.querier._rql_cache = {}
......@@ -477,12 +477,11 @@ class TestDataBaseHandler(object):
repo = self.get_repo(startup=True)
cnx = self.get_cnx()
session = repo._sessions[cnx.sessionid]
session.set_pool()
session.set_cnxset()
_commit = session.commit
def always_pooled_commit():
_commit()
session.set_pool()
session.commit = always_pooled_commit
def keep_cnxset_commit():
_commit(free_cnxset=False)
session.commit = keep_cnxset_commit
pre_setup_func(session, self.config)
session.commit()
cnx.close()
......
......@@ -146,7 +146,7 @@ class FakeSession(RequestSessionBase):
if vreg is None:
vreg = CubicWebVRegistry(FakeConfig(), initlog=False)
self.vreg = vreg
self.pool = FakePool()
self.cnxset = FakeConnectionsSet()
self.user = user or FakeUser()
self.is_internal_session = False
self.transaction_data = {}
......@@ -210,6 +210,6 @@ class FakeSource(object):
self.uri = uri
class FakePool(object):
class FakeConnectionsSet(object):
def source(self, uri):
return FakeSource(uri)
......@@ -205,7 +205,7 @@ class BaseQuerierTC(TestCase):
self.ueid = self.session.user.eid
assert self.ueid != -1
self.repo._type_source_cache = {} # clear cache
self.pool = self.session.set_pool()
self.cnxset = self.session.set_cnxset()
self.maxeid = self.get_max_eid()
do_monkey_patch()
self._dumb_sessions = []
......@@ -213,7 +213,7 @@ class BaseQuerierTC(TestCase):
def get_max_eid(self):
return self.session.execute('Any MAX(X)')[0][0]
def cleanup(self):
self.session.set_pool()
self.session.set_cnxset()
self.session.execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
def tearDown(self):
......@@ -225,7 +225,7 @@ class BaseQuerierTC(TestCase):
for session in self._dumb_sessions:
session.rollback()
session.close()
self.repo._free_pool(self.pool)
self.repo._free_cnxset(self.cnxset)
assert self.session.user.eid != -1
def set_debug(self, debug):
......@@ -263,7 +263,7 @@ class BaseQuerierTC(TestCase):
u = self.repo._build_user(self.session, self.session.user.eid)
u._groups = set(groups)
s = Session(u, self.repo)
s._threaddata.pool = self.pool
s._threaddata.cnxset = self.cnxset
s._threaddata.ctx_count = 1
# register session to ensure it gets closed
self._dumb_sessions.append(s)
......@@ -274,7 +274,7 @@ class BaseQuerierTC(TestCase):
def commit(self):
self.session.commit()
self.session.set_pool()
self.session.set_cnxset()
class BasePlannerTC(BaseQuerierTC):
......@@ -288,7 +288,7 @@ class BasePlannerTC(BaseQuerierTC):
# XXX source_defs
self.o = self.repo.querier
self.session = self.repo._sessions.values()[0]
self.pool = self.session.set_pool()
self.cnxset = self.session.set_cnxset()
self.schema = self.o.schema
self.sources = self.o._repo.sources
self.system = self.sources[-1]
......@@ -312,7 +312,7 @@ class BasePlannerTC(BaseQuerierTC):
del self.repo.sources_by_uri[source.uri]
undo_monkey_patch()
for session in self._dumb_sessions:
session._threaddata.pool = None
session._threaddata.cnxset = None
session.close()
def _prepare_plan(self, rql, kwargs=None):
......
......@@ -274,7 +274,7 @@ class CubicWebTC(TestCase):
def session(self):
"""return current server side session (using default manager account)"""
session = self.repo._sessions[self.cnx.sessionid]
session.set_pool()
session.set_cnxset()
return session
@property
......@@ -458,7 +458,7 @@ class CubicWebTC(TestCase):
try:
return self.cnx.commit()
finally:
self.session.set_pool() # ensure pool still set after commit
self.session.set_cnxset() # ensure cnxset still set after commit
@nocoverage
def rollback(self):
......@@ -467,7 +467,7 @@ class CubicWebTC(TestCase):
except dbapi.ProgrammingError:
pass # connection closed
finally:
self.session.set_pool() # ensure pool still set after commit
self.session.set_cnxset() # ensure cnxset still set after commit
# # server side db api #######################################################
......@@ -475,7 +475,7 @@ class CubicWebTC(TestCase):
if eid_key is not None:
warn('[3.8] eid_key is deprecated, you can safely remove this argument',
DeprecationWarning, stacklevel=2)
self.session.set_pool()
self.session.set_cnxset()
return self.session.execute(rql, args)
# other utilities #########################################################
......
......@@ -557,7 +557,7 @@ class WorkflowHooksTC(CubicWebTC):
def setUp(self):
CubicWebTC.setUp(self)
self.wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow
self.session.set_pool()
self.session.set_cnxset()
self.s_activated = self.wf.state_by_name('activated').eid
self.s_deactivated = self.wf.state_by_name('deactivated').eid
self.s_dummy = self.wf.add_state(u'dummy').eid
......@@ -629,13 +629,13 @@ class WorkflowHooksTC(CubicWebTC):
iworkflowable = user.cw_adapt_to('IWorkflowable')
iworkflowable.fire_transition('deactivate')
cnx.commit()
session.set_pool()
session.set_cnxset()
with self.assertRaises(ValidationError) as cm:
iworkflowable.fire_transition('deactivate')
self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
u"transition isn't allowed from")
cnx.rollback()
session.set_pool()
session.set_cnxset()
# get back now
iworkflowable.fire_transition('activate')
cnx.commit()
......
......@@ -67,7 +67,7 @@ class ServerStartupHook(hook.Hook):
except Exception, exc:
session.exception('while trying to update feed %s', source)
session.rollback()
session.set_pool()
session.set_cnxset()
finally:
session.close()
self.repo.looping_task(60, update_feeds, self.repo)
......@@ -92,7 +92,7 @@ def add_inline_relation_column(session, etype, rtype):
# create index before alter table which may expectingly fail during test
# (sqlite) while index creation should never fail (test for index existence
# is done by the dbhelper)
session.pool.source('system').create_index(session, table, column)
session.cnxset.source('system').create_index(session, table, column)
session.info('added index on %s(%s)', table, column)
......@@ -252,7 +252,7 @@ class CWETypeAddOp(MemSchemaOperation):
description=entity.description)
eschema = schema.add_entity_type(etype)
# create the necessary table
tablesql = y2sql.eschema2sql(session.pool.source('system').dbhelper,
tablesql = y2sql.eschema2sql(session.cnxset.source('system').dbhelper,
eschema, prefix=SQL_PREFIX)
for sql in tablesql.split(';'):
if sql.strip():
......@@ -289,7 +289,7 @@ class CWETypeRenameOp(MemSchemaOperation):
self.session.vreg.schema.rename_entity_type(oldname, newname)
# we need sql to operate physical changes on the system database
sqlexec = self.session.system_sql
dbhelper= self.session.pool.source('system').dbhelper
dbhelper= self.session.cnxset.source('system').dbhelper
sql = dbhelper.sql_rename_table(SQL_PREFIX+oldname,
SQL_PREFIX+newname)
sqlexec(sql)
......@@ -433,7 +433,7 @@ class CWAttributeAddOp(MemSchemaOperation):
# update the in-memory schema first
rdefdef = self.init_rdef(**props)
# then make necessary changes to the system source database
syssource = session.pool.source('system')
syssource = session.cnxset.source('system')
attrtype = y2sql.type_from_constraints(
syssource.dbhelper, rdefdef.object, rdefdef.constraints)
# XXX should be moved somehow into lgdb: sqlite doesn't support to
......@@ -603,7 +603,7 @@ class RDefUpdateOp(MemSchemaOperation):
self.oldvalues = dict( (attr, getattr(rdef, attr)) for attr in self.values)
rdef.update(self.values)
# then make necessary changes to the system source database
syssource = session.pool.source('system')
syssource = session.cnxset.source('system')
if 'indexed' in self.values:
syssource.update_rdef_indexed(session, rdef)
self.indexed_changed = True
......@@ -621,7 +621,7 @@ class RDefUpdateOp(MemSchemaOperation):
# revert changes on in memory schema
self.rdef.update(self.oldvalues)
# revert changes on database
syssource = self.session.pool.source('system')
syssource = self.session.cnxset.source('system')
if self.indexed_changed:
syssource.update_rdef_indexed(self.session, self.rdef)
if self.null_allowed_changed:
......@@ -649,7 +649,7 @@ class CWConstraintDelOp(MemSchemaOperation):
rdef.constraints.remove(self.oldcstr)
# then update database: alter the physical schema on size/unique
# constraint changes
syssource = session.pool.source('system')
syssource = session.cnxset.source('system')
cstrtype = self.oldcstr.type()
if cstrtype == 'SizeConstraint':
syssource.update_rdef_column(session, rdef)
......@@ -665,7 +665,7 @@ class CWConstraintDelOp(MemSchemaOperation):
if self.oldcstr is not None:
self.rdef.constraints.append(self.oldcstr)
# revert changes on database
syssource = self.session.pool.source('system')
syssource = self.session.cnxset.source('system')
if self.size_cstr_changed:
syssource.update_rdef_column(self.session, self.rdef)
if self.unique_changed:
......@@ -696,7 +696,7 @@ class CWConstraintAddOp(CWConstraintDelOp):
rdef.constraints.append(newcstr)
# then update database: alter the physical schema on size/unique
# constraint changes
syssource = session.pool.source('system')
syssource = session.cnxset.source('system')
if cstrtype == 'SizeConstraint' and (oldcstr is None or
oldcstr.max != newcstr.max):
syssource.update_rdef_column(session, rdef)
......@@ -713,7 +713,7 @@ class CWUniqueTogetherConstraintAddOp(MemSchemaOperation):
prefix = SQL_PREFIX
table = '%s%s' % (prefix, self.entity.constraint_of[0].name)
cols = ['%s%s' % (prefix, r.name) for r in self.entity.relations]
dbhelper= session.pool.source('system').dbhelper
dbhelper= session.cnxset.source('system').dbhelper
sqls = dbhelper.sqls_create_multicol_unique_index(table, cols)
for sql in sqls:
session.system_sql(sql)
......@@ -733,7 +733,7 @@ class CWUniqueTogetherConstraintDelOp(MemSchemaOperation):
session = self.session
prefix = SQL_PREFIX
table = '%s%s' % (prefix, self.entity.type)
dbhelper= session.pool.source('system').dbhelper
dbhelper= session.cnxset.source('system').dbhelper
cols = ['%s%s' % (prefix, c) for c in self.cols]
sqls = dbhelper.sqls_drop_multicol_unique_index(table, cols)
for sql in sqls:
......@@ -782,7 +782,7 @@ class MemSchemaPermissionAdd(MemSchemaOperation):
"""
def precommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections.cnxset has been commited"""
try:
erschema = self.session.vreg.schema.schema_by_eid(self.eid)
except KeyError:
......@@ -811,7 +811,7 @@ class MemSchemaPermissionDel(MemSchemaPermissionAdd):
"""
def precommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
try:
erschema = self.session.vreg.schema.schema_by_eid(self.eid)
except KeyError:
......@@ -1223,7 +1223,7 @@ class UpdateFTIndexOp(hook.DataOperationMixIn, hook.SingleLastOperation):
source.fti_index_entities(session, [container])
if to_reindex:
# Transaction has already been committed
session.pool.commit()
session.cnxset.commit()
......
......@@ -56,7 +56,7 @@ class _GroupOperation(hook.Operation):
class _DeleteGroupOp(_GroupOperation):
"""synchronize user when a in_group relation has been deleted"""
def postcommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
groups = self.cnxuser.groups
try:
groups.remove(self.group)
......@@ -67,7 +67,7 @@ class _DeleteGroupOp(_GroupOperation):
class _AddGroupOp(_GroupOperation):
"""synchronize user when a in_group relation has been added"""
def postcommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
groups = self.cnxuser.groups
if self.group in groups:
self.warning('user %s already in group %s', self.cnxuser,
......@@ -97,7 +97,7 @@ class _DelUserOp(hook.Operation):
hook.Operation.__init__(self, session)
def postcommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
try:
self.session.repo.close(self.cnxid)
except BadConnectionId:
......@@ -122,7 +122,7 @@ class _DelCWPropertyOp(hook.Operation):
"""a user's custom properties has been deleted"""
def postcommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
try:
del self.cwpropdict[self.key]
except KeyError:
......@@ -133,7 +133,7 @@ class _ChangeCWPropertyOp(hook.Operation):
"""a user's custom properties has been added/changed"""
def postcommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
self.cwpropdict[self.key] = self.value
......@@ -141,7 +141,7 @@ class _AddCWPropertyOp(hook.Operation):
"""a user's custom properties has been added/changed"""
def postcommit_event(self):
"""the observed connections pool has been commited"""
"""the observed connections set has been commited"""
cwprop = self.cwprop
if not cwprop.for_user:
self.session.vreg['propertyvalues'][cwprop.pkey] = cwprop.value
......
......@@ -36,9 +36,9 @@ class SchemaModificationHooksTC(CubicWebTC):
self.__class__.schema_eids = schema_eids_idx(self.repo.schema)
def index_exists(self, etype, attr, unique=False):
self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
def _set_perms(self, eid):
......@@ -57,9 +57,9 @@ class SchemaModificationHooksTC(CubicWebTC):
def test_base(self):
schema = self.repo.schema
self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
self.failIf(schema.has_entity('Societe2'))
self.failIf(schema.has_entity('concerne2'))
# schema should be update on insertion (after commit)
......@@ -170,9 +170,9 @@ class SchemaModificationHooksTC(CubicWebTC):
# schema modification hooks tests #########################################
def test_uninline_relation(self):
self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
self.failUnless(self.schema['state_of'].inlined)
try:
self.execute('SET X inlined FALSE WHERE X name "state_of"')
......@@ -195,9 +195,9 @@ class SchemaModificationHooksTC(CubicWebTC):
self.assertEqual(len(rset), 2)
def test_indexed_change(self):
self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
try:
self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
......@@ -214,9 +214,9 @@ class SchemaModificationHooksTC(CubicWebTC):
self.failUnless(self.index_exists('Workflow', 'name'))
def test_unique_change(self):
self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
try:
self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
......
......@@ -49,7 +49,7 @@ if applcubicwebversion == (3, 6, 0) and cubicwebversion >= (3, 6, 0):
elif applcubicwebversion < (3, 6, 0) and cubicwebversion >= (3, 6, 0):
CSTRMAP = dict(rql('Any T, X WHERE X is CWConstraintType, X name T',
ask_confirm=False))
session.set_pool()
session.set_cnxset()
permsdict = ss.deserialize_ertype_permissions(session)
with hooks_control(session, session.HOOKS_ALLOW_ALL, 'integrity'):
......
......@@ -3,7 +3,7 @@ source, = __args__
sql("DELETE FROM entities WHERE type='Int'")
ecnx = session.pool.connection(source)
ecnx = session.cnxset.connection(source)
for e in rql('Any X WHERE X cw_source S, S name %(name)s', {'name': source}).entities():
meta = e.cw_metainformation()
assert meta['source']['uri'] == source
......
......@@ -230,7 +230,7 @@ def initialize_schema(config, schema, mhandler, event='create'):
for path in reversed(paths):
mhandler.exec_event_script('pre%s' % event, path)
# enter instance'schema into the database
session.set_pool()
session.set_cnxset()
serialize_schema(session, schema)
# execute cubicweb's post<event> script
mhandler.exec_event_script('post%s' % event)
......
......@@ -101,7 +101,7 @@ def reindex_entities(schema, session, withpb=True, etypes=None):
# deactivate modification_date hook since we don't want them
# to be updated due to the reindexation
repo = session.repo
cursor = session.pool['system']
cursor = session.cnxset['system']
dbhelper = session.repo.system_source.dbhelper
if not dbhelper.has_fti_table(cursor):
print 'no text index table'
......@@ -356,7 +356,7 @@ def check(repo, cnx, checks, reindex, fix, withpb=True):
using given user and password to locally connect to the repository
(no running cubicweb server needed)
"""
session = repo._get_session(cnx.sessionid, setpool=True)
session = repo._get_session(cnx.sessionid, setcnxset=True)
# yo, launch checks
if checks:
eids_cache = {}
......@@ -372,6 +372,6 @@ def check(repo, cnx, checks, reindex, fix, withpb=True):
print 'WARNING: Diagnostic run, nothing has been corrected'
if reindex:
cnx.rollback()
session.set_pool()
session.set_cnxset()
reindex_entities(repo.schema, session, withpb=withpb)
cnx.commit()
......@@ -730,8 +730,8 @@ class Operation(object):
operation. These keyword arguments will be accessible as attributes from the
operation instance.
An operation is triggered on connections pool events related to
commit / rollback transations. Possible events are:
An operation is triggered on connections set events related to commit /
rollback transations. Possible events are:
* `precommit`:
......@@ -805,7 +805,7 @@ class Operation(object):
getattr(self, event)()
def precommit_event(self):
"""the observed connections pool is preparing a commit"""
"""the observed connections set is preparing a commit"""
def revertprecommit_event(self):
"""an error went when pre-commiting this operation or a later one
......@@ -815,14 +815,13 @@ class Operation(object):
"""
def rollback_event(self):
"""the observed connections pool has been rollbacked
"""the observed connections set has been rollbacked
do nothing by default, the operation will just be removed from the pool
operation list
do nothing by default
"""
def postcommit_event(self):
"""the observed connections pool has committed"""
"""the observed connections set has committed"""
@property
@deprecated('[3.6] use self.session.user')
......@@ -1098,7 +1097,7 @@ class CleanupNewEidsCacheOp(DataOperationMixIn, SingleLastOperation):
data_key = 'neweids'
def rollback_event(self):
"""the observed connections pool has been rollbacked,
"""the observed connections set has been rollbacked,
remove inserted eid from repository type/source cache
"""
try:
......@@ -1112,7 +1111,7 @@ class CleanupDeletedEidsCacheOp(DataOperationMixIn, SingleLastOperation):
"""
data_key = 'pendingeids'
def postcommit_event(self):
"""the observed connections pool has been rollbacked,
"""the observed connections set has been rollbacked,
remove inserted eid from repository type/source cache
"""
try:
......
......@@ -201,7 +201,6 @@ class ServerMigrationHelper(MigrationHelper):
versions = repo.get_versions()
for cube, version in versions.iteritems():
version_file.write('%s %s\n' % (cube, version))
if not failed:
bkup = tarfile.open(backupfile, 'w|gz')
for filename in os.listdir(tmpdir):
......@@ -242,7 +241,7 @@ class ServerMigrationHelper(MigrationHelper):
written_format = format_file.readline().strip()
if written_format in ('portable', 'native'):
format = written_format
self.config.open_connections_pools = False
self.config.init_cnxset_pool = False
repo = self.repo_connect()
for source in repo.sources:
if systemonly and source.uri != 'system':
......@@ -255,7 +254,7 @@ class ServerMigrationHelper(MigrationHelper):
raise SystemExit(1)
shutil.rmtree(tmpdir)
# call hooks
repo.open_connections_pools()
repo.init_cnxset_pool()
repo.hm.call_hooks('server_restore', repo=repo, timestamp=backupfile)
print '-> database restored.'
......@@ -288,7 +287,7 @@ class ServerMigrationHelper(MigrationHelper):
except (KeyboardInterrupt, EOFError):
print 'aborting...'
sys.exit(0)
self.session.keep_pool_mode('transaction')
self.session.keep_cnxset_mode('transaction')
self.session.data['rebuild-infered'] = False
return self._cnx
......@@ -296,10 +295,10 @@ class ServerMigrationHelper(MigrationHelper):
def session(self):
if self.config is not None: