Skip to content
Snippets Groups Projects
ccplugin.py 4.3 KiB
Newer Older
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command

:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function

import os.path as osp

from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command

from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS, fulltext_indexable_rql


__docformat__ = "restructuredtext en"

HERE = osp.dirname(osp.abspath(__file__))


# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
def bulk_actions(rset, index_name, etype, dry_run=False):
    for entity in rset.entities():
        serializer = entity.cw_adapt_to('IFullTextIndexSerializable')
        json = serializer.serialize()
        if not dry_run:
            yield {'_op_type': 'index',
                   '_index': index_name,
                   '_type': etype,
                   '_id': entity.eid,
                   '_source': json
                   }
    """Index content in ElasticSearch.

    <instance id>
      identifier of the instance

    """
    name = 'index-in-es'
    min_args = max_args = 1
    arguments = '<instance id>'
    options = [('dry-run', {'type': 'yn', 'default': False,
                            'help': 'set to True if you want to skip the insertion in ES'}),
               ('bulk', {'type': 'yn', 'default': False,
                         'help': 'set to True if you want to insert in bulk in ES'}),
               ('debug', {'type': 'yn', 'default': False,
                          'help': 'set to True if you want to print'
                                  'out debug info and progress'}),
               ]

    def run(self, args):
        """run the command with its specific arguments"""
        appid = args.pop(0)
        with admincnx(appid) as cnx:
            config = cnx.vreg.config
            schema = cnx.vreg.schema
            locations = config['elasticsearch-locations']
            index_name = config['index-name']
            if locations and index_name:
                es = Elasticsearch(locations)
                if not self.config.dry_run:
                    # ignore 400 caused by IndexAlreadyExistsException when creating an
                    # index
                    es.indices.create(
                        index=index_name, body=INDEX_SETTINGS, ignore=400)
                    print(u'found indexable_types {}'.format(
                        ','.join(indexable_types(schema))))
                for etype in indexable_types(schema):
                    rset = cnx.execute(fulltext_indexable_rql(etype, schema))
                    if len(rset) == 0:
                        continue
                        print(u'indexing {} {}'.format(etype, len(rset)))
                    if self.config.bulk:
                        for x in parallel_bulk(es, bulk_actions(rset,
                                                                index_name,
                                                                etype,
                                                                dry_run=self.config.dry_run),
                                               raise_on_error=False,
                                               raise_on_exception=False):
                            pass
                    else:
                        for entity in rset.entities():
                            serializer = entity.cw_adapt_to(
                                'IFullTextIndexSerializable')
                            json = serializer.serialize()
                            if not self.config.bulk:
                                if not self.config.dry_run:
                                    es.index(index=index_name,
                                             id=entity.eid,
                                             doc_type=etype,
                                             body=json)
                    print(u'no elasticsearch configuration found, skipping')