Commit de7868a9 authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

a bit of pep8

parent 0939ad2edf63
......@@ -18,14 +18,11 @@
"""this module contains base classes and utilities for cubicweb tests"""
from __future__ import print_function
__docformat__ = "restructuredtext en"
import sys
import re
from os.path import dirname, join, abspath
from math import log
from contextlib import contextmanager
from warnings import warn
from itertools import chain
from six import text_type, string_types
......@@ -43,7 +40,7 @@ from logilab.common.deprecation import deprecated, class_deprecated
from logilab.common.shellutils import getlogin
from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError,
ProgrammingError, BadConnectionId)
BadConnectionId)
from cubicweb import cwconfig, devtools, web, server, repoapi
from cubicweb.utils import json
from cubicweb.sobjects import notification
......@@ -52,7 +49,7 @@ from cubicweb.server.hook import SendMailOp
from cubicweb.server.session import Session
from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID
from cubicweb.utils import json
# low-level utilities ##########################################################
......@@ -67,6 +64,7 @@ class CubicWebDebugger(Debugger):
toto.write(data)
webbrowser.open('file:///tmp/toto.html')
def line_context_filter(line_no, center, before=3, after=None):
"""return true if line are in context
......@@ -76,6 +74,7 @@ def line_context_filter(line_no, center, before=3, after=None):
after = before
return center - before <= line_no <= center + after
def unprotected_entities(schema, strict=False):
"""returned a set of each non final entity type, excluding "system" entities
(eg CWGroup, CWUser...)
......@@ -86,10 +85,12 @@ def unprotected_entities(schema, strict=False):
protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
return set(schema.entities()) - protected_entities
class JsonValidator(object):
def parse_string(self, data):
return json.loads(data.decode('ascii'))
@contextmanager
def real_error_handling(app):
"""By default, CubicWebTC `app` attribute (ie the publisher) is monkey
......@@ -112,10 +113,12 @@ def real_error_handling(app):
# restore
app.error_handler = fake_error_handler
# email handling, to test emails sent by an application ########################
MAILBOX = []
class Email(object):
"""you'll get instances of Email into MAILBOX during tests that trigger
some notification.
......@@ -146,13 +149,17 @@ class Email(object):
return '<Email to %s with subject %s>' % (','.join(self.recipients),
self.message.get('Subject'))
# the trick to get email into MAILBOX instead of actually sent: monkey patch
# cwconfig.SMTP object
class MockSMTP:
def __init__(self, server, port):
pass
def close(self):
pass
def sendmail(self, fromaddr, recipients, msg):
MAILBOX.append(Email(fromaddr, recipients, msg))
......@@ -246,7 +253,6 @@ class RepoAccess(object):
cnx.commit()
# base class for cubicweb tests requiring a full cw environments ###############
class CubicWebTC(TestCase):
......@@ -296,14 +302,14 @@ class CubicWebTC(TestCase):
try:
self._open_access.pop().close()
except BadConnectionId:
continue # already closed
continue # already closed
@property
def session(self):
"""return admin session"""
return self._admin_session
#XXX this doesn't need to a be classmethod anymore
# XXX this doesn't need to a be classmethod anymore
def _init_repo(self):
"""init the repository and connection to it.
"""
......@@ -317,7 +323,6 @@ class CubicWebTC(TestCase):
self.admin_access = self.new_access(login)
self._admin_session = self.admin_access._session
# config management ########################################################
@classproperty
......@@ -338,7 +343,7 @@ class CubicWebTC(TestCase):
config.mode = 'test'
return config
@classmethod # XXX could be turned into a regular method
@classmethod # XXX could be turned into a regular method
def init_config(cls, config):
"""configuration initialization hooks.
......@@ -354,13 +359,13 @@ class CubicWebTC(TestCase):
cls.admlogin = text_type(admincfg['login'])
cls.admpassword = admincfg['password']
# uncomment the line below if you want rql queries to be logged
#config.global_set_option('query-log-file',
# '/tmp/test_rql_log.' + `os.getpid()`)
# config.global_set_option('query-log-file',
# '/tmp/test_rql_log.' + `os.getpid()`)
config.global_set_option('log-file', None)
# set default-dest-addrs to a dumb email address to avoid mailbox or
# mail queue pollution
config.global_set_option('default-dest-addrs', ['whatever'])
send_to = '%s@logilab.fr' % getlogin()
send_to = '%s@logilab.fr' % getlogin()
config.global_set_option('sender-addr', send_to)
config.global_set_option('default-dest-addrs', send_to)
config.global_set_option('sender-name', 'cubicweb-test')
......@@ -370,14 +375,13 @@ class CubicWebTC(TestCase):
# web resources
try:
config.global_set_option('embed-allowed', re.compile('.*'))
except Exception: # not in server only configuration
except Exception: # not in server only configuration
pass
@property
def vreg(self):
return self.repo.vreg
# global resources accessors ###############################################
@property
......@@ -411,7 +415,7 @@ class CubicWebTC(TestCase):
self.addCleanup(self._close_access)
self.config.set_anonymous_allowed(self.anonymous_allowed)
self.setup_database()
MAILBOX[:] = [] # reset mailbox
MAILBOX[:] = [] # reset mailbox
def tearDown(self):
# XXX hack until logilab.common.testlib is fixed
......@@ -427,8 +431,10 @@ class CubicWebTC(TestCase):
# monkey patch send mail operation so emails are sent synchronously
_old_mail_postcommit_event = SendMailOp.postcommit_event
SendMailOp.postcommit_event = SendMailOp.sendmails
def reverse_SendMailOp_monkey_patch():
SendMailOp.postcommit_event = _old_mail_postcommit_event
self.addCleanup(reverse_SendMailOp_monkey_patch)
def setup_database(self):
......@@ -452,7 +458,7 @@ class CubicWebTC(TestCase):
else:
return req.user
@iclassmethod # XXX turn into a class method
@iclassmethod # XXX turn into a class method
def create_user(self, req, login=None, groups=('users',), password=None,
email=None, commit=True, **kwargs):
"""create and return a new user entity"""
......@@ -471,12 +477,11 @@ class CubicWebTC(TestCase):
user.cw_clear_relation_cache('in_group', 'subject')
if commit:
try:
req.commit() # req is a session
req.commit() # req is a session
except AttributeError:
req.cnx.commit()
return user
# other utilities #########################################################
@contextmanager
......@@ -555,7 +560,6 @@ class CubicWebTC(TestCase):
self.assertListEqual(sorted(tr.name for tr in transitions),
sorted(expected))
# views and actions registries inspection ##################################
def pviews(self, req, rset):
......@@ -591,6 +595,7 @@ class CubicWebTC(TestCase):
@property
def items(self):
return self
class fake_box(object):
def action_link(self, action, **kwargs):
return (action.title, action.url())
......@@ -617,7 +622,7 @@ class CubicWebTC(TestCase):
try:
view = viewsvreg._select_best(views, req, rset=rset)
if view is None:
raise NoSelectableObject((req,), {'rset':rset}, views)
raise NoSelectableObject((req,), {'rset': rset}, views)
if view.linkable():
yield view
else:
......@@ -648,7 +653,6 @@ class CubicWebTC(TestCase):
else:
not_selected(self.vreg, view)
# web ui testing utilities #################################################
@property
......@@ -656,8 +660,10 @@ class CubicWebTC(TestCase):
def app(self):
"""return a cubicweb publisher"""
publisher = application.CubicWebPublisher(self.repo, self.config)
def raise_error_handler(*args, **kwargs):
raise
publisher.error_handler = raise_error_handler
return publisher
......@@ -709,7 +715,7 @@ class CubicWebTC(TestCase):
for fields that are not tied to the given entity
"""
assert field_dict or entity_field_dicts, \
'field_dict and entity_field_dicts arguments must not be both unspecified'
'field_dict and entity_field_dicts arguments must not be both unspecified'
if field_dict is None:
field_dict = {}
form = {'__form_id': formid}
......@@ -717,9 +723,11 @@ class CubicWebTC(TestCase):
for field, value in field_dict.items():
fields.append(field)
form[field] = value
def _add_entity_field(entity, field, value):
entity_fields.append(field)
form[eid_param(field, entity.eid)] = value
for entity, field_dict in entity_field_dicts:
if '__maineid' not in form:
form['__maineid'] = entity.eid
......@@ -742,7 +750,7 @@ class CubicWebTC(TestCase):
"""
req = self.request(url=url)
if isinstance(url, unicode):
url = url.encode(req.encoding) # req.setup_params() expects encoded strings
url = url.encode(req.encoding) # req.setup_params() expects encoded strings
querystring = urlparse(url)[-2]
params = parse_qs(querystring)
req.setup_params(params)
......@@ -756,7 +764,7 @@ class CubicWebTC(TestCase):
"""
with self.admin_access.web_request(url=url) as req:
if isinstance(url, unicode):
url = url.encode(req.encoding) # req.setup_params() expects encoded strings
url = url.encode(req.encoding) # req.setup_params() expects encoded strings
querystring = urlparse(url)[-2]
params = parse_qs(querystring)
req.setup_params(params)
......@@ -799,7 +807,7 @@ class CubicWebTC(TestCase):
else:
cleanup = lambda p: (p[0], urlunquote(p[1]))
params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
if path.startswith(req.base_url()): # may be relative
if path.startswith(req.base_url()): # may be relative
path = path[len(req.base_url()):]
return path, params
......@@ -818,8 +826,8 @@ class CubicWebTC(TestCase):
"""call the publish method of the application publisher, expecting to
get a Redirect exception
"""
result = self.app_handle_request(req, path)
self.assertTrue(300 <= req.status_out <400, req.status_out)
self.app_handle_request(req, path)
self.assertTrue(300 <= req.status_out < 400, req.status_out)
location = req.get_response_header('location')
return self._parse_location(req, location)
......@@ -828,7 +836,6 @@ class CubicWebTC(TestCase):
def expect_redirect_publish(self, *args, **kwargs):
return self.expect_redirect_handle_request(*args, **kwargs)
def set_auth_mode(self, authmode, anonuser=None):
self.set_option('auth-mode', authmode)
self.set_option('anonymous-user', anonuser)
......@@ -877,8 +884,8 @@ class CubicWebTC(TestCase):
#
# do not set html validators here, we need HTMLValidator for html
# snippets
#'text/html': DTDValidator,
#'application/xhtml+xml': DTDValidator,
# 'text/html': DTDValidator,
# 'application/xhtml+xml': DTDValidator,
'application/xml': htmlparser.XMLValidator,
'text/xml': htmlparser.XMLValidator,
'application/json': JsonValidator,
......@@ -892,7 +899,6 @@ class CubicWebTC(TestCase):
vid_validators = dict((vid, htmlparser.VALMAP[valkey])
for vid, valkey in VIEW_VALIDATORS.items())
def view(self, vid, rset=None, req=None, template='main-template',
**kwargs):
"""This method tests the view `vid` on `rset` using `template`
......@@ -921,7 +927,7 @@ class CubicWebTC(TestCase):
else:
self.set_description("testing vid=%s defined in %s without rset" % (
vid, view.__module__))
if template is None: # raw view testing, no template
if template is None: # raw view testing, no template
viewfunc = view.render
else:
kwargs['view'] = view
......@@ -929,7 +935,6 @@ class CubicWebTC(TestCase):
rset=rset, **kwargs)
return self._test_view(viewfunc, view, template, kwargs)
def _test_view(self, viewfunc, view, template='main-template', kwargs={}):
"""this method does the actual call to the view
......@@ -989,11 +994,11 @@ class CubicWebTC(TestCase):
output = output.encode('utf-8')
validator = self.get_validator(view, output=output)
if validator is None:
return output # return raw output if no validator is defined
return output # return raw output if no validator is defined
if isinstance(validator, htmlparser.DTDValidator):
# XXX remove <canvas> used in progress widget, unknown in html dtd
output = re.sub('<canvas.*?></canvas>', '', output)
return self.assertWellFormed(validator, output.strip(), context= view.__regid__)
return self.assertWellFormed(validator, output.strip(), context=view.__regid__)
def assertWellFormed(self, validator, content, context=None):
try:
......@@ -1069,6 +1074,7 @@ from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
# XXX cleanup unprotected_entities & all mess
def how_many_dict(schema, cnx, how_many, skip):
"""given a schema, compute how many entities by type we need to be able to
satisfy relations cardinality.
......@@ -1097,7 +1103,7 @@ def how_many_dict(schema, cnx, how_many, skip):
# reverse subj and obj in the above explanation
relmap.setdefault((rschema, obj), []).append(str(subj))
unprotected = unprotected_entities(schema)
for etype in skip: # XXX (syt) duh? explain or kill
for etype in skip: # XXX (syt) duh? explain or kill
unprotected.add(etype)
howmanydict = {}
# step 1, compute a base number of each entity types: number of already
......@@ -1143,7 +1149,6 @@ class AutoPopulateTest(CubicWebTC):
def post_populate(self, cnx):
pass
@nocoverage
def auto_populate(self, how_many):
"""this method populates the database with `how_many` entities
......@@ -1183,7 +1188,7 @@ class AutoPopulateTest(CubicWebTC):
except ValidationError as ex:
# failed to satisfy some constraint
print('error in automatic db population', ex)
cnx.commit_state = None # reset uncommitable flag
cnx.commit_state = None # reset uncommitable flag
self.post_populate(cnx)
def iter_individual_rsets(self, etypes=None, limit=None):
......@@ -1218,7 +1223,8 @@ class AutoPopulateTest(CubicWebTC):
# test a mixed query (DISTINCT/GROUP to avoid getting duplicate
# X which make muledit view failing for instance (html validation fails
# because of some duplicate "id" attributes)
yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % (etype1, etype2))
yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' %
(etype1, etype2))
# test some application-specific queries if defined
for rql in self.application_rql:
yield req.execute(rql)
......@@ -1241,7 +1247,8 @@ class AutoPopulateTest(CubicWebTC):
# resultset's syntax tree
rset = backup_rset
for action in self.list_actions_for(rset):
yield InnerTest(self._testname(rset, action.__regid__, 'action'), self._test_action, action)
yield InnerTest(self._testname(rset, action.__regid__, 'action'),
self._test_action, action)
for box in self.list_boxes_for(rset):
w = [].append
yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w)
......@@ -1269,21 +1276,18 @@ class AutomaticWebTest(AutoPopulateTest):
# machinery (else some views may fail)
self.app
## one each
def test_one_each_config(self):
self.auto_populate(1)
for rset in self.iter_automatic_rsets(limit=1):
for testargs in self._test_everything_for(rset):
yield testargs
## ten each
def test_ten_each_config(self):
self.auto_populate(10)
for rset in self.iter_automatic_rsets(limit=10):
for testargs in self._test_everything_for(rset):
yield testargs
## startup views
def test_startup_views(self):
for vid in self.list_startup_views():
with self.admin_access.web_request() as req:
......
# copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2010-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
......@@ -18,19 +18,15 @@
"""some basic entity adapter implementations, for interfaces used in the
framework itself.
"""
__docformat__ = "restructuredtext en"
from cubicweb import _
from itertools import chain
from warnings import warn
from hashlib import md5
from logilab.mtconverter import TransformError
from logilab.common.decorators import cached
from cubicweb import ValidationError, view, ViolatedConstraint
from cubicweb.schema import CONSTRAINTS
from cubicweb import ValidationError, view, ViolatedConstraint, UniqueTogetherError
from cubicweb.predicates import is_instance, relation_possible, match_exception
......@@ -63,8 +59,8 @@ class IEmailableAdapter(view.EntityAdapter):
NOTE: the dictionary keys should match the list returned by the
`allowed_massmail_keys` method.
"""
return dict( (attr, getattr(self.entity, attr))
for attr in self.allowed_massmail_keys() )
return dict((attr, getattr(self.entity, attr))
for attr in self.allowed_massmail_keys())
class INotifiableAdapter(view.EntityAdapter):
......@@ -156,24 +152,27 @@ class IFTIndexableAdapter(view.EntityAdapter):
if role == 'subject':
for entity_ in getattr(entity, rschema.type):
merge_weight_dict(words, entity_.cw_adapt_to('IFTIndexable').get_words())
else: # if role == 'object':
else: # if role == 'object':
for entity_ in getattr(entity, 'reverse_%s' % rschema.type):
merge_weight_dict(words, entity_.cw_adapt_to('IFTIndexable').get_words())
return words
def merge_weight_dict(maindict, newdict):
for weight, words in newdict.items():
maindict.setdefault(weight, []).extend(words)
class IDownloadableAdapter(view.EntityAdapter):
"""interface for downloadable entities"""
__regid__ = 'IDownloadable'
__abstract__ = True
def download_url(self, **kwargs): # XXX not really part of this interface
def download_url(self, **kwargs): # XXX not really part of this interface
"""return a URL to download entity's content
It should be a unicode object containing url-encoded ASCII."""
It should be a unicode object containing url-encoded ASCII.
"""
raise NotImplementedError
def download_content_type(self):
......@@ -192,6 +191,7 @@ class IDownloadableAdapter(view.EntityAdapter):
"""return actual data (bytes) of the downloadable content"""
raise NotImplementedError
# XXX should propose to use two different relations for children/parent
class ITreeAdapter(view.EntityAdapter):
"""This adapter provides a tree interface.
......@@ -339,7 +339,7 @@ class ITreeAdapter(view.EntityAdapter):
try:
# check we are not jumping to another tree
if (adapter.tree_relation != self.tree_relation or
adapter.child_role != self.child_role):
adapter.child_role != self.child_role):
break
entity = adapter.parent()
adapter = entity.cw_adapt_to('ITree')
......@@ -377,7 +377,6 @@ class ISerializableAdapter(view.EntityAdapter):
# error handling adapters ######################################################
from cubicweb import UniqueTogetherError
class IUserFriendlyError(view.EntityAdapter):
__regid__ = 'IUserFriendlyError'
......@@ -408,13 +407,14 @@ class IUserFriendlyCheckConstraint(IUserFriendlyError):
__select__ = match_exception(ViolatedConstraint)
def raise_user_exception(self):
_ = self._cw._
cstrname = self.exc.cstrname
eschema = self.entity.e_schema
for rschema, attrschema in eschema.attribute_definitions():
rdef = rschema.rdef(eschema, attrschema)
for constraint in rdef.constraints:
if cstrname == 'cstr' + md5((eschema.type + rschema.type + constraint.type() + (constraint.serialize() or '')).encode('ascii')).hexdigest():
if cstrname == 'cstr' + md5(
(eschema.type + rschema.type + constraint.type() +
(constraint.serialize() or '')).encode('ascii')).hexdigest():
break
else:
continue
......
# -*- coding: utf-8 -*-
# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
......@@ -46,13 +46,13 @@ class MetadataTC(BaseEntityTC):
self.assertEqual(entity.dc_creator(), u'member')
def test_type(self):
#dc_type may be translated
# dc_type may be translated
with self.admin_access.client_cnx() as cnx:
member = cnx.entity_from_eid(self.membereid)
self.assertEqual(member.dc_type(), 'CWUser')
def test_cw_etype(self):
#cw_etype is never translated
# cw_etype is never translated
with self.admin_access.client_cnx() as cnx:
member = cnx.entity_from_eid(self.membereid)
self.assertEqual(member.cw_etype, 'CWUser')
......@@ -62,15 +62,17 @@ class MetadataTC(BaseEntityTC):
self.assertEqual(self.schema['CWUser'].meta_attributes(), {})
self.assertEqual(dict((str(k), v)
for k, v in self.schema['State'].meta_attributes().items()),
{'description_format': ('format', 'description')})
{'description_format': ('format', 'description')})
def test_fti_rql_method(self):
class EmailAddress(AnyEntity):
__regid__ = 'EmailAddress'
__select__ = AnyEntity.__select__ & yes(2)
@classmethod
def cw_fti_index_rql_queries(cls, req):
return ['EmailAddress Y']
with self.admin_access.web_request() as req:
req.create_entity('EmailAddress', address=u'foo@bar.com')
eclass = self.vreg['etypes'].etype_class('EmailAddress')
......@@ -107,14 +109,16 @@ class EmailAddressTC(BaseEntityTC):
self.assertEqual(email3.prefered.eid, email3.eid)
def test_mangling(self):
query = 'INSERT EmailAddress X: X address "maarten.ter.huurne@philips.com"'
with self.admin_access.repo_cnx() as cnx:
email = cnx.execute('INSERT EmailAddress X: X address "maarten.ter.huurne@philips.com"').get_entity(0, 0)
email = cnx.execute(query).get_entity(0, 0)
self.assertEqual(email.display_address(), 'maarten.ter.huurne@philips.com')
self.assertEqual(email.printable_value('address'), 'maarten.ter.huurne@philips.com')
self.vreg.config.global_set_option('mangle-emails', True)
try:
self.assertEqual(email.display_address(), 'maarten.ter.huurne at philips dot com')
self.assertEqual(email.printable_value('address'), 'maarten.ter.huurne at philips dot com')
self.assertEqual(email.printable_value('address'),
'maarten.ter.huurne at philips dot com')
email = cnx.execute('INSERT EmailAddress X: X address "syt"').get_entity(0, 0)
self.assertEqual(email.display_address(), 'syt')
self.assertEqual(email.printable_value('address'), 'syt')
......@@ -130,6 +134,7 @@ class EmailAddressTC(BaseEntityTC):
self.assertEqual(email.printable_value('address', format='text/plain'),
'maarten&ter@philips.com')
class CWUserTC(BaseEntityTC):
def test_complete(self):
......@@ -167,10 +172,11 @@ class CWUserTC(BaseEntityTC):
with self.admin_access.repo_cnx() as cnx:
e = cnx.execute('CWUser U WHERE U login "member"').get_entity(0, 0)
# Bytes/Password attributes should be omitted
self.assertEqual(e.cw_adapt_to('IEmailable').allowed_massmail_keys(),
set(('surname', 'firstname', 'login', 'last_login_time',
'creation_date', 'modification_date', 'cwuri', 'eid'))
)
self.assertEqual(
e.cw_adapt_to('IEmailable').allowed_massmail_keys(),
set(('surname', 'firstname', 'login', 'last_login_time',
'creation_date', 'modification_date', 'cwuri', 'eid'))
)
def test_cw_instantiate_object_relation(self):
""" a weird non regression test """
......@@ -213,7 +219,7 @@ class SpecializedEntityClassesTC(CubicWebTC):
# no specific class for Subdivisions, the default one should be selected
eclass = self.select_eclass('SubDivision')
self.assertTrue(eclass.__autogenerated__)
#self.assertEqual(eclass.__bases__, (AnyEntity,))
# self.assertEqual(eclass.__bases__, (AnyEntity,))
# build class from most generic to most specific and make
# sure the most specific is always selected
self.vreg._loadedmods[__name__] = {}
......
......@@ -31,7 +31,8 @@ class JsonViewsTC(CubicWebTC):
def test_json_rsetexport(self):
with self.admin_access.web_request() as req:
rset = req.execute('Any GN,COUNT(X) GROUPBY GN ORDERBY GN WHERE X in_group G, G name GN')
rset = req.execute(