Commit 6133213a authored by Simon Chabot's avatar Simon Chabot
Browse files

paint it black

parent dc97fb313229
Pipeline #21346 passed with stage
in 1 minute and 19 seconds
# pylint: disable=W0622
"""cubicweb-elasticsearch application packaging information"""
modname = 'elasticsearch'
distname = 'cubicweb-elasticsearch'
modname = "elasticsearch"
distname = "cubicweb-elasticsearch"
numversion = (0, 9, 1)
version = '.'.join(str(num) for num in numversion)
version = ".".join(str(num) for num in numversion)
license = 'LGPL'
author = 'LOGILAB S.A. (Paris, FRANCE)'
author_email = 'contact@logilab.fr'
description = 'Simple ElasticSearch indexing integration for CubicWeb'
web = 'http://www.cubicweb.org/project/%s' % distname
license = "LGPL"
author = "LOGILAB S.A. (Paris, FRANCE)"
author_email = "contact@logilab.fr"
description = "Simple ElasticSearch indexing integration for CubicWeb"
web = "http://www.cubicweb.org/project/%s" % distname
__depends__ = {'cubicweb': '>= 3.24.0', 'six': '>= 1.4.0',
'cwtags': None,
'elasticsearch': '>=7.0.0,<8.0.0',
'elasticsearch-dsl': '>=7.0.0,<8.0.0',
'beautifulsoup4': None,
}
__depends__ = {
"cubicweb": ">= 3.24.0",
"six": ">= 1.4.0",
"cwtags": None,
"elasticsearch": ">=7.0.0,<8.0.0",
"elasticsearch-dsl": ">=7.0.0,<8.0.0",
"beautifulsoup4": None,
}
__recommends__ = {}
classifiers = [
'Environment :: Web Environment',
'Framework :: CubicWeb',
'Programming Language :: Python',
'Programming Language :: JavaScript',
"Environment :: Web Environment",
"Framework :: CubicWeb",
"Programming Language :: Python",
"Programming Language :: JavaScript",
]
......@@ -27,100 +27,148 @@ class IndexInES(Command):
identifier of the instance
"""
name = 'index-in-es'
name = "index-in-es"
min_args = max_args = 1
arguments = '<instance id>'
arguments = "<instance id>"
options = [
('dry-run',
{'action': 'store_true', 'default': False,
'short': 'N',
'help': 'set to True if you want to skip the insertion in ES'}),
('debug',
{'action': 'store_true', 'default': False, 'short': 'D',
'help': ('shortcut for --loglevel=debug')}),
('loglevel',
{'short': 'l', 'type': 'choice', 'metavar': '<log level>',
'default': None, 'choices': ('debug', 'info', 'warning', 'error')}),
('etypes',
{'type': 'csv', 'default': '',
'help': 'only index given etypes [default:all indexable types]'}),
('index-name',
{'type': 'string', 'default': '',
'help': ('override index-name if you want to use a different ID'
'[default: uses index-name from all-in-one.conf]')}),
('except-etypes',
{'type': 'string', 'default': '',
'help': 'all indexable types except given etypes'
'[default: []]'}),
('chunksize',
{'type': 'int', 'default': 100000,
'help': 'max number of entities to fetch at once (deafult: 100000)',
}),
(
"dry-run",
{
"action": "store_true",
"default": False,
"short": "N",
"help": "set to True if you want to skip the insertion in ES",
},
),
(
"debug",
{
"action": "store_true",
"default": False,
"short": "D",
"help": ("shortcut for --loglevel=debug"),
},
),
(
"loglevel",
{
"short": "l",
"type": "choice",
"metavar": "<log level>",
"default": None,
"choices": ("debug", "info", "warning", "error"),
},
),
(
"etypes",
{
"type": "csv",
"default": "",
"help": "only index given etypes [default:all indexable types]",
},
),
(
"index-name",
{
"type": "string",
"default": "",
"help": (
"override index-name if you want to use a different ID"
"[default: uses index-name from all-in-one.conf]"
),
},
),
(
"except-etypes",
{
"type": "string",
"default": "",
"help": "all indexable types except given etypes" "[default: []]",
},
),
(
"chunksize",
{
"type": "int",
"default": 100000,
"help": "max number of entities to fetch at once (deafult: 100000)",
},
),
]
def run(self, args):
"""run the command with its specific arguments"""
appid = args.pop(0)
if self['debug']:
self['loglevel'] = 'debug'
config = cwcfg.config_for(appid, debugmode=self['loglevel'])
if self['loglevel']:
init_cmdline_log_threshold(config, self['loglevel'])
if self["debug"]:
self["loglevel"] = "debug"
config = cwcfg.config_for(appid, debugmode=self["loglevel"])
if self["loglevel"]:
init_cmdline_log_threshold(config, self["loglevel"])
with config.repository().internal_cnx() as cnx:
schema = cnx.vreg.schema
indexer = cnx.vreg['es'].select('indexer', cnx)
indexer = cnx.vreg["es"].select("indexer", cnx)
es = indexer.get_connection()
indexer.create_index()
if self.config.index_name:
cnx.info('create ES index {}'.format(self.config.index_name))
cnx.info("create ES index {}".format(self.config.index_name))
indexer.create_index(index_name=self.config.index_name)
if es:
if self.config.etypes:
etypes = self.config.etypes
else:
etypes = indexable_types(
schema, custom_skip_list=self.config.except_etypes)
schema, custom_skip_list=self.config.except_etypes
)
assert self.config.except_etypes not in etypes
if not self.config.etypes:
cnx.debug(u'found indexable types: {}'.format(
','.join(etypes)))
cnx.debug(u"found indexable types: {}".format(",".join(etypes)))
for _ in parallel_bulk(
es, self.bulk_actions(etypes, cnx,
index_name=self.config.index_name,
dry_run=self.config.dry_run),
raise_on_error=False,
raise_on_exception=False):
es,
self.bulk_actions(
etypes,
cnx,
index_name=self.config.index_name,
dry_run=self.config.dry_run,
),
raise_on_error=False,
raise_on_exception=False,
):
pass
else:
cnx.info(u'no elasticsearch configuration found, skipping')
cnx.info(u"no elasticsearch configuration found, skipping")
def bulk_actions(self, etypes, cnx, index_name=None, dry_run=False):
if index_name is None:
index_name = cnx.vreg.config['index-name']
index_name = cnx.vreg.config["index-name"]
for etype in etypes:
for idx, entity in enumerate(
indexable_entities(
cnx, etype, chunksize=self.config.chunksize), 1):
indexable_entities(cnx, etype, chunksize=self.config.chunksize), 1
):
try:
serializer = entity.cw_adapt_to('IFullTextIndexSerializable')
serializer = entity.cw_adapt_to("IFullTextIndexSerializable")
json = serializer.serialize(complete=False)
except Exception:
cnx.error('[{}] Failed to serialize entity {} ({})'.format(
index_name, entity.eid, etype))
cnx.error(
"[{}] Failed to serialize entity {} ({})".format(
index_name, entity.eid, etype
)
)
continue
if not dry_run and json:
# Entities with
# fulltext_containers relations return their container
# IFullTextIndex serializer , therefor the "id" and
# "doc_type" in kwargs bellow must be container data.
data = {'_op_type': 'index',
'_index': index_name or cnx.vreg.config['index-name'],
'_id': serializer.es_id,
'_source': json
}
data = {
"_op_type": "index",
"_index": index_name or cnx.vreg.config["index-name"],
"_id": serializer.es_id,
"_source": json,
}
self.customize_data(data)
yield data
cnx.info(u'[{}] indexed {} {} entities'.format(index_name, idx, etype))
cnx.info(u"[{}] indexed {} {} entities".format(index_name, idx, etype))
def customize_data(self, data):
pass
......
......@@ -45,32 +45,35 @@ def deep_update(d1, d2):
class EsRegistry(AppObject):
__registry__ = 'es'
__registry__ = "es"
class Indexer(EsRegistry):
__regid__ = 'indexer'
adapter = 'IFullTextIndexSerializable'
__regid__ = "indexer"
adapter = "IFullTextIndexSerializable"
settings = {
'settings': {
'analysis': {
'analyzer': {
'default': {'filter': ['my_ascii_folding',
'lowercase',
'french_snowball'],
'tokenizer': 'standard'}
"settings": {
"analysis": {
"analyzer": {
"default": {
"filter": ["my_ascii_folding", "lowercase", "french_snowball"],
"tokenizer": "standard",
}
},
"filter": {
"my_ascii_folding": {
"preserve_original": True,
"type": "asciifolding",
},
"french_snowball": {"type": "snowball", "language": "French"},
},
'filter': {'my_ascii_folding': {'preserve_original': True,
'type': 'asciifolding'},
'french_snowball': {'type': 'snowball',
'language': 'French'}}
},
}
}
@property
def index_name(self):
return self._cw.vreg.config['index-name']
return self._cw.vreg.config["index-name"]
def get_connection(self):
self.create_index()
......@@ -91,24 +94,31 @@ class Indexer(EsRegistry):
def es_index(self, entity, params=None):
es_cnx = self.get_connection()
if es_cnx is None or not self.index_name:
self.error('no connection to ES (not configured) skip ES indexing')
self.error("no connection to ES (not configured) skip ES indexing")
return
serializable = entity.cw_adapt_to(self.adapter)
json = serializable.serialize()
if not json:
return
es_cnx.index(index=self.index_name, id=serializable.es_id,
doc_type=serializable.es_doc_type, body=json,
params=params)
es_cnx.index(
index=self.index_name,
id=serializable.es_id,
doc_type=serializable.es_doc_type,
body=json,
params=params,
)
def es_delete(self, entity):
es_cnx = self.get_connection()
if es_cnx is None or not self.index_name:
self.error('no connection to ES (not configured) skip ES deletion')
self.error("no connection to ES (not configured) skip ES deletion")
return
serializable = entity.cw_adapt_to(self.adapter)
es_cnx.delete(index=self.index_name, id=serializable.es_id,
doc_type=serializable.es_doc_type)
es_cnx.delete(
index=self.index_name,
id=serializable.es_id,
doc_type=serializable.es_doc_type,
)
class IFullTextIndexSerializable(view.EntityAdapter):
......@@ -116,8 +126,8 @@ class IFullTextIndexSerializable(view.EntityAdapter):
directly serialized to e.g. JSON.
"""
__regid__ = 'IFullTextIndexSerializable'
__select__ = is_instance('Any')
__regid__ = "IFullTextIndexSerializable"
__select__ = is_instance("Any")
custom_indexable_attributes = ()
skip_indexable_attributes = ()
......@@ -127,19 +137,24 @@ class IFullTextIndexSerializable(view.EntityAdapter):
@property
def es_doc_type(self):
return '_doc'
return "_doc"
@cachedproperty
def fulltext_indexable_attributes(self):
eschema = self._cw.vreg.schema[self.entity.cw_etype]
attrs = ['creation_date', 'modification_date', 'cwuri']
attrs.extend([r.type for r in eschema.indexable_attributes()
if r.type not in self.skip_indexable_attributes])
attrs = ["creation_date", "modification_date", "cwuri"]
attrs.extend(
[
r.type
for r in eschema.indexable_attributes()
if r.type not in self.skip_indexable_attributes
]
)
for rschema, tschema in eschema.attribute_definitions():
if rschema.type == 'eid':
if rschema.type == "eid":
continue
# XXX
if tschema.type in ('Int', 'Float'):
if tschema.type in ("Int", "Float"):
attrs.append(rschema.type)
attrs.extend(self.custom_indexable_attributes)
return attrs
......@@ -155,9 +170,9 @@ class IFullTextIndexSerializable(view.EntityAdapter):
if complete:
entity.complete()
data = {
'cw_etype': entity.cw_etype,
'eid': entity.eid,
'cwuri': entity.cwuri,
"cw_etype": entity.cw_etype,
"eid": entity.eid,
"cwuri": entity.cwuri,
}
# TODO take a look at what's in entity.cw_relation_cache
data.update(self.process_attributes())
......@@ -165,7 +180,7 @@ class IFullTextIndexSerializable(view.EntityAdapter):
class File(IFullTextIndexSerializable):
__select__ = IFullTextIndexSerializable.__select__ & is_instance('File')
__select__ = IFullTextIndexSerializable.__select__ & is_instance("File")
def serialize(self, complete=True):
"""this could be a generic implementation of fulltext_containers indexation, but for
......@@ -173,20 +188,22 @@ class File(IFullTextIndexSerializable):
now we can not return more than one parent json which is fine
for Files
"""
for rschema, role in self._cw.vreg.schema['File'].fulltext_containers():
for rschema, role in self._cw.vreg.schema["File"].fulltext_containers():
for parent in self.entity.related(
rschema.type, role=neg_role(role)).entities():
return parent.cw_adapt_to(
'IFullTextIndexSerializable').serialize(complete)
rschema.type, role=neg_role(role)
).entities():
return parent.cw_adapt_to("IFullTextIndexSerializable").serialize(
complete
)
return {}
class ESTransactionQueue(EsRegistry):
__regid__ = 'es.opqueue'
__regid__ = "es.opqueue"
@cachedproperty
def default_indexer(self):
return self._cw.vreg['es'].select('indexer', self._cw)
return self._cw.vreg["es"].select("indexer", self._cw)
def purge_useless_operations(self, es_operations):
"""remove operations from `es_operations` that will have no effect.
......@@ -203,37 +220,47 @@ class ESTransactionQueue(EsRegistry):
"""
done = collections.OrderedDict()
for es_operation in reversed(es_operations):
entity = es_operation['entity']
op_type = es_operation['op_type']
entity = es_operation["entity"]
op_type = es_operation["op_type"]
if entity.eid not in done:
done[entity.eid] = es_operation
else:
prev_op_type = done[entity.eid]['op_type']
if op_type == 'delete' and prev_op_type == 'index':
self.warning('a delete operation on %s#%s inserted before'
'an index one', entity.cw_etype, entity.eid)
prev_op_type = done[entity.eid]["op_type"]
if op_type == "delete" and prev_op_type == "index":
self.warning(
"a delete operation on %s#%s inserted before" "an index one",
entity.cw_etype,
entity.eid,
)
done[entity.eid] = es_operation
return done.values()
def process_operation(self, es_operation):
indexer = es_operation.get('indexer', self.default_indexer)
entity = es_operation['entity']
indexer = es_operation.get("indexer", self.default_indexer)
entity = es_operation["entity"]
if self._cw.deleted_in_transaction(entity.eid):
es_method = indexer.es_delete
elif es_operation['op_type'] == 'index':
es_method = partial(indexer.es_index, params={'refresh': 'true'})
elif es_operation['op_type'] == 'delete':
elif es_operation["op_type"] == "index":
es_method = partial(indexer.es_index, params={"refresh": "true"})
elif es_operation["op_type"] == "delete":
es_method = indexer.es_delete
else:
self.info('skipping unknown operation type %s on %s',
es_operation['op_type'], entity.eid)
self.info(
"skipping unknown operation type %s on %s",
es_operation["op_type"],
entity.eid,
)
return
try:
es_method(entity)
except (ConnectionError, ProtocolError, NotFoundError) as exc:
self.warning('[ES] Failed to %s %s#%s (%s)',
es_operation['op_type'],
entity.cw_etype, entity.eid, exc)
self.warning(
"[ES] Failed to %s %s#%s (%s)",
es_operation["op_type"],
entity.cw_etype,
entity.eid,
exc,
)
def process_operations(self, es_operations):
es_operations = self.purge_useless_operations(es_operations)
......
......@@ -33,14 +33,14 @@ log = logging.getLogger(__name__)
def indexable_types(schema, custom_skip_list=None):
'''
"""
introspect indexable types
'''
"""
global INDEXABLE_TYPES
if INDEXABLE_TYPES is not None:
return INDEXABLE_TYPES
indexable_types = []
skip_list = ['TrInfo', 'EmailAddress']
skip_list = ["TrInfo", "EmailAddress"]
if custom_skip_list:
skip_list = skip_list + custom_skip_list
for eschema in schema.entities():
......@@ -56,27 +56,27 @@ def indexable_types(schema, custom_skip_list=None):
def fulltext_indexable_rql(etype, cnx, eid=None):
'''
"""
Generate RQL with fulltext_indexable attributes for a given entity type
:eid:
defaults to None, set it to an eid to get RQL for a single element (used in hooks)
'''
"""
varmaker = rqlvar_maker()
V = next(varmaker)
rql = ['WHERE %s is %s' % (V, etype)]
rql = ["WHERE %s is %s" % (V, etype)]
if eid:
rql.append('%s eid %i' % (V, eid))
rql.append("%s eid %i" % (V, eid))
var = next(varmaker)
selected = []
cw_entity = cnx.vreg['etypes'].etype_class(etype)(cnx)
cw_entity = cnx.vreg["etypes"].etype_class(etype)(cnx)
for attr in cw_entity.cw_adapt_to(
'IFullTextIndexSerializable').fulltext_indexable_attributes:
"IFullTextIndexSerializable"
).fulltext_indexable_attributes:
var = next(varmaker)
rql.append('%s %s %s' % (V, attr, var))
rql.append("%s %s %s" % (V, attr, var))
selected.append(var)
return 'Any %s,%s %s' % (V, ','.join(selected),
','.join(rql))
return "Any %s,%s %s" % (V, ",".join(selected), ",".join(rql))
def indexable_entities(cnx, etype, chunksize=100000):
......@@ -106,10 +106,10 @@ def indexable_entities(cnx, etype, chunksize=100000):
last_eid = 0
while True:
rqlst.save_state()
rqlst.add_constant_restriction(mainvar, 'eid', last_eid, 'Int', '>')
rqlst.add_constant_restriction(mainvar, "eid", last_eid, "Int", ">")
rql = rqlst.as_string()
rqlst.recover()
cnx.debug(u'RQL: {}'.format(rql))
cnx.debug(u"RQL: {}".format(rql))
rset = cnx.execute(rql)
if not rset:
break
......@@ -139,24 +139,25 @@ def create_index(es, index_name, settings=None):
"""
try:
if index_name and not es.indices.exists(index=index_name):
es.indices.create(index=index_name,
body=settings)
es.indices.create(index=index_name, body=settings)
except (ConnectionError, ProtocolError):
log.debug('Failed to index in hook, could not connect to ES')
log.debug("Failed to index in hook, could not connect to ES")
def get_connection(config):
'''
"""
Get connection with config object, creates a persistent connexion and
'''
"""
try:
return connections.get_connection()
except KeyError:
locations = config['elasticsearch-locations']
locations = config["elasticsearch-locations"]
if locations:
# TODO sanitize locations
es = connections.create_connection(hosts=locations.split(','),
verify_certs=config['elasticsearch-verify-certs'],