Commit 4af72603 authored by Denis Laxalde's avatar Denis Laxalde
Browse files

[server] Make "sources_by_uri" and "sources_by_eid" properties of repository

I.e. do not populate these dict as repo initialization (bootstrap step) but
always use information from database. This is needed because when multiple
instances of the same application run, if one instance adds a CWSource the
other ones will not see it. In particular, when using a scheduler instance,
new CWSource will be added by the web instance and not seen by the scheduler
which is supposed to update them.

We thus define properties for sources_by_eid and sources_by_uri instead
attributes on repository instance. CWSource entities are thus retrieved from
database every time these properties are accessed. We factor out
initialization of the "source" instance (subclass of
cubicweb.server.source.AbstractSource) in a _sources() method. Note that this
method takes care of calling "init" method on the source as well as
"set_schema" (previously done in repo.set_schema(), which now only touches
system_source). Accordingly the "init_sources_from_database" method is dropped
along with "add_source"/"remove_source" methods.

In syncsources hook, we thus drop:

* SourceAddedOp operation which called repo.add_source() so that the
  SourceAddedHook only cares about checking source configuration now;
* SourceRemovedOp and SourceRenamedOp operations for the same reason;
* SourceConfigUpdatedOp as updating the live config of source is
  meaningless once we rely on them being retrieved from the database;
* SourceHostConfigUpdatedHook hook which is now useless without call to
  SourceConfigUpdatedOp;

In 3.10 migration script, remove usage of sources_by_uri repo attribute which,
unless I'm missing something, appears useless (at least now).

In tests:

* unittest_datafeed: remove test_update_url method since we dropped respective
  hook;
* unittest_ldapsource: LDAPFeedUserDeletionTC.test_a_filter_inactivate()
  currently fails because it still relies on live config being updated, this
  will be fixed in the next changeset once all "live source" logic will be
  removed.

--HG--
branch : 3.25
parent 29d032bb70d8
......@@ -35,11 +35,6 @@ class SourceHook(hook.Hook):
# repo sources synchronization #################################################
class SourceAddedOp(hook.Operation):
entity = None # make pylint happy
def postcommit_event(self):
self.cnx.repo.add_source(self.entity)
class SourceAddedHook(SourceHook):
__regid__ = 'cw.sources.added'
__select__ = SourceHook.__select__ & is_instance('CWSource')
......@@ -56,13 +51,7 @@ class SourceAddedHook(SourceHook):
if self.entity.name != 'system':
sourcecls.check_conf_dict(self.entity.eid, self.entity.host_config,
fail_if_unknown=not self._cw.vreg.config.repairing)
SourceAddedOp(self._cw, entity=self.entity)
class SourceRemovedOp(hook.Operation):
uri = None # make pylint happy
def postcommit_event(self):
self.cnx.repo.remove_source(self.uri)
class SourceRemovedHook(SourceHook):
__regid__ = 'cw.sources.removed'
......@@ -72,34 +61,6 @@ class SourceRemovedHook(SourceHook):
if self.entity.name == 'system':
msg = _("You cannot remove the system source")
raise validation_error(self.entity, {None: msg})
SourceRemovedOp(self._cw, uri=self.entity.name)
class SourceConfigUpdatedOp(hook.DataOperationMixIn, hook.Operation):
def precommit_event(self):
self.__processed = []
for source in self.get_data():
if not self.cnx.deleted_in_transaction(source.eid):
conf = source.repo_source.check_config(source)
self.__processed.append( (source, conf) )
def postcommit_event(self):
for source, conf in self.__processed:
source.repo_source.update_config(source, conf)
class SourceRenamedOp(hook.LateOperation):
oldname = newname = None # make pylint happy
def postcommit_event(self):
repo = self.cnx.repo
# XXX race condition
source = repo.sources_by_uri.pop(self.oldname)
source.uri = self.newname
source.public_config['uri'] = self.newname
repo.sources_by_uri[self.newname] = source
clear_cache(repo, 'source_defs')
class SourceUpdatedHook(SourceHook):
......@@ -112,26 +73,8 @@ class SourceUpdatedHook(SourceHook):
if oldname == 'system':
msg = _("You cannot rename the system source")
raise validation_error(self.entity, {('name', 'subject'): msg})
SourceRenamedOp(self._cw, oldname=oldname, newname=newname)
if 'config' in self.entity.cw_edited or 'url' in self.entity.cw_edited:
if self.entity.name == 'system' and self.entity.config:
msg = _("Configuration of the system source goes to "
"the 'sources' file, not in the database")
raise validation_error(self.entity, {('config', 'subject'): msg})
SourceConfigUpdatedOp.get_instance(self._cw).add_data(self.entity)
class SourceHostConfigUpdatedHook(SourceHook):
__regid__ = 'cw.sources.hostconfigupdate'
__select__ = SourceHook.__select__ & is_instance('CWSourceHostConfig')
events = ('after_add_entity', 'after_update_entity', 'before_delete_entity',)
def __call__(self):
if self.entity.match(gethostname()):
if self.event == 'after_update_entity' and \
not 'config' in self.entity.cw_edited:
return
try:
SourceConfigUpdatedOp.get_instance(self._cw).add_data(self.entity.cwsource)
except IndexError:
# XXX no source linked to the host config yet
pass
from six import text_type
for uri, cfg in config.read_sources_file().items():
if uri in ('system', 'admin'):
continue
repo.sources_by_uri[uri] = repo.get_source(cfg['adapter'], uri, cfg.copy())
add_entity_type('CWSource')
add_relation_definition('CWSource', 'cw_source', 'CWSource')
add_entity_type('CWSourceHostConfig')
......@@ -21,7 +16,6 @@ commit()
for uri, cfg in config.read_sources_file().items():
if uri in ('system', 'admin'):
continue
repo.sources_by_uri.pop(uri)
config = u'\n'.join('%s=%s' % (key, value) for key, value in cfg.items()
if key != 'adapter' and value is not None)
create_entity('CWSource', name=text_type(uri), type=text_type(cfg['adapter']),
......
......@@ -211,7 +211,6 @@ class Repository(object):
def __init__(self, config, scheduler=None, vreg=None):
self.config = config
self.sources_by_eid = {}
if vreg is None:
vreg = cwvreg.CWRegistryStore(config)
self.vreg = vreg
......@@ -230,7 +229,6 @@ class Repository(object):
# sources (additional sources info in the system database)
self.system_source = self.get_source('native', 'system',
config.system_source_config.copy())
self.sources_by_uri = {'system': self.system_source}
# querier helper, need to be created after sources initialization
self.querier = querier.QuerierHelper(self, self.schema)
# cache eid -> type
......@@ -295,7 +293,6 @@ class Repository(object):
self.system_source.init_creating()
else:
self._init_system_source()
self.init_sources_from_database()
if 'CWProperty' in self.schema:
self.vreg.init_properties(self.properties())
# 4. close initialization connection set and reopen fresh ones for
......@@ -305,6 +302,41 @@ class Repository(object):
# 5. call instance level initialisation hooks
self.hm.call_hooks('server_startup', repo=self)
@property
def sources_by_uri(self):
mapping = {'system': self.system_source}
mapping.update((sourceent.name, source)
for sourceent, source in self._sources())
return mapping
@property
def sources_by_eid(self):
mapping = {self.system_source.eid: self.system_source}
mapping.update((sourceent.eid, source)
for sourceent, source in self._sources())
return mapping
def _sources(self):
if self.config.quick_start:
return
with self.internal_cnx() as cnx:
for sourceent in cnx.execute(
'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
'S name SN, S type SA, S config SC, S name != "system"').entities():
source = self.get_source(sourceent.type, sourceent.name,
sourceent.host_config, sourceent.eid)
if self.config.source_enabled(source):
# call source's init method to complete their initialisation if
# needed (for instance looking for persistent configuration using an
# internal session, which is not possible until connections sets have been
# initialized)
source.init(True, sourceent)
else:
source.init(False, sourceent)
source.set_schema(self.schema)
yield sourceent, source
self._clear_source_defs_caches()
# internals ###############################################################
def _init_system_source(self):
......@@ -317,45 +349,8 @@ class Repository(object):
' S name "system", S type SA, S config SC'
).one()
self.system_source.eid = sourceent.eid
self.sources_by_eid[sourceent.eid] = self.system_source
self.system_source.init(True, sourceent)
def init_sources_from_database(self):
if self.config.quick_start:
return
with self.internal_cnx() as cnx:
# FIXME: sources should be ordered (add_entity priority)
for sourceent in cnx.execute(
'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
'S name SN, S type SA, S config SC, S name != "system"').entities():
self.add_source(sourceent)
def add_source(self, sourceent):
try:
source = self.get_source(sourceent.type, sourceent.name,
sourceent.host_config, sourceent.eid)
except RuntimeError:
if self.config.repairing:
self.exception('cant setup source %s, skipped', sourceent.name)
return
raise
self.sources_by_eid[sourceent.eid] = source
self.sources_by_uri[sourceent.name] = source
if self.config.source_enabled(source):
# call source's init method to complete their initialisation if
# needed (for instance looking for persistent configuration using an
# internal session, which is not possible until connections sets have been
# initialized)
source.init(True, sourceent)
else:
source.init(False, sourceent)
self._clear_source_defs_caches()
def remove_source(self, uri):
source = self.sources_by_uri.pop(uri)
del self.sources_by_eid[source.eid]
self._clear_source_defs_caches()
def get_source(self, type, uri, source_config, eid=None):
# set uri and type in source config so it's available through
# source_defs()
......@@ -371,8 +366,7 @@ class Repository(object):
else:
self.vreg._set_schema(schema)
self.querier.set_schema(schema)
for source in self.sources_by_uri.values():
source.set_schema(schema)
self.system_source.set_schema(schema)
self.schema = schema
def deserialize_schema(self):
......
......@@ -116,8 +116,9 @@ class DataFeedTC(CubicWebTC):
dfsource = self.repo.sources_by_uri[u'ô myfeed']
with self.admin_access.repo_cnx() as cnx:
cnx.entity_from_eid(dfsource.eid).cw_set(url=u"http://pouet.com\nhttp://pouet.org")
self.assertEqual(dfsource.urls, [u'ignored'])
cnx.commit()
self.assertEqual(dfsource.urls, [u'ignored'])
dfsource = self.repo.sources_by_uri[u'ô myfeed']
self.assertEqual(dfsource.urls, [u"http://pouet.com", u"http://pouet.org"])
def test_parser_not_found(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment