# -*- coding: utf-8 -*- # copyright 2016-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr -- mailto:contact@logilab.fr # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with this program. If not, see . """Register the new type.""" from logilab.database import get_db_helper, FunctionDescr from logilab.database.sqlgen import SQLExpression from rql.utils import register_function from yams import register_base_type # # EncryptedString type # # this key should be stored in the all-in-one.conf (or elsewhere), obviously… PWD = "toto" def convert_encryptedstring(x): if isinstance(x, SQLExpression): return x elif isinstance(x, str): return SQLExpression("pgp_sym_encrypt(%(x)s, %(key)s)", x=x, key=PWD) return SQLExpression("pgp_sym_encrypt(%(x)s, %(key)s)", x=x, key=PWD) # Register the new type register_base_type("EncryptedString") # Map the new type with PostgreSQL pghelper = get_db_helper("postgres") pghelper.TYPE_MAPPING["EncryptedString"] = "text" pghelper.TYPE_CONVERTERS["EncryptedString"] = convert_encryptedstring # Map the new type with SQLite3 sqlitehelper = get_db_helper("sqlite") sqlitehelper.TYPE_MAPPING["EncryptedString"] = "text" sqlitehelper.TYPE_CONVERTERS["EncryptedString"] = lambda x: x class DECRYPTED(FunctionDescr): minargs = 1 maxargs = 1 supported_backends = ("postgres", "sqlite") rtype = "text" def as_sql_postgres(self, args): field = args[0] return f"pgp_sym_decrypt({field}::bytea, '{PWD}')" def as_sql_sqlite(self, args): field = args[0] return field register_function(DECRYPTED)