Skip to content
Snippets Groups Projects
storages.py 11.43 KiB
# copyright 2018-2022 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

"""custom storages for S3"""

import uuid
from logging import getLogger

import os
import boto3

from cubicweb import Binary, set_log_methods
from cubicweb.server.sources.storages import Storage
from cubicweb.server.edition import EditedEntity
from cubicweb.server.hook import DataOperationMixIn, LateOperation


class S3Storage(Storage):
    is_source_callback = True
    KEY_SEPARATOR = "#"

    def __init__(self, bucket, suffix=".tmp"):
        self.s3cnx = self._s3_client()
        self.bucket = bucket
        self.suffix = suffix

    @classmethod
    def _s3_client(cls):
        endpoint_url = os.environ.get("AWS_S3_ENDPOINT_URL")
        if endpoint_url:
            cls.debug(f"Using custom S3 endpoint url {endpoint_url}")
        return boto3.client("s3", endpoint_url=endpoint_url)

    def callback(self, source, cnx, value):
        """see docstring for prototype, which vary according to is_source_callback"""
        key = source.binary_to_str(value).decode("utf-8")
        if (
            cnx.repo.config["s3-transaction-suffix-key"]
            and cnx.commit_state == "precommit"
        ):
            # download suffixed key if it exists
            # FIXME need a way to check that the attribute is actually edited
            try:
                suffixed_key = self.suffixed_key(key)
                return self.get_s3_object(cnx, suffixed_key)
            except Exception:
                pass
        try:
            return self.get_s3_object(cnx, key)
        except Exception as ex:
            source.critical("can't retrieve S3 object %s: %s", key, ex)
            return None

    def entity_added(self, entity, attr):
        """an entity using this storage for attr has been added"""
        if entity._cw.transaction_data.get("fs_importing"):
            # fs_importing allows to change S3 key saved in database
            entity._cw_dont_cache_attribute(attr, repo_side=True)
            key = entity.cw_edited[attr].getvalue()
            key = key.decode("utf-8")
            try:
                return self.get_s3_object(entity._cw, key)
            except Exception:
                return None

        binary = entity.cw_edited.pop(attr)
        if binary is None:
            # remove S3 key
            entity.cw_edited.edited_attribute(attr, None)
            self.entity_deleted(entity, attr)
        else:
            oldkey = self.get_s3_key(entity, attr)
            if oldkey is not None:
                oldkey, _ = self.parse_key(oldkey)
            key = self.new_s3_key(entity, attr)
            # copy Binary value, workaround for boto3 bug
            # https://github.com/boto/s3transfer/issues/80
            # .read() is required since Binary can't wrap itself
            binary.seek(0)
            buffer = Binary(binary.read())
            binary.seek(0)
            if entity._cw.repo.config["s3-transaction-suffix-key"]:
                upload_key = self.suffixed_key(key)
            else:
                upload_key = key
            extra_args = self.get_upload_extra_args(entity, attr, key)

            put_object_result = self.s3cnx.put_object(
                Body=buffer, Bucket=self.bucket, Key=upload_key, **extra_args
            )
            buffer.close()
            version_id = put_object_result.get("VersionId", None)
            # save S3 key
            entity = self.save_s3_key(entity, attr, upload_key, version_id)

            # when key is suffixed, move to final key in post commit event
            # remove temporary key on rollback
            S3AddFileOp.get_instance(entity._cw).add_data((self, key, entity.eid, attr))
            self.info(
                "Uploaded %s.%s (%s/%s) to S3",
                entity.eid,
                attr,
                self.bucket,
                upload_key,
            )
            if oldkey is not None and oldkey != key:
                # remove unneeded old key
                self.delay_deletion(entity, attr, oldkey)
        return binary

    def get_upload_extra_args(self, _entity, _attr, _key):
        """Additional options for boto3's upload_fileobj method.
        Documentation for supported options can be found at:
        https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-extraargs-parameter
        """
        return {}

    def entity_updated(self, entity, attr):
        """an entity using this storage for attr has been updated"""
        return self.entity_added(entity, attr)

    def entity_deleted(self, entity, attr):
        """an entity using this storage for attr has been deleted"""
        key = self.get_s3_key(entity, attr)
        if key is None:
            # no key to remove
            return
        key, _ = self.parse_key(key)
        self.delay_deletion(entity, attr, key)

    def delay_deletion(self, entity, attr, key):
        if entity._cw.repo.config["s3-auto-delete"]:
            # delete key in a post commit event
            S3DeleteFileOp.get_instance(entity._cw).add_data(
                (self, key, entity.eid, attr)
            )
            self.info(
                "Delaying deletion for %s.%s (%s/%s) in S3",
                entity.eid,
                attr,
                self.bucket,
                key,
            )

    def migrate_entity(self, entity, attribute):
        """migrate an entity attribute to the storage"""
        entity.cw_edited = EditedEntity(entity, **entity.cw_attr_cache)
        binary = self.entity_added(entity, attribute)
        if binary is not None:
            cnx = entity._cw
            source = cnx.repo.system_source
            attrs = source.preprocess_entity(entity)
            sql = source.sqlgen.update("cw_" + entity.cw_etype, attrs, ["cw_eid"])
            source.doexec(cnx, sql, attrs)
        entity.cw_edited = None

    def save_s3_key(self, entity, attr, key, version_id=None):
        """
        Save the s3 key into the entity bytes attribute
        """
        id_string = key
        if (
            entity._cw.repo.config["s3-activate-object-versioning"]
            and version_id is not None
        ):
            id_string = self.format_version_id_suffix(key, version_id)
        entity.cw_edited.edited_attribute(attr, Binary(id_string.encode("utf-8")))
        return entity

    def parse_key(self, key):
        try:
            key, version = key.rsplit(self.KEY_SEPARATOR, 1)
        except ValueError:
            return key, None
        return key, version

    def format_version_id_suffix(self, key, version_id):
        """
        Format the string that will store key and version id
        """
        return f"{key}{self.KEY_SEPARATOR}{version_id}"

    def get_s3_key(self, entity, attr):
        """
        Return the S3 key of the S3 object storing the content of attribute
        attr of the entity.
        """
        try:
            rset = entity._cw.execute(
                "Any stkey(D) WHERE X eid {}, X {} D".format(entity.eid, attr)
            )
        except NotImplementedError:
            # may occur when called from migrate_entity, ie. when the storage
            # has not yet been installed
            rset = None
        if rset and rset.rows[0][0]:
            key = rset.rows[0][0].getvalue()
            key = key.decode("utf-8")
            return key
        return None

    def new_s3_key(self, entity, attr):
        """Generate a new key for given entity attr.

        This implementation just returns a random UUID"""
        return str(uuid.uuid1())

    def suffixed_key(self, key):
        return key + self.suffix

    def get_s3_object(self, cnx, key):
        """
        :param cnx: (Object)
        :param key: (string)
        get s3 stored attribute for key
        handle the case of versioned object
        """
        versioning_activated = cnx.repo.config["s3-activate-object-versioning"]
        # check  first : does the key contain a '<separator>'
        key, version_id = self.parse_key(key)

        # if object-versioning is activated use the version_id
        if versioning_activated and version_id is not None:
            return self.download(key, VersionId=version_id)
        return self.download(key)

    def download(self, key, **kwargs):
        """
        :param key: (string)
        :param kwargs: (dict) Keys must be compatible with method S3.Client.put_object
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object
        """
        result = self.s3cnx.get_object(Bucket=self.bucket, Key=key, **kwargs)
        self.info("Downloaded %s/%s from S3", self.bucket, key)
        return Binary(result["Body"].read())


class S3AddFileOp(DataOperationMixIn, LateOperation):
    containercls = list

    def postcommit_event(self):
        if not self.cnx.repo.config["s3-transaction-suffix-key"]:
            return
        consumed_keys = set()
        for storage, key, eid, attr in self.get_data():
            if key in consumed_keys:
                continue
            consumed_keys.add(key)
            suffixed_key = storage.suffixed_key(key)
            storage.s3cnx.copy_object(
                Bucket=storage.bucket,
                CopySource={"Bucket": storage.bucket, "Key": suffixed_key},
                Key=key,
            )
            storage.s3cnx.delete_object(Bucket=storage.bucket, Key=suffixed_key)
            self.info(
                "Moved temporary object for %s.%s (%s/%s to %s/%s)" " in S3",
                eid,
                attr,
                storage.bucket,
                suffixed_key,
                storage.bucket,
                key,
            )

    def rollback_event(self):
        for storage, key, eid, attr in self.get_data():
            if self.cnx.repo.config["s3-transaction-suffix-key"]:
                upload_key = storage.suffixed_key(key)
            else:
                upload_key = key
            storage.s3cnx.delete_object(Bucket=storage.bucket, Key=upload_key)
            self.info(
                "Deleted temporary object for %s.%s (%s/%s) in S3",
                eid,
                attr,
                storage.bucket,
                upload_key,
            )


class S3DeleteFileOp(DataOperationMixIn, LateOperation):
    containercls = list

    def postcommit_event(self):
        for storage, key, eid, attr in self.get_data():
            self.info(
                "Deleting object %s.%s (%s/%s) from S3", eid, attr, storage.bucket, key
            )
            resp = storage.s3cnx.delete_object(Bucket=storage.bucket, Key=key)
            if resp.get("ResponseMetadata", {}).get("HTTPStatusCode") >= 300:
                self.error("S3 object deletion FAILED: %s", resp)
            else:
                self.debug("S3 object deletion OK: %s", resp)


set_log_methods(S3Storage, getLogger("cube.s3storage.storages.s3storage"))