diff --git a/cubicweb_elasticsearch/entities.py b/cubicweb_elasticsearch/entities.py index 4060fb567a19d20f5a3cd259cc8c726fd4893c72_Y3ViaWN3ZWJfZWxhc3RpY3NlYXJjaC9lbnRpdGllcy5weQ==..12d6d9c297d3c8e83165a3a8f789edaa9a969ef3_Y3ViaWN3ZWJfZWxhc3RpY3NlYXJjaC9lbnRpdGllcy5weQ== 100644 --- a/cubicweb_elasticsearch/entities.py +++ b/cubicweb_elasticsearch/entities.py @@ -82,6 +82,27 @@ if es_cnx is not None: es.create_index(es_cnx, index_name, settings) + def es_index(self, entity): + es_cnx = self.get_connection() + if es_cnx is None or not self.index_name: + self.error('no connection to ES (not configured) skip ES indexing') + return + serializable = entity.cw_adapt_to('IFullTextIndexSerializable') + json = serializable.serialize() + if not json: + return + es_cnx.index(index=self.index_name, id=serializable.es_id, + doc_type=serializable.es_doc_type, body=json) + + def es_delete(self, entity): + es_cnx = self.get_connection() + if es_cnx is None or not self.index_name: + self.error('no connection to ES (not configured) skip ES deletion') + return + serializable = entity.cw_adapt_to('IFullTextIndexSerializable') + es_cnx.delete(index=self.index_name, id=serializable.es_id, + doc_type=serializable.es_doc_type) + class IFullTextIndexSerializable(view.EntityAdapter): """Adapter to serialize an entity to a bare python structure that may be diff --git a/cubicweb_elasticsearch/hooks.py b/cubicweb_elasticsearch/hooks.py index 4060fb567a19d20f5a3cd259cc8c726fd4893c72_Y3ViaWN3ZWJfZWxhc3RpY3NlYXJjaC9ob29rcy5weQ==..12d6d9c297d3c8e83165a3a8f789edaa9a969ef3_Y3ViaWN3ZWJfZWxhc3RpY3NlYXJjaC9ob29rcy5weQ== 100644 --- a/cubicweb_elasticsearch/hooks.py +++ b/cubicweb_elasticsearch/hooks.py @@ -76,8 +76,4 @@ def postcommit_event(self): indexer = self.cnx.vreg['es'].select('indexer', self.cnx) - es = indexer.get_connection() - if es is None or not indexer.index_name: - log.error('no connection to ES (not configured) skip ES indexing') - return for entity in self.get_data(): @@ -83,5 +79,2 @@ for entity in self.get_data(): - kwargs = dict(index=indexer.index_name, - id=entity.eid, - doc_type=entity.cw_etype) if self.cnx.deleted_in_transaction(entity.eid): @@ -87,3 +80,3 @@ if self.cnx.deleted_in_transaction(entity.eid): - self.delete_doc(es, **kwargs) + indexer.es_delete(entity) continue @@ -89,15 +82,2 @@ continue - serializer = entity.cw_adapt_to('IFullTextIndexSerializable') - json = serializer.serialize() - if not json: - # if en entity has been already indexed, we still - # keep the first indexation - # which is wrong. We should remove the existing es entry. - continue - kwargs['body'] = json - # Entities with fulltext_containers relations return their container - # IFullTextIndex serializer, therefore the "id" and "doc_type" in - # kwargs below must be container data. - kwargs['id'] = serializer.es_id - kwargs['doc_type'] = serializer.es_doc_type try: @@ -103,6 +83,5 @@ try: - # TODO option for async ? - es.index(**kwargs) + indexer.es_index(entity) except (ConnectionError, ProtocolError) as exc: log.warning('Failed to index in hook, could not connect to ES') except Exception as exc: