Commit 6e022dec authored by Elouan Martinet's avatar Elouan Martinet
Browse files

[server] Backport Lifo database connection pooler

This includes code made by Philippe Pepiot and Laurent Peuch
(revisions b10688370322 to 7f2e4c3518de), edited to add Python 2 support.

--HG--
branch : 3.27
parent 1d513eadfd4d
# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
# copyright 2003-2020 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
......@@ -14,7 +14,7 @@
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
# with CubicWeb. If not, see <https://www.gnu.org/licenses/>.
"""Defines the central class for the CubicWeb RQL server: the repository.
The repository is an abstraction allowing execution of rql queries against
......@@ -30,6 +30,8 @@ from itertools import chain
from contextlib import contextmanager
from logging import getLogger
import queue
import threading
import time
from logilab.common.decorators import cached, clear_cache
......@@ -45,11 +47,11 @@ from cubicweb.server import utils, hook, querier, sources
from cubicweb.server.session import InternalManager, Connection
NO_CACHE_RELATIONS = set([
NO_CACHE_RELATIONS = {
('owned_by', 'object'),
('created_by', 'object'),
('cw_source', 'object'),
])
}
def prefill_entity_caches(entity):
......@@ -143,62 +145,217 @@ class NullEventBus(object):
pass
class _CnxSetPool:
# python STL doens't provide any concurrent deque implementation with blocking .get
class LifoDeque(queue.LifoQueue):
def get_oldest(self, block=True, timeout=None):
'Like queue.get except it gets the oldest one'
with self.not_empty:
if not block:
if not self._qsize():
raise queue.Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise queue.Empty
self.not_empty.wait(remaining)
item = self.queue.pop(0) # <- that's the only code difference from super().get
self.not_full.notify()
return item
class _BaseCnxSet(object):
"""
Simple connection set without any pooling capability.
You should only uses this if you are using an external pool like pgbouncer.
"""
def __init__(self, source):
self._source = source
def qsize(self):
return None
def get(self):
return self._new_cnxset()
def _new_cnxset(self):
return self._source.wrapped_connection()
def release(self, cnxset):
cnxset.close(True)
def __iter__(self):
return
yield
def __init__(self, source, size):
def close(self):
pass
class _CnxSetPool(_BaseCnxSet):
"""
Dynamic database connections pool.
"""
def __init__(self, source, min_size=1, max_size=4, idle_timeout=300):
"""
Connection pool for database connections. This pool is a dynamic pooler
that opens new connections when the load is to high and closes connections
on lower load after some idle timeout.
Arguments::
* source: database source
* min_size: minimum number of simultanuous connections
* max_size: maximum number of simultanuous connections, if set to 0
there won't be any limit
* idle_timeout: time before the pool starts closing connections
"""
super(_CnxSetPool, self).__init__(source)
self._cnxsets = []
self._queue = LifoDeque()
self.lock = threading.Lock()
self.min_size = min_size
self.max_size = max_size
self.idle = time.time()
self.idle_timeout = idle_timeout
if size is not None:
self._queue = queue.Queue()
for i in range(min_size):
self._queue.put_nowait(self._new_cnxset())
for i in range(size):
cnxset = source.wrapped_connection()
self._cnxsets.append(cnxset)
self._queue.put_nowait(cnxset)
def _new_cnxset(self):
"""
Create a new connection and returns it. This operation grabs the lock.
"""
cnxset = super(_CnxSetPool, self)._new_cnxset()
with self.lock:
self._cnxsets.append(cnxset)
return cnxset
else:
self._queue = None
self._source = source
def _close_idle_cnxset(self):
"""
Close connections not being used since idle_timeout.
"""
if abs(time.time() - self.idle) > self.idle_timeout and self.size() > self.min_size:
try:
cnxset = self._queue.get_oldest(block=False)
except queue.Empty:
# the queue has been used since we checked it size
pass
else:
self.debug("[pool] load is low, close a connection")
cnxset.close(True)
with self.lock:
self._cnxsets.remove(cnxset)
def size(self):
"""
Return the total number of connections.
"""
with self.lock:
return len(self._cnxsets)
def qsize(self):
if self._queue is None:
return None
"""
Return the size of the queue.
"""
return self._queue.qsize()
def get(self):
if self._queue is None:
return self._source.wrapped_connection()
"""
Try to get a connection from the queue if available or open a new one
if the maximum number isn't reached.
If no connection is available and the maximum number of connections is
reached, try to get a new one with a timeout. If the timeout is reached
an exception is raised. The maximum number of connections can be
modified to avoid that or the timeout can be increased.
If max_size is set to 0 then there won't be any limit to open new connections.
"""
self.debug("[pool] try to get a connection, free queue size is %d/%d (max %d)",
self.qsize(), self.size(), self.max_size)
try:
return self._queue.get(True, timeout=5)
cnxset = self._queue.get_nowait()
self._close_idle_cnxset()
self.debug("[pool] we got a directly available connection")
return cnxset
except queue.Empty:
raise Exception('no connections set available after 5 secs, probably either a '
'bug in code (too many uncommited/rolled back '
'connections) or too much load on the server (in '
'which case you can try to set a bigger '
'connections pool size)')
# reset idle time
self.idle = time.time()
if self.max_size and self.size() >= self.max_size:
try:
connection = self._queue.get(True, timeout=5)
self.debug("[pool] we got a connection before minimum timeout")
return connection
except queue.Empty:
raise Exception('no connections set available after 5 secs, probably either a '
'bug in code (too many uncommited/rolled back '
'connections) or too much load on the server (in '
'which case you can try to set a bigger connections pool size '
'by changing the connections-pool-max-size option in your '
'configuration file)')
else:
self.debug("[pool] we had to open a new connection, "
"connection number: %d/%d" %
(self.size(), self.max_size))
return self._new_cnxset()
def release(self, cnxset):
if self._queue is None:
cnxset.close(True)
else:
self._queue.put_nowait(cnxset)
"""
Release a connection by returning it into the queue.
"""
self._queue.put_nowait(cnxset)
self.debug("[pool] release %s [%s], free queue size is %d/%d (max %d)",
cnxset, id(cnxset), self.qsize(), self.size(), self.max_size)
self._close_idle_cnxset()
def __iter__(self):
for cnxset in self._cnxsets:
yield cnxset
"""
Iter on all connections. This operation grabs the lock.
"""
with self.lock:
for cnxset in self._cnxsets:
yield cnxset
def close(self):
# XXX we don't close the connection when there is no queue?
if self._queue is not None:
while not self._queue.empty():
"""
Close the poll by closing all the connections still in the queue.
"""
while True:
try:
cnxset = self._queue.get_nowait()
except queue.Empty:
break
try:
cnxset.close(True)
except Exception as e:
self.exception('error while closing %s, error: %s' % (cnxset, e))
try:
cnxset.close(True)
except Exception as e:
self.exception('error while closing %s, error: %s' % (cnxset, e))
# these are overridden by set_log_methods below
# only defining here to prevent pylint from complaining
info = warning = error = critical = exception = debug = lambda msg, *a, **kw: None
def get_cnxset(config, source, bootstrap=False):
if not config['connections-pooler-enabled']:
return _BaseCnxSet(source)
idle_timeout = config['connections-pool-idle-timeout']
if bootstrap or config.quick_start:
min_size, max_size = 0, 1
else:
min_size = config['connections-pool-min-size']
max_size = config['connections-pool-max-size']
return _CnxSetPool(source, min_size=min_size, max_size=max_size,
idle_timeout=idle_timeout)
class Repository(object):
......@@ -237,16 +394,9 @@ class Repository(object):
self.info('starting repository from %s', self.config.apphome)
self.shutting_down = False
config = self.config
# copy pool size here since config.init_cube() and config.load_schema()
# reload configuration from file and could reset a manually set pool
# size.
if config['connections-pooler-enabled']:
pool_size, min_pool_size = config['connections-pool-size'], 1
else:
pool_size = min_pool_size = None
# 0. init a cnxset that will be used to fetch bootstrap information from
# the database
self.cnxsets = _CnxSetPool(self.system_source, min_pool_size)
self.cnxsets = get_cnxset(config, self.system_source, bootstrap=True)
# 1. set used cubes
if config.creating or not config.read_instance_schema:
config.bootstrap_cubes()
......@@ -262,8 +412,6 @@ class Repository(object):
# the registry
config.cube_appobject_path = set(('hooks', 'entities'))
config.cubicweb_appobject_path = set(('hooks', 'entities'))
# limit connections pool size
pool_size = min_pool_size
if config.quick_start or config.creating or not config.read_instance_schema:
# load schema from the file system
if not config.creating:
......@@ -295,7 +443,7 @@ class Repository(object):
# 4. close initialization connection set and reopen fresh ones for
# proper initialization
self.cnxsets.close()
self.cnxsets = _CnxSetPool(self.system_source, pool_size)
self.cnxsets = get_cnxset(config, self.system_source)
# 5. call instance level initialisation hooks
self.hm.call_hooks('server_startup', repo=self)
......@@ -350,7 +498,7 @@ class Repository(object):
return
with self.internal_cnx() as cnx:
sourceent = cnx.execute(
'Any S, SA, SC WHERE S is_instance_of CWSource,'
'Any S, SA, SC LIMIT 2 WHERE S is_instance_of CWSource,'
' S name "system", S type SA, S config SC'
).one()
self.system_source.eid = sourceent.eid
......@@ -928,3 +1076,4 @@ class Repository(object):
set_log_methods(Repository, getLogger('cubicweb.repository'))
set_log_methods(_CnxSetPool, getLogger('cubicweb.repository.pool'))
......@@ -132,11 +132,22 @@ the repository rather than the user running the command',
'help': 'enable the connection pooler',
'group': 'main', 'level': 3,
}),
('connections-pool-size',
('connections-pool-max-size',
{'type' : 'int',
'default': 4,
'help': 'size of the connections pool. Each source supporting multiple \
connections will have this number of opened connections.',
'default': 0,
'help': 'Maximum, per process, number of database connections. Default 0 (unlimited)',
'group': 'main', 'level': 3,
}),
('connections-pool-min-size',
{'type' : 'int',
'default': 0,
'help': 'Minimum, per process, number of database connections.',
'group': 'main', 'level': 3,
}),
('connections-pool-idle-timeout',
{'type' : 'int',
'default': 600,
'help': "Start closing connection if the pool hasn't been empty for this many seconds",
'group': 'main', 'level': 3,
}),
('rql-cache-size',
......
......@@ -196,12 +196,12 @@ class CubicWebConfigurationTC(BaseTestCase):
del os.environ['CW_BASE_URL']
def test_config_value_from_environment_int(self):
self.assertEqual(self.config['connections-pool-size'], 4)
os.environ['CW_CONNECTIONS_POOL_SIZE'] = '6'
self.assertEqual(self.config['connections-pool-max-size'], 0)
os.environ['CW_CONNECTIONS_POOL_MAX_SIZE'] = '6'
try:
self.assertEqual(self.config['connections-pool-size'], 6)
self.assertEqual(self.config['connections-pool-max-size'], 6)
finally:
del os.environ['CW_CONNECTIONS_POOL_SIZE']
del os.environ['CW_CONNECTIONS_POOL_MAX_SIZE']
def test_config_value_from_environment_yn(self):
self.assertEqual(self.config['allow-email-login'], False)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment