Commit 5bdf3b59 authored by David Douard's avatar David Douard
Browse files

[test] make tests pass again with the new-style cube

also make tests pass with py3
parent 720d94943a4b
......@@ -10,8 +10,8 @@
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
"""
......@@ -20,6 +20,7 @@ import sys
from logilab.common.pytest import PyTester
def getlogin():
"""avoid usinng os.getlogin() because of strange tty / stdin problems
(man 3 getlogin)
......
import urllib
import urlparse
from six.moves import urllib
import mock
import webtest
import six
from logilab.common.decorators import monkeypatch
from cubicweb.devtools.httptest import CubicWebServerTC
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.predicates import yes
from cubicweb.wsgi import handler
from cubes.oauth.entities import ServiceAdapter
from cubes.oauth import views as oauth_views
from cubicweb_oauth.entities import ServiceAdapter
from cubicweb_oauth import views as oauth_views
# XXX Taken from rememberme tests
......@@ -78,7 +78,7 @@ class FakeRequestRession(object):
class FakeRauthService(object):
tokens = (u'token{}'.format(i) for i in xrange(9999))
tokens = (u'token{}'.format(i) for i in range(9999))
def __init__(
self, name, client_id, client_secret, authorize_url,
......@@ -91,13 +91,13 @@ class FakeRauthService(object):
self.base_url = base_url
def get_authorize_url(self, redirect_uri, scope):
return self.authorize_url + '?' + urllib.urlencode(
return self.authorize_url + '?' + urllib.parse.urlencode(
{"redirect_uri": redirect_uri, 'scope': scope})
def get_auth_session(self, data):
if data['code'] == u'goodcode':
s = FakeRequestRession(self.base_url)
s.access_token = self.tokens.next()
s.access_token = six.next(self.tokens)
return s
def get_session(self, token):
......@@ -131,13 +131,14 @@ class OAuthTC(CubicWebTestTC):
def setUp(self):
super(OAuthTC, self).setUp()
#CubicWebTC.setUp(self)
# CubicWebTC.setUp(self)
self.vreg.register(FakeServiceAdapter)
with self.admin_access.repo_cnx() as cnx:
service = cnx.create_entity(
'ExternalAuthService',
provider=cnx.find(
'ExternalAuthProvider', spid='facebook').one(),
'ExternalAuthProvider',
spid=six.text_type('facebook')).one(),
application_name=self.application_name,
application_id=self.application_id,
application_secret=self.application_secret
......@@ -162,7 +163,7 @@ class OAuthTC(CubicWebTestTC):
response = self.webapp.get('?__externalauthprovider=facebook')
location = response.headers['location']
redirect_uri = urlparse.parse_qs(
redirect_uri = urllib.parse.parse_qs(
location.split('?')[1])['redirect_uri'][0]
assert redirect_uri.startswith(self.config['base-url'])
......@@ -177,16 +178,22 @@ class OAuthTC(CubicWebTestTC):
def test_1st_redirect(self):
response = self.webapp.get('/?__externalauthprovider=facebook')
return
location = response.headers['location']
expected_location = self.provider.authorize_url + '?' \
+ urllib.urlencode({
'scope': self.provider.scope,
'redirect_uri':
self.config.get('base-url') + 'externalauth-confirm'
+ '?__externalauth_negociationid='
+ oauth_views.NEGOSTATE.keys()[0]})
self.assertEqual(
expected_location, location)
purl = urllib.parse.urlparse(location)
purl_qs = urllib.parse.parse_qs(purl.query)
self.assertEqual(purl_qs.get('scope'), ['email'])
self.assertIn('redirect_uri', purl_qs)
ruri = urllib.parse.urlparse(purl_qs['redirect_uri'][0])
self.assertEqual(ruri.netloc, 'localhost:80')
self.assertEqual(ruri.path, '/externalauth-confirm')
ruri_qs = urllib.parse.parse_qs(ruri.query)
self.assertIn('__externalauth_negociationid', ruri_qs)
self.assertEqual(ruri_qs['__externalauth_negociationid'],
list(oauth_views.NEGOSTATE.keys()))
self.assertEqual(303, response.status_code)
def test_creation(self):
......@@ -318,12 +325,12 @@ class OAuthTC(CubicWebTestTC):
def test_login(self):
with self.admin_access.repo_cnx() as cnx:
user = cnx.create_entity(
'CWUser', login=u"zeuser", upassword='',
in_group=cnx.find('CWGroup', name='users').one())
'CWUser', login=u"zeuser", upassword=six.text_type(''),
in_group=cnx.find('CWGroup', name=u'users').one())
cnx.create_entity(
'ExternalIdentity',
identity_of=user, provider=cnx.find(
'ExternalAuthProvider', spid='facebook').one(),
'ExternalAuthProvider', spid=u'facebook').one(),
uid=u'1111')
rset = cnx.execute('Any X WHERE X login "zeuser"')
self.assertEqual(1, rset.rowcount)
......@@ -344,12 +351,12 @@ class OAuthTC(CubicWebTestTC):
def test_directlogin(self):
with self.admin_access.repo_cnx() as cnx:
user = cnx.create_entity(
'CWUser', login=u"zeuser", upassword='',
in_group=cnx.find('CWGroup', name='users').one())
'CWUser', login=u"zeuser", upassword=six.text_type(''),
in_group=cnx.find('CWGroup', name=u'users').one())
cnx.create_entity(
'ExternalIdentity',
identity_of=user, provider=cnx.find(
'ExternalAuthProvider', spid='facebook').one(),
'ExternalAuthProvider', spid=u'facebook').one(),
uid=u'1111')
rset = cnx.execute('Any X WHERE X login "zeuser"')
self.assertEqual(1, rset.rowcount)
......@@ -361,12 +368,12 @@ class OAuthTC(CubicWebTestTC):
def test_persistent_login(self):
with self.admin_access.repo_cnx() as cnx:
user = cnx.create_entity(
'CWUser', login=u"zeuser", upassword='',
in_group=cnx.find('CWGroup', name='users').one())
'CWUser', login=u"zeuser", upassword=six.text_type(''),
in_group=cnx.find('CWGroup', name=u'users').one())
cnx.create_entity(
'ExternalIdentity',
identity_of=user, provider=cnx.find(
'ExternalAuthProvider', spid='facebook').one(),
'ExternalAuthProvider', spid=u'facebook').one(),
uid=u'1111')
rset = cnx.execute('Any X WHERE X login "zeuser"')
self.assertEqual(1, rset.rowcount)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment