diff --git a/ccplugin.py b/ccplugin.py index 70db85e13042eded769831fe5cb686979c03b363_Y2NwbHVnaW4ucHk=..073889b33c0a3a2f832af9e2cdbefc1c0154dbf6_Y2NwbHVnaW4ucHk= 100644 --- a/ccplugin.py +++ b/ccplugin.py @@ -10,6 +10,7 @@ import os.path as osp from elasticsearch import Elasticsearch +from elasticsearch.helpers import parallel_bulk from cubicweb.cwctl import CWCTL from cubicweb.utils import admincnx @@ -22,6 +23,20 @@ HERE = osp.dirname(osp.abspath(__file__)) +# TODO optimisation : json serialize on one side, send to ES on the other +# TODO progress bar +def bulk_actions(rset, index_name, etype): + for entity in rset.entities(): + serializer = entity.cw_adapt_to('ISerializable') + json = serializer.serialize() + yield {'_op_type': 'index', + '_index': index_name, + '_type': etype, + '_id': entity.eid, + '_source': json + } + + class IndexInES(Command): """Index content in ElasticSearch. @@ -34,6 +49,8 @@ arguments = '<instance id>' options = [('dry-run', {'type': 'yn', 'default': False, 'help': 'set to True if you want to skip the insertion in ES'}), + ('bulk', {'type': 'yn', 'default': False, + 'help': 'set to True if you want to insert in bulk in ES'}), ('debug', {'type': 'yn', 'default': False, 'help': 'set to True if you want to print out debug info and progress'}), ] @@ -60,5 +77,7 @@ for etype in indexable_types(schema): rset = cnx.execute( 'Any X WHERE X is %(etype)s' % {'etype': etype}) + if len(rset) == 0: + continue if self.config.debug: print(u'indexing {} {}'.format(etype, len(rset))) @@ -63,24 +82,23 @@ if self.config.debug: print(u'indexing {} {}'.format(etype, len(rset))) - for entity in rset.entities(): - # TODO add specific IFTIES adapter - serializer = entity.cw_adapt_to('ISerializable') - json = serializer.serialize() - # TODO remove non indexable data or (better) serialize only - if not self.config.dry_run: - es.index(index=index_name, - id=entity.eid, - doc_type=etype, - body=json) - # TODO optimize with elasticsearch.helpers.bulk - # or elasticsearch.helpers.parallel_bulk - # or elasticsearch.helpers.streaming_bulk - # TODO optimisation : json serialize on one side, send to ES on the other - # TODO progress bar - if self.config.debug: - print(u'.', end=u'') - if self.config.debug: - print(u'') + if self.config.bulk: + # success, failed = bulk(es, bulk_actions(rset, index_name, etype)) + # if self.config.debug: + # print(u'ES bulk : {} success {} failed'.format(success, failed)) + list(parallel_bulk(es, bulk_actions(rset, index_name, etype), + raise_on_error=False, + raise_on_exception=False)) + else: + for entity in rset.entities(): + # TODO add specific IFTIES adapter + serializer = entity.cw_adapt_to('ISerializable') + json = serializer.serialize() + if not self.config.bulk: + if not self.config.dry_run: + es.index(index=index_name, + id=entity.eid, + doc_type=etype, + body=json) else: if self.config.debug: print(u'no elasticsearch configuration found, skipping')