Commit 753e1df2 authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

first draft for a simple hooks based custom attribute storage,

with a BytesFileSystemStorage POC implementation.

Basically:

* a dictionary contains maps from which attribute of which entity types are
  mapped to which custom storage

* hooks check for one of these entity type being added/modified/deleted

* read is based on the sql generator callback mecanism (used in vcsfile for
  instance)

* all storages have the same basic interface (read, add, update, delete),
  and should be pluggable in a transparent way (except at migration time
  when one want to change from a storage to another)

* the sample BytesFileSystemStorage:
  * may store Bytes attributes content of any entity type as file on the file system
  * is based on one FSPATH rql/sql function and another _fsopen only available in sql
  * has a dumb file name allocation algorithm
parent 80b455066c9a
......@@ -1037,3 +1037,11 @@ def register_stored_procedures():
supported_backends = ('mysql', 'postgres', 'sqlite',)
register_function(TEXT_LIMIT_SIZE)
class FSPATH(FunctionDescr):
supported_backends = ('postgres', 'sqlite',)
rtype = 'Bytes'
register_function(FSPATH)
"""hooks to handle attributes mapped to a custom storage
"""
from os import unlink
from cubicweb.server.hook import Hook
from cubicweb.server.sources.storages import ETYPE_ATTR_STORAGE
class BFSSHook(Hook):
"""abstract class for bytes file-system storage hooks"""
__abstract__ = True
category = 'bfss'
class PreAddEntityHook(BFSSHook):
""""""
__regid__ = 'bfss_add_entity'
events = ('before_add_entity', )
#__select__ = Hook.__select__ & implements('Repository')
def __call__(self):
for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
fpath = ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_added(self.entity, attr)
if fpath is not None:
AddFileOp(filepath=fpath)
class PreUpdateEntityHook(BFSSHook):
""""""
__regid__ = 'bfss_update_entity'
events = ('before_update_entity', )
#__select__ = Hook.__select__ & implements('Repository')
def __call__(self):
for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_updated(self.entity, attr)
class PreDeleteEntityHook(BFSSHook):
""""""
__regid__ = 'bfss_delete_entity'
events = ('before_delete_entity', )
#__select__ = Hook.__select__ & implements('Repository')
def __call__(self):
for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_deleted(self.entity, attr)
/* -*- sql -*-
postgres specific registered procedures for the Bytes File System storage,
require the plpythonu language installed
*/
CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$
fpath = args[0]
if fpath:
try:
data = file(fpath, 'rb').read()
#/* XXX due to plpython bug we have to replace some characters... */
return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #'
except Exception, ex:
plpy.warning('failed to get content for %s: %s', fpath, ex)
return None
$$ LANGUAGE plpythonu
/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
;;
/* fspath(eid, entity type, attribute) */
CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$
pkey = 'plan%s%s' % (args[1], args[2])
try:
plan = SD[pkey]
except KeyError:
#/* then prepare and cache plan to get versioned file information from a
# version content eid */
plan = plpy.prepare(
'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]),
['bigint'])
SD[pkey] = plan
return plpy.execute(plan, [args[0]])[0]
$$ LANGUAGE plpythonu
/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
;;
"""custom storages for the system source"""
from os import unlink, path as osp
from cubicweb.server.hook import Operation
ETYPE_ATTR_STORAGE = {}
def set_attribute_storage(repo, etype, attr, storage):
ETYPE_ATTR_STORAGE.setdefault(etype, {})[attr] = storage
repo.system_source.map_attribute(etype, attr, storage.sqlgen_callback)
class Storage(object):
"""abstract storage"""
def sqlgen_callback(self, generator, relation, linkedvar):
"""sql generator callback when some attribute with a custom storage is
accessed
"""
raise NotImplementedError()
def entity_added(self, entity, attr):
"""an entity using this storage for attr has been added"""
raise NotImplementedError()
def entity_updated(self, entity, attr):
"""an entity using this storage for attr has been updatded"""
raise NotImplementedError()
def entity_deleted(self, entity, attr):
"""an entity using this storage for attr has been deleted"""
raise NotImplementedError()
# TODO
# * make it configurable without code
# * better file path attribution
class BytesFileSystemStorage(Storage):
"""store Bytes attribute value on the file system"""
def __init__(self, defaultdir):
self.default_directory = defaultdir
def sqlgen_callback(self, generator, linkedvar, relation):
"""sql generator callback when some attribute with a custom storage is
accessed
"""
linkedvar.accept(generator)
return '_fsopen(%s.cw_%s)' % (
linkedvar._q_sql.split('.', 1)[0], # table name
relation.r_type) # attribute name
def entity_added(self, entity, attr):
"""an entity using this storage for attr has been added"""
if not entity._cw.transaction_data.get('fs_importing'):
try:
value = entity.pop(attr)
except KeyError:
pass
else:
fpath = self.new_fs_path(entity, attr)
# bytes storage used to store file's path
entity[attr]= Binary(fpath)
file(fpath, 'w').write(value.getvalue())
AddFileOp(entity._cw, filepath=fpath)
# else entity[attr] is expected to be an already existant file path
def entity_updated(self, entity, attr):
"""an entity using this storage for attr has been updatded"""
try:
value = entity.pop(attr)
except KeyError:
pass
else:
fpath = self.current_fs_path(entity, attr)
UpdateFileOp(entity._cw, filepath=fpath, filedata=value.getvalue())
def entity_deleted(self, entity, attr):
"""an entity using this storage for attr has been deleted"""
DeleteFileOp(entity._cw, filepath=self.current_fs_path(entity, attr))
def new_fs_path(self, entity, attr):
fpath = osp.join(self.default_directory, '%s_%s_%s' % (
self.default_directory, entity.eid, attr))
while osp.exists(fspath):
fspath = '_' + fspath
return fspath
def current_fs_path(self, entity, attr):
cu = entity._cw.system_sql('SELECT cw_%s.%s WHERE cw_eid=%s' %
(entity.__regid__, attr, entity.eid))
return cu.fetchone()[0]
class AddFileOp(Operation):
def rollback_event(self):
try:
unlink(self.filepath)
except:
pass
class DeleteFileOp(Operation):
def commit_event(self):
try:
unlink(self.filepath)
except:
pass
class UpdateFileOp(Operation):
def precommit_event(self):
try:
file(self.filepath, 'w').write(self.filedata)
except:
pass
......@@ -314,6 +314,29 @@ def init_sqlite_connexion(cnx):
if hasattr(yams.constraints, 'patch_sqlite_decimal'):
yams.constraints.patch_sqlite_decimal()
def fspath(eid, etype, attr):
try:
cu = cnx.cursor()
cu.execute('SELECT X.cw_%s FROM cw_%s as X '
'WHERE X.cw_eid=%%(eid)s' % (attr, etype),
{'eid': eid})
return cu.fetchone()[0]
except:
import traceback
traceback.print_exc()
raise
cnx.create_function('fspath', 3, fspath)
def _fsopen(fspath):
if fspath:
try:
return buffer(file(fspath).read())
except:
import traceback
traceback.print_exc()
raise
cnx.create_function('_fsopen', 1, _fsopen)
sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', [])
sqlite_hooks.append(init_sqlite_connexion)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment