Commit f3acc79c authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

[server] implement dynamic database pooler

Opening too much database connection has a cost at startup and also PostgreSQL
as a maximum number of connection (100 by default).
This get worse when starting multiple wsgi processes, since each process has
its own database pool.

Instead of opening `connections-pool-size` connections to the database at
startup, just open one and open more only when needed.
parent 29b55fb3ab0c
...@@ -30,6 +30,7 @@ from itertools import chain ...@@ -30,6 +30,7 @@ from itertools import chain
from contextlib import contextmanager from contextlib import contextmanager
from logging import getLogger from logging import getLogger
import queue import queue
import threading
from logilab.common.decorators import cached, clear_cache from logilab.common.decorators import cached, clear_cache
...@@ -178,38 +179,54 @@ class _CnxSetPool(_BaseCnxSet): ...@@ -178,38 +179,54 @@ class _CnxSetPool(_BaseCnxSet):
Database connections pool. Database connections pool.
""" """
def __init__(self, source, size): def __init__(self, source, min_size=1, max_size=4):
super().__init__(source) super().__init__(source)
self._cnxsets = [] self._cnxsets = []
self._queue = queue.LifoQueue() self._queue = queue.LifoQueue()
self.lock = threading.Lock()
self.min_size = min_size
self.max_size = max_size
for i in range(size): for i in range(min_size):
self._queue.put_nowait(self._new_cnxset()) self._queue.put_nowait(self._new_cnxset())
def _new_cnxset(self): def _new_cnxset(self):
cnxset = super()._new_cnxset() cnxset = super()._new_cnxset()
self._cnxsets.append(cnxset) with self.lock:
self._cnxsets.append(cnxset)
return cnxset return cnxset
def size(self):
with self.lock:
return len(self._cnxsets)
def qsize(self): def qsize(self):
return self._queue.qsize() return self._queue.qsize()
def get(self): def get(self):
try: try:
return self._queue.get(True, timeout=5) cnxset = self._queue.get_nowait()
return cnxset
except queue.Empty: except queue.Empty:
raise Exception('no connections set available after 5 secs, probably either a ' if self.max_size and self.size() >= self.max_size:
'bug in code (too many uncommited/rolled back ' try:
'connections) or too much load on the server (in ' return self._queue.get(True, timeout=5)
'which case you can try to set a bigger ' except queue.Empty:
'connections pool size)') raise Exception('no connections set available after 5 secs, probably either a '
'bug in code (too many uncommited/rolled back '
'connections) or too much load on the server (in '
'which case you can try to set a bigger '
'connections pool size)')
else:
return self._new_cnxset()
def release(self, cnxset): def release(self, cnxset):
self._queue.put_nowait(cnxset) self._queue.put_nowait(cnxset)
def __iter__(self): def __iter__(self):
for cnxset in self._cnxsets: with self.lock:
yield cnxset for cnxset in self._cnxsets:
yield cnxset
def close(self): def close(self):
while True: while True:
...@@ -226,7 +243,7 @@ class _CnxSetPool(_BaseCnxSet): ...@@ -226,7 +243,7 @@ class _CnxSetPool(_BaseCnxSet):
def get_cnxset(source, size): def get_cnxset(source, size):
if not size: if not size:
return _BaseCnxSet(source) return _BaseCnxSet(source)
return _CnxSetPool(source, size) return _CnxSetPool(source, min_size=1, max_size=size)
class Repository(object): class Repository(object):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment