Skip to content
Snippets Groups Projects
ccplugin.py 3.45 KiB
Newer Older
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command

:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function

import os.path as osp

from elasticsearch import Elasticsearch

from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command

from cubes.elasticsearch.es import indexable_types, index_settings

__docformat__ = "restructuredtext en"

HERE = osp.dirname(osp.abspath(__file__))


class IndexInES(Command):
    """Index content in ElasticSearch.

    <instance id>
      identifier of the instance

    """
    name = 'index-in-es'
    min_args = max_args = 1
    arguments = '<instance id>'
    options = [('dry-run', {'type': 'yn', 'default': False,
                            'help': 'set to True if you want to skip the insertion in ES'}),
               ('debug', {'type': 'yn', 'default': False,
                          'help': 'set to True if you want to print out debug info and progress'}),
               ]

    def run(self, args):
        """run the command with its specific arguments"""
        appid = args.pop(0)
        with admincnx(appid) as cnx:
            config = cnx.vreg.config
            schema = cnx.vreg.schema
            locations = config['elasticsearch-locations']
            index_name = config['index-name']
            if locations and index_name:
                es = Elasticsearch(locations)
                if not self.config.dry_run:
                    # ignore 400 caused by IndexAlreadyExistsException when creating an
                    # index
                    es.indices.create(
                        index=index_name, body=index_settings(), ignore=400)
                    print(u'found indexable_types {}'.format(
                        ','.join(indexable_types(schema))))
                for etype in indexable_types(schema):
                    rset = cnx.execute(
                        'Any X WHERE X is %(etype)s' % {'etype': etype})
                        print(u'indexing {} {}'.format(etype, len(rset)))
                    for entity in rset.entities():
                        # TODO add specific IFTIES adapter
                        serializer = entity.cw_adapt_to('ISerializable')
                        json = serializer.serialize()
                        # TODO remove non indexable data or (better) serialize only
                        if not self.config.dry_run:
                            es.index(index=index_name,
                                     id=entity.eid,
                                     doc_type=etype,
                                     body=json)
                        # TODO optimize with elasticsearch.helpers.bulk
                        # or elasticsearch.helpers.parallel_bulk
                        # or elasticsearch.helpers.streaming_bulk
                        # TODO optimisation : json serialize on one side, send to ES on the other
                        # TODO progress bar
                        if self.config.debug:
                            print(u'.', end=u'')
                    if self.config.debug:
                        print(u'')
            else:
                    print(u'no elasticsearch configuration found, skipping')