Commit 8a6d7c68 authored by Julien Cristau's avatar Julien Cristau
Browse files

[test] test the cwclientlib-based importer instead of the pyro-based one

parent a07f19f1ce70
email,comment,blog
email,comment,blog,signedrequest,rqlcontroller
......@@ -5,36 +5,42 @@ import os
from StringIO import StringIO
from logilab.common.testlib import TestCase, unittest_main
from cwclientlib import cwproxy
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.httptest import CubicWebServerTC
from cubes.email.mboximport import MBOXImporter
from cubes.email.mboximportclient import Importer
class MBOXImporterTC(CubicWebTC):
class MBOXImporterTC(CubicWebServerTC):
token_id = u'mboximport'
def setUp(self):
super(MBOXImporterTC, self).setUp()
with self.admin_access.repo_cnx() as cnx:
cnx.create_entity('AuthToken', enabled=True, id=self.token_id, token_for_user=cnx.user)
cnx.commit()
self.secret = cnx.execute('String X WHERE T token X')[0][0]
def test_mbox_format(self):
mi = Importer(cwproxy.CWProxy(self.config['base-url'], auth=cwproxy.SignedRequestAuth(self.token_id, self.secret)))
mi.import_mbox(self.datapath('mbox'))
with self.admin_access.client_cnx() as cnx:
mi = MBOXImporter(cnx)
mi.import_mbox(self.datapath('mbox'))
self._test_base(mi)
self._test_base(cnx)
def test_maildir_format(self):
mi = Importer(cwproxy.CWProxy(self.config['base-url'], auth=cwproxy.SignedRequestAuth(self.token_id, self.secret)))
maildirpath = self.datapath('maildir')
newpath = os.path.join(maildirpath, 'new')
if not os.path.exists(newpath):
os.mkdir(newpath)
mi.import_maildir(maildirpath)
with self.admin_access.client_cnx() as cnx:
mi = MBOXImporter(cnx)
maildirpath = self.datapath('maildir')
newpath = os.path.join(maildirpath, 'new')
if not os.path.exists(newpath):
os.mkdir(newpath)
mi.import_maildir(maildirpath)
self._test_base(mi)
def _test_base(self, mi):
self.assertEqual(sorted([(x, len(y)) for x, y in mi.created.items()]),
[('email', 2), ('emailpart', 5), ('file', 2)])
self.assertEqual(mi.skipped, [])
rset = mi.cnx.execute('Any X ORDERBY S WHERE X is Email, X subject S')
self.assertEqual(len(rset), 2)
self._test_base(cnx)
def _test_base(self, cnx):
rset = cnx.execute('Any X, S ORDERBY S WHERE X is Email, X subject S')
self.assertEqual(len(rset), 2, rset.rows)
email = rset.get_entity(0, 0)
self.assertEqual(email.subject, 'Re: [Python-projects] Pylint: Disable-msg for a block or statement?')
self.assertEqual(email.sender[0].address, 'pink@odahoda.de')
......@@ -45,7 +51,7 @@ class MBOXImporterTC(CubicWebTC):
self.assertEqual(sorted([(f.data_name, f.data_format) for f in email.attachment]),
[(u'astng.patch', u'text/x-diff'), (u'pylint.patch', u'text/x-diff')])
assert not self.vreg.schema['parts'].inlined
self.assertEqual(len(mi.cnx.execute('Any P WHERE E parts P, E eid %s'%email.eid)), 2)
self.assertEqual(len(cnx.execute('Any P WHERE E parts P, E eid %s'%email.eid)), 2)
self.assertEqual(len(email.parts), 2)
part1, part2 = email.parts_in_order()
self.assertEqual(part1.content_format,'text/plain')
......@@ -189,9 +195,9 @@ http://lists.logilab.org/mailman/listinfo/python-projects
"""
def test_no_subject(self):
mi = Importer(cwproxy.CWProxy(self.config['base-url'], auth=cwproxy.SignedRequestAuth(self.token_id, self.secret)))
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
with self.admin_access.client_cnx() as cnx:
mi = MBOXImporter(cnx)
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
rset = cnx.execute('Any X ORDERBY S WHERE X is Email, X subject S')
self.assertEqual(len(rset), 1)
email = rset.get_entity(0, 0)
......@@ -199,12 +205,12 @@ http://lists.logilab.org/mailman/listinfo/python-projects
self.assertEqual(email.references(), set(('<xxx@blabla>',)))
def test_double_import(self):
mi = Importer(cwproxy.CWProxy(self.config['base-url'], auth=cwproxy.SignedRequestAuth(self.token_id, self.secret)))
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
with self.admin_access.client_cnx() as cnx:
mi = MBOXImporter(cnx)
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
rset = cnx.execute('Any X ORDERBY S WHERE X is Email, X subject S')
self.assertEqual(len(rset), 1)
rset = cnx.execute('Any X, S ORDERBY S WHERE X is Email, X subject S')
self.assertEqual(len(rset), 1, rset.rows)
def test_address_detection(self):
with self.admin_access.client_cnx() as cnx:
......@@ -221,8 +227,10 @@ http://lists.logilab.org/mailman/listinfo/python-projects
#self.assertEqual(testrset[0][0], eid2)
#testrset = cursor.execute('Any X WHERE X identical_to Y, X canonical TRUE, Y eid %(y)s', {'y': eid1})
#self.assertEqual(testrset[0][0], eid2)
mi = MBOXImporter(cnx)
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
cnx.commit()
mi = Importer(cwproxy.CWProxy(self.config['base-url'], auth=cwproxy.SignedRequestAuth(self.token_id, self.secret)))
mi.import_mbox_stream(StringIO(self.NOSUBJECT))
with self.admin_access.client_cnx() as cnx:
rset = cnx.execute('Any X ORDERBY S WHERE X is Email, X subject S')
self.assertEqual(len(rset), 1)
email = rset.get_entity(0, 0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment