Commit 48b3e844 authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

[source storage] refactor source sql generation and results handling to allow...

[source storage] refactor source sql generation and results handling to allow repository side callbacks

for instance with the BytesFileSystemStorage, before this change:
* fspath, _fsopen function were stored procedures executed on the database
  -> files had to be available both on the repository *and* the database host
* we needed implementation for each handled database

Now, those function are python callbacks executed when necessary on the
repository side, on data comming from the database.

The litle cons are:
* you can't do anymore restriction on mapped attributes
* you can't write queries which will return in the same rset column
  some mapped attributes (or not mapped the same way) / some not

This seems much acceptable since:
* it's much more easy to handle when you start having the db on another host
  than the repo
* BFSS works seemlessly on any backend now
* you don't bother that much about the cons (at least in the bfss case):
  you usually don't do any restriction on Bytes...

Bonus points: BFSS is more efficient (no queries under the cover as it
was done in the registered procedure) and we have a much nicer/efficient
fspath implementation.

IMO, that rocks :D

--HG--
branch : stable
parent 9c4ea944ecf9
......@@ -90,7 +90,8 @@ from logilab.common.logging_ext import set_log_methods, init_log
from logilab.common.configuration import (Configuration, Method,
ConfigurationMixIn, merge_options)
from cubicweb import CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, ConfigurationError
from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP,
ConfigurationError, Binary)
from cubicweb.toolsutils import env_path, create_dir
CONFIGURATIONS = []
......@@ -1050,7 +1051,24 @@ def register_stored_procedures():
class FSPATH(FunctionDescr):
supported_backends = ('postgres', 'sqlite',)
rtype = 'Bytes'
"""return path of some bytes attribute stored using the Bytes
File-System Storage (bfss)
"""
rtype = 'Bytes' # XXX return a String? potential pb with fs encoding
def update_cb_stack(self, stack):
assert len(stack) == 1
stack[0] = self.source_execute
def as_sql(self, backend, args):
raise NotImplementedError('source only callback')
def source_execute(self, source, value):
fpath = source.binary_to_str(value)
try:
return Binary(fpath)
except OSError, ex:
self.critical("can't open %s: %s", fpath, ex)
return None
register_function(FSPATH)
sql('DROP FUNCTION IF EXISTS _fsopen')
sql('DROP FUNCTION IF EXISTS fspath')
/* -*- sql -*-
postgres specific registered procedures for the Bytes File System storage,
require the plpythonu language installed
*/
CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$
fpath = args[0]
if fpath:
try:
data = file(fpath, 'rb').read()
#/* XXX due to plpython bug we have to replace some characters... */
return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #'
except Exception, ex:
plpy.warning('failed to get content for %s: %s', fpath, ex)
return None
$$ LANGUAGE plpythonu
/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
;;
/* fspath(eid, entity type, attribute) */
CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$
pkey = 'plan%s%s' % (args[1], args[2])
try:
plan = SD[pkey]
except KeyError:
#/* then prepare and cache plan to get versioned file information from a
# version content eid */
plan = plpy.prepare(
'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]),
['bigint'])
SD[pkey] = plan
return plpy.execute(plan, [args[0]])[0]['cw_' + args[2]]
$$ LANGUAGE plpythonu
/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
;;
......@@ -187,9 +187,10 @@ repository.',
if self._need_sql_create:
return []
assert dbg_st_search(self.uri, union, varmap, args, cachekey)
sql, query_args = self.rqlsqlgen.generate(union, args)
args = self.sqladapter.merge_args(args, query_args)
results = self.sqladapter.process_result(self.doexec(session, sql, args))
sql, qargs, cbs = self.rqlsqlgen.generate(union, args)
args = self.sqladapter.merge_args(args, qargs)
cursor = self.doexec(session, sql, args)
results = self.sqladapter.process_result(cursor, cbs)
assert dbg_results(results)
return results
......
......@@ -264,8 +264,9 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
def init(self):
self.init_creating()
def map_attribute(self, etype, attr, cb):
self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb
# XXX deprecates [un]map_attribute ?
def map_attribute(self, etype, attr, cb, sourcedb=True):
self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = (cb, sourcedb)
def unmap_attribute(self, etype, attr):
self._rql_sqlgen.attr_map.pop('%s.%s' % (etype, attr), None)
......@@ -273,7 +274,8 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
def set_storage(self, etype, attr, storage):
storage_dict = self._storages.setdefault(etype, {})
storage_dict[attr] = storage
self.map_attribute(etype, attr, storage.sqlgen_callback)
self.map_attribute(etype, attr,
storage.callback, storage.is_source_callback)
def unset_storage(self, etype, attr):
self._storages[etype].pop(attr)
......@@ -348,17 +350,17 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
if cachekey is None:
self.no_cache += 1
# generate sql query if we are able to do so (not supported types...)
sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
else:
# sql may be cached
try:
sql, query_args = self._cache[cachekey]
sql, qargs, cbs = self._cache[cachekey]
self.cache_hit += 1
except KeyError:
self.cache_miss += 1
sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
self._cache[cachekey] = sql, query_args
args = self.merge_args(args, query_args)
sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
self._cache[cachekey] = sql, qargs, cbs
args = self.merge_args(args, qargs)
assert isinstance(sql, basestring), repr(sql)
try:
cursor = self.doexec(session, sql, args)
......@@ -367,7 +369,7 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
self.info("request failed '%s' ... retry with a new cursor", sql)
session.pool.reconnect(self)
cursor = self.doexec(session, sql, args)
results = self.process_result(cursor)
results = self.process_result(cursor, cbs)
assert dbg_results(results)
return results
......@@ -381,9 +383,9 @@ class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
self.uri, union, varmap, args,
prefix='ON THE FLY temp data insertion into %s from' % table)
# generate sql queries if we are able to do so
sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
query = 'INSERT INTO %s %s' % (table, sql.encode(self._dbencoding))
self.doexec(session, query, self.merge_args(args, query_args))
self.doexec(session, query, self.merge_args(args, qargs))
def manual_insert(self, results, table, session):
"""insert given result into a temporary table on the system source"""
......
......@@ -33,16 +33,30 @@ __docformat__ = "restructuredtext en"
import threading
from logilab.database import FunctionDescr, SQL_FUNCTIONS_REGISTRY
from rql import BadRQLQuery, CoercionError
from rql.stmts import Union, Select
from rql.nodes import (SortTerm, VariableRef, Constant, Function, Not,
Variable, ColumnAlias, Relation, SubQuery, Exists)
from cubicweb import QueryError
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.utils import cleanup_solutions
ColumnAlias._q_invariant = False # avoid to check for ColumnAlias / Variable
FunctionDescr.source_execute = None
def default_update_cb_stack(self, stack):
stack.append(self.source_execute)
FunctionDescr.update_cb_stack = default_update_cb_stack
LENGTH = SQL_FUNCTIONS_REGISTRY.get_function('LENGTH')
def length_source_execute(source, value):
return len(value.getvalue())
LENGTH.source_execute = length_source_execute
def _new_var(select, varname):
newvar = select.get_variable(varname)
if not 'relations' in newvar.stinfo:
......@@ -252,14 +266,44 @@ def fix_selection_and_group(rqlst, selectedidx, needwrap, selectsortterms,
selectedidx.append(vref.name)
rqlst.selection.append(vref)
# IGenerator implementation for RQL->SQL ######################################
def iter_mapped_var_sels(stmt, variable):
# variable is a Variable or ColumnAlias node mapped to a source side
# callback
if not (len(variable.stinfo['rhsrelations']) <= 1 and # < 1 on column alias
variable.stinfo['selected']):
raise QueryError("can't use %s as a restriction variable"
% variable.name)
for selectidx in variable.stinfo['selected']:
vrefs = stmt.selection[selectidx].get_nodes(VariableRef)
if len(vrefs) != 1:
raise QueryError()
yield selectidx, vrefs[0]
def update_source_cb_stack(state, stmt, node, stack):
while True:
node = node.parent
if node is stmt:
break
if not isinstance(node, Function):
raise QueryError()
func = SQL_FUNCTIONS_REGISTRY.get_function(node.name)
if func.source_execute is None:
raise QueryError('%s can not be called on mapped attribute'
% node.name)
state.source_cb_funcs.add(node)
func.update_cb_stack(stack)
# IGenerator implementation for RQL->SQL #######################################
class StateInfo(object):
def __init__(self, existssols, unstablevars):
self.existssols = existssols
self.unstablevars = unstablevars
self.subtables = {}
self.needs_source_cb = None
self.subquery_source_cb = None
self.source_cb_funcs = set()
def reset(self, solution):
"""reset some visit variables"""
......@@ -276,6 +320,17 @@ class StateInfo(object):
self.restrictions = []
self._restr_stack = []
self.ignore_varmap = False
self._needs_source_cb = {}
def merge_source_cbs(self, needs_source_cb):
if self.needs_source_cb is None:
self.needs_source_cb = needs_source_cb
elif needs_source_cb != self.needs_source_cb:
raise QueryError('query fetch some source mapped attribute, some not')
def finalize_source_cbs(self):
if self.subquery_source_cb is not None:
self.needs_source_cb.update(self.subquery_source_cb)
def add_restriction(self, restr):
if restr:
......@@ -373,7 +428,7 @@ class SQLGenerator(object):
# union query for each rqlst / solution
sql = self.union_sql(union)
# we are done
return sql, self._query_attrs
return sql, self._query_attrs, self._state.needs_source_cb
finally:
self._lock.release()
......@@ -436,6 +491,9 @@ class SQLGenerator(object):
else:
existssols, unstable = {}, ()
state = StateInfo(existssols, unstable)
if self._state is not None:
# state from a previous unioned select
state.merge_source_cbs(self._state.needs_source_cb)
# treat subqueries
self._subqueries_sql(select, state)
# generate sql for this select node
......@@ -491,6 +549,7 @@ class SQLGenerator(object):
if fneedwrap:
selection = ['T1.C%s' % i for i in xrange(len(origselection))]
sql = 'SELECT %s FROM (%s) AS T1' % (','.join(selection), sql)
state.finalize_source_cbs()
finally:
select.selection = origselection
# limit / offset
......@@ -508,10 +567,21 @@ class SQLGenerator(object):
tablealias = '_T%s' % i # XXX nested subqueries
sql = '(%s) AS %s' % (sql, tablealias)
state.subtables[tablealias] = (0, sql)
latest_state = self._state
for vref in subquery.aliases:
alias = vref.variable
alias._q_sqltable = tablealias
alias._q_sql = '%s.C%s' % (tablealias, alias.colnum)
try:
stack = latest_state.needs_source_cb[alias.colnum]
if state.subquery_source_cb is None:
state.subquery_source_cb = {}
for selectidx, vref in iter_mapped_var_sels(select, alias):
stack = stack[:]
update_source_cb_stack(state, select, vref, stack)
state.subquery_source_cb[selectidx] = stack
except KeyError:
continue
def _solutions_sql(self, select, solutions, distinct, needalias):
sqls = []
......@@ -523,6 +593,7 @@ class SQLGenerator(object):
sql = [self._selection_sql(select.selection, distinct, needalias)]
if self._state.restrictions:
sql.append('WHERE %s' % ' AND '.join(self._state.restrictions))
self._state.merge_source_cbs(self._state._needs_source_cb)
# add required tables
assert len(self._state.actual_tables) == 1, self._state.actual_tables
tables = self._state.actual_tables[-1]
......@@ -895,7 +966,13 @@ class SQLGenerator(object):
except KeyError:
mapkey = '%s.%s' % (self._state.solution[lhs.name], rel.r_type)
if mapkey in self.attr_map:
lhssql = self.attr_map[mapkey](self, lhs.variable, rel)
cb, sourcecb = self.attr_map[mapkey]
if sourcecb:
# callback is a source callback, we can't use this
# attribute in restriction
raise QueryError("can't use %s (%s) in restriction"
% (mapkey, rel.as_string()))
lhssql = cb(self, lhs.variable, rel)
elif rel.r_type == 'eid':
lhssql = lhs.variable._q_sql
else:
......@@ -987,9 +1064,13 @@ class SQLGenerator(object):
def visit_function(self, func):
"""generate SQL name for a function"""
# func_sql_call will check function is supported by the backend
return self.dbms_helper.func_as_sql(func.name,
[c.accept(self) for c in func.children])
args = [c.accept(self) for c in func.children]
if func in self._state.source_cb_funcs:
# function executed as a callback on the source
assert len(args) == 1
return args[0]
# func_as_sql will check function is supported by the backend
return self.dbhelper.func_as_sql(func.name, args)
def visit_constant(self, constant):
"""generate SQL name for a constant"""
......@@ -1156,12 +1237,20 @@ class SQLGenerator(object):
if isinstance(linkedvar, ColumnAlias):
raise BadRQLQuery('variable %s should be selected by the subquery'
% variable.name)
mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type)
if mapkey in self.attr_map:
return self.attr_map[mapkey](self, linkedvar, rel)
try:
sql = self._varmap['%s.%s' % (linkedvar.name, rel.r_type)]
except KeyError:
mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type)
if mapkey in self.attr_map:
cb, sourcecb = self.attr_map[mapkey]
if not sourcecb:
return cb(self, linkedvar, rel)
# attribute mapped at the source level (bfss for instance)
stmt = rel.stmt
for selectidx, vref in iter_mapped_var_sels(stmt, variable):
stack = [cb]
update_source_cb_stack(self._state, stmt, vref, stack)
self._state._needs_source_cb[selectidx] = stack
linkedvar.accept(self)
sql = '%s.%s%s' % (linkedvar._q_sqltable, SQL_PREFIX, rel.r_type)
return sql
......
......@@ -11,10 +11,31 @@ def unset_attribute_storage(repo, etype, attr):
repo.system_source.unset_storage(etype, attr)
class Storage(object):
"""abstract storage"""
def sqlgen_callback(self, generator, relation, linkedvar):
"""sql generator callback when some attribute with a custom storage is
accessed
"""abstract storage
* If `source_callback` is true (by default), the callback will be run during
query result process of fetched attribute's valu and should have the
following prototype::
callback(self, source, value)
where `value` is the value actually stored in the backend. None values
will be skipped (eg callback won't be called).
* if `source_callback` is false, the callback will be run during sql
generation when some attribute with a custom storage is accessed and
should have the following prototype::
callback(self, generator, relation, linkedvar)
where `generator` is the sql generator, `relation` the current rql syntax
tree relation and linkedvar the principal syntax tree variable holding the
attribute.
"""
is_source_callback = True
def callback(self, *args):
"""see docstring for prototype, which vary according to is_source_callback
"""
raise NotImplementedError()
......@@ -38,14 +59,16 @@ class BytesFileSystemStorage(Storage):
def __init__(self, defaultdir):
self.default_directory = defaultdir
def sqlgen_callback(self, generator, linkedvar, relation):
def callback(self, source, value):
"""sql generator callback when some attribute with a custom storage is
accessed
"""
linkedvar.accept(generator)
return '_fsopen(%s.cw_%s)' % (
linkedvar._q_sql.split('.', 1)[0], # table name
relation.r_type) # attribute name
fpath = source.binary_to_str(value)
try:
return Binary(file(fpath).read())
except OSError, ex:
source.critical("can't open %s: %s", value, ex)
return None
def entity_added(self, entity, attr):
"""an entity using this storage for attr has been added"""
......
......@@ -188,9 +188,17 @@ class SQLAdapterMixIn(object):
return newargs
return query_args
def process_result(self, cursor):
def process_result(self, cursor, column_callbacks=None):
"""return a list of CubicWeb compliant values from data in the given cursor
"""
# use two different implementations to avoid paying the price of
# callback lookup for each *cell* in results when there is nothing to
# lookup
if not column_callbacks:
return self._process_result(cursor)
return self._cb_process_result(cursor, column_callbacks)
def _process_result(self, cursor, column_callbacks=None):
# begin bind to locals for optimization
descr = cursor.description
encoding = self._dbencoding
......@@ -208,6 +216,30 @@ class SQLAdapterMixIn(object):
results[i] = result
return results
def _cb_process_result(self, cursor, column_callbacks):
# begin bind to locals for optimization
descr = cursor.description
encoding = self._dbencoding
process_value = self._process_value
binary = Binary
# /end
results = cursor.fetchall()
for i, line in enumerate(results):
result = []
for col, value in enumerate(line):
if value is None:
result.append(value)
continue
cbstack = column_callbacks.get(col, None)
if cbstack is None:
value = process_value(value, descr[col], encoding, binary)
else:
for cb in cbstack:
value = cb(self, value)
result.append(value)
results[i] = result
return results
def preprocess_entity(self, entity):
"""return a dictionary to use as extra argument to cursor.execute
to insert/update an entity into a SQL database
......@@ -277,28 +309,5 @@ def init_sqlite_connexion(cnx):
import yams.constraints
yams.constraints.patch_sqlite_decimal()
def fspath(eid, etype, attr):
try:
cu = cnx.cursor()
cu.execute('SELECT X.cw_%s FROM cw_%s as X '
'WHERE X.cw_eid=%%(eid)s' % (attr, etype),
{'eid': eid})
return cu.fetchone()[0]
except:
import traceback
traceback.print_exc()
raise
cnx.create_function('fspath', 3, fspath)
def _fsopen(fspath):
if fspath:
try:
return buffer(file(fspath).read())
except:
import traceback
traceback.print_exc()
raise
cnx.create_function('_fsopen', 1, _fsopen)
sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', [])
sqlite_hooks.append(init_sqlite_connexion)
......@@ -1113,8 +1113,8 @@ class PostgresSQLGeneratorTC(RQLGeneratorTC):
args = {'text': 'hip hop momo'}
try:
union = self._prepare(rql)
r, nargs = self.o.generate(union, args,
varmap=varmap)
r, nargs, cbs = self.o.generate(union, args,
varmap=varmap)
args.update(nargs)
self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True)
except Exception, ex:
......@@ -1135,7 +1135,7 @@ class PostgresSQLGeneratorTC(RQLGeneratorTC):
def _checkall(self, rql, sql):
try:
rqlst = self._prepare(rql)
r, args = self.o.generate(rqlst)
r, args, cbs = self.o.generate(rqlst)
self.assertEqual((r.strip(), args), sql)
except Exception, ex:
print rql
......@@ -1197,7 +1197,7 @@ WHERE rel_in_group0.eid_from=T00.x AND rel_in_group0.eid_to=_G.cw_eid''',
def test_is_null_transform(self):
union = self._prepare('Any X WHERE X login %(login)s')
r, args = self.o.generate(union, {'login': None})
r, args, cbs = self.o.generate(union, {'login': None})
self.assertLinesEquals((r % args).strip(),
'''SELECT _X.cw_eid
FROM cw_CWUser AS _X
......@@ -1386,11 +1386,11 @@ WHERE NOT EXISTS(SELECT 1 FROM created_by_relation AS rel_created_by0 WHERE rel_
'''SELECT COUNT(1)
WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''')
def test_attr_map(self):
def test_attr_map_sqlcb(self):
def generate_ref(gen, linkedvar, rel):
linkedvar.accept(gen)
return 'VERSION_DATA(%s)' % linkedvar._q_sql
self.o.attr_map['Affaire.ref'] = generate_ref
self.o.attr_map['Affaire.ref'] = (generate_ref, False)
try:
self._check('Any R WHERE X ref R',
'''SELECT VERSION_DATA(_X.cw_eid)
......@@ -1402,6 +1402,17 @@ WHERE VERSION_DATA(_X.cw_eid)=1''')
finally:
self.o.attr_map.clear()
def test_attr_map_sourcecb(self):
cb = lambda x,y: None
self.o.attr_map['Affaire.ref'] = (cb, True)
try:
union = self._prepare('Any R WHERE X ref R')
r, nargs, cbs = self.o.generate(union, args={})
self.assertLinesEquals(r.strip(), 'SELECT _X.cw_ref\nFROM cw_Affaire AS _X')
self.assertEquals(cbs, {0: [cb]})
finally:
self.o.attr_map.clear()
class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
......
......@@ -13,7 +13,7 @@ import os.path as osp
import shutil
import tempfile
from cubicweb import Binary
from cubicweb import Binary, QueryError
from cubicweb.selectors import implements
from cubicweb.server.sources import storages
from cubicweb.server.hook import Hook, Operation
......@@ -80,7 +80,7 @@ class StorageTC(CubicWebTC):
def test_bfss_sqlite_fspath(self):
f1 = self.create_file()
expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
{'f': f1.eid})[0][0]
self.assertEquals(fspath.getvalue(), expected_filepath)
......@@ -88,7 +88,7 @@ class StorageTC(CubicWebTC):
self.session.transaction_data['fs_importing'] = True
f1 = self.session.create_entity('File', data=Binary('/the/path'),
data_format=u'text/plain', data_name=u'foo')
fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
{'f': f1.eid})[0][0]
self.assertEquals(fspath.getvalue(), '/the/path')
......@@ -102,5 +102,63 @@ class StorageTC(CubicWebTC):
self.vreg.unregister(DummyBeforeHook)
self.vreg.unregister(DummyAfterHook)
def test_source_mapped_attribute_error_cases(self):
ex = self.assertRaises(QueryError, self.execute,
'Any X WHERE X data ~= "hop", X is File')
self.assertEquals(str(ex), 'can\'t use File.data (X data ILIKE "hop") in restriction')
ex = self.assertRaises(QueryError, self.execute,
'Any X, Y WHERE X data D, Y data D, '
'NOT X identity Y, X is File, Y is File')
self.assertEquals(str(ex), "can't use D as a restriction variable")
# query returning mix of mapped / regular attributes (only file.data
# mapped, not image.data for instance)
ex = self.assertRaises(QueryError, self.execute,