Skip to content
Snippets Groups Projects
Commit ac89237201a8 authored by Arthur Lutz's avatar Arthur Lutz
Browse files

[ccplugin] initial cubicweb-ctl command to index existing content in ES

parent d1881dfe6959
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
"""cubicweb-ctl plugin providing the index-in-es command
:organization: Logilab
:copyright: 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import print_function
import os.path as osp
from elasticsearch import Elasticsearch
from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command
from cubes.elasticsearch.es import indexable_types, index_settings
__docformat__ = "restructuredtext en"
HERE = osp.dirname(osp.abspath(__file__))
class IndexInES(Command):
"""Index content in ElasticSearch.
<instance id>
identifier of the instance
"""
name = 'index-in-es'
min_args = max_args = 1
arguments = '<instance id>'
options = [('dry-run', {'type': 'yn', 'default': False,
'help': 'set to True if you want to skip the insertion in ES'}),
('debug', {'type': 'yn', 'default': False,
'help': 'set to True if you want to print out debug info and progress'}),
]
def run(self, args):
"""run the command with its specific arguments"""
appid = args.pop(0)
with admincnx(appid) as cnx:
config = cnx.vreg.config
schema = cnx.vreg.schema
locations = config['elasticsearch-locations']
index_name = config.get('index-name') or 'cubicweb'
es = Elasticsearch(locations and locations.split(',') or None)
if not self.config.dry_run:
# ignore 400 caused by IndexAlreadyExistsException when creating an
# index
es.indices.create(
index=index_name, body=index_settings(), ignore=400)
if self.config.debug:
print(u'found indexable_types {}'.format(
','.join(indexable_types(schema))))
for etype in indexable_types(schema):
rset = cnx.execute(
'Any X WHERE X is %(etype)s' % {'etype': etype})
if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset)))
for entity in rset.entities():
# TODO add specific IFTIES adapter
serializer = entity.cw_adapt_to('ISerializable')
json = serializer.serialize()
# TODO remove non indexable data or (better) serialize only
if not self.config.dry_run:
es.index(index=index_name, doc_type=etype, body=json)
# TODO optimize with elasticsearch.helpers.bulk
# or elasticsearch.helpers.parallel_bulk
# or elasticsearch.helpers.streaming_bulk
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
if self.config.debug:
print(u'.', end=u'')
if self.config.debug:
print(u'')
CWCTL.register(IndexInES)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment