Newer
Older
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command
:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function
import os.path as osp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command
from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS, fulltext_indexable_rql
__docformat__ = "restructuredtext en"
HERE = osp.dirname(osp.abspath(__file__))
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
def bulk_actions(rset, index_name, etype, dry_run=False):
serializer = entity.cw_adapt_to('IFullTextIndexSerializable')
if not dry_run:
yield {'_op_type': 'index',
'_index': index_name,
'_type': etype,
'_id': entity.eid,
'_source': json
}
class IndexInES(Command):
"""Index content in ElasticSearch.
<instance id>
identifier of the instance
"""
name = 'index-in-es'
min_args = max_args = 1
arguments = '<instance id>'
options = [('dry-run', {'type': 'yn', 'default': False,
'help': 'set to True if you want to skip the insertion in ES'}),
('bulk', {'type': 'yn', 'default': False,
'help': 'set to True if you want to insert in bulk in ES'}),
('debug', {'type': 'yn', 'default': False,
'help': 'set to True if you want to print'
'out debug info and progress'}),
]
def run(self, args):
"""run the command with its specific arguments"""
appid = args.pop(0)
with admincnx(appid) as cnx:
config = cnx.vreg.config
schema = cnx.vreg.schema
locations = config['elasticsearch-locations']
index_name = config['index-name']
if locations and index_name:
es = Elasticsearch(locations)
if not self.config.dry_run:
# ignore 400 caused by IndexAlreadyExistsException when creating an
# index
es.indices.create(

Arthur Lutz
committed
index=index_name, body=INDEX_SETTINGS, ignore=400)
if self.config.debug:
print(u'found indexable_types {}'.format(
','.join(indexable_types(schema))))
for etype in indexable_types(schema):
rset = cnx.execute(fulltext_indexable_rql(etype, schema))
if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset)))
for x in parallel_bulk(es, bulk_actions(rset,
index_name,
etype,
dry_run=self.config.dry_run),
raise_on_error=False,
raise_on_exception=False):
pass
else:
for entity in rset.entities():
serializer = entity.cw_adapt_to(
'IFullTextIndexSerializable')
json = serializer.serialize()
if not self.config.bulk:
if not self.config.dry_run:
es.index(index=index_name,
id=entity.eid,
doc_type=etype,
body=json)
if self.config.debug:
print(u'no elasticsearch configuration found, skipping')
CWCTL.register(IndexInES)