Commit fb4551ff authored by Julien Cristau's avatar Julien Cristau
Browse files

Convert forgotpwd_send_email and forgotpwd_change_passwd to call_service

We no longer need to monkeypatch the Repository class.
Closes #3730701
parent 318b6c1bb1fa
......@@ -78,55 +78,3 @@ See you soon on %(base_url)s !
'base_url': self._cw.base_url(),
}
@monkeypatch(Repository)
def forgotpwd_send_email(self, data):
session = self.internal_session()
revocation_limit = self.config['revocation-limit']
revocation_id = u''.join([random.choice(string.letters+string.digits)
for x in xrange(10)])
revocation_date = datetime.now() + timedelta(minutes=revocation_limit)
try:
existing_requests = session.execute('Any F WHERE U primary_email E, E address %(e)s, U has_fpasswd F',
{'e': data['use_email']})
if existing_requests:
raise ValidationError(None, {None: session._('You have already asked for a new password.')})
rset = session.execute('INSERT Fpasswd X: X revocation_id %(a)s, X revocation_date %(b)s, '
'U has_fpasswd X WHERE U primary_email E, E address %(e)s',
{'a':revocation_id, 'b':revocation_date, 'e': data['use_email']})
if not rset:
raise ValidationError(None, {None: session._(u'An error occured, this email address is unknown.')})
# mail is sent on commit
session.commit()
finally:
session.close()
@monkeypatch(Repository)
def forgotpwd_change_passwd(self, data):
session = self.internal_session()
try:
rset = session.execute('Any F, U WHERE U is CWUser, U primary_email E, '
'E address %(email)s, EXISTS(U has_fpasswd F, '
'F revocation_id %(revid)s)',
{'email': data['use_email'],
'revid': data['revocation_id']})
if rset:
forgotpwd = rset.get_entity(0,0)
revocation_date = forgotpwd.revocation_date
user = rset.get_entity(0,1)
if revocation_date > datetime.now():
session.execute('SET U upassword %(newpasswd)s WHERE U is CWUser, U eid %(usereid)s',
{'newpasswd':data['upassword'].encode('UTF-8'), 'usereid':user.eid})
session.execute('DELETE Fpasswd F WHERE F eid %(feid)s',
{'feid':forgotpwd.eid})
session.commit()
msg = session._(u'Your password has been changed.')
else:
msg = session._(u'That link has either expired or is not valid.')
else:
msg = session._(u'You already changed your password. This link has expired.')
return msg
finally:
session.close()
import random
import string
from datetime import datetime, timedelta
from cubicweb.predicates import match_kwargs
from cubicweb.crypto import encrypt
from cubicweb.server import Service
class ForgotPwdEmailService(Service):
"""Generate a password reset request, store it in the database, and send
reset email"""
__regid__ = 'forgotpwd_send_email'
__select__ = Service.__select__ & match_kwargs('use_email')
def call(self, use_email=None):
cnx = self._cw
repo = cnx.repo
revocation_limit = repo.config['revocation-limit']
revocation_id = u''.join(random.choice(string.letters+string.digits)
for x in xrange(10))
revocation_date = datetime.now() + timedelta(minutes=revocation_limit)
existing_requests = cnx.execute('Any F WHERE U primary_email E, E address %(e)s, U has_fpasswd F',
{'e': use_email})
if existing_requests:
raise ValidationError(None, {None: cnx._('You have already asked for a new password.')})
rset = cnx.execute('INSERT Fpasswd X: X revocation_id %(a)s, X revocation_date %(b)s, '
'U has_fpasswd X WHERE U primary_email E, E address %(e)s',
{'a':revocation_id, 'b':revocation_date, 'e': use_email})
if not rset:
raise ValidationError(None, {None: session._(u'An error occured, this email address is unknown.')})
class ForgotPwdChangePwdService(Service):
"""Actually change the user's password"""
__regid__ = 'forgotpwd_change_passwd'
__select__ = Service.__select__ & match_kwargs('use_email', 'revocation_id', 'upassword')
def call(self, use_email=None, revocation_id=None, upassword=None):
cnx = self._cw
rset = cnx.execute('Any F, U WHERE U is CWUser, U primary_email E, '
'E address %(email)s, EXISTS(U has_fpasswd F, '
'F revocation_id %(revid)s)',
{'email': use_email,
'revid': revocation_id})
if rset:
forgotpwd = rset.get_entity(0,0)
revocation_date = forgotpwd.revocation_date
user = rset.get_entity(0,1)
if revocation_date > datetime.now():
cnx.execute('SET U upassword %(newpasswd)s WHERE U is CWUser, U eid %(usereid)s',
{'newpasswd':upassword.encode('UTF-8'), 'usereid':user.eid})
cnx.execute('DELETE Fpasswd F WHERE F eid %(feid)s',
{'feid':forgotpwd.eid})
msg = cnx._(u'Your password has been changed.')
else:
msg = cnx._(u'That link has either expired or is not valid.')
else:
msg = cnx._(u'You already changed your password. This link has expired.')
return msg
......@@ -19,18 +19,24 @@ class ForgotTC(CubicWebTC):
def test_reset_password(self):
MAILBOX[:] = []
self.assertEqual(len(MAILBOX), 0)
self.session.repo.forgotpwd_send_email({'use_email':'test_user1@logilab.fr'})
with self.repo.internal_cnx() as cnx:
cnx.call_service('forgotpwd_send_email', use_email='test_user1@logilab.fr')
cnx.commit()
self.assertEqual(len(MAILBOX), 1)
self.assertNotIn('None', MAILBOX[0].content)
def test_reset_https(self):
MAILBOX[:] = []
self.config.global_set_option('base-url', 'http://babar.com/')
self.session.repo.forgotpwd_send_email({'use_email':'test_user2@logilab.fr'})
with self.repo.internal_cnx() as cnx:
cnx.call_service('forgotpwd_send_email', use_email='test_user2@logilab.fr')
cnx.commit()
self.assertIn('http://babar.com/', MAILBOX[0].content)
self.config.global_set_option('https-url', 'https://babar.com/')
self.session.repo.forgotpwd_send_email({'use_email':'test_user3@logilab.fr'})
with self.repo.internal_cnx() as cnx:
cnx.call_service('forgotpwd_send_email', use_email='test_user3@logilab.fr')
cnx.commit()
self.assertIn('https://babar.com/', MAILBOX[1].content)
......
......@@ -62,7 +62,9 @@ class ForgottenPasswordSendMailController(controller.Controller):
def publish(self, rset=None):
data = self.checked_data()
try:
self.appli.repo.forgotpwd_send_email(data)
with self.appli.repo.internal_cnx() as cnx:
cnx.call_service('forgotpwd_send_email', data)
cnx.commit()
except ValidationError:
raise
except Exception, exc:
......@@ -140,7 +142,9 @@ class ForgottenPasswordRequestConfirm(controller.Controller):
def publish(self, rset=None):
data = self.checked_data()
msg = self.appli.repo.forgotpwd_change_passwd(data)
with self.appli.repo.internal_cnx() as cnx:
msg = cnx.call_service('forgotpwd_change_passwd', data)
cnx.commit()
raise Redirect(self._cw.build_url('pwdreset', __message=msg))
def checked_data(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment