Commit cad6b21e authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

[multi-sources-removal] Simplify ConnectionsSet internal structures and public methods

since it now handles a connection to the system source only

Related to #2919300

[jcr: adjust 3.17.11 migration, fix a number of bugs in new
 ConnectionsSet implementation, fix
 source.{open,close}_source_connections]
parent 375fc1868b11
# -*- coding: utf-8 -*-
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
......@@ -938,7 +938,7 @@ class SQLGenObjectStore(NoHookRQLObjectStore):
def drop_indexes(self, etype):
"""Drop indexes for a given entity type"""
if etype not in self.indexes_etypes:
cu = self.session.cnxset['system']
cu = self.session.cnxset.cu
def index_to_attr(index):
"""turn an index name to (database) attribute name"""
return index.replace(etype.lower(), '').replace('idx', '').strip('_')
......
......@@ -81,7 +81,7 @@ def add_inline_relation_column(session, etype, rtype):
# create index before alter table which may expectingly fail during test
# (sqlite) while index creation should never fail (test for index existence
# is done by the dbhelper)
session.cnxset.source('system').create_index(session, table, column)
session.repo.system_source.create_index(session, table, column)
session.info('added index on %s(%s)', table, column)
......@@ -244,7 +244,7 @@ class CWETypeAddOp(MemSchemaOperation):
description=entity.description)
eschema = schema.add_entity_type(etype)
# create the necessary table
tablesql = y2sql.eschema2sql(session.cnxset.source('system').dbhelper,
tablesql = y2sql.eschema2sql(session.repo.system_source.dbhelper,
eschema, prefix=SQL_PREFIX)
for sql in tablesql.split(';'):
if sql.strip():
......@@ -287,7 +287,7 @@ class CWETypeRenameOp(MemSchemaOperation):
self.session.vreg.schema.rename_entity_type(oldname, newname)
# we need sql to operate physical changes on the system database
sqlexec = self.session.system_sql
dbhelper= self.session.cnxset.source('system').dbhelper
dbhelper = self.session.repo.system_source.dbhelper
sql = dbhelper.sql_rename_table(SQL_PREFIX+oldname,
SQL_PREFIX+newname)
sqlexec(sql)
......@@ -432,7 +432,7 @@ class CWAttributeAddOp(MemSchemaOperation):
# update the in-memory schema first
rdefdef = self.init_rdef(**props)
# then make necessary changes to the system source database
syssource = session.cnxset.source('system')
syssource = session.repo.system_source
attrtype = y2sql.type_from_constraints(
syssource.dbhelper, rdefdef.object, rdefdef.constraints)
# XXX should be moved somehow into lgdb: sqlite doesn't support to
......@@ -607,7 +607,7 @@ class RDefUpdateOp(MemSchemaOperation):
self.oldvalues = dict( (attr, getattr(rdef, attr)) for attr in self.values)
rdef.update(self.values)
# then make necessary changes to the system source database
syssource = session.cnxset.source('system')
syssource = session.repo.system_source
if 'indexed' in self.values:
syssource.update_rdef_indexed(session, rdef)
self.indexed_changed = True
......@@ -625,7 +625,7 @@ class RDefUpdateOp(MemSchemaOperation):
# revert changes on in memory schema
self.rdef.update(self.oldvalues)
# revert changes on database
syssource = self.session.cnxset.source('system')
syssource = self.session.repo.system_source
if self.indexed_changed:
syssource.update_rdef_indexed(self.session, self.rdef)
if self.null_allowed_changed:
......@@ -653,7 +653,7 @@ class CWConstraintDelOp(MemSchemaOperation):
rdef.constraints.remove(self.oldcstr)
# then update database: alter the physical schema on size/unique
# constraint changes
syssource = session.cnxset.source('system')
syssource = session.repo.system_source
cstrtype = self.oldcstr.type()
if cstrtype == 'SizeConstraint':
syssource.update_rdef_column(session, rdef)
......@@ -669,7 +669,7 @@ class CWConstraintDelOp(MemSchemaOperation):
if self.oldcstr is not None:
self.rdef.constraints.append(self.oldcstr)
# revert changes on database
syssource = self.session.cnxset.source('system')
syssource = self.session.repo.system_source
if self.size_cstr_changed:
syssource.update_rdef_column(self.session, self.rdef)
if self.unique_changed:
......@@ -700,7 +700,7 @@ class CWConstraintAddOp(CWConstraintDelOp):
rdef.constraints.append(newcstr)
# then update database: alter the physical schema on size/unique
# constraint changes
syssource = session.cnxset.source('system')
syssource = session.repo.system_source
if cstrtype == 'SizeConstraint' and (oldcstr is None or
oldcstr.max != newcstr.max):
syssource.update_rdef_column(session, rdef)
......@@ -719,7 +719,7 @@ class CWUniqueTogetherConstraintAddOp(MemSchemaOperation):
entity = self.entity
table = '%s%s' % (prefix, entity.constraint_of[0].name)
cols = ['%s%s' % (prefix, r.name) for r in entity.relations]
dbhelper = session.cnxset.source('system').dbhelper
dbhelper = session.repo.system_source.dbhelper
sqls = dbhelper.sqls_create_multicol_unique_index(table, cols, entity.name)
for sql in sqls:
session.system_sql(sql)
......@@ -739,7 +739,7 @@ class CWUniqueTogetherConstraintDelOp(MemSchemaOperation):
session = self.session
prefix = SQL_PREFIX
table = '%s%s' % (prefix, self.entity.type)
dbhelper = session.cnxset.source('system').dbhelper
dbhelper = session.repo.system_source.dbhelper
cols = ['%s%s' % (prefix, c) for c in self.cols]
sqls = dbhelper.sqls_drop_multicol_unique_index(table, cols, self.cstrname)
for sql in sqls:
......
......@@ -38,8 +38,8 @@ class SchemaModificationHooksTC(CubicWebTC):
def index_exists(self, etype, attr, unique=False):
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
dbhelper = self.repo.system_source.dbhelper
sqlcursor = self.session.cnxset.cu
return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
def _set_perms(self, eid):
......@@ -59,8 +59,8 @@ class SchemaModificationHooksTC(CubicWebTC):
def test_base(self):
schema = self.repo.schema
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
dbhelper = self.repo.system_source.dbhelper
sqlcursor = self.session.cnxset.cu
self.assertFalse(schema.has_entity('Societe2'))
self.assertFalse(schema.has_entity('concerne2'))
# schema should be update on insertion (after commit)
......@@ -200,8 +200,8 @@ class SchemaModificationHooksTC(CubicWebTC):
def test_uninline_relation(self):
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
dbhelper = self.repo.system_source.dbhelper
sqlcursor = self.session.cnxset.cu
self.assertTrue(self.schema['state_of'].inlined)
try:
self.execute('SET X inlined FALSE WHERE X name "state_of"')
......@@ -225,8 +225,8 @@ class SchemaModificationHooksTC(CubicWebTC):
def test_indexed_change(self):
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
dbhelper = self.repo.system_source.dbhelper
sqlcursor = self.session.cnxset.cu
try:
self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed)
......@@ -244,8 +244,8 @@ class SchemaModificationHooksTC(CubicWebTC):
def test_unique_change(self):
self.session.set_cnxset()
dbhelper = self.session.cnxset.source('system').dbhelper
sqlcursor = self.session.cnxset['system']
dbhelper = self.repo.system_source.dbhelper
sqlcursor = self.session.cnxset.cu
try:
self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
......
......@@ -4,7 +4,7 @@ dbhelper = repo.system_source.dbhelper
rdefdef = schema['CWSource'].rdef('name')
attrtype = y2sql.type_from_constraints(dbhelper, rdefdef.object, rdefdef.constraints).split()[0]
cursor = session.cnxset['system']
cursor = session.cnxset.cu
sql('UPDATE entities SET asource = source WHERE asource is NULL')
dbhelper.change_col_type(cursor, 'entities', 'asource', attrtype, False)
dbhelper.change_col_type(cursor, 'entities', 'source', attrtype, False)
......@@ -2,6 +2,6 @@ for table, column in [
('transactions', 'tx_time'),
('tx_entity_actions', 'tx_uuid'),
('tx_relation_actions', 'tx_uuid')]:
session.cnxset.source('system').create_index(session, table, column)
repo.system_source.create_index(session, table, column)
commit()
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
......@@ -99,7 +99,7 @@ def reindex_entities(schema, session, withpb=True, etypes=None):
# deactivate modification_date hook since we don't want them
# to be updated due to the reindexation
repo = session.repo
cursor = session.cnxset['system']
cursor = session.cnxset.cu
dbhelper = session.repo.system_source.dbhelper
if not dbhelper.has_fti_table(cursor):
print 'no text index table'
......
......@@ -1501,7 +1501,7 @@ class ServerMigrationHelper(MigrationHelper):
self.sqlexec(sql, ask_confirm=False)
dbhelper = self.repo.system_source.dbhelper
sqltype = dbhelper.TYPE_MAPPING[newtype]
cursor = self.session.cnxset[self.repo.system_source.uri]
cursor = self.session.cnxset.cu
dbhelper.change_col_type(cursor, 'cw_%s' % etype, 'cw_%s' % attr, sqltype, allownull)
if commit:
self.commit()
......
......@@ -17,73 +17,57 @@
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
"""CubicWeb server connections set : the repository has a limited number of
:class:`ConnectionsSet` (defined in configuration, default to 4). Each of them
hold a connection for each source used by the repository.
hold a connection to the system source.
"""
__docformat__ = "restructuredtext en"
import sys
from logilab.common.deprecation import deprecated
class ConnectionsSet(object):
"""handle connections on a set of sources, at some point associated to a
"""handle connection to the system source, at some point associated to a
:class:`Session`
"""
# since 3.19, we only have to manage the system source connection
def __init__(self, system_source):
# dictionary of (source, connection), indexed by sources'uri
self.source_cnxs = {}
self.source_cnxs['system'] = (system_source,
system_source.get_connection())
self._cursors = {}
def __getitem__(self, uri):
"""subscription notation provide access to sources'cursors"""
assert uri == 'system'
try:
cursor = self._cursors[uri]
except KeyError:
cursor = self.source_cnxs[uri][1].cursor()
if cursor is not None:
# None possible on sources without cursor support such as ldap
self._cursors[uri] = cursor
return cursor
self._source = system_source
self.cnx = system_source.get_connection()
self.cu = self.cnx.cursor()
def commit(self):
"""commit the current transaction for this user"""
# FIXME: what happends if a commit fail
# would need a two phases commit or like, but I don't know how to do
# this using the db-api...
for source, cnx in self.source_cnxs.itervalues():
# let exception propagates
cnx.commit()
# let exception propagates
self.cnx.commit()
def rollback(self):
"""rollback the current transaction for this user"""
for source, cnx in self.source_cnxs.itervalues():
# catch exceptions, rollback other sources anyway
try:
cnx.rollback()
except Exception:
source.critical('rollback error', exc_info=sys.exc_info())
# error on rollback, the connection is much probably in a really
# bad state. Replace it by a new one.
self.reconnect(source)
# catch exceptions, rollback other sources anyway
try:
self.cnx.rollback()
except Exception:
self._source.critical('rollback error', exc_info=sys.exc_info())
# error on rollback, the connection is much probably in a really
# bad state. Replace it by a new one.
self.reconnect()
def close(self, i_know_what_i_do=False):
"""close all connections in the set"""
if i_know_what_i_do is not True: # unexpected closing safety belt
raise RuntimeError('connections set shouldn\'t be closed')
for cu in self._cursors.itervalues():
try:
cu.close()
except Exception:
continue
for _, cnx in self.source_cnxs.itervalues():
try:
cnx.close()
except Exception:
continue
try:
self.cu.close()
self.cu = None
except Exception:
pass
try:
self.cnx.close()
except Exception:
pass
# internals ###############################################################
......@@ -93,49 +77,40 @@ class ConnectionsSet(object):
def cnxset_freed(self):
"""connections set is being freed from a session"""
for source, cnx in self.source_cnxs.itervalues():
source.cnxset_freed(cnx)
def sources(self):
"""return the source objects handled by this connections set"""
# implementation details of flying insert requires the system source
# first
yield self.source_cnxs['system'][0]
def source(self, uid):
"""return the source object with the given uri"""
return self.source_cnxs[uid][0]
def connection(self, uid):
"""return the connection on the source object with the given uri"""
return self.source_cnxs[uid][1]
self._source.cnxset_freed(self.cnx)
def reconnect(self, source=None):
def reconnect(self):
"""reopen a connection for this source or all sources if none specified
"""
if source is None:
sources = self.sources()
else:
sources = (source,)
for source in sources:
try:
# properly close existing connection if any
self.source_cnxs[source.uri][1].close()
except Exception:
pass
source.info('trying to reconnect')
self.source_cnxs[source.uri] = (source, source.get_connection())
self._cursors.pop(source.uri, None)
try:
# properly close existing connection if any
self.cnx.close()
except Exception:
pass
self._source.info('trying to reconnect')
self.cnx = self._source.get_connection()
self.cu = self.cnx.cursor()
def check_connections(self):
for source, cnx in self.source_cnxs.itervalues():
newcnx = source.check_connection(cnx)
if newcnx is not None:
self.reset_connection(source, newcnx)
def reset_connection(self, source, cnx):
self.source_cnxs[source.uri] = (source, cnx)
self._cursors.pop(source.uri, None)
newcnx = self._source.check_connection(self.cnx)
if newcnx is not None:
self.cnx = newcnx
self.cu = self.cnx.cursor()
@deprecated('[3.19] use .cu instead')
def __getitem__(self, uri):
assert uri == 'system'
return self.cu
@deprecated('[3.19] use repo.system_source instead')
def source(self, uid):
assert uid == 'system'
return self._source
@deprecated('[3.19] use .cnx instead')
def connection(self, uid):
assert uid == 'system'
return self.cnx
from cubicweb.server.hook import Operation, LateOperation, SingleLastOperation
......
......@@ -151,7 +151,7 @@ class ExecutionPlan(object):
# session executing the query
self.session = session
# quick reference to the system source
self.syssource = session.cnxset.source('system')
self.syssource = session.repo.system_source
# execution steps
self.steps = []
# index of temporary tables created during execution
......
......@@ -89,7 +89,7 @@ def deserialize_schema(schema, session):
repo = session.repo
dbhelper = repo.system_source.dbhelper
# XXX bw compat (3.6 migration)
sqlcu = session.cnxset['system']
sqlcu = session.cnxset.cu
sqlcu.execute("SELECT * FROM cw_CWRType WHERE cw_name='symetric'")
if sqlcu.fetchall():
sql = dbhelper.sql_rename_col('cw_CWRType', 'cw_symetric', 'cw_symmetric',
......
......@@ -1164,7 +1164,7 @@ class Connection(RequestSessionBase):
"""return a sql cursor on the system database"""
if sql.split(None, 1)[0].upper() != 'SELECT':
self.mode = 'write'
source = self.cnxset.source('system')
source = self.repo.system_source
try:
return source.doexec(self, sql, args, rollback=rollback_on_failure)
except (source.OperationalError, source.InterfaceError):
......
......@@ -241,12 +241,13 @@ class AbstractSource(object):
def close_source_connections(self):
for cnxset in self.repo.cnxsets:
cnxset._cursors.pop(self.uri, None)
cnxset.source_cnxs[self.uri][1].close()
cnxset.cu = None
cnxset.cnx.close()
def open_source_connections(self):
for cnxset in self.repo.cnxsets:
cnxset.source_cnxs[self.uri] = (self, self.get_connection())
cnxset.cnx = self.get_connection()
cnxset.cu = cnxset.cnx.cursor()
def cnxset_freed(self, cnx):
"""the connections set holding the given connection is being reseted
......@@ -386,7 +387,7 @@ class AbstractSource(object):
.executemany().
"""
res = self.syntax_tree_search(session, union, args, varmap=varmap)
session.cnxset.source('system').manual_insert(res, table, session)
session.repo.system_source.manual_insert(res, table, session)
# write modification api ###################################################
# read-only sources don't have to implement methods below
......
......@@ -336,7 +336,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
_cnxset.cnxset_set()
else:
_cnxset = cnxset
if not self.dbhelper.has_fti_table(_cnxset['system']):
if not self.dbhelper.has_fti_table(_cnxset.cu):
if not self.repo.config.creating:
self.critical('no text index table')
self.do_fti = False
......@@ -706,9 +706,9 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
"""Execute a query.
it's a function just so that it shows up in profiling
"""
cursor = session.cnxset[self.uri]
cursor = session.cnxset.cu
if server.DEBUG & server.DBG_SQL:
cnx = session.cnxset.connection(self.uri)
cnx = session.cnxset.cnx
# getattr to get the actual connection if cnx is a CnxLoggingWrapper
# instance
print 'exec', query, args, getattr(cnx, '_cnx', cnx)
......@@ -723,7 +723,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
query, args, ex.args[0])
if rollback:
try:
session.cnxset.connection(self.uri).rollback()
session.cnxset.rollback()
if self.repo.config.mode != 'test':
self.critical('transaction has been rolled back')
except Exception as ex:
......@@ -751,7 +751,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
"""
if server.DEBUG & server.DBG_SQL:
print 'execmany', query, 'with', len(args), 'arguments'
cursor = session.cnxset[self.uri]
cursor = session.cnxset.cu
try:
# str(query) to avoid error if it's an unicode string
cursor.executemany(str(query), args)
......@@ -762,7 +762,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
self.critical("sql many: %r\n args: %s\ndbms message: %r",
query, args, ex.args[0])
try:
session.cnxset.connection(self.uri).rollback()
session.cnxset.rollback()
if self.repo.config.mode != 'test':
self.critical('transaction has been rolled back')
except Exception:
......@@ -780,7 +780,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
not allownull and 'NOT NULL' or '')
return
self.dbhelper.change_col_type(LogCursor(session.cnxset[self.uri]),
self.dbhelper.change_col_type(LogCursor(session.cnxset.cu),
table, column, coltype, allownull)
self.info('altered %s.%s: now %s%s', table, column, coltype,
not allownull and 'NOT NULL' or '')
......@@ -795,7 +795,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
return
table, column = rdef_table_column(rdef)
coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
self.dbhelper.set_null_allowed(LogCursor(session.cnxset[self.uri]),
self.dbhelper.set_null_allowed(LogCursor(session.cnxset.cu),
table, column, coltype, allownull)
def update_rdef_indexed(self, session, rdef):
......@@ -813,11 +813,11 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
self.drop_index(session, table, column, unique=True)
def create_index(self, session, table, column, unique=False):
cursor = LogCursor(session.cnxset[self.uri])
cursor = LogCursor(session.cnxset.cu)
self.dbhelper.create_index(cursor, table, column, unique)
def drop_index(self, session, table, column, unique=False):
cursor = LogCursor(session.cnxset[self.uri])
cursor = LogCursor(session.cnxset.cu)
self.dbhelper.drop_index(cursor, table, column, unique)
# system source interface #################################################
......@@ -1377,7 +1377,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
def fti_unindex_entities(self, session, entities):
"""remove text content for entities from the full text index
"""
cursor = session.cnxset['system']
cursor = session.cnxset.cu
cursor_unindex_object = self.dbhelper.cursor_unindex_object
try:
for entity in entities:
......@@ -1390,7 +1390,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
"""add text content of created/modified entities to the full text index
"""
cursor_index_object = self.dbhelper.cursor_index_object
cursor = session.cnxset['system']
cursor = session.cnxset.cu
try:
# use cursor_index_object, not cursor_reindex_object since
# unindexing done in the FTIndexEntityOp
......
......@@ -236,7 +236,7 @@ class BytesFileSystemStorage(Storage):
"""return the current fs_path of the attribute, or None is the attr is
not stored yet.
"""
sysource = entity._cw.cnxset.source('system')
sysource = entity._cw.repo.system_source
cu = sysource.doexec(entity._cw,
'SELECT cw_%s FROM cw_%s WHERE cw_eid=%s' % (
attr, entity.cw_etype, entity.eid))
......
......@@ -1169,7 +1169,7 @@ Any P1,B,E WHERE P1 identity P2 WITH
#'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y'
eeid, = self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0]
self.execute("DELETE Email X")
sqlc = self.session.cnxset['system']
sqlc = self.session.cnxset.cu
sqlc.execute('SELECT * FROM recipients_relation')
self.assertEqual(len(sqlc.fetchall()), 0)
sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
......@@ -1310,7 +1310,7 @@ Any P1,B,E WHERE P1 identity P2 WITH
self.assertEqual(rset.description, [('CWUser',)])
self.assertRaises(Unauthorized,
self.execute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
cursor = self.cnxset['system']
cursor = self.cnxset.cu
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = str(cursor.fetchone()[0])
......@@ -1325,7 +1325,7 @@ Any P1,B,E WHERE P1 identity P2 WITH
self.assertEqual(rset.description[0][0], 'CWUser')
rset = self.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
{'pwd': 'tutu'})
cursor = self.cnxset['system']
cursor = self.cnxset.cu
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = str(cursor.fetchone()[0])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment