Skip to content
Snippets Groups Projects
ccplugin.py 4.18 KiB
Newer Older
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command

:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function

import os.path as osp

from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command

from cubes.elasticsearch.es import indexable_types, index_settings

__docformat__ = "restructuredtext en"

HERE = osp.dirname(osp.abspath(__file__))


# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
def bulk_actions(rset, index_name, etype):
    for entity in rset.entities():
        serializer = entity.cw_adapt_to('ISerializable')
        json = serializer.serialize()
        yield {'_op_type': 'index',
               '_index': index_name,
               '_type': etype,
               '_id': entity.eid,
               '_source': json
               }


class IndexInES(Command):
    """Index content in ElasticSearch.

    <instance id>
      identifier of the instance

    """
    name = 'index-in-es'
    min_args = max_args = 1
    arguments = '<instance id>'
    options = [('dry-run', {'type': 'yn', 'default': False,
                            'help': 'set to True if you want to skip the insertion in ES'}),
               ('bulk', {'type': 'yn', 'default': False,
                         'help': 'set to True if you want to insert in bulk in ES'}),
               ('debug', {'type': 'yn', 'default': False,
                          'help': 'set to True if you want to print out debug info and progress'}),
               ]

    def run(self, args):
        """run the command with its specific arguments"""
        appid = args.pop(0)
        with admincnx(appid) as cnx:
            config = cnx.vreg.config
            schema = cnx.vreg.schema
            locations = config['elasticsearch-locations']
            index_name = config['index-name']
            if locations and index_name:
                es = Elasticsearch(locations)
                if not self.config.dry_run:
                    # ignore 400 caused by IndexAlreadyExistsException when creating an
                    # index
                    es.indices.create(
                        index=index_name, body=index_settings(), ignore=400)
                    print(u'found indexable_types {}'.format(
                        ','.join(indexable_types(schema))))
                for etype in indexable_types(schema):
                    rset = cnx.execute(
                        'Any X WHERE X is %(etype)s' % {'etype': etype})
                    if len(rset) == 0:
                        continue
                        print(u'indexing {} {}'.format(etype, len(rset)))
                    if self.config.bulk:
                        # success, failed = bulk(es, bulk_actions(rset, index_name, etype))
                        # if self.config.debug:
                        #     print(u'ES bulk : {} success {} failed'.format(success, failed))
                        list(parallel_bulk(es, bulk_actions(rset, index_name, etype),
                                           raise_on_error=False,
                                           raise_on_exception=False))
                    else:
                        for entity in rset.entities():
                            # TODO add specific IFTIES adapter
                            serializer = entity.cw_adapt_to('ISerializable')
                            json = serializer.serialize()
                            if not self.config.bulk:
                                if not self.config.dry_run:
                                    es.index(index=index_name,
                                             id=entity.eid,
                                             doc_type=etype,
                                             body=json)
                    print(u'no elasticsearch configuration found, skipping')