Commit 4e67259d authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

[cleanup] Fix some flake8 errors

parent 6ed86e0b0222
......@@ -22,7 +22,6 @@ import os.path as osp
import sys
from datetime import date
from contextlib import contextmanager
import tempfile
from logilab.common import tempattr
......@@ -111,15 +110,18 @@ class MigrationTC(CubicWebTC):
interactive=False)
def table_sql(self, mh, tablename):
result = mh.sqlexec("SELECT table_name FROM information_schema.tables WHERE LOWER(table_name)=%(table)s",
{'table': tablename.lower()})
result = mh.sqlexec(
"SELECT table_name FROM information_schema.tables WHERE LOWER(table_name)=%(table)s",
{'table': tablename.lower()})
if result:
return result[0][0]
return None # no such table
return None # no such table
def table_schema(self, mh, tablename):
result = mh.sqlexec("SELECT column_name, data_type, character_maximum_length FROM information_schema.columns "
"WHERE LOWER(table_name) = %(table)s", {'table': tablename.lower()})
result = mh.sqlexec(
"SELECT column_name, data_type, character_maximum_length "
"FROM information_schema.columns "
"WHERE LOWER(table_name) = %(table)s", {'table': tablename.lower()})
assert result, 'no table %s' % tablename
return dict((x[0], (x[1], x[2])) for x in result)
......@@ -178,16 +180,9 @@ class MigrationCommandsTC(MigrationTC):
whateverorder = migrschema['whatever'].rdef('Note', 'Int').order
for k, v in orderdict.items():
if v >= whateverorder:
orderdict[k] = v+1
orderdict[k] = v + 1
orderdict['whatever'] = whateverorder
self.assertDictEqual(orderdict, orderdict2)
#self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()],
# ['modification_date', 'creation_date', 'owned_by',
# 'eid', 'ecrit_par', 'inline1', 'date', 'type',
# 'whatever', 'date', 'in_basket'])
# NB: commit instead of rollback make following test fail with py2.5
# this sounds like a pysqlite/2.5 bug (the same eid is affected to
# two different entities)
def test_add_attribute_varchar(self):
with self.mh() as (cnx, mh):
......@@ -233,7 +228,7 @@ class MigrationCommandsTC(MigrationTC):
self.assertEqual(self.schema['mydate'].objects(), ('Date', ))
testdate = date(2005, 12, 13)
eid1 = mh.rqlexec('INSERT Note N')[0][0]
eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0]
eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate': testdate})[0][0]
d1 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0]
d2 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0]
d3 = mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0]
......@@ -285,8 +280,8 @@ class MigrationCommandsTC(MigrationTC):
def test_workflow_actions(self):
with self.mh() as (cnx, mh):
wf = mh.cmd_add_workflow(u'foo', ('Personne', 'Email'),
ensure_workflowable=False)
mh.cmd_add_workflow(u'foo', ('Personne', 'Email'),
ensure_workflowable=False)
for etype in ('Personne', 'Email'):
s1 = mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' %
etype)[0][0]
......@@ -306,18 +301,19 @@ class MigrationCommandsTC(MigrationTC):
self.assertIn('filed_under2', self.schema)
self.assertTrue(cnx.execute('CWRType X WHERE X name "filed_under2"'))
self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()),
['created_by', 'creation_date', 'cw_source', 'cwuri',
'description', 'description_format',
'eid',
'filed_under2', 'has_text',
'identity', 'in_basket', 'inlined_rel', 'is', 'is_instance_of',
'modification_date', 'name', 'owned_by'])
['created_by', 'creation_date', 'cw_source', 'cwuri',
'description', 'description_format',
'eid',
'filed_under2', 'has_text',
'identity', 'in_basket', 'inlined_rel', 'is', 'is_instance_of',
'modification_date', 'name', 'owned_by'])
self.assertCountEqual([str(rs) for rs in self.schema['Folder2'].object_relations()],
['filed_under2', 'identity', 'inlined_rel'])
# Old will be missing as it has been renamed into 'New' in the migrated
# schema while New hasn't been added here.
self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old'))
sorted(str(e) for e in self.schema.entities()
if not e.final and e != 'Old'))
self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',))
eschema = self.schema.eschema('Folder2')
for cstr in eschema.rdef('name').constraints:
......@@ -339,7 +335,7 @@ class MigrationCommandsTC(MigrationTC):
self.assertEqual(rdef.scale, 10)
self.assertEqual(rdef.precision, 18)
fields = self.table_schema(mh, '%sLocation' % SQL_PREFIX)
self.assertEqual(fields['%snum' % SQL_PREFIX], ('numeric', None)) # XXX
self.assertEqual(fields['%snum' % SQL_PREFIX], ('numeric', None)) # XXX
finally:
mh.cmd_drop_cube('fakecustomtype')
mh.drop_entity_type('Numeric')
......@@ -354,7 +350,6 @@ class MigrationCommandsTC(MigrationTC):
wf.add_transition(u'redoit', done, todo)
wf.add_transition(u'markasdone', todo, done)
cnx.commit()
eschema = self.schema.eschema('Folder2')
mh.cmd_drop_entity_type('Folder2')
self.assertNotIn('Folder2', self.schema)
self.assertFalse(cnx.execute('CWEType X WHERE X name "Folder2"'))
......@@ -404,8 +399,9 @@ class MigrationCommandsTC(MigrationTC):
('Personne',))
self.assertEqual(self.schema['concerne2'].objects(),
('Affaire', ))
self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality,
'1*')
self.assertEqual(
self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality,
'1*')
mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note')
self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note'])
mh.create_entity('Personne', nom=u'tot')
......@@ -420,38 +416,38 @@ class MigrationCommandsTC(MigrationTC):
def test_drop_relation_definition_existant_rtype(self):
with self.mh() as (cnx, mh):
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
['Affaire', 'Personne'])
['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
['Affaire'])
['Affaire'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
['Division', 'Note', 'Societe', 'SubDivision'])
['Division', 'Note', 'Societe', 'SubDivision'])
mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
['Affaire', 'Personne'])
['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
# trick: overwrite self.maxeid to avoid deletion of just reintroduced types
self.maxeid = cnx.execute('Any MAX(X)')[0][0]
def test_drop_relation_definition_with_specialization(self):
with self.mh() as (cnx, mh):
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
['Affaire', 'Personne'])
['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
['Affaire', 'Personne'])
['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
['Affaire', 'Note'])
['Affaire', 'Note'])
mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
['Affaire', 'Personne'])
['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
# trick: overwrite self.maxeid to avoid deletion of just reintroduced types
self.maxeid = cnx.execute('Any MAX(X)')[0][0]
......@@ -546,8 +542,8 @@ class MigrationCommandsTC(MigrationTC):
# new rql expr to add note entity
eexpr = self._erqlexpr_entity(cnx, 'add', 'Note')
self.assertEqual(eexpr.expression,
'X ecrit_part PE, U in_group G, '
'PE require_permission P, P name "add_note", P require_group G')
'X ecrit_part PE, U in_group G, '
'PE require_permission P, P name "add_note", P require_group G')
self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note'])
self.assertEqual(eexpr.reverse_read_permission, ())
self.assertEqual(eexpr.reverse_delete_permission, ())
......@@ -558,9 +554,10 @@ class MigrationCommandsTC(MigrationTC):
# new rql expr to add ecrit_par relation
rexpr = self._rrqlexpr_entity(cnx, 'add', 'ecrit_par')
self.assertEqual(rexpr.expression,
'O require_permission P, P name "add_note", '
'U in_group G, P require_group G')
self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par'])
'O require_permission P, P name "add_note", '
'U in_group G, P require_group G')
self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission],
['ecrit_par'])
self.assertEqual(rexpr.reverse_read_permission, ())
self.assertEqual(rexpr.reverse_delete_permission, ())
# no more rqlexpr to delete and add travaille relation
......@@ -590,14 +587,16 @@ class MigrationCommandsTC(MigrationTC):
# * 2 implicit new for attributes (Note.para, Person.test)
# remaining orphan rql expr which should be deleted at commit (composite relation)
# unattached expressions -> pending deletion on commit
self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",'
'NOT ET1 read_permission X, NOT ET2 add_permission X, '
'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
7)
self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",'
'NOT ET1 read_permission X, NOT ET2 add_permission X, '
'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
2)
self.assertEqual(
cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",'
'NOT ET1 read_permission X, NOT ET2 add_permission X, '
'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
7)
self.assertEqual(
cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",'
'NOT ET1 read_permission X, NOT ET2 add_permission X, '
'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
2)
# finally
self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0],
nbrqlexpr_start + 1 + 2 + 2 + 2)
......@@ -605,8 +604,9 @@ class MigrationCommandsTC(MigrationTC):
# unique_together test
self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1)
self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0],
('nom', 'prenom', 'datenaiss'))
rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"')
('nom', 'prenom', 'datenaiss'))
rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, '
'C constraint_of ET, ET name "Personne"')
self.assertEqual(len(rset), 1)
relations = [r.name for r in rset.get_entity(0, 0).relations]
self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss'))
......@@ -628,8 +628,9 @@ class MigrationCommandsTC(MigrationTC):
return rset.get_entity(0, 0)
def _rrqlexpr_rset(self, cnx, action, ertype):
rql = 'RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, RT name %%(name)s, RDEF relation_type RT' % action
return cnx.execute(rql, {'name': ertype})
return cnx.execute('RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, '
'RT name %%(name)s, RDEF relation_type RT' % action,
{'name': ertype})
def _rrqlexpr_entity(self, cnx, action, ertype):
rset = self._rrqlexpr_rset(cnx, action, ertype)
......@@ -667,15 +668,21 @@ class MigrationCommandsTC(MigrationTC):
'sender', 'in_thread', 'reply_to', 'data_format'):
self.assertNotIn(ertype, schema)
self.assertEqual(sorted(schema['see_also'].rdefs),
sorted([('Folder', 'Folder'),
('Bookmark', 'Bookmark'),
('Bookmark', 'Note'),
('Note', 'Note'),
('Note', 'Bookmark')]))
self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'Folder', 'Note'])
self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'Folder', 'Note'])
self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.fakeemail"').rowcount, 0)
self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0)
sorted([('Folder', 'Folder'),
('Bookmark', 'Bookmark'),
('Bookmark', 'Note'),
('Note', 'Note'),
('Note', 'Bookmark')]))
self.assertEqual(sorted(schema['see_also'].subjects()),
['Bookmark', 'Folder', 'Note'])
self.assertEqual(sorted(schema['see_also'].objects()),
['Bookmark', 'Folder', 'Note'])
self.assertEqual(
cnx.execute('Any X WHERE X pkey "system.version.fakeemail"').rowcount,
0)
self.assertEqual(
cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount,
0)
finally:
mh.cmd_add_cube('fakeemail')
self.assertIn('fakeemail', self.config.cubes())
......@@ -691,17 +698,18 @@ class MigrationCommandsTC(MigrationTC):
('Bookmark', 'Note'),
('Note', 'Note'),
('Note', 'Bookmark')]))
self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
self.assertEqual(sorted(schema['see_also'].subjects()),
['Bookmark', 'EmailThread', 'Folder', 'Note'])
self.assertEqual(sorted(schema['see_also'].objects()),
['Bookmark', 'EmailThread', 'Folder', 'Note'])
from cubes.fakeemail.__pkginfo__ import version as email_version
from cubes.file.__pkginfo__ import version as file_version
self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.fakeemail"')[0][0],
email_version)
self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0],
file_version)
# trick: overwrite self.maxeid to avoid deletion of just reintroduced
# types (and their associated tables!)
self.maxeid = cnx.execute('Any MAX(X)')[0][0]
self.assertEqual(
cnx.execute('Any V WHERE X value V, X pkey "system.version.fakeemail"')[0][0],
email_version)
self.assertEqual(
cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0],
file_version)
# why this commit is necessary is unclear to me (though without it
# next test may fail complaining of missing tables
cnx.commit()
......@@ -739,20 +747,21 @@ class MigrationCommandsTC(MigrationTC):
self.assertEqual(self.schema['Note'].specializes().type, 'Para')
mh.cmd_add_entity_type('Text')
self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
['Note', 'Text'])
['Note', 'Text'])
self.assertEqual(self.schema['Text'].specializes().type, 'Para')
# test columns have been actually added
text = cnx.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0)
note = cnx.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0)
aff = cnx.execute('INSERT Affaire X').get_entity(0, 0)
text = cnx.create_entity('Text', para=u"hip", summary=u"hop", newattr=u"momo")
note = cnx.create_entity('Note', para=u"hip", shortpara=u"hop",
newattr=u"momo", unique_id=u"x")
aff = cnx.create_entity('Affaire')
self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': text.eid, 'y': aff.eid}))
{'x': text.eid, 'y': aff.eid}))
self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': note.eid, 'y': aff.eid}))
{'x': note.eid, 'y': aff.eid}))
self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': text.eid, 'y': aff.eid}))
{'x': text.eid, 'y': aff.eid}))
self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': note.eid, 'y': aff.eid}))
{'x': note.eid, 'y': aff.eid}))
# XXX remove specializes by ourselves, else tearDown fails when removing
# Para because of Note inheritance. This could be fixed by putting the
# MemSchemaCWETypeDel(session, name) operation in the
......@@ -789,7 +798,7 @@ class MigrationCommandsTC(MigrationTC):
def test_drop_required_inlined_relation(self):
with self.mh() as (cnx, mh):
bob = mh.cmd_create_entity('Personne', nom=u'bob')
note = mh.cmd_create_entity('Note', ecrit_par=bob)
mh.cmd_create_entity('Note', ecrit_par=bob)
mh.commit()
rdef = mh.fs_schema.rschema('ecrit_par').rdefs[('Note', 'Personne')]
with tempattr(rdef, 'cardinality', '1*'):
......@@ -800,7 +809,7 @@ class MigrationCommandsTC(MigrationTC):
def test_drop_inlined_rdef_delete_data(self):
with self.mh() as (cnx, mh):
note = mh.cmd_create_entity('Note', ecrit_par=cnx.user.eid)
mh.cmd_create_entity('Note', ecrit_par=cnx.user.eid)
mh.commit()
mh.drop_relation_definition('Note', 'ecrit_par', 'CWUser')
self.assertFalse(mh.sqlexec('SELECT * FROM cw_Note WHERE cw_ecrit_par IS NOT NULL'))
......
......@@ -45,20 +45,25 @@ class SSPlannerTC(BasePlannerTC):
self.system = self.o._repo.system_source
def test_ordered_ambigous_sol(self):
self._test('Any XN ORDERBY XN WHERE X name XN, X is IN (Basket, State, Folder)',
[('OneFetchStep', [('Any XN ORDERBY XN WHERE X name XN, X is IN(Basket, State, Folder)',
[{'X': 'Basket', 'XN': 'String'},
{'X': 'State', 'XN': 'String'},
{'X': 'Folder', 'XN': 'String'}])],
[])])
self._test(
'Any XN ORDERBY XN WHERE X name XN, X is IN (Basket, State, Folder)',
[('OneFetchStep', [('Any XN ORDERBY XN WHERE X name XN, '
'X is IN(Basket, State, Folder)',
[{'X': 'Basket', 'XN': 'String'},
{'X': 'State', 'XN': 'String'},
{'X': 'Folder', 'XN': 'String'}])],
[])])
def test_groupeded_ambigous_sol(self):
self._test('Any XN,COUNT(X) GROUPBY XN WHERE X name XN, X is IN (Basket, State, Folder)',
[('OneFetchStep', [('Any XN,COUNT(X) GROUPBY XN WHERE X name XN, X is IN(Basket, State, Folder)',
[{'X': 'Basket', 'XN': 'String'},
{'X': 'State', 'XN': 'String'},
{'X': 'Folder', 'XN': 'String'}])],
[])])
self._test(
'Any XN,COUNT(X) GROUPBY XN WHERE X name XN, X is IN (Basket, State, Folder)',
[('OneFetchStep', [('Any XN,COUNT(X) GROUPBY XN WHERE X name XN, '
'X is IN(Basket, State, Folder)',
[{'X': 'Basket', 'XN': 'String'},
{'X': 'State', 'XN': 'String'},
{'X': 'Folder', 'XN': 'String'}])],
[])])
if __name__ == '__main__':
import unittest
......
......@@ -61,8 +61,10 @@ cubicweb/server/test/data-schema2sql/__init__.py
cubicweb/server/test/unittest_checkintegrity.py
cubicweb/server/test/unittest_datafeed.py
cubicweb/server/test/unittest_ldapsource.py
cubicweb/server/test/unittest_migractions.py
cubicweb/server/test/unittest_serverctl.py
cubicweb/server/test/unittest_session.py
cubicweb/server/test/unittest_ssplanner.py
cubicweb/server/test/unittest_rqlannotation.py
cubicweb/server/test/unittest_utils.py
cubicweb/sobjects/notification.py
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment