storages.py 8.6 KB
Newer Older
1
# copyright 2018-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

"""custom storages for S3"""

import uuid
from logging import getLogger

from six import PY3
25
import os
26
27
28
29
import boto3

from cubicweb import Binary, set_log_methods
from cubicweb.server.sources.storages import Storage
30
from cubicweb.server.edition import EditedEntity
31
from cubicweb.server.hook import DataOperationMixIn, LateOperation
32
33
34
35
36


class S3Storage(Storage):
    is_source_callback = True

37
    def __init__(self, bucket, suffix='.tmp'):
38
        self.s3cnx = self._s3_client()
39
        self.bucket = bucket
40
        self.suffix = suffix
41

42
43
    @classmethod
    def _s3_client(cls):
44
45
46
47
48
        endpoint_url = os.environ.get('AWS_S3_ENDPOINT_URL')
        if endpoint_url:
            cls.debug('Using custom S3 endpoint url {}'.format(endpoint_url))
        return boto3.client('s3',
                            endpoint_url=endpoint_url)
49

50
51
52
    def callback(self, source, cnx, value):
        """see docstring for prototype, which vary according to is_source_callback
        """
53
        key = source.binary_to_str(value).decode('utf-8')
54
55
56
57
58
59
60
61
        if cnx.commit_state == 'precommit':
            # download suffixed key if it exists
            # FIXME need a way to check that the attribute is actually edited
            try:
                suffixed_key = self.suffixed_key(key)
                return self.download(suffixed_key)
            except Exception:
                pass
62
        try:
63
            return self.download(key)
64
        except Exception as ex:
65
            source.critical("can't retrieve S3 object %s: %s", value, ex)
66
67
68
69
            return None

    def entity_added(self, entity, attr):
        """an entity using this storage for attr has been added"""
70
71
72
73
74
75
76
        if entity._cw.transaction_data.get('fs_importing'):
            # fs_importing allows to change S3 key saved in database
            entity._cw_dont_cache_attribute(attr, repo_side=True)
            key = entity.cw_edited[attr].getvalue()
            if PY3:
                key = key.decode('utf-8')
            try:
77
                return self.download(key)
78
79
80
            except Exception:
                return None

81
        binary = entity.cw_edited.pop(attr)
82
83
84
85
86
        if binary is None:
            # remove S3 key
            entity.cw_edited.edited_attribute(attr, None)
            self.entity_deleted(entity, attr)
        else:
87
            key = self.get_s3_key(entity, attr)
88
89
            if key is None:
                key = self.new_s3_key(entity, attr)
90
91
92
93
94
            # save S3 key
            entity.cw_edited.edited_attribute(
                attr, Binary(key.encode('utf-8')))

            # copy Binary value, workaround for boto3 bug
95
96
            # https://github.com/boto/s3transfer/issues/80
            # .read() is required since Binary can't wrap itself
97
            binary.seek(0)
98
            buffer = Binary(binary.read())
99
            binary.seek(0)
100
            suffixed_key = self.suffixed_key(key)
101
102
103
            extra_args = self.get_upload_extra_args(entity, attr, key)
            self.s3cnx.upload_fileobj(buffer, self.bucket, suffixed_key,
                                      ExtraArgs=extra_args)
104
            buffer.close()
105
106
107
108
109
110

            # move to final key in post commit event
            S3AddFileOp.get_instance(entity._cw).add_data(
                (self, key, entity.eid, attr))
            self.info('Temporary uploaded %s.%s (%s/%s) to S3',
                      entity.eid, attr, self.bucket, suffixed_key)
111
112
        return binary

113
114
115
116
117
118
119
    def get_upload_extra_args(self, _entity, _attr, _key):
        """Additional options for boto3's upload_fileobj method.
        Documentation for supported options can be found at:
        https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-extraargs-parameter
        """
        return {}

120
121
122
123
124
125
    def entity_updated(self, entity, attr):
        """an entity using this storage for attr has been updatded"""
        return self.entity_added(entity, attr)

    def entity_deleted(self, entity, attr):
        """an entity using this storage for attr has been deleted"""
126
127
        if entity._cw.repo.config['s3-auto-delete']:
            key = self.get_s3_key(entity, attr)
128
129
130
            if key is None:
                # no key to remove
                return
131
132
133
134
            # delete key in a post commit event
            S3DeleteFileOp.get_instance(entity._cw).add_data(
                (self, key, entity.eid, attr))
            self.info('Delaying deletion for %s.%s (%s/%s) in S3',
135
                      entity.eid, attr, self.bucket, key)
136
137
138

    def migrate_entity(self, entity, attribute):
        """migrate an entity attribute to the storage"""
139
140
141
142
143
144
145
146
147
148
        entity.cw_edited = EditedEntity(entity, **entity.cw_attr_cache)
        binary = self.entity_added(entity, attribute)
        if binary is not None:
            cnx = entity._cw
            source = cnx.repo.system_source
            attrs = source.preprocess_entity(entity)
            sql = source.sqlgen.update('cw_' + entity.cw_etype, attrs,
                                       ['cw_eid'])
            source.doexec(cnx, sql, attrs)
        entity.cw_edited = None
149
150

    def get_s3_key(self, entity, attr):
Arthur Lutz's avatar
Arthur Lutz committed
151
152
153
        """
        Return the S3 key of the S3 object storing the content of attribute
        attr of the entity.
154
        """
155
156
157
158
159
160
161
162
        try:
            rset = entity._cw.execute(
                'Any stkey(D) WHERE X eid %s, X %s D' %
                (entity.eid, attr))
        except NotImplementedError:
            # may occur when called from migrate_entity, ie. when the storage
            # has not yet been installed
            rset = None
163
164
165
        if rset and rset.rows[0][0]:
            key = rset.rows[0][0].getvalue()
            if PY3:
166
                key = key.decode('utf-8')
167
            return key
168
        return None
169
170
171
172

    def new_s3_key(self, entity, attr):
        """Generate a new key for given entity attr.

Elouan Martinet's avatar
Elouan Martinet committed
173
        This implementation just returns a random UUID"""
174
175
        return str(uuid.uuid1())

176
177
178
    def suffixed_key(self, key):
        return key + self.suffix

179
180
181
182
183
    def download(self, key):
        result = self.s3cnx.get_object(Bucket=self.bucket, Key=key)
        self.info('Downloaded %s/%s from S3', self.bucket, key)
        return Binary(result['Body'].read())

184

185
class S3AddFileOp(DataOperationMixIn, LateOperation):
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
    containercls = list

    def postcommit_event(self):
        for storage, key, eid, attr in self.get_data():
            suffixed_key = storage.suffixed_key(key)
            storage.s3cnx.copy_object(
                Bucket=storage.bucket,
                CopySource={'Bucket': storage.bucket, 'Key': suffixed_key},
                Key=key)
            storage.s3cnx.delete_object(
                Bucket=storage.bucket, Key=suffixed_key)
            self.info('Moved temporary object for %s.%s (%s/%s to %s/%s)'
                      ' in S3', eid, attr, storage.bucket, suffixed_key,
                      storage.bucket, key)

    def rollback_event(self):
        for storage, key, eid, attr in self.get_data():
            suffixed_key = storage.suffixed_key(key)
            storage.s3cnx.delete_object(
                Bucket=storage.bucket, Key=suffixed_key)
            self.info('Deleted temporary object for %s.%s (%s/%s) in S3',
                      eid, attr, storage.bucket, suffixed_key)


210
class S3DeleteFileOp(DataOperationMixIn, LateOperation):
211
212
213
214
215
216
217
218
219
220
221
222
    containercls = list

    def postcommit_event(self):
        for storage, key, eid, attr in self.get_data():
            self.info('Deleting object %s.%s (%s/%s) from S3',
                      eid, attr, storage.bucket, key)
            resp = storage.s3cnx.delete_object(Bucket=storage.bucket, Key=key)
            if resp.get('ResponseMetadata', {}).get('HTTPStatusCode') >= 300:
                self.error('S3 object deletion FAILED: %s', resp)
            else:
                self.debug('S3 object deletion OK: %s', resp)

223
224
225

set_log_methods(S3Storage,
                getLogger('cube.s3storage.storages.s3storage'))