diff --git a/ccplugin.py b/ccplugin.py new file mode 100644 index 0000000000000000000000000000000000000000..ac89237201a8d1252f59a9f7eeeaaf8f239692d4_Y2NwbHVnaW4ucHk= --- /dev/null +++ b/ccplugin.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +"""cubicweb-ctl plugin providing the index-in-es command + +:organization: Logilab +:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" +from __future__ import print_function + +import os.path as osp + +from elasticsearch import Elasticsearch + +from cubicweb.cwctl import CWCTL +from cubicweb.utils import admincnx +from cubicweb.toolsutils import Command + +from cubes.elasticsearch.es import indexable_types, index_settings + +__docformat__ = "restructuredtext en" + +HERE = osp.dirname(osp.abspath(__file__)) + + +class IndexInES(Command): + """Index content in ElasticSearch. + + <instance id> + identifier of the instance + + """ + name = 'index-in-es' + min_args = max_args = 1 + arguments = '<instance id>' + options = [('dry-run', {'type': 'yn', 'default': False, + 'help': 'set to True if you want to skip the insertion in ES'}), + ('debug', {'type': 'yn', 'default': False, + 'help': 'set to True if you want to print out debug info and progress'}), + ] + + def run(self, args): + """run the command with its specific arguments""" + appid = args.pop(0) + with admincnx(appid) as cnx: + config = cnx.vreg.config + schema = cnx.vreg.schema + locations = config['elasticsearch-locations'] + index_name = config.get('index-name') or 'cubicweb' + es = Elasticsearch(locations and locations.split(',') or None) + if not self.config.dry_run: + # ignore 400 caused by IndexAlreadyExistsException when creating an + # index + es.indices.create( + index=index_name, body=index_settings(), ignore=400) + + if self.config.debug: + print(u'found indexable_types {}'.format( + ','.join(indexable_types(schema)))) + for etype in indexable_types(schema): + rset = cnx.execute( + 'Any X WHERE X is %(etype)s' % {'etype': etype}) + if self.config.debug: + print(u'indexing {} {}'.format(etype, len(rset))) + for entity in rset.entities(): + # TODO add specific IFTIES adapter + serializer = entity.cw_adapt_to('ISerializable') + json = serializer.serialize() + # TODO remove non indexable data or (better) serialize only + if not self.config.dry_run: + es.index(index=index_name, doc_type=etype, body=json) + # TODO optimize with elasticsearch.helpers.bulk + # or elasticsearch.helpers.parallel_bulk + # or elasticsearch.helpers.streaming_bulk + # TODO optimisation : json serialize on one side, send to ES on the other + # TODO progress bar + if self.config.debug: + print(u'.', end=u'') + if self.config.debug: + print(u'') + + +CWCTL.register(IndexInES)