Commit 2726e95a authored by Julien Cristau's avatar Julien Cristau
Browse files

Record a log of datafeed source imports (closes #2026097)

Formatting, css and js stolen from narval.
parent af3fb709c061
......@@ -20,12 +20,14 @@
__docformat__ = "restructuredtext en"
import logging
from random import randint, choice
from copy import deepcopy
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from logilab.common import attrdict
from logilab.mtconverter import xml_escape
from yams.constraints import (SizeConstraint, StaticVocabularyConstraint,
IntervalBoundConstraint, BoundaryConstraint,
Attribute, actual_value)
......@@ -238,6 +240,14 @@ title
# raise exception
return u'text/plain'
def generate_CWDataImport_log(self, entity, index, **kwargs):
# content_format attribute of EmailPart has no vocabulary constraint, we
# need this method else stupid values will be set which make mtconverter
# raise exception
logs = [u'%s\t%s\t%s\t%s<br/>' % (logging.ERROR, 'http://url.com?arg1=hop&arg2=hip',
1, xml_escape('hjoio&oio"'))]
return u'<br/>'.join(logs)
class autoextend(type):
def __new__(mcs, name, bases, classdict):
......
......@@ -21,9 +21,11 @@ __docformat__ = "restructuredtext en"
import re
from socket import gethostname
import logging
from logilab.common.textutils import text_to_dict
from logilab.common.configuration import OptionError
from logilab.mtconverter import xml_escape
from cubicweb import ValidationError
from cubicweb.entities import AnyEntity, fetch_config
......@@ -131,3 +133,52 @@ class CWSourceSchemaConfig(AnyEntity):
@property
def cwsource(self):
return self.cw_for_source[0]
class CWDataImport(AnyEntity):
__regid__ = 'CWDataImport'
def __init__(self, *args, **kwargs):
super(CWDataImport, self).__init__(*args, **kwargs)
self._logs = []
def dc_title(self):
return '%s [%s]' % (self.printable_value('start_timestamp'),
self.printable_value('status'))
@property
def cwsource(self):
return self.cw_import_of[0]
def record_debug(self, msg, path=None, line=None):
self._log(logging.DEBUG, msg, path, line)
self.debug(msg)
def record_info(self, msg, path=None, line=None):
self._log(logging.INFO, msg, path, line)
self.info(msg)
def record_warning(self, msg, path=None, line=None):
self._log(logging.WARNING, msg, path, line)
self.warning(msg)
def record_error(self, msg, path=None, line=None):
self._status = u'failed'
self._log(logging.ERROR, msg, path, line)
self.error(msg)
def record_fatal(self, msg, path=None, line=None):
self._status = u'failed'
self._log(logging.FATAL, msg, path, line)
self.fatal(msg)
def _log(self, severity, msg, path=None, line=None):
encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, path or u'',
line or u'', xml_escape(msg))
self._logs.append(encodedmsg)
def write_log(self, session, **kwargs):
if 'status' not in kwargs:
kwargs['status'] = getattr(self, '_status', u'success')
self.set_attributes(log=u'<br/>'.join(self._logs), **kwargs)
self._logs = []
......@@ -835,7 +835,7 @@ class Entity(AppObject):
var = varmaker.next()
rql.append('%s %s %s' % (V, attr, var))
selected.append((attr, var))
# +1 since this doen't include the main variable
# +1 since this doesn't include the main variable
lastattr = len(selected) + 1
# don't fetch extra relation if attributes specified or of the entity is
# coming from an external source (may lead to error)
......
......@@ -59,13 +59,23 @@ class ServerStartupHook(hook.Hook):
continue
session = repo.internal_session(safe=True)
try:
stats = source.pull_data(session)
if stats.get('created'):
source.info('added %s entities', len(stats['created']))
if stats.get('updated'):
source.info('updated %s entities', len(stats['updated']))
source.pull_data(session)
except Exception, exc:
session.exception('while trying to update feed %s', source)
finally:
session.close()
self.repo.looping_task(60, update_feeds, self.repo)
def expire_dataimports(repo=self.repo):
for source in repo.sources_by_eid.itervalues():
if (not source.copy_based_source
or not repo.config.source_enabled(source)):
continue
session = repo.internal_session()
try:
mindate = datetime.now() - timedelta(seconds=source.config['logs-lifetime'])
session.execute('DELETE CWDataImport X WHERE X start_timestamp < %(time)s', {'time': mindate})
session.commit()
finally:
session.close()
self.repo.looping_task(60*60*24, expire_dataimports, self.repo)
config['rql-cache-size'] = config['rql-cache-size'] * 10
add_entity_type('CWDataImport')
......@@ -305,6 +305,24 @@ class cw_source(RelationDefinition):
cardinality = '1*'
composite = 'object'
class CWDataImport(EntityType):
__permissions__ = ENTITY_MANAGERS_PERMISSIONS
start_timestamp = TZDatetime()
end_timestamp = TZDatetime()
log = String()
status = String(required=True, internationalizable=True, indexed=True,
default='in progress',
vocabulary=[_('in progress'), _('success'), _('failed')])
class cw_import_of(RelationDefinition):
__permissions__ = RELATION_MANAGERS_PERMISSIONS
subject = 'CWDataImport'
object = 'CWSource'
cardinality = '1*'
composite = 'object'
class CWSourceSchemaConfig(EntityType):
__permissions__ = ENTITY_MANAGERS_PERMISSIONS
cw_for_source = SubjectRelation(
......
......@@ -27,6 +27,7 @@ from base64 import b64decode
from cookielib import CookieJar
from lxml import etree
from logilab.mtconverter import xml_escape
from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
from cubicweb.server.sources import AbstractSource
......@@ -71,7 +72,12 @@ class DataFeedSource(AbstractSource):
'external source be deleted?'),
'group': 'datafeed-source', 'level': 2,
}),
('logs-lifetime',
{'type': 'time',
'default': '10d',
'help': ('Time before logs from datafeed imports are deleted.'),
'group': 'datafeed-source', 'level': 2,
}),
)
def __init__(self, repo, source_config, eid=None):
AbstractSource.__init__(self, repo, source_config, eid)
......@@ -188,7 +194,8 @@ class DataFeedSource(AbstractSource):
myuris = self.source_cwuris(session)
else:
myuris = None
parser = self._get_parser(session, sourceuris=myuris)
importlog = self.init_import_log(session)
parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
if self.process_urls(parser, self.urls, raise_on_error):
self.warning("some error occured, don't attempt to delete entities")
elif self.config['delete-entities'] and myuris:
......@@ -200,7 +207,13 @@ class DataFeedSource(AbstractSource):
session.execute('DELETE %s X WHERE X eid IN (%s)'
% (etype, ','.join(eids)))
self.update_latest_retrieval(session)
return parser.stats
stats = parser.stats
if stats.get('created'):
importlog.record_info('added %s entities' % len(stats['created']))
if stats.get('updated'):
importlog.record_info('updated %s entities' % len(stats['updated']))
importlog.write_log(session, end_timestamp=self.latest_retrieval)
return stats
def process_urls(self, parser, urls, raise_on_error=False):
error = False
......@@ -255,14 +268,20 @@ class DataFeedSource(AbstractSource):
return dict((b64decode(uri), (eid, type))
for uri, eid, type in session.system_sql(sql))
def init_import_log(self, session, **kwargs):
dataimport = session.create_entity('CWDataImport', cw_import_of=self,
start_timestamp=datetime.utcnow(),
**kwargs)
return dataimport
class DataFeedParser(AppObject):
__registry__ = 'parsers'
def __init__(self, session, source, sourceuris=None, **kwargs):
def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
super(DataFeedParser, self).__init__(session, **kwargs)
self.source = source
self.sourceuris = sourceuris
self.import_log = import_log
self.stats = {'created': set(),
'updated': set()}
......
......@@ -64,7 +64,7 @@ class FakeDataFeedSource(FakeCardSource):
X_ALL_SOLS = sorted([{'X': 'Affaire'}, {'X': 'BaseTransition'}, {'X': 'Basket'},
{'X': 'Bookmark'}, {'X': 'CWAttribute'}, {'X': 'CWCache'},
{'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWEType'},
{'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWDataImport'}, {'X': 'CWEType'},
{'X': 'CWGroup'}, {'X': 'CWPermission'}, {'X': 'CWProperty'},
{'X': 'CWRType'}, {'X': 'CWRelation'},
{'X': 'CWSource'}, {'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'},
......@@ -907,6 +907,7 @@ class MSPlannerTC(BaseMSPlannerTC):
ALL_SOLS = X_ALL_SOLS[:]
ALL_SOLS.remove({'X': 'CWSourceHostConfig'}) # not authorized
ALL_SOLS.remove({'X': 'CWSourceSchemaConfig'}) # not authorized
ALL_SOLS.remove({'X': 'CWDataImport'}) # not authorized
self._test('Any MAX(X)',
[('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])],
[self.cards, self.system], None, {'E': 'table1.C0'}, []),
......@@ -957,7 +958,7 @@ class MSPlannerTC(BaseMSPlannerTC):
ueid = self.session.user.eid
X_ET_ALL_SOLS = []
for s in X_ALL_SOLS:
if s in ({'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}):
if s in ({'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}, {'X': 'CWDataImport'}):
continue # not authorized
ets = {'ET': 'CWEType'}
ets.update(s)
......@@ -990,7 +991,8 @@ class MSPlannerTC(BaseMSPlannerTC):
[{'X': 'BaseTransition', 'ET': 'CWEType'},
{'X': 'Bookmark', 'ET': 'CWEType'}, {'X': 'CWAttribute', 'ET': 'CWEType'},
{'X': 'CWCache', 'ET': 'CWEType'}, {'X': 'CWConstraint', 'ET': 'CWEType'},
{'X': 'CWConstraintType', 'ET': 'CWEType'}, {'X': 'CWEType', 'ET': 'CWEType'},
{'X': 'CWConstraintType', 'ET': 'CWEType'},
{'X': 'CWEType', 'ET': 'CWEType'},
{'X': 'CWGroup', 'ET': 'CWEType'}, {'X': 'CWPermission', 'ET': 'CWEType'},
{'X': 'CWProperty', 'ET': 'CWEType'}, {'X': 'CWRType', 'ET': 'CWEType'},
{'X': 'CWSource', 'ET': 'CWEType'},
......@@ -2661,7 +2663,7 @@ class MSPlannerTwoSameExternalSourcesTC(BasePlannerTC):
None, {'X': 'table0.C0'}, []),
('UnionStep', None, None,
[('OneFetchStep',
[(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Affaire, BaseTransition, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWSourceHostConfig, CWSourceSchemaConfig, CWUniqueTogetherConstraint, CWUser, Division, Email, EmailAddress, EmailPart, EmailThread, ExternalUri, File, Folder, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
[(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Affaire, BaseTransition, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWDataImport, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWSourceHostConfig, CWSourceSchemaConfig, CWUniqueTogetherConstraint, CWUser, Division, Email, EmailAddress, EmailPart, EmailThread, ExternalUri, File, Folder, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
[{'U': 'CWUser', 'X': 'Affaire'},
{'U': 'CWUser', 'X': 'BaseTransition'},
{'U': 'CWUser', 'X': 'Basket'},
......@@ -2670,6 +2672,7 @@ class MSPlannerTwoSameExternalSourcesTC(BasePlannerTC):
{'U': 'CWUser', 'X': 'CWCache'},
{'U': 'CWUser', 'X': 'CWConstraint'},
{'U': 'CWUser', 'X': 'CWConstraintType'},
{'U': 'CWUser', 'X': 'CWDataImport'},
{'U': 'CWUser', 'X': 'CWEType'},
{'U': 'CWUser', 'X': 'CWGroup'},
{'U': 'CWUser', 'X': 'CWPermission'},
......
......@@ -233,13 +233,13 @@ class CWEntityXMLParser(datafeed.DataFeedXMLParser):
try:
related_items = rels[role][rtype]
except KeyError:
self.source.error('relation %s-%s not found in xml export of %s',
rtype, role, etype)
self.import_log.record_error('relation %s-%s not found in xml export of %s'
% (rtype, role, etype))
continue
try:
linker = self.select_linker(action, rtype, role, entity)
except RegistryException:
self.source.error('no linker for action %s', action)
self.import_log.record_error('no linker for action %s' % action)
else:
linker.link_items(related_items, rules)
......@@ -430,15 +430,15 @@ class CWEntityXMLActionLink(CWEntityXMLActionCopy):
def issubset(x,y):
return all(z in y for z in x)
eids = [] # local eids
source = self.parser.source
log = self.parser.import_log
for item, rels in others:
if item['cwtype'] != ttype:
continue
if not issubset(searchattrs, item):
item, rels = self.parser.complete_item(item, rels)
if not issubset(searchattrs, item):
source.error('missing attribute, got %s expected keys %s',
item, searchattrs)
log.record_error('missing attribute, got %s expected keys %s'
% (item, searchattrs))
continue
# XXX str() needed with python < 2.6
kwargs = dict((str(attr), item[attr]) for attr in searchattrs)
......@@ -449,11 +449,11 @@ class CWEntityXMLActionLink(CWEntityXMLActionCopy):
entity = self._cw.create_entity(item['cwtype'], **kwargs)
else:
if len(targets) > 1:
source.error('ambiguous link: found %s entity %s with attributes %s',
len(targets), item['cwtype'], kwargs)
log.record_error('ambiguous link: found %s entity %s with attributes %s'
% (len(targets), item['cwtype'], kwargs))
else:
source.error('can not find %s entity with attributes %s',
item['cwtype'], kwargs)
log.record_error('can not find %s entity with attributes %s'
% (item['cwtype'], kwargs))
continue
eids.append(entity.eid)
self.parser.process_relations(entity, rels)
......
......@@ -161,8 +161,8 @@ class SchemaReaderClassTest(TestCase):
entities = sorted([str(e) for e in schema.entities()])
expected_entities = ['BaseTransition', 'BigInt', 'Bookmark', 'Boolean', 'Bytes', 'Card',
'Date', 'Datetime', 'Decimal',
'CWCache', 'CWConstraint', 'CWConstraintType', 'CWEType',
'CWAttribute', 'CWGroup', 'EmailAddress', 'CWRelation',
'CWCache', 'CWConstraint', 'CWConstraintType', 'CWDataImport',
'CWEType', 'CWAttribute', 'CWGroup', 'EmailAddress', 'CWRelation',
'CWPermission', 'CWProperty', 'CWRType',
'CWSource', 'CWSourceHostConfig', 'CWSourceSchemaConfig',
'CWUniqueTogetherConstraint', 'CWUser',
......@@ -183,12 +183,12 @@ class SchemaReaderClassTest(TestCase):
'constrained_by', 'constraint_of',
'content', 'content_format',
'created_by', 'creation_date', 'cstrtype', 'custom_workflow',
'cwuri', 'cw_for_source', 'cw_host_config_of', 'cw_schema', 'cw_source',
'cwuri', 'cw_for_source', 'cw_import_of', 'cw_host_config_of', 'cw_schema', 'cw_source',
'data', 'data_encoding', 'data_format', 'data_name', 'default_workflow', 'defaultval', 'delete_permission',
'description', 'description_format', 'destination_state',
'ecrit_par', 'eid', 'evaluee', 'expression', 'exprtype',
'ecrit_par', 'eid', 'end_timestamp', 'evaluee', 'expression', 'exprtype',
'fabrique_par', 'final', 'firstname', 'for_user', 'fournit',
'from_entity', 'from_state', 'fulltext_container', 'fulltextindexed',
......@@ -196,7 +196,7 @@ class SchemaReaderClassTest(TestCase):
'identity', 'in_group', 'in_state', 'in_synchronization', 'indexed',
'initial_state', 'inlined', 'internationalizable', 'is', 'is_instance_of',
'label', 'last_login_time', 'latest_retrieval', 'lieu', 'login',
'label', 'last_login_time', 'latest_retrieval', 'lieu', 'log', 'login',
'mainvars', 'match_host', 'modification_date',
......@@ -208,7 +208,7 @@ class SchemaReaderClassTest(TestCase):
'read_permission', 'relation_type', 'relations', 'require_group',
'specializes', 'state_of', 'subworkflow', 'subworkflow_exit', 'subworkflow_state', 'surname', 'symmetric', 'synopsis',
'specializes', 'start_timestamp', 'state_of', 'status', 'subworkflow', 'subworkflow_exit', 'subworkflow_state', 'surname', 'symmetric', 'synopsis',
'tags', 'timestamp', 'title', 'to_entity', 'to_state', 'transition_of', 'travaille', 'type',
......
/* sample css file for logs
*
* Copyright (c) 2003-2010 LOGILAB S.A. (Paris, FRANCE).
* http://www.logilab.fr/ -- mailto:contact@logilab.fr
*/
pre.rawtext {
overflow: auto;
max-width: 110em;
padding: 0 0 0 0;
}
table.listing td.logSeverity {
font-weight: bold;
padding-left: 0.5em;
padding-right: 1em;
}
table.listing pre{
color: black;
}
table.listing .logDebug a{
color : #444 ;
}
table.listing .logDebug td{
color : #444 ;
border-color: grey #AAA;
}
table.listing .logDebug pre{
background-color : transparent ;
border: none;
}
table.listing .logSeverity .internallink {
visibility: hidden;
color: #FF4500;
font-weight: bolder;
}
table.listing tr:hover .internallink {
visibility: visible;
}
table.listing .internallink:hover {
background-color: #FF4500;
color: White;
font-weight: bolder;
}
table.listing .logInfo a{
color : #240 ;
}
table.listing .logInfo td{
color : #240 ;
background-color : #DFD ;
border-color: grey #AFA;
}
table.listing .logInfo pre{
background-color : transparent ;
border: none;
}
table.listing .logWarning a{
color : #A42 ;
}
table.listing .logWarning td{
color : #A42 ;
background-color : #FFC ;
border-color: grey #FA6;
}
table.listing .logWarning pre{
background-color : transparent ;
border: none;
}
table.listing .logError a{
color : #A00 ;
}
table.listing .logError td{
color : #A00 ;
background-color : #FDD ;
border-color: grey #FAA;
}
table.listing .logError pre{
background-color : transparent ;
border: none;
}
table.listing .logFatal a{
color : #00A;
}
table.listing .logFatal td{
color : #00A;
background-color : #DDF ;
border-color: grey #AAF;
}
table.listing .logFatal pre{
background-color : transparent ;
border: none;
}
div.validPlan{
color: green;
text-align: center;
}
div.invalidPlan{
color: red;
text-align: center;
}
// This contains template-specific javascript
function filterLog(domid, thresholdLevel) {
var logLevels = ["Debug", "Info", "Warning", "Error", "Fatal"]
var action = "hide";
for (var idx = 0; idx < logLevels.length; idx++){
var level = logLevels[idx];
if (level === thresholdLevel){
action = "show";
}
$('#'+domid+' .log' + level)[action]();
}
}
......@@ -22,14 +22,21 @@ CWSourceHostConfig, CWSourceSchemaConfig).
__docformat__ = "restructuredtext en"
_ = unicode
import logging
from itertools import repeat, chain
from logilab.mtconverter import xml_escape
from logilab.common.decorators import cachedproperty
from cubicweb import Unauthorized
from cubicweb.selectors import is_instance, score_entity, match_user_groups
from cubicweb import Unauthorized, tags
from cubicweb.utils import make_uid
from cubicweb.selectors import (is_instance, score_entity, has_related_entities,
match_user_groups, match_kwargs, match_view)
from cubicweb.view import EntityView, StartupView
from cubicweb.schema import META_RTYPES, VIRTUAL_RTYPES, display_name
from cubicweb.web import uicfg, formwidgets as wdgs
from cubicweb.web.views import tabs, actions, ibreadcrumbs, tableview, add_etype_button
from cubicweb.web import uicfg, formwidgets as wdgs, facet
from cubicweb.web.views import add_etype_button
from cubicweb.web.views import (tabs, actions, ibreadcrumbs, navigation,
tableview, pyviews)
_abaa = uicfg.actionbox_appearsin_addmenu
......@@ -37,6 +44,7 @@ _abaa = uicfg.actionbox_appearsin_addmenu
_abaa.tag_object_of(('CWSourceSchemaConfig', 'cw_schema', '*'), False)
_abaa.tag_object_of(('CWSourceSchemaConfig', 'cw_for_source', '*'), False)
_abaa.tag_object_of(('CWSourceSchemaConfig', 'cw_host_config_of', '*'), False)
_abaa.tag_object_of(('CWDataImport', 'cw_import_of', '*'), False)
_afs = uicfg.autoform_section
_afs.tag_object_of(('*', 'cw_for_source', 'CWSource'), 'main', 'hidden')
......@@ -62,13 +70,13 @@ _rc.tag_attribute(('CWSourceSchemaConfig', 'options'), {'rvid': 'verbatimattr'})
class CWSourcePrimaryView(tabs.TabbedPrimaryView):
__select__ = is_instance('CWSource')
tabs = [_('cwsource-main'), _('cwsource-mapping')]
tabs = [_('cwsource-main'), _('cwsource-mapping'), _('cwsource-imports')]
default_tab = 'cwsource-main'
class CWSourceMainTab(tabs.PrimaryTab):
__regid__ = 'cwsource-main'
__select__ = tabs.PrimaryTab.__select__ & is_instance('CWSource')
__select__ = is_instance('CWSource')
def render_entity_attributes(self, entity):
super(CWSourceMainTab, self).render_entity_attributes(entity)
......@@ -93,7 +101,7 @@ MAPPED_SOURCE_TYPES = set( ('pyrorql', 'datafeed') )
class CWSourceMappingTab(EntityView):
__regid__ = 'cwsource-mapping'
__select__ = (tabs.PrimaryTab.__select__ & is_instance('CWSource')
__select__ = (is_instance('CWSource')
& match_user_groups('managers')
& score_entity(lambda x:x.type in MAPPED_SOURCE_TYPES))
......@@ -248,6 +256,30 @@ MAPPING_CHECKERS = {
'pyrorql': PyroRQLMappingChecker,
}
class CWSourceImportsTab(EntityView):
__regid__ = 'cwsource-imports'
__select__ = (is_instance('CWSource')
& has_related_entities('cw_import_of', 'object'))
def entity_call(self, entity):
rset = self._cw.execute('Any X, XST, XET, XS ORDERBY XST DESC WHERE '
'X cw_import_of S, S eid %(s)s, X status XS, '
'X start_timestamp XST, X end_timestamp XET',
{'s': entity.eid})
self._cw.view('cw.imports-table', rset, w=self.w)
class CWImportsTable(tableview.EntityTableView):
__regid__ = 'cw.imports-table'
__select__ = is_instance('CWDataImport')
columns = ['import', 'start_timestamp', 'end_timestamp']
column_renderers = {'import': tableview.MainEntityColRenderer()}
layout_args = {'display_filter': 'top'}
# sources management view ######################################################
class ManageSourcesAction(actions.ManagersAction):
......@@ -272,12 +304,221 @@ class CWSourcesManagementView(StartupView):
class CWSourcesTable(tableview.EntityTableView):
__regid__ = 'cw.sources-table'
__select__ = is_instance('CWSource')
columns = ['source', 'type', 'parser', 'latest_retrieval']