repository.py 45.1 KB
Newer Older
1
# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2
3
4
5
6
7
8
9
10
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
11
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
12
13
14
15
16
17
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
Adrien Di Mascio's avatar
Adrien Di Mascio committed
18
19
20
21
22
23
24
"""Defines the central class for the CubicWeb RQL server: the repository.

The repository is an abstraction allowing execution of rql queries against
data sources. Most of the work is actually done in helper classes. The
repository mainly:

* brings these classes all together to provide a single access
25
  point to a cubicweb instance.
Adrien Di Mascio's avatar
Adrien Di Mascio committed
26
27
* handles session management
"""
Samuel Trégouët's avatar
Samuel Trégouët committed
28

29
from itertools import chain
30
from contextlib import contextmanager
31
from logging import getLogger
Denis Laxalde's avatar
Denis Laxalde committed
32
import queue
33
import threading
34
import time
35

36
from logilab.common.decorators import cached, clear_cache
Adrien Di Mascio's avatar
Adrien Di Mascio committed
37
38

from yams import BadSchemaDefinition
39
from rql.utils import rqlvar_maker
Adrien Di Mascio's avatar
Adrien Di Mascio committed
40

Sylvain Thénault's avatar
Sylvain Thénault committed
41
from cubicweb import (CW_MIGRATION_MAP,
42
                      UnknownEid, AuthenticationError, ExecutionError,
43
                      UniqueTogetherError, ViolatedConstraint)
44
from cubicweb import set_log_methods
Sylvain Thénault's avatar
Sylvain Thénault committed
45
from cubicweb import cwvreg, schema, server
46
from cubicweb.server import utils, hook, querier, sources
47
from cubicweb.server.session import InternalManager, Connection
Adrien Di Mascio's avatar
Adrien Di Mascio committed
48

49
50
51
52
53
54
55

NO_CACHE_RELATIONS = set([
    ('owned_by', 'object'),
    ('created_by', 'object'),
    ('cw_source', 'object'),
])

56

57
def prefill_entity_caches(entity):
58
    cnx = entity._cw
59
60
61
    # prefill entity relation caches
    for rschema in entity.e_schema.subject_relations():
        rtype = str(rschema)
62
        if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS:
63
64
65
66
67
            continue
        if rschema.final:
            entity.cw_attr_cache.setdefault(rtype, None)
        else:
            entity.cw_set_relation_cache(rtype, 'subject',
68
                                         cnx.empty_rset())
69
70
    for rschema in entity.e_schema.object_relations():
        rtype = str(rschema)
71
        if rtype in schema.VIRTUAL_RTYPES or (rtype, 'object') in NO_CACHE_RELATIONS:
72
            continue
73
        entity.cw_set_relation_cache(rtype, 'object', cnx.empty_rset())
74

75

76
def del_existing_rel_if_needed(cnx, eidfrom, rtype, eidto):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
77
78
79
80
81
82
83
84
    """delete existing relation when adding a new one if card is 1 or ?

    have to be done once the new relation has been inserted to avoid having
    an entity without a relation for some time

    this kind of behaviour has to be done in the repository so we don't have
    hooks order hazardness
    """
85
    # skip that if integrity explicitly disabled
86
    if not cnx.is_hook_category_activated('activeintegrity'):
87
        return
88
    rdef = cnx.rtype_eids_rdef(rtype, eidfrom, eidto)
89
    card = rdef.cardinality
Adrien Di Mascio's avatar
Adrien Di Mascio committed
90
91
92
93
94
95
    # one may be tented to check for neweids but this may cause more than one
    # relation even with '1?'  cardinality if thoses relations are added in the
    # same transaction where the entity is being created. This never occurs from
    # the web interface but may occurs during test or dbapi connection (though
    # not expected for this).  So: don't do it, we pretend to ensure repository
    # consistency.
96
    #
Sylvain Thénault's avatar
Sylvain Thénault committed
97
98
99
100
    # notes:
    # * inlined relations will be implicitly deleted for the subject entity
    # * we don't want read permissions to be applied but we want delete
    #   permission to be checked
101
    if card[0] in '1?':
102
103
104
105
        with cnx.security_enabled(read=False):
            cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
                        'NOT Y eid %%(y)s' % rtype,
                        {'x': eidfrom, 'y': eidto})
Adrien Di Mascio's avatar
Adrien Di Mascio committed
106
    if card[1] in '1?':
107
108
109
110
        with cnx.security_enabled(read=False):
            cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
                        'NOT X eid %%(x)s' % rtype,
                        {'x': eidfrom, 'y': eidto})
111

Sylvain Thénault's avatar
cleanup    
Sylvain Thénault committed
112

113
def preprocess_inlined_relations(cnx, entity):
114
115
116
117
    """when an entity is added, check if it has some inlined relation which
    requires to be extrated for proper call hooks
    """
    relations = []
118
    activeintegrity = cnx.is_hook_category_activated('activeintegrity')
119
    eschema = entity.e_schema
120
    for attr in entity.cw_edited:
121
        rschema = eschema.subjrels[attr]
122
        if not rschema.final:  # inlined relation
123
124
            value = entity.cw_edited[attr]
            relations.append((attr, value))
125
126
            cnx.update_rel_cache_add(entity.eid, attr, value)
            rdef = cnx.rtype_eids_rdef(attr, entity.eid, value)
127
            if rdef.cardinality[1] in '1?' and activeintegrity:
128
129
                with cnx.security_enabled(read=False):
                    cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr,
130
                                {'x': entity.eid, 'y': value})
131
132
    return relations

133

134
class NullEventBus(object):
135
    def publish(self, msg):
136
137
138
139
140
141
142
143
144
145
146
        pass

    def add_subscription(self, topic, callback):
        pass

    def start(self):
        pass

    def stop(self):
        pass

147

148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# python STL doens't provide any concurrent deque implementation with blocking .get
class LifoDeque(queue.LifoQueue):
    def get_oldest(self, block=True, timeout=None):
        'Like queue.get except it gets the oldest one'
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise queue.Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise queue.Empty
                    self.not_empty.wait(remaining)
            item = self.queue.pop(0)  # <- that's the only code difference from super().get
            self.not_full.notify()
        return item


173
174
175
class _BaseCnxSet:
    """
    Simple connection set without any pooling capability.
176

177
178
    You should only uses this if you are using an external pool like pgbouncer.
    """
179

180
181
    def __init__(self, source):
        self._source = source
182

183
184
    def qsize(self):
        return None
185

186
    def get(self):
187
188
189
        return self._new_cnxset()

    def _new_cnxset(self):
190
        return self._source.wrapped_connection()
191

192
193
194
195
196
197
198
199
200
    def release(self, cnxset):
        cnxset.close(True)

    def __iter__(self):
        return
        yield

    def close(self):
        pass
201

202
203
204

class _CnxSetPool(_BaseCnxSet):
    """
205
    Dynamic database connections pool.
206
207
    """

208
    def __init__(self, source, min_size=1, max_size=4, idle_timeout=300):
209
210
211
212
213
214
215
216
217
218
219
220
221
        """
        Connection pool for database connections. This pool is a dynamic pooler
        that opens new connections when the load is to high and closes connections
        on lower load after some idle timeout.

        Arguments::

        * source: database source
        * min_size: minimum number of simultanuous connections
        * max_size: maximum number of simultanuous connections, if set to 0
          there won't be any limit
        * idle_timeout: time before the pool starts closing connections
        """
222
223
        super().__init__(source)
        self._cnxsets = []
224
        self._queue = LifoDeque()
225
226
227
        self.lock = threading.Lock()
        self.min_size = min_size
        self.max_size = max_size
228
229
        self.idle = time.time()
        self.idle_timeout = idle_timeout
230

231
        for i in range(min_size):
232
233
234
            self._queue.put_nowait(self._new_cnxset())

    def _new_cnxset(self):
235
236
237
        """
        Create a new connection and returns it. This operation grabs the lock.
        """
238
        cnxset = super()._new_cnxset()
239
240
        with self.lock:
            self._cnxsets.append(cnxset)
241
        return cnxset
242

243
    def _close_idle_cnxset(self):
244
245
246
        """
        Close connections not being used since idle_timeout.
        """
247
248
        if abs(time.time() - self.idle) > self.idle_timeout and self.size() > self.min_size:
            try:
249
                cnxset = self._queue.get_oldest(block=False)
250
251
252
253
            except queue.Empty:
                # the queue has been used since we checked it size
                pass
            else:
254
                self.debug("[pool] load is low, close a connection")
255
256
257
258
                cnxset.close(True)
                with self.lock:
                    self._cnxsets.remove(cnxset)

259
    def size(self):
260
261
262
        """
        Return the total number of connections.
        """
263
264
265
        with self.lock:
            return len(self._cnxsets)

266
    def qsize(self):
267
268
269
        """
        Return the size of the queue.
        """
270
        return self._queue.qsize()
271
272

    def get(self):
273
274
275
276
277
278
279
280
281
282
283
        """
        Try to get a connection from the queue if available or open a new one
        if the maximum number isn't reached.

        If no connection is available and the maximum number of connections is
        reached, try to get a new one with a timeout. If the timeout is reached
        an exception is raised. The maximum number of connections can be
        modified to avoid that or the timeout can be increased.

        If max_size is set to 0 then there won't be any limit to open new connections.
        """
284
285
        self.debug(f"[pool] try to get a connection, free queue size is "
                   f"{self.qsize()}/{self.size()} (max {self.max_size})")
286
        try:
287
            cnxset = self._queue.get_nowait()
288
            self._close_idle_cnxset()
289
            self.debug("[pool] we got a directly available connection")
290
            return cnxset
291
        except queue.Empty:
292
293
            # reset idle time
            self.idle = time.time()
294
295
            if self.max_size and self.size() >= self.max_size:
                try:
296
297
298
                    connection = self._queue.get(True, timeout=5)
                    self.debug("[pool] we got a connection before minimum timeout")
                    return connection
299
300
301
302
                except queue.Empty:
                    raise Exception('no connections set available after 5 secs, probably either a '
                                    'bug in code (too many uncommited/rolled back '
                                    'connections) or too much load on the server (in '
303
304
305
                                    'which case you can try to set a bigger connections pool size '
                                    'by changing the connections-pool-max-size option in your '
                                    'configuration file)')
306
            else:
307
308
                self.debug("[pool] we had to open a new connection, connection number: %s/%s" %
                           (self.size(), self.max_size))
309
                return self._new_cnxset()
310
311

    def release(self, cnxset):
312
313
314
        """
        Release a connection by returning it into the queue.
        """
315
        self._queue.put_nowait(cnxset)
316
317
        self.debug(f"[pool] release {cnxset} [{id(cnxset)}], free queue size is "
                   f"{self.qsize()}/{self.size()} (max {self.max_size})")
318
        self._close_idle_cnxset()
319
320

    def __iter__(self):
321
322
323
        """
        Iter on all connections. This operation grabs the lock.
        """
324
325
326
        with self.lock:
            for cnxset in self._cnxsets:
                yield cnxset
327
328

    def close(self):
329
330
331
        """
        Close the poll by closing all the connections still in the queue.
        """
332
333
334
335
336
        while True:
            try:
                cnxset = self._queue.get_nowait()
            except queue.Empty:
                break
337
338
339
340
341
            try:
                cnxset.close(True)
            except Exception as e:
                self.exception('error while closing %s, error: %s' % (cnxset, e))

342
343
344
345
    # these are overridden by set_log_methods below
    # only defining here to prevent pylint from complaining
    info = warning = error = critical = exception = debug = lambda msg, *a, **kw: None

346

347
348
def get_cnxset(config, source, bootstrap=False):
    if not config['connections-pooler-enabled']:
349
        return _BaseCnxSet(source)
350
    idle_timeout = config['connections-pool-idle-timeout']
351
    if bootstrap or config.quick_start:
352
        min_size, max_size = 0, 1
353
    else:
354
355
356
357
358
359
        min_size, max_size = (
            config['connections-pool-min-size'],
            config['connections-pool-max-size'],
        )
    return _CnxSetPool(source, min_size=min_size, max_size=max_size,
                       idle_timeout=idle_timeout)
360

361

Adrien Di Mascio's avatar
Adrien Di Mascio committed
362
363
364
365
class Repository(object):
    """a repository provides access to a set of persistent storages for
    entities and relations
    """
366

367
    def __init__(self, config, scheduler=None, vreg=None):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
368
369
        self.config = config
        if vreg is None:
370
            vreg = cwvreg.CWRegistryStore(config)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
371
        self.vreg = vreg
372
        self._scheduler = scheduler
373

374
        self.app_instances_bus = NullEventBus()
375

Adrien Di Mascio's avatar
Adrien Di Mascio committed
376
377
378
379
        # list of functions to be called at regular interval
        # list of running threads
        self._running_threads = []
        # initial schema, should be build or replaced latter
Sylvain Thénault's avatar
Sylvain Thénault committed
380
        self.schema = schema.CubicWebSchema(config.appid)
381
        self.vreg.schema = self.schema  # until actual schema is loaded...
382
        # shutdown flag
383
        self.shutting_down = None
384
385
        # sources (additional sources info in the system database)
        self.system_source = self.get_source('native', 'system',
386
                                             config.system_source_config.copy())
387
388
        # querier helper, need to be created after sources initialization
        self.querier = querier.QuerierHelper(self, self.schema)
389
390
        # cache eid -> type
        self._type_cache = {}
391
392
        # the hooks manager
        self.hm = hook.HooksManager(self.vreg)
393

394
    def bootstrap(self):
395
        self.info('starting repository from %s', self.config.apphome)
396
        self.shutting_down = False
397
        config = self.config
398
        # 0. init a cnxset that will be used to fetch bootstrap information from
399
        #    the database
400
        self.cnxsets = get_cnxset(config, self.system_source, bootstrap=True)
401
402
403
404
405
406
407
        # 1. set used cubes
        if config.creating or not config.read_instance_schema:
            config.bootstrap_cubes()
        else:
            self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
            config.init_cubes(self.get_cubes())
        # 2. load schema
408
        if config.quick_start:
409
            # quick start: only to get a minimal repository to get cubes
410
            # information (eg dump/restore/...)
411
412
413
            #
            # restrict appobject_path to only load hooks and entity classes in
            # the registry
414
415
            config.cube_appobject_path = set(('hooks', 'entities'))
            config.cubicweb_appobject_path = set(('hooks', 'entities'))
416
417
        if config.quick_start or config.creating or not config.read_instance_schema:
            # load schema from the file system
418
            if not config.creating:
419
                self.info("set fs instance's schema")
420
            self.set_schema(config.load_schema(expand_cubes=True))
421
422
423
424
            if not config.creating:
                # set eids on entities schema
                with self.internal_cnx() as cnx:
                    for etype, eid in cnx.execute('Any XN,X WHERE X is CWEType, X name XN'):
425
426
427
428
429
430
                        try:
                            self.schema.eschema(etype).eid = eid
                        except KeyError:
                            # etype in the database doesn't exist in the fs schema, this may occur
                            # during dev and we shouldn't crash
                            self.warning('No %s entity type in the file system schema', etype)
431
432
        else:
            # normal start: load the instance schema from the database
433
434
435
436
            self.info('loading schema from the repository')
            self.set_schema(self.deserialize_schema())
        # 3. initialize data sources
        if config.creating:
Sylvain Thénault's avatar
Sylvain Thénault committed
437
438
            # call init_creating so that for instance native source can
            # configurate tsearch according to postgres version
439
            self.system_source.init_creating()
440
        else:
441
            self._init_system_source()
442
443
444
445
            if 'CWProperty' in self.schema:
                self.vreg.init_properties(self.properties())
        # 4. close initialization connection set and reopen fresh ones for
        #    proper initialization
446
        self.cnxsets.close()
447
        self.cnxsets = get_cnxset(config, self.system_source)
448
449
        # 5. call instance level initialisation hooks
        self.hm.call_hooks('server_startup', repo=self)
450

451
452
453
454
455
456
457
458
459
460
461
462
463
464
    def source_by_uri(self, uri):
        with self.internal_cnx() as cnx:
            rset = cnx.find('CWSource', name=uri)
            if not rset:
                raise ValueError('no source with uri %s found' % uri)
            return self._source_from_cwsource(rset.one())

    def source_by_eid(self, eid):
        with self.internal_cnx() as cnx:
            rset = cnx.find('CWSource', eid=eid)
            if not rset:
                raise ValueError('no source with eid %d found' % eid)
            return self._source_from_cwsource(rset.one())

465
466
467
468
469
470
471
472
473
474
475
476
477
478
    @property
    def sources_by_uri(self):
        mapping = {'system': self.system_source}
        mapping.update((sourceent.name, source)
                       for sourceent, source in self._sources())
        return mapping

    def _sources(self):
        if self.config.quick_start:
            return
        with self.internal_cnx() as cnx:
            for sourceent in cnx.execute(
                    'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
                    'S name SN, S type SA, S config SC, S name != "system"').entities():
479
                source = self._source_from_cwsource(sourceent)
480
481
482
                yield sourceent, source
        self._clear_source_defs_caches()

483
484
485
486
487
488
489
490
    def _source_from_cwsource(self, sourceent):
        source = self.get_source(sourceent.type, sourceent.name,
                                 sourceent.host_config, sourceent.eid)
        if self.config.source_enabled(source):
            # call source's init method to complete their initialisation if
            # needed (for instance looking for persistent configuration using an
            # internal session, which is not possible until connections sets have been
            # initialized)
491
            source.init(sourceent)
492
493
        return source

Adrien Di Mascio's avatar
Adrien Di Mascio committed
494
495
    # internals ###############################################################

496
    def _init_system_source(self):
497
        if self.config.quick_start:
498
            self.system_source.init_creating()
499
            return
500
501
502
503
504
505
        with self.internal_cnx() as cnx:
            sourceent = cnx.execute(
                'Any S, SA, SC WHERE S is_instance_of CWSource,'
                ' S name "system", S type SA, S config SC'
            ).one()
            self.system_source.eid = sourceent.eid
506
            self.system_source.init(sourceent)
507

508
    def get_source(self, type, uri, source_config, eid=None):
509
510
        # set uri and type in source config so it's available through
        # source_defs()
Adrien Di Mascio's avatar
Adrien Di Mascio committed
511
        source_config['uri'] = uri
512
        source_config['type'] = type
513
        return sources.get_source(type, source_config, self, eid)
514

515
    def set_schema(self, schema, resetvreg=True):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
516
517
        self.info('set schema %s %#x', schema.name, id(schema))
        if resetvreg:
518
            # trigger full reload of all appobjects
Adrien Di Mascio's avatar
Adrien Di Mascio committed
519
            self.vreg.set_schema(schema)
520
521
522
        else:
            self.vreg._set_schema(schema)
        self.querier.set_schema(schema)
523
        self.system_source.set_schema(schema)
524
        self.schema = schema
Adrien Di Mascio's avatar
Adrien Di Mascio committed
525

526
527
    def deserialize_schema(self):
        """load schema from the database"""
Adrien Di Mascio's avatar
Adrien Di Mascio committed
528
        from cubicweb.server.schemaserial import deserialize_schema
Sylvain Thénault's avatar
Sylvain Thénault committed
529
        appschema = schema.CubicWebSchema(self.config.appid)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
530
        self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
531
        with self.internal_cnx() as cnx:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
532
            try:
533
                deserialize_schema(appschema, cnx)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
534
535
            except BadSchemaDefinition:
                raise
536
            except Exception as ex:
537
538
                import traceback
                traceback.print_exc()
539
                raise Exception('Is the database initialised ? (cause: %s)' % ex)
540
        return appschema
541

542
543
544
545
546
547
    def has_scheduler(self):
        """Return True if the repository has a scheduler attached and is able
        to register looping tasks.
        """
        return self._scheduler is not None

548
549
    def run_scheduler(self):
        """Start repository scheduler after preparing the repository for that.
550
551

        * trigger server startup hook,
552
        * start the scheduler *and block*.
553
554
555
556

        XXX Other startup related stuffs are done elsewhere. In Repository
        XXX __init__ or in external codes (various server managers).
        """
557
        assert self.has_scheduler(), \
558
            "This Repository is not intended to be used as a server"
559
560
561
562
        self.info(
            'starting repository scheduler with tasks: %s',
            ', '.join(e.action.__name__ for e in self._scheduler.queue))
        self._scheduler.run()
Adrien Di Mascio's avatar
Adrien Di Mascio committed
563

564
    def looping_task(self, interval, func, *args):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
565
        """register a function to be called every `interval` seconds.
566

Adrien Di Mascio's avatar
Adrien Di Mascio committed
567
568
569
        looping tasks can only be registered during repository initialization,
        once done this method will fail.
        """
570
571
        if self.config.repairing:
            return
572
        if not self.has_scheduler():
573
574
575
576
577
            self.warning(
                'looping task %s will not run in this process where repository '
                'has no scheduler; use "cubicweb-ctl scheduler <appid>" to '
                'have it running', func)
            return
578
579
580
581
        event = utils.schedule_periodic_task(
            self._scheduler, interval, func, *args)
        self.info('scheduled periodic task %s (interval: %.2fs)',
                  event.action.__name__, interval)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
582
583
584

    def threaded_task(self, func):
        """start function in a separated thread"""
585
        utils.RepoThread(func, self._running_threads).start()
586

Adrien Di Mascio's avatar
Adrien Di Mascio committed
587
588
589
590
    def shutdown(self):
        """called on server stop event to properly close opened sessions and
        connections
        """
591
        assert not self.shutting_down, 'already shutting down'
592
593
594
595
        if not (self.config.creating or self.config.repairing
                or self.config.quick_start):
            # then, the system source is still available
            self.hm.call_hooks('before_server_shutdown', repo=self)
596
        self.shutting_down = True
597
        self.info('shutting down repository')
598
        self.system_source.shutdown()
599
600
601
        if not (self.config.creating or self.config.repairing
                or self.config.quick_start):
            self.hm.call_hooks('server_shutdown', repo=self)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
602
        for thread in self._running_threads:
603
            self.info('waiting thread %s...', thread.getName())
Adrien Di Mascio's avatar
Adrien Di Mascio committed
604
            thread.join()
605
            self.info('thread %s finished', thread.getName())
606
        self.cnxsets.close()
607
        hits, misses = self.querier.rql_cache.cache_hit, self.querier.rql_cache.cache_miss
Adrien Di Mascio's avatar
Adrien Di Mascio committed
608
        try:
609
            self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
Adrien Di Mascio's avatar
Adrien Di Mascio committed
610
611
612
613
                      (hits * 100) / (hits + misses))
            hits, misses = self.system_source.cache_hit, self.system_source.cache_miss
            self.info('sql cache hit/miss: %s/%s (%s%% hits)', hits, misses,
                      (hits * 100) / (hits + misses))
614
615
            nocache = self.system_source.no_cache
            self.info('sql cache usage: %s/%s (%s%%)', hits + misses, nocache,
Adrien Di Mascio's avatar
Adrien Di Mascio committed
616
617
618
                      ((hits + misses) * 100) / (hits + misses + nocache))
        except ZeroDivisionError:
            pass
619

620
    def check_auth_info(self, cnx, login, authinfo):
621
622
        """validate authentication, raise AuthenticationError on failure, return
        associated CWUser's eid on success.
Adrien Di Mascio's avatar
Adrien Di Mascio committed
623
        """
624
625
        # iter on sources_by_uri then check enabled source since sources doesn't
        # contain copy based sources
626
        for source in self.sources_by_uri.values():
627
            if self.config.source_enabled(source):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
628
                try:
629
                    return source.authenticate(cnx, login, **authinfo)
630
                except (NotImplementedError, AuthenticationError):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
631
632
633
                    continue
        else:
            raise AuthenticationError('authentication failed with all sources')
634

635
    def authenticate_user(self, cnx, login, **authinfo):
636
637
638
        """validate login / password, raise AuthenticationError on failure
        return associated CWUser instance on success
        """
639
640
        eid = self.check_auth_info(cnx, login, authinfo)
        cwuser = self._build_user(cnx, eid)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
641
        if self.config.consider_user_state and \
642
           not cwuser.cw_adapt_to('IWorkflowable').state in cwuser.AUTHENTICABLE_STATES:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
643
            raise AuthenticationError('user is not in authenticable state')
Sylvain Thénault's avatar
Sylvain Thénault committed
644
        return cwuser
Adrien Di Mascio's avatar
Adrien Di Mascio committed
645

646
    def _build_user(self, cnx, eid):
647
        """return a CWUser entity for user with the given eid"""
648
649
650
651
652
        cls = self.vreg['etypes'].etype_class('CWUser')
        st = cls.fetch_rqlst(cnx.user, ordermethod=None)
        st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
        rset = cnx.execute(st.as_string(), {'x': eid})
        assert len(rset) == 1, rset
653
        return rset.get_entity(0, 0)
654

Adrien Di Mascio's avatar
Adrien Di Mascio committed
655
    # public (dbapi) interface ################################################
656

Adrien Di Mascio's avatar
Adrien Di Mascio committed
657
    def get_schema(self):
Sylvain Thénault's avatar
Sylvain Thénault committed
658
659
660
        """Return the instance schema.

        This is a public method, not requiring a session id.
Adrien Di Mascio's avatar
Adrien Di Mascio committed
661
        """
662
        return self.schema
Adrien Di Mascio's avatar
Adrien Di Mascio committed
663
664

    def get_cubes(self):
Sylvain Thénault's avatar
Sylvain Thénault committed
665
666
667
        """Return the list of cubes used by this instance.

        This is a public method, not requiring a session id.
Adrien Di Mascio's avatar
Adrien Di Mascio committed
668
        """
669
        versions = self.get_versions(not (self.config.creating
670
                                          or self.config.repairing
671
                                          or self.config.quick_start
672
                                          or self.config.mode == 'test'))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
673
674
675
676
        cubes = list(versions)
        cubes.remove('cubicweb')
        return cubes

Denis Laxalde's avatar
Denis Laxalde committed
677
    def get_option_value(self, option):
678
        """Return the value for `option` in the configuration.
679
680
681
682

        This is a public method, not requiring a session id.
        """
        # XXX we may want to check we don't give sensible information
683
        return self.config[option]
684

Adrien Di Mascio's avatar
Adrien Di Mascio committed
685
686
    @cached
    def get_versions(self, checkversions=False):
Sylvain Thénault's avatar
Sylvain Thénault committed
687
688
689
690
        """Return the a dictionary containing cubes used by this instance
        as key with their version as value, including cubicweb version.

        This is a public method, not requiring a session id.
Adrien Di Mascio's avatar
Adrien Di Mascio committed
691
692
693
        """
        from logilab.common.changelog import Version
        vcconf = {}
694
695
        with self.internal_cnx() as cnx:
            for pk, version in cnx.execute(
696
697
                    'Any K,V WHERE P is CWProperty, P value V, P pkey K, '
                    'P pkey ~="system.version.%"', build_descr=False):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
698
699
700
701
702
703
704
705
706
707
708
709
                cube = pk.split('.')[-1]
                # XXX cubicweb migration
                if cube in CW_MIGRATION_MAP:
                    cube = CW_MIGRATION_MAP[cube]
                version = Version(version)
                vcconf[cube] = version
                if checkversions:
                    if cube != 'cubicweb':
                        fsversion = self.config.cube_version(cube)
                    else:
                        fsversion = self.config.cubicweb_version()
                    if version < fsversion:
710
                        msg = ('instance has %s version %s but %s '
711
712
                               'is installed. Run "cubicweb-ctl upgrade %s".')
                        raise ExecutionError(msg % (cube, version, fsversion, self.config.appid))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
713
        return vcconf
714

Adrien Di Mascio's avatar
Adrien Di Mascio committed
715
716
    @cached
    def source_defs(self):
Sylvain Thénault's avatar
Sylvain Thénault committed
717
718
719
720
721
        """Return the a dictionary containing source uris as value and a
        dictionary describing each source as value.

        This is a public method, not requiring a session id.
        """
722
        sources = {}
Adrien Di Mascio's avatar
Adrien Di Mascio committed
723
        # remove sensitive information
724
        for uri, source in self.sources_by_uri.items():
725
            sources[uri] = source.public_config
Adrien Di Mascio's avatar
Adrien Di Mascio committed
726
727
        return sources

728
729
730
    def _clear_source_defs_caches(self):
        clear_cache(self, 'source_defs')

Adrien Di Mascio's avatar
Adrien Di Mascio committed
731
    def properties(self):
Sylvain Thénault's avatar
Sylvain Thénault committed
732
733
734
735
        """Return a result set containing system wide properties.

        This is a public method, not requiring a session id.
        """
736
        with self.internal_cnx() as cnx:
737
738
739
740
            # don't use cnx.execute, we don't want rset.req set
            return self.querier.execute(cnx, 'Any K,V WHERE P is CWProperty,'
                                        'P pkey K, P value V, NOT P for_user U',
                                        build_descr=False)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
741

742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
    def find_users(self, fetch_attrs, **query_attrs):
        """yield user attributes for cwusers matching the given query_attrs
        (the result set cannot survive this method call)

        This can be used by low-privileges account (anonymous comes to
        mind).

        `fetch_attrs`: tuple of attributes to be fetched
        `query_attrs`: dict of attr/values to restrict the query
        """
        assert query_attrs
        if not hasattr(self, '_cwuser_attrs'):
            cwuser = self.schema['CWUser']
            self._cwuser_attrs = set(str(rschema)
                                     for rschema, _eschema in cwuser.attribute_definitions()
                                     if not rschema.meta)
        cwuserattrs = self._cwuser_attrs
759
        for k in chain(fetch_attrs, query_attrs):
760
761
            if k not in cwuserattrs:
                raise Exception('bad input for find_user')
762
        with self.internal_cnx() as cnx:
763
            varmaker = rqlvar_maker()
764
            vars = [(attr, next(varmaker)) for attr in fetch_attrs]
765
766
            rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars)
            rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ','
767
768
769
            rset = cnx.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr)
                                              for attr in query_attrs),
                               query_attrs)
770
771
            return rset.rows

Adrien Di Mascio's avatar
Adrien Di Mascio committed
772
    # session handling ########################################################
773

774
    @contextmanager
775
    def internal_cnx(self):
776
777
        """Context manager returning a Connection using internal user which have
        every access rights on the repository.
778

779
        Internal connections have all hooks beside security enabled.
780
        """
781
        with Connection(self, InternalManager()) as cnx:
782
            cnx.user._cw = cnx  # XXX remove when "vreg = user._cw.vreg" hack in entity.py is gone
783
784
            with cnx.security_enabled(read=False, write=False):
                yield cnx
785

Adrien Di Mascio's avatar
Adrien Di Mascio committed
786
    # data sources handling ###################################################
787
    # * correspondance between eid and type
Adrien Di Mascio's avatar
Adrien Di Mascio committed
788
    # * correspondance between eid and local id (i.e. specific to a given source)
789

790
791
792
793
794
795
796
797
798
799
800
801
802
    def clear_caches(self, eids=None):
        if eids is None:
            self._type_cache = {}
            etypes = None
        else:
            etypes = []
            etcache = self._type_cache
            for eid in eids:
                try:
                    etype = etcache.pop(int(eid))  # may be a string in some cases
                except KeyError:
                    etype = None
                etypes.append(etype)
803
        self.querier.clear_caches(eids, etypes)
804
        self.system_source.clear_caches(eids, etypes)
805

806
807
    def type_from_eid(self, eid, cnx):
        """Return the type of the entity with id `eid`"""
808
809
810
811
812
        try:
            eid = int(eid)
        except ValueError:
            raise UnknownEid(eid)
        try:
813
            return self._type_cache[eid]
814
        except KeyError:
815
816
817
            etype = self.system_source.eid_type(cnx, eid)
            self._type_cache[eid] = etype
            return etype
818

819
    def add_info(self, cnx, entity, source):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
820
821
822
        """add type and source info for an eid into the system table,
        and index the entity with the full text index
        """
823
        # begin by inserting eid/type/source into the entities table
824
        hook.CleanupNewEidsCacheOp.get_instance(cnx).add_data(entity.eid)
825
        self.system_source.add_info(cnx, entity, source)
826

827
    def _delete_cascade_multi(self, cnx, entities):
828
829
        """same as _delete_cascade but accepts a list of entities with
        the same etype and belonging to the same source.
830
        """
831
        pendingrtypes = cnx.transaction_data.get('pendingrtypes', ())
832
833
        # delete remaining relations: if user can delete the entity, he can
        # delete all its relations without security checking
834
        with cnx.security_enabled(read=False, write=False):
Sylvain Thénault's avatar
cleanup    
Sylvain Thénault committed
835
            in_eids = ','.join([str(_e.eid) for _e in entities])
836
837
838
            with cnx.running_hooks_ops():
                for rschema, _, role in entities[0].e_schema.relation_definitions():
                    if rschema.rule:
839
                        continue  # computed relation
840
841
842
843
844
845
846
847
848
                    rtype = rschema.type
                    if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
                        continue
                    if role == 'subject':
                        # don't skip inlined relation so they are regularly
                        # deleted and so hooks are correctly called
                        rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids)
                    else:
                        rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids)
849
                    cnx.execute(rql, build_descr=False)
850

851
    def init_entity_caches(self, cnx, entity, source):
852
        """Add entity to connection entities cache and repo's cache."""
853
        cnx.set_entity_cache(entity)
854
        self._type_cache[entity.eid] = entity.cw_etype
855

856
    def glob_add_entity(self, cnx, edited):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
857
        """add an entity to the repository
858

Julien Cristau's avatar
Julien Cristau committed
859
        the entity eid should originally be None and a unique eid is assigned to
Adrien Di Mascio's avatar
Adrien Di Mascio committed
860
861
        the entity instance
        """
862
        entity = edited.entity
863
        entity._cw_is_saved = False  # entity has an eid but is not yet saved
864
865
        # init edited_attributes before calling before_add_entity hooks
        entity.cw_edited = edited
866
        source = self.system_source
867
        # allocate an eid to the entity before calling hooks
868
        entity.eid = self.system_source.create_eid(cnx)
869
        # set caches asap
870
        self.init_entity_caches(cnx, entity, source)
871
        if server.DEBUG & server.DBG_REPO:
Samuel Trégouët's avatar
Samuel Trégouët committed
872
            print('ADD entity', self, entity.cw_etype, entity.eid, edited)
873
        prefill_entity_caches(entity)
874
875
        self.hm.call_hooks('before_add_entity', cnx, entity=entity)
        relations = preprocess_inlined_relations(cnx, entity)
876
        edited.set_defaults()
877
        if cnx.is_hook_category_activated('integrity'):
878
            edited.check(creation=True)
879
        self.add_info(cnx, entity, source)
880
        try:
881
            source.add_entity(cnx, entity)
882
        except (UniqueTogetherError, ViolatedConstraint) as exc:
883
884
            userhdlr = cnx.vreg['adapters'].select(
                'IUserFriendlyError', cnx, entity=entity, exc=exc)
885
            userhdlr.raise_user_exception()
886
        edited.saved = entity._cw_is_saved = True
Adrien Di Mascio's avatar
Adrien Di Mascio committed
887
        # trigger after_add_entity after after_add_relation
888
        self.hm.call_hooks('after_add_entity', cnx, entity=entity)
889
890
        # call hooks for inlined relations
        for attr, value in relations:
891
            self.hm.call_hooks('before_add_relation', cnx,
892
                               eidfrom=entity.eid, rtype=attr, eidto=value)
893
            self.hm.call_hooks('after_add_relation', cnx,
894
                               eidfrom=entity.eid, rtype=attr, eidto=value)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
895
        return entity.eid
896

897
    def glob_update_entity(self, cnx, edited):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
898
899
900
        """replace an entity in the repository
        the type and the eid of an entity must not be changed
        """
901
        entity = edited.entity
902
        if server.DEBUG & server.DBG_REPO:
Samuel Trégouët's avatar
Samuel Trégouët committed
903
904
            print('UPDATE entity', entity.cw_etype, entity.eid,
                  entity.cw_attr_cache, edited)
905
        hm = self.hm
Adrien Di Mascio's avatar
Adrien Di Mascio committed
906
        eschema = entity.e_schema
907
        cnx.set_entity_cache(entity)
908
909