Skip to content
Snippets Groups Projects
Commit 073889b33c0a authored by Arthur Lutz's avatar Arthur Lutz
Browse files

[ccplugin] use parallel_bulk when asked for

parent 70db85e13042
No related branches found
No related tags found
No related merge requests found
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
import os.path as osp import os.path as osp
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from cubicweb.cwctl import CWCTL from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx from cubicweb.utils import admincnx
...@@ -22,6 +23,20 @@ ...@@ -22,6 +23,20 @@
HERE = osp.dirname(osp.abspath(__file__)) HERE = osp.dirname(osp.abspath(__file__))
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
def bulk_actions(rset, index_name, etype):
for entity in rset.entities():
serializer = entity.cw_adapt_to('ISerializable')
json = serializer.serialize()
yield {'_op_type': 'index',
'_index': index_name,
'_type': etype,
'_id': entity.eid,
'_source': json
}
class IndexInES(Command): class IndexInES(Command):
"""Index content in ElasticSearch. """Index content in ElasticSearch.
...@@ -34,6 +49,8 @@ ...@@ -34,6 +49,8 @@
arguments = '<instance id>' arguments = '<instance id>'
options = [('dry-run', {'type': 'yn', 'default': False, options = [('dry-run', {'type': 'yn', 'default': False,
'help': 'set to True if you want to skip the insertion in ES'}), 'help': 'set to True if you want to skip the insertion in ES'}),
('bulk', {'type': 'yn', 'default': False,
'help': 'set to True if you want to insert in bulk in ES'}),
('debug', {'type': 'yn', 'default': False, ('debug', {'type': 'yn', 'default': False,
'help': 'set to True if you want to print out debug info and progress'}), 'help': 'set to True if you want to print out debug info and progress'}),
] ]
...@@ -60,5 +77,7 @@ ...@@ -60,5 +77,7 @@
for etype in indexable_types(schema): for etype in indexable_types(schema):
rset = cnx.execute( rset = cnx.execute(
'Any X WHERE X is %(etype)s' % {'etype': etype}) 'Any X WHERE X is %(etype)s' % {'etype': etype})
if len(rset) == 0:
continue
if self.config.debug: if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset))) print(u'indexing {} {}'.format(etype, len(rset)))
...@@ -63,24 +82,23 @@ ...@@ -63,24 +82,23 @@
if self.config.debug: if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset))) print(u'indexing {} {}'.format(etype, len(rset)))
for entity in rset.entities(): if self.config.bulk:
# TODO add specific IFTIES adapter # success, failed = bulk(es, bulk_actions(rset, index_name, etype))
serializer = entity.cw_adapt_to('ISerializable') # if self.config.debug:
json = serializer.serialize() # print(u'ES bulk : {} success {} failed'.format(success, failed))
# TODO remove non indexable data or (better) serialize only list(parallel_bulk(es, bulk_actions(rset, index_name, etype),
if not self.config.dry_run: raise_on_error=False,
es.index(index=index_name, raise_on_exception=False))
id=entity.eid, else:
doc_type=etype, for entity in rset.entities():
body=json) # TODO add specific IFTIES adapter
# TODO optimize with elasticsearch.helpers.bulk serializer = entity.cw_adapt_to('ISerializable')
# or elasticsearch.helpers.parallel_bulk json = serializer.serialize()
# or elasticsearch.helpers.streaming_bulk if not self.config.bulk:
# TODO optimisation : json serialize on one side, send to ES on the other if not self.config.dry_run:
# TODO progress bar es.index(index=index_name,
if self.config.debug: id=entity.eid,
print(u'.', end=u'') doc_type=etype,
if self.config.debug: body=json)
print(u'')
else: else:
if self.config.debug: if self.config.debug:
print(u'no elasticsearch configuration found, skipping') print(u'no elasticsearch configuration found, skipping')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment