Skip to content
Snippets Groups Projects
Commit aea1ea565d4e authored by Arthur Lutz's avatar Arthur Lutz
Browse files

skip indexing when locations and index-name are not configured

parent 21b992410538
No related branches found
No related tags found
No related merge requests found
...@@ -45,11 +45,12 @@ ...@@ -45,11 +45,12 @@
config = cnx.vreg.config config = cnx.vreg.config
schema = cnx.vreg.schema schema = cnx.vreg.schema
locations = config['elasticsearch-locations'] locations = config['elasticsearch-locations']
index_name = config.get('index-name') or 'cubicweb' index_name = config['index-name']
es = Elasticsearch(locations and locations.split(',') or None) if locations and index_name:
if not self.config.dry_run: es = Elasticsearch(locations)
# ignore 400 caused by IndexAlreadyExistsException when creating an if not self.config.dry_run:
# index # ignore 400 caused by IndexAlreadyExistsException when creating an
es.indices.create( # index
index=index_name, body=index_settings(), ignore=400) es.indices.create(
index=index_name, body=index_settings(), ignore=400)
...@@ -55,8 +56,2 @@ ...@@ -55,8 +56,2 @@
if self.config.debug:
print(u'found indexable_types {}'.format(
','.join(indexable_types(schema))))
for etype in indexable_types(schema):
rset = cnx.execute(
'Any X WHERE X is %(etype)s' % {'etype': etype})
if self.config.debug: if self.config.debug:
...@@ -62,15 +57,7 @@ ...@@ -62,15 +57,7 @@
if self.config.debug: if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset))) print(u'found indexable_types {}'.format(
for entity in rset.entities(): ','.join(indexable_types(schema))))
# TODO add specific IFTIES adapter for etype in indexable_types(schema):
serializer = entity.cw_adapt_to('ISerializable') rset = cnx.execute(
json = serializer.serialize() 'Any X WHERE X is %(etype)s' % {'etype': etype})
# TODO remove non indexable data or (better) serialize only
if not self.config.dry_run:
es.index(index=index_name, doc_type=etype, body=json)
# TODO optimize with elasticsearch.helpers.bulk
# or elasticsearch.helpers.parallel_bulk
# or elasticsearch.helpers.streaming_bulk
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
if self.config.debug: if self.config.debug:
...@@ -76,3 +63,20 @@ ...@@ -76,3 +63,20 @@
if self.config.debug: if self.config.debug:
print(u'.', end=u'') print(u'indexing {} {}'.format(etype, len(rset)))
for entity in rset.entities():
# TODO add specific IFTIES adapter
serializer = entity.cw_adapt_to('ISerializable')
json = serializer.serialize()
# TODO remove non indexable data or (better) serialize only
if not self.config.dry_run:
es.index(index=index_name, doc_type=etype, body=json)
# TODO optimize with elasticsearch.helpers.bulk
# or elasticsearch.helpers.parallel_bulk
# or elasticsearch.helpers.streaming_bulk
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
if self.config.debug:
print(u'.', end=u'')
if self.config.debug:
print(u'')
else:
if self.config.debug: if self.config.debug:
...@@ -78,5 +82,4 @@ ...@@ -78,5 +82,4 @@
if self.config.debug: if self.config.debug:
print(u'') print(u'no elasticsearch configuration found, skipping')
CWCTL.register(IndexInES) CWCTL.register(IndexInES)
...@@ -46,13 +46,14 @@ ...@@ -46,13 +46,14 @@
def __call__(self): def __call__(self):
locations = self._cw.vreg.config['elasticsearch-locations'] locations = self._cw.vreg.config['elasticsearch-locations']
index_name = self._cw.vreg.config['index-name'] index_name = self._cw.vreg.config['index-name']
serializer = self.entity.cw_adapt_to('ISerializable') if locations and index_name:
json = serializer.serialize() serializer = self.entity.cw_adapt_to('ISerializable')
es = Elasticsearch(locations and locations.split(',') or None) json = serializer.serialize()
try: es = Elasticsearch(locations and locations.split(','))
# TODO option pour coté async ? try:
es.index(index=index_name, # TODO option pour coté async ?
doc_type=self.entity.cw_etype, es.index(index=index_name,
body=json) doc_type=self.entity.cw_etype,
except (ConnectionError, ProtocolError): body=json)
log.debug('Failed to index in hook, could not connect to ES') except (ConnectionError, ProtocolError):
log.debug('Failed to index in hook, could not connect to ES')
...@@ -3,10 +3,11 @@ ...@@ -3,10 +3,11 @@
{'type': 'string', {'type': 'string',
'default': '', 'default': '',
'help': 'Elastic Search location (eg. 192.168.0.23:9200), ' 'help': 'Elastic Search location (eg. 192.168.0.23:9200), '
'this can be a list of locations (192.168.0.23:9200,192.168.0.24:9200,' 'this can be a list of locations (192.168.0.23:9200,192.168.0.24:9200, '
' you can also include the scheme (eg. http://192.168.0.23:9200) ', 'you can also include the scheme (eg. http://192.168.0.23:9200) '
'warning: if this is not defined indexing will be disabled (no localhost default)',
'group': 'elasticsearch', 'group': 'elasticsearch',
'level': 5, 'level': 5,
}), }),
('index-name', ('index-name',
{'type': 'string', {'type': 'string',
...@@ -8,10 +9,11 @@ ...@@ -8,10 +9,11 @@
'group': 'elasticsearch', 'group': 'elasticsearch',
'level': 5, 'level': 5,
}), }),
('index-name', ('index-name',
{'type': 'string', {'type': 'string',
'default': 'cubicweb', 'default': '',
'help': 'Elastic Search index name (eg. cubicweb)', 'help': 'Elastic Search index name (eg. cubicweb)'
'warning: if this is not defined indexing will be disabled (no index name default)',
'group': 'elasticsearch', 'group': 'elasticsearch',
'level': 5, 'level': 5,
}), }),
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
self.orig_config_for = CubicWebConfiguration.config_for self.orig_config_for = CubicWebConfiguration.config_for
config_for = lambda appid: self.config # noqa config_for = lambda appid: self.config # noqa
CubicWebConfiguration.config_for = staticmethod(config_for) CubicWebConfiguration.config_for = staticmethod(config_for)
self.config['elasticsearch-locations'] = 'http://10.1.1.1:9200'
self.config['index-name'] = 'unittest_index_name'
def to_test_etypes(self): def to_test_etypes(self):
with self.admin_access.repo_cnx() as cnx: with self.admin_access.repo_cnx() as cnx:
...@@ -79,7 +81,7 @@ ...@@ -79,7 +81,7 @@
self.assert_(cnx.execute('Any X WHERE X is %(etype)s' % self.assert_(cnx.execute('Any X WHERE X is %(etype)s' %
{'etype': indexable_types(cnx.repo)[0]})) {'etype': indexable_types(cnx.repo)[0]}))
create.assert_called_with( create.assert_called_with(
ignore=400, index='cubicweb', body=index_settings()) ignore=400, index='unittest_index_name', body=index_settings())
index.assert_called() index.assert_called()
# TODO ? check called data # TODO ? check called data
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment