# HG changeset patch # User Arthur Lutz <arthur.lutz@logilab.fr> # Date 1465581137 -7200 # Fri Jun 10 19:52:17 2016 +0200 # Node ID 6d185e758b6fad80649e2d390827910b91e148ed # Parent c169e8fd2ec45498de2644292fd86879da05ed3a Single place for creating a cnx, make sure that index is configured at that point Also : ccplugin - generator serves all actions, not split up by type es.py - add CUSTOM_ATTRIBUTES mechanism diff --git a/ccplugin.py b/ccplugin.py --- a/ccplugin.py +++ b/ccplugin.py @@ -9,14 +9,13 @@ import os.path as osp -from elasticsearch import Elasticsearch from elasticsearch.helpers import parallel_bulk from cubicweb.cwctl import CWCTL from cubicweb.utils import admincnx from cubicweb.toolsutils import Command -from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS, fulltext_indexable_rql +from cubes.elasticsearch.es import indexable_types, fulltext_indexable_rql, get_connection __docformat__ = "restructuredtext en" @@ -24,21 +23,6 @@ HERE = osp.dirname(osp.abspath(__file__)) -# TODO optimisation : json serialize on one side, send to ES on the other -# TODO progress bar -def bulk_actions(rset, index_name, etype, dry_run=False): - for entity in rset.entities(): - serializer = entity.cw_adapt_to('IFullTextIndexSerializable') - json = serializer.serialize() - if not dry_run: - yield {'_op_type': 'index', - '_index': index_name, - '_type': etype, - '_id': entity.eid, - '_source': json - } - - class IndexInES(Command): """Index content in ElasticSearch. @@ -52,60 +36,61 @@ arguments = '<instance id>' options = [('dry-run', {'type': 'yn', 'default': False, 'help': 'set to True if you want to skip the insertion in ES'}), - ('bulk', {'type': 'yn', 'default': False, - 'help': 'set to True if you want to insert in bulk in ES'}), ('debug', {'type': 'yn', 'default': False, 'help': 'set to True if you want to print' 'out debug info and progress'}), + ('etype', {'type': 'string', 'default': '', + 'help': 'only index a given etype' + '[default:all indexable types]'}), ] def run(self, args): """run the command with its specific arguments""" appid = args.pop(0) with admincnx(appid) as cnx: - config = cnx.vreg.config schema = cnx.vreg.schema - locations = config['elasticsearch-locations'] - index_name = config['index-name'] - if locations and index_name: - es = Elasticsearch(locations) - if not self.config.dry_run: - # ignore 400 caused by IndexAlreadyExistsException when creating an - # index - es.indices.create( - index=index_name, body=INDEX_SETTINGS, ignore=400) - - if self.config.debug: + # indexer = cnx.vreg['es'].select('indexer') + es = get_connection(cnx.vreg.config) + if es: + if self.config.etype: + etypes = (self.config.etype,) + else: + etypes = indexable_types(schema) + if self.config.debug and not self.config.etype: print(u'found indexable_types {}'.format( ','.join(indexable_types(schema)))) - for etype in indexable_types(schema): - rset = cnx.execute(fulltext_indexable_rql(etype, schema)) - if len(rset) == 0: - continue - if self.config.debug: - print(u'indexing {} {}'.format(etype, len(rset))) - if self.config.bulk: - for x in parallel_bulk(es, bulk_actions(rset, - index_name, - etype, - dry_run=self.config.dry_run), - raise_on_error=False, - raise_on_exception=False): + for _ in parallel_bulk(es, + self.bulk_actions(etypes, + cnx, + dry_run=self.config.dry_run), + raise_on_error=False, + raise_on_exception=False): pass - else: - for entity in rset.entities(): - serializer = entity.cw_adapt_to( - 'IFullTextIndexSerializable') - json = serializer.serialize() - if not self.config.bulk: - if not self.config.dry_run: - es.index(index=index_name, - id=entity.eid, - doc_type=etype, - body=json) else: if self.config.debug: print(u'no elasticsearch configuration found, skipping') + def bulk_actions(self, etypes, cnx, dry_run=False): + for etype in etypes: + rql = fulltext_indexable_rql(etype, cnx.vreg.schema) + rset = cnx.execute(rql) + if self.config.debug: + print(u'indexing {} {}'.format(etype, len(rset))) + print(u'RQL : {}'.format(rql)) + for entity in rset.entities(): + serializer = entity.cw_adapt_to('IFullTextIndexSerializable') + json = serializer.serialize() + if not dry_run: + data = {'_op_type': 'index', + '_index': cnx.vreg.config['index-name'], + '_type': etype, + '_id': entity.eid, + '_source': json + } + self.customize_data(data) + yield data + + def customize_data(self, data): + pass CWCTL.register(IndexInES) diff --git a/entities.py b/entities.py --- a/entities.py +++ b/entities.py @@ -54,6 +54,8 @@ except KeyError: continue data[attr] = value + data.update(entity.cw_attr_cache) + # TODO take a look at what's in entity.cw_relation_cache return data diff --git a/es.py b/es.py --- a/es.py +++ b/es.py @@ -15,6 +15,12 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import logging + +from elasticsearch.exceptions import ConnectionError +from urllib3.exceptions import ProtocolError +from elasticsearch_dsl.connections import connections + from rql.utils import rqlvar_maker INDEXABLE_TYPES = None @@ -35,6 +41,13 @@ }, } +# customization mechanism, in your cube, add your type as a key, and a list of +# additionnal attributes +# eg. CUSTOM_ATTRIBUTES['BlogEntry'] = ('description',) +CUSTOM_ATTRIBUTES = {} + +log = logging.getLogger(__name__) + def indexable_types(schema): ''' @@ -69,6 +82,7 @@ rql = ['WHERE %s is %s' % (V, etype)] if eid: rql.append('%s eid %i' % (V, eid)) + var = next(varmaker) selected = [] for rschema in schema.eschema(etype).indexable_attributes(): attr = rschema.type @@ -81,10 +95,38 @@ var = next(varmaker) rql.append('%s %s %s' % (V, attr, var)) selected.append((attr, var)) - for attr in ('creation_date', 'modification_date'): + for attr in ('creation_date', 'modification_date',) + CUSTOM_ATTRIBUTES.get(etype, ()): var = next(varmaker) rql.append('%s %s %s' % (V, attr, var)) - selected.append((attr, var)) + selected.append(var) # TODO inlined relations ? - return 'Any %s,%s %s' % (V, ','.join(var for attr, var in selected), + return 'Any %s,%s %s' % (V, ','.join(selected), ','.join(rql)) + + +def get_connection(config): + ''' + Get connection with config object, creates a persistent connexion and + creates the initial index with custom settings + ''' + try: + return connections.get_connection() + except KeyError: + locations = config['elasticsearch-locations'] + index_name = config['index-name'] + if locations and index_name: + # TODO sanitize locations + es = connections.create_connection(hosts=locations.split(','), + index=index_name, + timeout=20) + try: + if not es.indices.exists(index=index_name): + # TODO could remove ignore=400 since checks does not exist + # ignore 400 caused by IndexAlreadyExistsException when creating an + es.indices.create(index=index_name, + body=INDEX_SETTINGS, + ignore=400) + except (ConnectionError, ProtocolError): + log.debug('Failed to index in hook, could not connect to ES') + return es + # TODO else ? raise KeyError - return None is OK? diff --git a/hooks.py b/hooks.py --- a/hooks.py +++ b/hooks.py @@ -19,13 +19,12 @@ import logging -from elasticsearch import Elasticsearch from elasticsearch.exceptions import ConnectionError from urllib3.exceptions import ProtocolError from cubicweb.server import hook from cubicweb.predicates import score_entity -from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS, fulltext_indexable_rql +from cubes.elasticsearch.es import indexable_types, fulltext_indexable_rql, get_connection log = logging.getLogger(__name__) @@ -46,26 +45,17 @@ def __call__(self): if self.entity.cw_etype == 'File': return # FIXME hack! - locations = self._cw.vreg.config['elasticsearch-locations'] - index_name = self._cw.vreg.config['index-name'] - if locations and index_name: - es = Elasticsearch(locations and locations.split(',')) - # TODO : have a cache so that we don't check index on every transition - try: - if not es.indices.exists(index=index_name): - es.indices.create(index=index_name, - body=INDEX_SETTINGS, - ignore=400) - except (ConnectionError, ProtocolError): - log.debug('Failed to index in hook, could not connect to ES') - indexable_entity = self._cw.execute(fulltext_indexable_rql(self.entity.cw_etype, - self.entity._cw.vreg.schema, - eid=self.entity.eid)).one() + es = get_connection(self._cw.vreg.config) + if es: + rql = fulltext_indexable_rql(self.entity.cw_etype, + self.entity._cw.vreg.schema, + eid=self.entity.eid) + indexable_entity = self._cw.execute(rql).one() serializer = indexable_entity.cw_adapt_to('IFullTextIndexSerializable') json = serializer.serialize() try: - # TODO option pour coté async ? - es.index(index=index_name, + # TODO option pour coté async ? thread + es.index(index=self._cw.vreg.config['index-name'], id=self.entity.eid, doc_type=self.entity.cw_etype, body=json) diff --git a/test/test_elastic_search.py b/test/test_elastic_search.py --- a/test/test_elastic_search.py +++ b/test/test_elastic_search.py @@ -15,6 +15,8 @@ from cubes.elasticsearch import ccplugin from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS +# TODO - find a way to configure ElasticSearch as non threaded while running tests +# so that the traces show the full stack, not just starting from connection.http_* class ExportElasticSearchTC(testlib.AutoPopulateTest): # ignore ComputedRelations @@ -26,7 +28,7 @@ self.orig_config_for = CubicWebConfiguration.config_for config_for = lambda appid: self.config # noqa CubicWebConfiguration.config_for = staticmethod(config_for) - self.config['elasticsearch-locations'] = 'http://10.1.1.1:9200' + self.config['elasticsearch-locations'] = 'http://nonexistant.elastic.search:9200' self.config['index-name'] = 'unittest_index_name' def to_test_etypes(self): @@ -45,11 +47,16 @@ 0) @patch('elasticsearch.client.Elasticsearch.index', unsafe=True) + @patch('elasticsearch.client.Elasticsearch.bulk', unsafe=True) @patch('elasticsearch.client.indices.IndicesClient.exists', unsafe=True) @patch('elasticsearch.client.indices.IndicesClient.create', unsafe=True) - def test_ccplugin(self, create, exists, index): - self.auto_populate(10) - index.reset_mock() + def test_ccplugin(self, create, exists, bulk, index): + # TODO disable hook!!! then remove index mock + with self.admin_access.repo_cnx() as cnx: + cnx.disable_hook_categories('es') + with cnx.allow_all_hooks_but('es'): + self.auto_populate(10) + bulk.reset_mock() cmd = [self.appid, '--dry-run', 'yes'] sys.stdout = out = StringIO() try: @@ -58,7 +65,7 @@ sys.stdout = sys.__stdout__ self.assertEquals('', out.getvalue()) create.assert_not_called() - index.assert_not_called() + bulk.assert_not_called() cmd = [self.appid, '--dry-run', 'yes', '--debug', 'yes'] sys.stdout = out = StringIO() @@ -68,7 +75,7 @@ sys.stdout = sys.__stdout__ self.assert_('found ' in out.getvalue()) create.assert_not_called() - index.assert_not_called() + bulk.assert_not_called() # TODO try wrong option # cmd = [self.appid, '--wrong-option', 'yes'] @@ -82,9 +89,11 @@ with self.admin_access.repo_cnx() as cnx: self.assert_(cnx.execute('Any X WHERE X is %(etype)s' % {'etype': indexable_types(cnx.repo)[0]})) - create.assert_called_with( - ignore=400, index='unittest_index_name', body=INDEX_SETTINGS) - index.assert_called() + # TODO - put this somewhere where it tests on the first get_connection + #create.assert_called_with(ignore=400, + # index='unittest_index_name', + # body=INDEX_SETTINGS) + bulk.assert_called() # TODO ? check called data @patch('elasticsearch.client.indices.IndicesClient.create', unsafe=True) diff --git a/views.py b/views.py --- a/views.py +++ b/views.py @@ -17,7 +17,6 @@ """elasticsearch search views""" from elasticsearch.exceptions import NotFoundError -from elasticsearch_dsl.connections import connections from elasticsearch_dsl import FacetedSearch, TermsFacet, DateHistogramFacet from bs4 import BeautifulSoup @@ -26,22 +25,10 @@ import cwtags.tag as t from cubicweb.view import StartupView -from cubes.elasticsearch.es import indexable_types +from cubes.elasticsearch.es import indexable_types, get_connection from cubes.elasticsearch.search_helpers import compose_search -def get_connection(config): - try: - connections.get_connection() - except KeyError: - locations = config['elasticsearch-locations'] - index_name = config['index-name'] - # TODO sanitize locations - connections.create_connection(hosts=locations.split(','), - index=index_name, - timeout=20) - - class CWFacetedSearch(FacetedSearch): # fields that should be searched fields = ["title^3", "description^2", '_all']