Commit a22ebe36 authored by Elouan Martinet's avatar Elouan Martinet
Browse files

[storages] Process uploads and deletions in post commit events

parent 54b232af6e2a
......@@ -28,14 +28,16 @@ import boto3
from cubicweb import Binary, set_log_methods
from cubicweb.server.sources.storages import Storage
from cubicweb.server.edition import EditedEntity
from cubicweb.server.hook import DataOperationMixIn, Operation
class S3Storage(Storage):
is_source_callback = True
def __init__(self, bucket):
def __init__(self, bucket, suffix='.tmp'):
self.s3cnx = self._s3_client()
self.bucket = bucket
self.suffix = suffix
@classmethod
def _s3_client(cls):
......@@ -86,12 +88,15 @@ class S3Storage(Storage):
binary.seek(0)
buffer = Binary(binary.read())
binary.seek(0)
self.debug('Upload object to S3')
# upload_fileobj should make automagically a multipart upload if
# needed
self.s3cnx.upload_fileobj(buffer, self.bucket, key)
suffixed_key = self.suffixed_key(key)
self.s3cnx.upload_fileobj(buffer, self.bucket, suffixed_key)
buffer.close()
self.info('Uploaded object %s.%s to S3', entity.eid, attr)
# move to final key in post commit event
S3AddFileOp.get_instance(entity._cw).add_data(
(self, key, entity.eid, attr))
self.info('Temporary uploaded %s.%s (%s/%s) to S3',
entity.eid, attr, self.bucket, suffixed_key)
return binary
def entity_updated(self, entity, attr):
......@@ -105,13 +110,11 @@ class S3Storage(Storage):
if key is None:
# no key to remove
return
self.info('Deleting object %s.%s (%s/%s) from S3',
# delete key in a post commit event
S3DeleteFileOp.get_instance(entity._cw).add_data(
(self, key, entity.eid, attr))
self.info('Delaying deletion for %s.%s (%s/%s) in S3',
entity.eid, attr, self.bucket, key)
resp = self.s3cnx.delete_object(Bucket=self.bucket, Key=key)
if resp.get('ResponseMetadata', {}).get('HTTPStatusCode') >= 300:
self.error('S3 object deletion FAILED: %s', resp)
else:
self.debug('S3 object deletion OK: %s', resp)
def migrate_entity(self, entity, attribute):
"""migrate an entity attribute to the storage"""
......@@ -156,6 +159,48 @@ class S3Storage(Storage):
This implemenation just return a random UUID"""
return str(uuid.uuid1())
def suffixed_key(self, key):
return key + self.suffix
class S3AddFileOp(DataOperationMixIn, Operation):
containercls = list
def postcommit_event(self):
for storage, key, eid, attr in self.get_data():
suffixed_key = storage.suffixed_key(key)
storage.s3cnx.copy_object(
Bucket=storage.bucket,
CopySource={'Bucket': storage.bucket, 'Key': suffixed_key},
Key=key)
storage.s3cnx.delete_object(
Bucket=storage.bucket, Key=suffixed_key)
self.info('Moved temporary object for %s.%s (%s/%s to %s/%s)'
' in S3', eid, attr, storage.bucket, suffixed_key,
storage.bucket, key)
def rollback_event(self):
for storage, key, eid, attr in self.get_data():
suffixed_key = storage.suffixed_key(key)
storage.s3cnx.delete_object(
Bucket=storage.bucket, Key=suffixed_key)
self.info('Deleted temporary object for %s.%s (%s/%s) in S3',
eid, attr, storage.bucket, suffixed_key)
class S3DeleteFileOp(DataOperationMixIn, Operation):
containercls = list
def postcommit_event(self):
for storage, key, eid, attr in self.get_data():
self.info('Deleting object %s.%s (%s/%s) from S3',
eid, attr, storage.bucket, key)
resp = storage.s3cnx.delete_object(Bucket=storage.bucket, Key=key)
if resp.get('ResponseMetadata', {}).get('HTTPStatusCode') >= 300:
self.error('S3 object deletion FAILED: %s', resp)
else:
self.debug('S3 object deletion OK: %s', resp)
set_log_methods(S3Storage,
getLogger('cube.s3storage.storages.s3storage'))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment