# -*- coding: utf-8 -*- """cubicweb-ctl plugin providing the index-in-es command :organization: Logilab :copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr """ from __future__ import print_function import os.path as osp from elasticsearch import Elasticsearch from elasticsearch.helpers import parallel_bulk from cubicweb.cwctl import CWCTL from cubicweb.utils import admincnx from cubicweb.toolsutils import Command from cubes.elasticsearch.es import indexable_types, index_settings __docformat__ = "restructuredtext en" HERE = osp.dirname(osp.abspath(__file__)) # TODO optimisation : json serialize on one side, send to ES on the other # TODO progress bar def bulk_actions(rset, index_name, etype): for entity in rset.entities(): serializer = entity.cw_adapt_to('ISerializable') json = serializer.serialize() yield {'_op_type': 'index', '_index': index_name, '_type': etype, '_id': entity.eid, '_source': json } class IndexInES(Command): """Index content in ElasticSearch. <instance id> identifier of the instance """ name = 'index-in-es' min_args = max_args = 1 arguments = '<instance id>' options = [('dry-run', {'type': 'yn', 'default': False, 'help': 'set to True if you want to skip the insertion in ES'}), ('bulk', {'type': 'yn', 'default': False, 'help': 'set to True if you want to insert in bulk in ES'}), ('debug', {'type': 'yn', 'default': False, 'help': 'set to True if you want to print out debug info and progress'}), ] def run(self, args): """run the command with its specific arguments""" appid = args.pop(0) with admincnx(appid) as cnx: config = cnx.vreg.config schema = cnx.vreg.schema locations = config['elasticsearch-locations'] index_name = config['index-name'] if locations and index_name: es = Elasticsearch(locations) if not self.config.dry_run: # ignore 400 caused by IndexAlreadyExistsException when creating an # index es.indices.create( index=index_name, body=index_settings(), ignore=400) if self.config.debug: print(u'found indexable_types {}'.format( ','.join(indexable_types(schema)))) for etype in indexable_types(schema): rset = cnx.execute( 'Any X WHERE X is %(etype)s' % {'etype': etype}) if len(rset) == 0: continue if self.config.debug: print(u'indexing {} {}'.format(etype, len(rset))) if self.config.bulk: # success, failed = bulk(es, bulk_actions(rset, index_name, etype)) # if self.config.debug: # print(u'ES bulk : {} success {} failed'.format(success, failed)) list(parallel_bulk(es, bulk_actions(rset, index_name, etype), raise_on_error=False, raise_on_exception=False)) else: for entity in rset.entities(): # TODO add specific IFTIES adapter serializer = entity.cw_adapt_to('ISerializable') json = serializer.serialize() if not self.config.bulk: if not self.config.dry_run: es.index(index=index_name, id=entity.eid, doc_type=etype, body=json) else: if self.config.debug: print(u'no elasticsearch configuration found, skipping') CWCTL.register(IndexInES)