Newer
Older
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command
:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function
import os.path as osp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command
from cubes.elasticsearch.es import indexable_types, index_settings
__docformat__ = "restructuredtext en"
HERE = osp.dirname(osp.abspath(__file__))
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
def bulk_actions(rset, index_name, etype):
for entity in rset.entities():
serializer = entity.cw_adapt_to('ISerializable')
json = serializer.serialize()
yield {'_op_type': 'index',
'_index': index_name,
'_type': etype,
'_id': entity.eid,
'_source': json
}
class IndexInES(Command):
"""Index content in ElasticSearch.
<instance id>
identifier of the instance
"""
name = 'index-in-es'
min_args = max_args = 1
arguments = '<instance id>'
options = [('dry-run', {'type': 'yn', 'default': False,
'help': 'set to True if you want to skip the insertion in ES'}),
('bulk', {'type': 'yn', 'default': False,
'help': 'set to True if you want to insert in bulk in ES'}),
('debug', {'type': 'yn', 'default': False,
'help': 'set to True if you want to print out debug info and progress'}),
]
def run(self, args):
"""run the command with its specific arguments"""
appid = args.pop(0)
with admincnx(appid) as cnx:
config = cnx.vreg.config
schema = cnx.vreg.schema
locations = config['elasticsearch-locations']
index_name = config['index-name']
if locations and index_name:
es = Elasticsearch(locations)
if not self.config.dry_run:
# ignore 400 caused by IndexAlreadyExistsException when creating an
# index
es.indices.create(
index=index_name, body=index_settings(), ignore=400)
if self.config.debug:
print(u'found indexable_types {}'.format(
','.join(indexable_types(schema))))
for etype in indexable_types(schema):
rset = cnx.execute(
'Any X WHERE X is %(etype)s' % {'etype': etype})
if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset)))
if self.config.bulk:
# success, failed = bulk(es, bulk_actions(rset, index_name, etype))
# if self.config.debug:
# print(u'ES bulk : {} success {} failed'.format(success, failed))
list(parallel_bulk(es, bulk_actions(rset, index_name, etype),
raise_on_error=False,
raise_on_exception=False))
else:
for entity in rset.entities():
# TODO add specific IFTIES adapter
serializer = entity.cw_adapt_to('ISerializable')
json = serializer.serialize()
if not self.config.bulk:
if not self.config.dry_run:
es.index(index=index_name,
id=entity.eid,
doc_type=etype,
body=json)
if self.config.debug:
print(u'no elasticsearch configuration found, skipping')
CWCTL.register(IndexInES)