Skip to content
Snippets Groups Projects
ccplugin.py 3.09 KiB
Newer Older
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command

:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function

import os.path as osp

from elasticsearch import Elasticsearch

from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command

from cubes.elasticsearch.es import indexable_types, index_settings

__docformat__ = "restructuredtext en"

HERE = osp.dirname(osp.abspath(__file__))


class IndexInES(Command):
    """Index content in ElasticSearch.

    <instance id>
      identifier of the instance

    """
    name = 'index-in-es'
    min_args = max_args = 1
    arguments = '<instance id>'
    options = [('dry-run', {'type': 'yn', 'default': False,
                            'help': 'set to True if you want to skip the insertion in ES'}),
               ('debug', {'type': 'yn', 'default': False,
                          'help': 'set to True if you want to print out debug info and progress'}),
               ]

    def run(self, args):
        """run the command with its specific arguments"""
        appid = args.pop(0)
        with admincnx(appid) as cnx:
            config = cnx.vreg.config
            schema = cnx.vreg.schema
            locations = config['elasticsearch-locations']
            index_name = config.get('index-name') or 'cubicweb'
            es = Elasticsearch(locations and locations.split(',') or None)
            if not self.config.dry_run:
                # ignore 400 caused by IndexAlreadyExistsException when creating an
                # index
                es.indices.create(
                    index=index_name, body=index_settings(), ignore=400)

            if self.config.debug:
                print(u'found indexable_types {}'.format(
                    ','.join(indexable_types(schema))))
            for etype in indexable_types(schema):
                rset = cnx.execute(
                    'Any X WHERE X is %(etype)s' % {'etype': etype})
                if self.config.debug:
                    print(u'indexing {} {}'.format(etype, len(rset)))
                for entity in rset.entities():
                    # TODO add specific IFTIES adapter
                    serializer = entity.cw_adapt_to('ISerializable')
                    json = serializer.serialize()
                    # TODO remove non indexable data or (better) serialize only
                    if not self.config.dry_run:
                        es.index(index=index_name, doc_type=etype, body=json)
                    # TODO optimize with elasticsearch.helpers.bulk
                    # or elasticsearch.helpers.parallel_bulk
                    # or elasticsearch.helpers.streaming_bulk
                    # TODO optimisation : json serialize on one side, send to ES on the other
                    # TODO progress bar
                    if self.config.debug:
                        print(u'.', end=u'')
                if self.config.debug:
                    print(u'')


CWCTL.register(IndexInES)