Skip to content
Snippets Groups Projects
Commit 6d185e758b6f authored by Arthur Lutz's avatar Arthur Lutz
Browse files

Single place for creating a cnx, make sure that index is configured at that point

Also : ccplugin - generator serves all actions, not split up by type
       es.py - add CUSTOM_ATTRIBUTES mechanism
parent c169e8fd2ec4
No related branches found
No related tags found
No related merge requests found
......@@ -9,10 +9,9 @@
import os.path as osp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command
......@@ -13,10 +12,10 @@
from elasticsearch.helpers import parallel_bulk
from cubicweb.cwctl import CWCTL
from cubicweb.utils import admincnx
from cubicweb.toolsutils import Command
from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS, fulltext_indexable_rql
from cubes.elasticsearch.es import indexable_types, fulltext_indexable_rql, get_connection
__docformat__ = "restructuredtext en"
......@@ -24,21 +23,6 @@
HERE = osp.dirname(osp.abspath(__file__))
# TODO optimisation : json serialize on one side, send to ES on the other
# TODO progress bar
def bulk_actions(rset, index_name, etype, dry_run=False):
for entity in rset.entities():
serializer = entity.cw_adapt_to('IFullTextIndexSerializable')
json = serializer.serialize()
if not dry_run:
yield {'_op_type': 'index',
'_index': index_name,
'_type': etype,
'_id': entity.eid,
'_source': json
}
class IndexInES(Command):
"""Index content in ElasticSearch.
......@@ -52,8 +36,6 @@
arguments = '<instance id>'
options = [('dry-run', {'type': 'yn', 'default': False,
'help': 'set to True if you want to skip the insertion in ES'}),
('bulk', {'type': 'yn', 'default': False,
'help': 'set to True if you want to insert in bulk in ES'}),
('debug', {'type': 'yn', 'default': False,
'help': 'set to True if you want to print'
'out debug info and progress'}),
......@@ -57,9 +39,12 @@
('debug', {'type': 'yn', 'default': False,
'help': 'set to True if you want to print'
'out debug info and progress'}),
('etype', {'type': 'string', 'default': '',
'help': 'only index a given etype'
'[default:all indexable types]'}),
]
def run(self, args):
"""run the command with its specific arguments"""
appid = args.pop(0)
with admincnx(appid) as cnx:
......@@ -60,8 +45,7 @@
]
def run(self, args):
"""run the command with its specific arguments"""
appid = args.pop(0)
with admincnx(appid) as cnx:
config = cnx.vreg.config
schema = cnx.vreg.schema
......@@ -67,14 +51,11 @@
schema = cnx.vreg.schema
locations = config['elasticsearch-locations']
index_name = config['index-name']
if locations and index_name:
es = Elasticsearch(locations)
if not self.config.dry_run:
# ignore 400 caused by IndexAlreadyExistsException when creating an
# index
es.indices.create(
index=index_name, body=INDEX_SETTINGS, ignore=400)
if self.config.debug:
# indexer = cnx.vreg['es'].select('indexer')
es = get_connection(cnx.vreg.config)
if es:
if self.config.etype:
etypes = (self.config.etype,)
else:
etypes = indexable_types(schema)
if self.config.debug and not self.config.etype:
print(u'found indexable_types {}'.format(
','.join(indexable_types(schema))))
......@@ -79,16 +60,9 @@
print(u'found indexable_types {}'.format(
','.join(indexable_types(schema))))
for etype in indexable_types(schema):
rset = cnx.execute(fulltext_indexable_rql(etype, schema))
if len(rset) == 0:
continue
if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset)))
if self.config.bulk:
for x in parallel_bulk(es, bulk_actions(rset,
index_name,
etype,
dry_run=self.config.dry_run),
raise_on_error=False,
raise_on_exception=False):
for _ in parallel_bulk(es,
self.bulk_actions(etypes,
cnx,
dry_run=self.config.dry_run),
raise_on_error=False,
raise_on_exception=False):
pass
......@@ -94,16 +68,5 @@
pass
else:
for entity in rset.entities():
serializer = entity.cw_adapt_to(
'IFullTextIndexSerializable')
json = serializer.serialize()
if not self.config.bulk:
if not self.config.dry_run:
es.index(index=index_name,
id=entity.eid,
doc_type=etype,
body=json)
else:
if self.config.debug:
print(u'no elasticsearch configuration found, skipping')
......@@ -106,6 +69,28 @@
else:
if self.config.debug:
print(u'no elasticsearch configuration found, skipping')
def bulk_actions(self, etypes, cnx, dry_run=False):
for etype in etypes:
rql = fulltext_indexable_rql(etype, cnx.vreg.schema)
rset = cnx.execute(rql)
if self.config.debug:
print(u'indexing {} {}'.format(etype, len(rset)))
print(u'RQL : {}'.format(rql))
for entity in rset.entities():
serializer = entity.cw_adapt_to('IFullTextIndexSerializable')
json = serializer.serialize()
if not dry_run:
data = {'_op_type': 'index',
'_index': cnx.vreg.config['index-name'],
'_type': etype,
'_id': entity.eid,
'_source': json
}
self.customize_data(data)
yield data
def customize_data(self, data):
pass
CWCTL.register(IndexInES)
......@@ -54,6 +54,8 @@
except KeyError:
continue
data[attr] = value
data.update(entity.cw_attr_cache)
# TODO take a look at what's in entity.cw_relation_cache
return data
......
......@@ -15,6 +15,12 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from elasticsearch.exceptions import ConnectionError
from urllib3.exceptions import ProtocolError
from elasticsearch_dsl.connections import connections
from rql.utils import rqlvar_maker
INDEXABLE_TYPES = None
......@@ -35,6 +41,13 @@
},
}
# customization mechanism, in your cube, add your type as a key, and a list of
# additionnal attributes
# eg. CUSTOM_ATTRIBUTES['BlogEntry'] = ('description',)
CUSTOM_ATTRIBUTES = {}
log = logging.getLogger(__name__)
def indexable_types(schema):
'''
......@@ -69,6 +82,7 @@
rql = ['WHERE %s is %s' % (V, etype)]
if eid:
rql.append('%s eid %i' % (V, eid))
var = next(varmaker)
selected = []
for rschema in schema.eschema(etype).indexable_attributes():
attr = rschema.type
......@@ -81,6 +95,6 @@
var = next(varmaker)
rql.append('%s %s %s' % (V, attr, var))
selected.append((attr, var))
for attr in ('creation_date', 'modification_date'):
for attr in ('creation_date', 'modification_date',) + CUSTOM_ATTRIBUTES.get(etype, ()):
var = next(varmaker)
rql.append('%s %s %s' % (V, attr, var))
......@@ -85,4 +99,4 @@
var = next(varmaker)
rql.append('%s %s %s' % (V, attr, var))
selected.append((attr, var))
selected.append(var)
# TODO inlined relations ?
......@@ -88,3 +102,3 @@
# TODO inlined relations ?
return 'Any %s,%s %s' % (V, ','.join(var for attr, var in selected),
return 'Any %s,%s %s' % (V, ','.join(selected),
','.join(rql))
......@@ -90,1 +104,29 @@
','.join(rql))
def get_connection(config):
'''
Get connection with config object, creates a persistent connexion and
creates the initial index with custom settings
'''
try:
return connections.get_connection()
except KeyError:
locations = config['elasticsearch-locations']
index_name = config['index-name']
if locations and index_name:
# TODO sanitize locations
es = connections.create_connection(hosts=locations.split(','),
index=index_name,
timeout=20)
try:
if not es.indices.exists(index=index_name):
# TODO could remove ignore=400 since checks does not exist
# ignore 400 caused by IndexAlreadyExistsException when creating an
es.indices.create(index=index_name,
body=INDEX_SETTINGS,
ignore=400)
except (ConnectionError, ProtocolError):
log.debug('Failed to index in hook, could not connect to ES')
return es
# TODO else ? raise KeyError - return None is OK?
......@@ -19,9 +19,8 @@
import logging
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError
from urllib3.exceptions import ProtocolError
from cubicweb.server import hook
from cubicweb.predicates import score_entity
......@@ -23,9 +22,9 @@
from elasticsearch.exceptions import ConnectionError
from urllib3.exceptions import ProtocolError
from cubicweb.server import hook
from cubicweb.predicates import score_entity
from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS, fulltext_indexable_rql
from cubes.elasticsearch.es import indexable_types, fulltext_indexable_rql, get_connection
log = logging.getLogger(__name__)
......@@ -46,21 +45,12 @@
def __call__(self):
if self.entity.cw_etype == 'File':
return # FIXME hack!
locations = self._cw.vreg.config['elasticsearch-locations']
index_name = self._cw.vreg.config['index-name']
if locations and index_name:
es = Elasticsearch(locations and locations.split(','))
# TODO : have a cache so that we don't check index on every transition
try:
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name,
body=INDEX_SETTINGS,
ignore=400)
except (ConnectionError, ProtocolError):
log.debug('Failed to index in hook, could not connect to ES')
indexable_entity = self._cw.execute(fulltext_indexable_rql(self.entity.cw_etype,
self.entity._cw.vreg.schema,
eid=self.entity.eid)).one()
es = get_connection(self._cw.vreg.config)
if es:
rql = fulltext_indexable_rql(self.entity.cw_etype,
self.entity._cw.vreg.schema,
eid=self.entity.eid)
indexable_entity = self._cw.execute(rql).one()
serializer = indexable_entity.cw_adapt_to('IFullTextIndexSerializable')
json = serializer.serialize()
try:
......@@ -64,8 +54,8 @@
serializer = indexable_entity.cw_adapt_to('IFullTextIndexSerializable')
json = serializer.serialize()
try:
# TODO option pour coté async ?
es.index(index=index_name,
# TODO option pour coté async ? thread
es.index(index=self._cw.vreg.config['index-name'],
id=self.entity.eid,
doc_type=self.entity.cw_etype,
body=json)
......
......@@ -15,6 +15,8 @@
from cubes.elasticsearch import ccplugin
from cubes.elasticsearch.es import indexable_types, INDEX_SETTINGS
# TODO - find a way to configure ElasticSearch as non threaded while running tests
# so that the traces show the full stack, not just starting from connection.http_*
class ExportElasticSearchTC(testlib.AutoPopulateTest):
# ignore ComputedRelations
......@@ -26,7 +28,7 @@
self.orig_config_for = CubicWebConfiguration.config_for
config_for = lambda appid: self.config # noqa
CubicWebConfiguration.config_for = staticmethod(config_for)
self.config['elasticsearch-locations'] = 'http://10.1.1.1:9200'
self.config['elasticsearch-locations'] = 'http://nonexistant.elastic.search:9200'
self.config['index-name'] = 'unittest_index_name'
def to_test_etypes(self):
......@@ -45,5 +47,6 @@
0)
@patch('elasticsearch.client.Elasticsearch.index', unsafe=True)
@patch('elasticsearch.client.Elasticsearch.bulk', unsafe=True)
@patch('elasticsearch.client.indices.IndicesClient.exists', unsafe=True)
@patch('elasticsearch.client.indices.IndicesClient.create', unsafe=True)
......@@ -48,8 +51,12 @@
@patch('elasticsearch.client.indices.IndicesClient.exists', unsafe=True)
@patch('elasticsearch.client.indices.IndicesClient.create', unsafe=True)
def test_ccplugin(self, create, exists, index):
self.auto_populate(10)
index.reset_mock()
def test_ccplugin(self, create, exists, bulk, index):
# TODO disable hook!!! then remove index mock
with self.admin_access.repo_cnx() as cnx:
cnx.disable_hook_categories('es')
with cnx.allow_all_hooks_but('es'):
self.auto_populate(10)
bulk.reset_mock()
cmd = [self.appid, '--dry-run', 'yes']
sys.stdout = out = StringIO()
try:
......@@ -58,7 +65,7 @@
sys.stdout = sys.__stdout__
self.assertEquals('', out.getvalue())
create.assert_not_called()
index.assert_not_called()
bulk.assert_not_called()
cmd = [self.appid, '--dry-run', 'yes', '--debug', 'yes']
sys.stdout = out = StringIO()
......@@ -68,7 +75,7 @@
sys.stdout = sys.__stdout__
self.assert_('found ' in out.getvalue())
create.assert_not_called()
index.assert_not_called()
bulk.assert_not_called()
# TODO try wrong option
# cmd = [self.appid, '--wrong-option', 'yes']
......@@ -82,9 +89,11 @@
with self.admin_access.repo_cnx() as cnx:
self.assert_(cnx.execute('Any X WHERE X is %(etype)s' %
{'etype': indexable_types(cnx.repo)[0]}))
create.assert_called_with(
ignore=400, index='unittest_index_name', body=INDEX_SETTINGS)
index.assert_called()
# TODO - put this somewhere where it tests on the first get_connection
#create.assert_called_with(ignore=400,
# index='unittest_index_name',
# body=INDEX_SETTINGS)
bulk.assert_called()
# TODO ? check called data
@patch('elasticsearch.client.indices.IndicesClient.create', unsafe=True)
......
......@@ -17,7 +17,6 @@
"""elasticsearch search views"""
from elasticsearch.exceptions import NotFoundError
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl import FacetedSearch, TermsFacet, DateHistogramFacet
from bs4 import BeautifulSoup
......@@ -26,7 +25,7 @@
import cwtags.tag as t
from cubicweb.view import StartupView
from cubes.elasticsearch.es import indexable_types
from cubes.elasticsearch.es import indexable_types, get_connection
from cubes.elasticsearch.search_helpers import compose_search
......@@ -30,18 +29,6 @@
from cubes.elasticsearch.search_helpers import compose_search
def get_connection(config):
try:
connections.get_connection()
except KeyError:
locations = config['elasticsearch-locations']
index_name = config['index-name']
# TODO sanitize locations
connections.create_connection(hosts=locations.split(','),
index=index_name,
timeout=20)
class CWFacetedSearch(FacetedSearch):
# fields that should be searched
fields = ["title^3", "description^2", '_all']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment