Commit e2a691da authored by sylvain.thenault@logilab.fr's avatar sylvain.thenault@logilab.fr
Browse files

fix test

--HG--
branch : tls-sprint
parent eccd1885d42e
comment,folder,tag,basket,email,file
card,comment,folder,tag,basket,email,file
......@@ -4,23 +4,23 @@
from logilab.common.testlib import unittest_main
from cubicweb.devtools.apptest import RepositoryBasedTC
from cubicweb.server.pool import LateOperation
from cubicweb.server.pool import LateOperation, Operation, SingleLastOperation
from cubicweb.server.hookhelper import *
class HookHelpersTC(RepositoryBasedTC):
def setUp(self):
RepositoryBasedTC.setUp(self)
self.hm = self.repo.hm
def test_late_operation(self):
session = self.session
l1 = LateOperation(session)
l2 = LateOperation(session)
l3 = Operation(session)
self.assertEquals(session.pending_operations, [l3, l1, l2])
def test_single_last_operation(self):
session = self.session
l0 = SingleLastOperation(session)
......@@ -30,7 +30,7 @@ class HookHelpersTC(RepositoryBasedTC):
self.assertEquals(session.pending_operations, [l3, l1, l2, l0])
l4 = SingleLastOperation(session)
self.assertEquals(session.pending_operations, [l3, l1, l2, l4])
def test_global_operation_order(self):
from cubicweb.server import hooks, schemahooks
session = self.session
......@@ -42,8 +42,8 @@ class HookHelpersTC(RepositoryBasedTC):
op4 = hooks.DelayedDeleteOp(session)
op5 = hooks.CheckORelationOp(session)
self.assertEquals(session.pending_operations, [op1, op2, op4, op5, op3])
def test_in_state_notification(self):
result = []
# test both email notification and transition_information
......@@ -78,6 +78,6 @@ class HookHelpersTC(RepositoryBasedTC):
if isinstance(op, SendMailOp)]
self.assertEquals(len(searchedops), 0,
self.session.pending_operations)
if __name__ == '__main__':
unittest_main()
......@@ -20,9 +20,9 @@ def teardown_module(*args):
Repository.get_versions = orig_get_versions
class CoreHooksTC(RepositoryBasedTC):
def test_delete_internal_entities(self):
self.assertRaises(RepositoryError, self.execute,
'DELETE CWEType X WHERE X name "CWEType"')
......@@ -40,17 +40,17 @@ class CoreHooksTC(RepositoryBasedTC):
self.execute('DELETE X in_group Y WHERE X login "toto"')
self.execute('SET X in_group Y WHERE X login "toto", Y name "guests"')
self.commit()
def test_delete_required_relations_object(self):
self.skip('no sample in the schema ! YAGNI ? Kermaat ?')
def test_static_vocabulary_check(self):
self.assertRaises(ValidationError,
self.execute,
'SET X composite "whatever" WHERE X from_entity FE, FE name "CWUser", X relation_type RT, RT name "in_group"')
def test_missing_required_relations_subject_inline(self):
# missing in_group relation
# missing in_group relation
self.execute('INSERT CWUser X: X login "toto", X upassword "hop"')
self.assertRaises(ValidationError,
self.commit)
......@@ -76,7 +76,7 @@ class CoreHooksTC(RepositoryBasedTC):
self.execute('SET X sender Y WHERE X is Email, Y is EmailAddress')
rset = self.execute('Any S WHERE X sender S, X eid %s' % eeid)
self.assertEquals(len(rset), 1)
def test_composite_1(self):
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
......@@ -90,7 +90,7 @@ class CoreHooksTC(RepositoryBasedTC):
self.commit()
rset = self.execute('Any X WHERE X is EmailPart')
self.assertEquals(len(rset), 0)
def test_composite_2(self):
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
......@@ -102,7 +102,7 @@ class CoreHooksTC(RepositoryBasedTC):
self.commit()
rset = self.execute('Any X WHERE X is EmailPart')
self.assertEquals(len(rset), 0)
def test_composite_redirection(self):
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
......@@ -148,11 +148,11 @@ class CoreHooksTC(RepositoryBasedTC):
self.execute('SET A descr "R&D<p>yo" WHERE A eid %s' % entity.eid)
entity = self.execute('Any A WHERE A eid %s' % entity.eid).get_entity(0, 0)
self.assertEquals(entity.descr, u'R&amp;D<p>yo</p>')
class UserGroupHooksTC(RepositoryBasedTC):
def test_user_synchronization(self):
self.create_user('toto', password='hop', commit=False)
self.assertRaises(AuthenticationError,
......@@ -193,9 +193,9 @@ class UserGroupHooksTC(RepositoryBasedTC):
self.execute('DELETE EmailAddress X WHERE X eid %s' % eid)
self.commit()
self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid}))
class CWPropertyHooksTC(RepositoryBasedTC):
def test_unexistant_eproperty(self):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "bla.bla", X value "hop", X for_user U')
......@@ -203,12 +203,12 @@ class CWPropertyHooksTC(RepositoryBasedTC):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "bla.bla", X value "hop"')
self.assertEquals(ex.errors, {'pkey': 'unknown property key'})
def test_site_wide_eproperty(self):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "ui.site-title", X value "hop", X for_user U')
self.assertEquals(ex.errors, {'for_user': "site-wide property can't be set for user"})
def test_bad_type_eproperty(self):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "ui.language", X value "hop", X for_user U')
......@@ -216,10 +216,10 @@ class CWPropertyHooksTC(RepositoryBasedTC):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "ui.language", X value "hop"')
self.assertEquals(ex.errors, {'value': u'unauthorized value'})
class SchemaHooksTC(RepositoryBasedTC):
def test_duplicate_etype_error(self):
# check we can't add a CWEType or CWRType entity if it already exists one
# with the same name
......@@ -229,7 +229,7 @@ class SchemaHooksTC(RepositoryBasedTC):
self.execute, 'INSERT CWEType X: X name "Societe"')
self.assertRaises((ValidationError, RepositoryError),
self.execute, 'INSERT CWRType X: X name "in_group"')
def test_validation_unique_constraint(self):
self.assertRaises(ValidationError,
self.execute, 'INSERT CWUser X: X login "admin"')
......@@ -253,13 +253,13 @@ class SchemaModificationHooksTC(RepositoryBasedTC):
RepositoryBasedTC.setUp(self)
def index_exists(self, etype, attr, unique=False):
dbhelper = self.session.pool.source('system').dbhelper
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
def test_base(self):
schema = self.repo.schema
dbhelper = self.session.pool.source('system').dbhelper
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.failIf(schema.has_entity('Societe2'))
self.failIf(schema.has_entity('concerne2'))
......@@ -332,8 +332,8 @@ class SchemaModificationHooksTC(RepositoryBasedTC):
self.failUnless('subdiv' in snames)
snames = [name for name, in self.execute('Any N WHERE S is_instance_of Division, S nom N')]
self.failUnless('subdiv' in snames)
def test_perms_synchronization_1(self):
schema = self.repo.schema
self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
......@@ -374,9 +374,9 @@ class SchemaModificationHooksTC(RepositoryBasedTC):
self.execute('Any X WHERE X is CWEType, X name "CWEType"')
# schema modification hooks tests #########################################
def test_uninline_relation(self):
dbhelper = self.session.pool.source('system').dbhelper
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
# Personne inline2 Affaire inline
# insert a person without inline2 relation (not mandatory)
......@@ -410,7 +410,7 @@ class SchemaModificationHooksTC(RepositoryBasedTC):
self.assertEquals(rset.rows[0], [peid, aeid])
def test_indexed_change(self):
dbhelper = self.session.pool.source('system').dbhelper
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
......@@ -428,7 +428,7 @@ class SchemaModificationHooksTC(RepositoryBasedTC):
self.failIf(self.index_exists('Affaire', 'sujet'))
def test_unique_change(self):
dbhelper = self.session.pool.source('system').dbhelper
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
try:
......@@ -453,7 +453,7 @@ class SchemaModificationHooksTC(RepositoryBasedTC):
self.commit()
self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
class WorkflowHooksTC(RepositoryBasedTC):
......@@ -468,7 +468,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
# enforcement
self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
self.commit()
def tearDown(self):
self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
self.commit()
......@@ -483,7 +483,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
{'x' : ueid})[0][0]
self.assertEquals(initialstate, u'activated')
def test_initial_state(self):
cnx = self.login('stduser')
cu = cnx.cursor()
......@@ -495,7 +495,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
self.commit()
# test that the workflow is correctly enforced
def test_transition_checking1(self):
cnx = self.login('stduser')
......@@ -505,7 +505,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_activated}, 'x')
cnx.close()
def test_transition_checking2(self):
cnx = self.login('stduser')
cu = cnx.cursor()
......@@ -514,7 +514,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_dummy}, 'x')
cnx.close()
def test_transition_checking3(self):
cnx = self.login('stduser')
cu = cnx.cursor()
......@@ -530,7 +530,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
{'x': ueid, 's': self.s_activated}, 'x')
cnx.commit()
cnx.close()
def test_transition_checking4(self):
cnx = self.login('stduser')
cu = cnx.cursor()
......@@ -559,7 +559,7 @@ class WorkflowHooksTC(RepositoryBasedTC):
self.assertEquals(tr.comment, None)
self.assertEquals(tr.from_state[0].eid, self.s_activated)
self.assertEquals(tr.to_state[0].eid, self.s_deactivated)
self.session.set_shared_data('trcomment', u'il est pas sage celui-la')
self.session.set_shared_data('trcommentformat', u'text/plain')
self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
......@@ -591,6 +591,6 @@ class WorkflowHooksTC(RepositoryBasedTC):
cu = cnx.cursor()
self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'"))
cnx.commit()
if __name__ == '__main__':
unittest_main()
......@@ -19,10 +19,10 @@ def setup_module(*args):
def teardown_module(*args):
Repository.get_versions = orig_get_versions
class MigrationCommandsTC(RepositoryBasedTC):
copy_schema = True
def setUp(self):
if not hasattr(self, '_repo'):
# first initialization
......@@ -43,7 +43,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
interactive=False)
assert self.cnx is self.mh._cnx
assert self.session is self.mh.session, (self.session.id, self.mh.session.id)
def test_add_attribute_int(self):
self.failIf('whatever' in self.schema)
paraordernum = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0]
......@@ -73,7 +73,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
self.assertEquals(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
self.mh.rollback()
def test_add_datetime_with_default_value_attribute(self):
self.failIf('mydate' in self.schema)
self.mh.cmd_add_attribute('Note', 'mydate')
......@@ -88,7 +88,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.assertEquals(d1, date.today())
self.assertEquals(d2, testdate)
self.mh.rollback()
def test_rename_attribute(self):
self.failIf('civility' in self.schema)
eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0]
......@@ -121,7 +121,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.assertEquals(t1, "baz")
gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0]
self.assertEquals(gn, 'managers')
def test_add_entity_type(self):
self.failIf('Folder2' in self.schema)
self.failIf('filed_under2' in self.schema)
......@@ -137,7 +137,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.assertEquals([str(rs) for rs in self.schema['Folder2'].object_relations()],
['filed_under2', 'identity'])
self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
'Folder2', 'Image', 'Note', 'Personne', 'Societe', 'SubDivision'])
self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',))
eschema = self.schema.eschema('Folder2')
......@@ -164,7 +164,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.mh.cmd_add_relation_type('filed_under2')
self.failUnless('filed_under2' in self.schema)
self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
'Folder2', 'Image', 'Note', 'Personne', 'Societe', 'SubDivision'])
self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',))
......@@ -178,8 +178,8 @@ class MigrationCommandsTC(RepositoryBasedTC):
def test_add_relation_definition(self):
self.mh.cmd_add_relation_definition('Societe', 'in_state', 'State')
self.assertEquals(sorted(self.schema['in_state'].subjects()),
['Affaire', 'Division', 'CWUser', 'Note', 'Societe', 'SubDivision'])
self.assertEquals(sorted(str(x) for x in self.schema['in_state'].subjects()),
['Affaire', 'CWUser', 'Division', 'Note', 'Societe', 'SubDivision'])
self.assertEquals(self.schema['in_state'].objects(), ('State',))
def test_add_relation_definition_nortype(self):
......@@ -195,7 +195,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire'])
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Division', 'Note', 'Societe', 'SubDivision'])
def test_drop_relation_definition_with_specialization(self):
self.failUnless('concerne' in self.schema)
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
......@@ -204,13 +204,13 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.mh.cmd_drop_relation_definition('None', 'concerne', 'Societe')
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Note'])
def test_drop_relation_definition2(self):
self.failUnless('evaluee' in self.schema)
self.mh.cmd_drop_relation_definition('Personne', 'evaluee', 'Note')
self.failUnless('evaluee' in self.schema)
self.assertEquals(sorted(self.schema['evaluee'].subjects()),
['Division', 'CWUser', 'Societe', 'SubDivision'])
['CWUser', 'Division', 'Societe', 'SubDivision'])
self.assertEquals(sorted(self.schema['evaluee'].objects()),
['Note'])
......@@ -229,7 +229,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
finally:
self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
cardinality='**')
def test_change_relation_props_final(self):
rschema = self.schema['adel']
card = rschema.rproperty('Personne', 'String', 'fulltextindexed')
......@@ -255,7 +255,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
# self.assertEquals([rs.type for rs in migrschema['Personne'].ordered_relations() if rs.is_final()],
# expected)
migrschema['Personne'].description = 'blabla bla'
migrschema['titre'].description = 'usually a title'
migrschema['titre'].description = 'usually a title'
migrschema['titre']._rproperties[('Personne', 'String')]['description'] = 'title for this person'
# rinorderbefore = cursor.execute('Any O,N WHERE X is CWAttribute, X relation_type RT, RT name N,'
# 'X from_entity FE, FE name "Personne",'
......@@ -264,9 +264,9 @@ class MigrationCommandsTC(RepositoryBasedTC):
# u'sexe', u'promo', u'titre', u'adel', u'ass', u'web', u'tel',
# u'fax', u'datenaiss', u'test', u'description']
# self.assertListEquals(rinorderbefore, map(list, zip([0, 0]+range(1, len(expected)), expected)))
self.mh.cmd_synchronize_schema(commit=False)
self.mh.cmd_sync_schema_props_perms(commit=False)
self.assertEquals(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0],
'blabla bla')
self.assertEquals(cursor.execute('Any D WHERE X name "titre", X description D')[0][0],
......@@ -300,7 +300,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
# no more rqlexpr to delete and add para attribute
self.failIf(self._rrqlexpr_rset('add', 'para'))
self.failIf(self._rrqlexpr_rset('delete', 'para'))
# new rql expr to add ecrit_par relation
# new rql expr to add ecrit_par relation
rexpr = self._rrqlexpr_entity('add', 'ecrit_par')
self.assertEquals(rexpr.expression,
'O require_permission P, P name "add_note", '
......@@ -333,8 +333,8 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.assertEquals(len(cursor.execute('RQLExpression X WHERE NOT ET1 read_permission X, NOT ET2 add_permission X, '
'NOT ET3 delete_permission X, NOT ET4 update_permission X')), 8+1)
# finally
self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2)
self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2)
self.mh.rollback()
def _erqlexpr_rset(self, action, ertype):
......@@ -351,7 +351,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
rset = self._rrqlexpr_rset(action, ertype)
self.assertEquals(len(rset), 1)
return rset.get_entity(0, 0)
def test_set_size_constraint(self):
# existing previous value
try:
......@@ -378,7 +378,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
cubes.remove('email')
cubes.remove('file')
self.assertEquals(set(self.config.cubes()), cubes)
for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
'sender', 'in_thread', 'reply_to', 'data_format'):
self.failIf(ertype in schema, ertype)
self.assertEquals(sorted(schema['see_also']._rproperties.keys()),
......@@ -402,7 +402,7 @@ class MigrationCommandsTC(RepositoryBasedTC):
cubes.add('email')
cubes.add('file')
self.assertEquals(set(self.config.cubes()), cubes)
for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
'sender', 'in_thread', 'reply_to', 'data_format'):
self.failUnless(ertype in schema, ertype)
self.assertEquals(sorted(schema['see_also']._rproperties.keys()),
......@@ -426,17 +426,13 @@ class MigrationCommandsTC(RepositoryBasedTC):
self.maxeid = self.execute('Any MAX(X)')[0][0]
# why this commit is necessary is unclear to me (though without it
# next test may fail complaining of missing tables
self.commit()
self.commit()
def test_set_state(self):
user = self.session.user
self.set_debug(True)
self.mh.set_state(user.eid, 'deactivated')
user.clear_related_cache('in_state', 'subject')
try:
self.assertEquals(user.state, 'deactivated')
finally:
self.set_debug(False)
self.assertEquals(user.state, 'deactivated')
if __name__ == '__main__':
unittest_main()
This diff is collapsed.
......@@ -17,7 +17,7 @@ from cubicweb.schema import CubicWebSchema, RQLConstraint
from cubicweb.dbapi import connect, repo_connect
from cubicweb.devtools.apptest import RepositoryBasedTC
from cubicweb.devtools.repotest import tuplify
from cubicweb.server import repository
from cubicweb.server import repository
from cubicweb.server.sqlutils import SQL_PREFIX
......@@ -29,10 +29,10 @@ class RepositoryTC(RepositoryBasedTC):
""" singleton providing access to a persistent storage for entities
and relation
"""
# def setUp(self):
# pass
# def tearDown(self):
# self.repo.config.db_perms = True
# cnxid = self.repo.connect(*self.default_user_password())
......@@ -64,7 +64,7 @@ class RepositoryTC(RepositoryBasedTC):
(u'String',), (u'Time',)])
finally:
self.repo._free_pool(pool)
def test_schema_has_owner(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
......@@ -74,7 +74,7 @@ class RepositoryTC(RepositoryBasedTC):
self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
def test_connect(self):
login, passwd = self.default_user_password()
self.assert_(self.repo.connect(login, passwd))
......@@ -84,7 +84,7 @@ class RepositoryTC(RepositoryBasedTC):
self.repo.connect, login, None)
self.assertRaises(AuthenticationError,
self.repo.connect, None, None)
def test_execute(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
......@@ -93,7 +93,7 @@ class RepositoryTC(RepositoryBasedTC):
repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
repo.close(cnxid)
def test_login_upassword_accent(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
......@@ -102,27 +102,28 @@ class RepositoryTC(RepositoryBasedTC):
repo.commit(cnxid)
repo.close(cnxid)
self.assert_(repo.connect(u"barnab", u"hhh".encode('UTF8')))
def test_invalid_entity_rollback(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
# no group
repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
{'login': u"tutetute", 'passwd': 'tutetute'})
self.assertRaises(ValidationError, repo.commit, cnxid)
rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')
self.assertEquals(rset.rowcount, 0)
def test_close(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
self.assert_(cnxid)
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
def test_invalid_cnxid(self):
self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
self.assertRaises(BadConnectionId, self.repo.close, None)
def test_shared_data(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
......@@ -179,7 +180,7 @@ class RepositoryTC(RepositoryBasedTC):
repo.rollback(cnxid)
result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
self.assertEquals(result.rowcount, 0, result.rows)
def test_transaction_base3(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
......@@ -194,7 +195,7 @@ class RepositoryTC(RepositoryBasedTC):
repo.rollback(cnxid)
rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
self.assertEquals(len(rset), 1)
def test_transaction_interleaved(self):
self.skip('implement me')