Commit fd6d446e authored by Laurent Peuch's avatar Laurent Peuch
Browse files

chore: black the whole project

parent eb2ad319f730
......@@ -30,25 +30,32 @@ def includeme(config):
from cubicweb_signedrequest.pconfig import SignedRequestAuthPolicy
policy = SignedRequestAuthPolicy()
config.add_tween('cubicweb_signedrequest.pconfig.body_hash_tween_factory',
under=pyramid.tweens.INGRESS)
config.add_tween(
"cubicweb_signedrequest.pconfig.body_hash_tween_factory",
under=pyramid.tweens.INGRESS,
)
# add some bw compat methods
# these ease code factorization between pyramid related code and legacy one
config.add_request_method(
lambda req, header, default=None: req.headers.get(header, default),
name='get_header', property=False, reify=False)
name="get_header",
property=False,
reify=False,
)
config.add_request_method(
lambda req: req.method,
name='http_method', property=False, reify=False)
if config.registry.get('cubicweb.authpolicy') is None:
err = "signedrequest: the default cubicweb auth policy should be "\
"available via the 'cubicweb.authpolicy' registry config "\
"entry"
lambda req: req.method, name="http_method", property=False, reify=False
)
if config.registry.get("cubicweb.authpolicy") is None:
err = (
"signedrequest: the default cubicweb auth policy should be "
"available via the 'cubicweb.authpolicy' registry config "
"entry"
)
raise ValueError(err)
# if we use (the default) a multiauth policy in CW, append
# signedrequest to it
mainpolicy = config.registry['cubicweb.authpolicy']
mainpolicy = config.registry["cubicweb.authpolicy"]
mainpolicy._policies.append(policy)
......@@ -16,27 +16,27 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-signedrequest application packaging information"""
modname = 'signedrequest'
distname = 'cubicweb-signedrequest'
modname = "signedrequest"
distname = "cubicweb-signedrequest"
numversion = (0, 5, 1)
version = '.'.join(str(num) for num in numversion)
version = ".".join(str(num) for num in numversion)
license = 'LGPL'
author = 'LOGILAB S.A. (Paris, FRANCE)'
author_email = 'contact@logilab.fr'
description = 'rest api for cubicweb'
web = 'http://www.cubicweb.org/project/%s' % distname
license = "LGPL"
author = "LOGILAB S.A. (Paris, FRANCE)"
author_email = "contact@logilab.fr"
description = "rest api for cubicweb"
web = "http://www.cubicweb.org/project/%s" % distname
__depends__ = {
'cubicweb[pyramid]': '>= 3.27.0',
"cubicweb[pyramid]": ">= 3.24.0",
}
__recommends__ = {}
classifiers = [
'Environment :: Web Environment',
'Framework :: CubicWeb',
'Programming Language :: Python',
'Programming Language :: JavaScript',
"Environment :: Web Environment",
"Framework :: CubicWeb",
"Programming Language :: Python",
"Programming Language :: JavaScript",
]
......@@ -35,12 +35,12 @@ class UserSecretKeyAuthentifier(native.BaseAuthentifier):
:request: canonicalized version of the request, used to
compute the signature
"""
session.debug('authentication by %s', self.__class__.__name__)
signature = kwargs.get('signature')
signed_content = kwargs.get('request')
session.debug("authentication by %s", self.__class__.__name__)
signature = kwargs.get("signature")
signed_content = kwargs.get("request")
if signature is None or signed_content is None:
raise AuthenticationError('authentication failure')
raise AuthenticationError("authentication failure")
user_eid = authenticate_user(session, login, signed_content, signature)
if user_eid is None:
raise AuthenticationError('invalid credentials')
raise AuthenticationError("invalid credentials")
return user_eid
......@@ -57,10 +57,12 @@ from email.utils import formatdate
def sign(req, token):
headers_to_sign = ('Content-SHA512', 'Content-Type', 'Date')
to_sign = (req.method + req.url
+ ''.join(req.headers.get(field, '')
for field in headers_to_sign))
headers_to_sign = ("Content-SHA512", "Content-Type", "Date")
to_sign = (
req.method
+ req.url
+ "".join(req.headers.get(field, "") for field in headers_to_sign)
)
return hmac.new(token, to_sign).hexdigest()
......@@ -75,19 +77,22 @@ class SignedRequestAuth(requests.auth.AuthBase):
self.secret = secret
def __call__(self, r):
if r.method in ('PUT', 'POST'):
r.headers['Content-SHA512'] = sha512(r.body or '')
r.headers['Authorization'] = 'Cubicweb %s:%s' % (
self.token_id, sign(r, self.secret))
if r.method in ("PUT", "POST"):
r.headers["Content-SHA512"] = sha512(r.body or "")
r.headers["Authorization"] = "Cubicweb %s:%s" % (
self.token_id,
sign(r, self.secret),
)
return r
def get(url, token_id, token):
auth = SignedRequestAuth(token_id, token)
resp = requests.get(url,
headers={'Accept': 'text/plain',
'Date': formatdate(usegmt=True)},
auth=auth)
resp = requests.get(
url,
headers={"Accept": "text/plain", "Date": formatdate(usegmt=True)},
auth=auth,
)
return resp.json()
......@@ -96,10 +101,14 @@ def post(url, token_id, token, data=None, files=None, **params):
_cw_fields = params.keys()
if files:
_cw_fields += files.keys()
resp = requests.post(url, files=files, data=data, params=params,
headers={'Accept': 'text/plain',
'Date': formatdate(usegmt=True)},
auth=auth)
resp = requests.post(
url,
files=files,
data=data,
params=params,
headers={"Accept": "text/plain", "Date": formatdate(usegmt=True)},
auth=auth,
)
return resp.json()
......
......@@ -27,24 +27,26 @@ from cubicweb_signedrequest.authplugin import UserSecretKeyAuthentifier
class ServerStartupHook(hook.Hook):
"""register authentifier at startup"""
__regid__ = 'signedrequest.secretkeyinit'
events = ('server_startup',)
__regid__ = "signedrequest.secretkeyinit"
events = ("server_startup",)
def __call__(self):
self.debug('registering secret key authentifier')
self.debug("registering secret key authentifier")
self.repo.system_source.add_authentifier(UserSecretKeyAuthentifier())
class CreateAuthTokenHook(hook.Hook):
"""Generate random secret token"""
__regid__ = 'signedrequest.createauthtoken'
__select__ = hook.Hook.__select__ & is_instance('AuthToken')
events = ('before_add_entity',)
__regid__ = "signedrequest.createauthtoken"
__select__ = hook.Hook.__select__ & is_instance("AuthToken")
events = ("before_add_entity",)
def __call__(self):
edited = self.entity.cw_edited
# generate token as a 128 chars len string
token = u''.join([uuid().hex for __ in range(4)])
edited['token'] = token
edited['id'] = edited.get('id') or text_type(uuid().hex)
edited.setdefault('token_for_user', self._cw.user.eid)
token = "".join([uuid().hex for __ in range(4)])
edited["token"] = token
edited["id"] = edited.get("id") or text_type(uuid().hex)
edited.setdefault("token_for_user", self._cw.user.eid)
sync_schema_props_perms('AuthToken')
sync_schema_props_perms('token_for_user')
sync_schema_props_perms("AuthToken")
sync_schema_props_perms("token_for_user")
sync_schema_props_perms('token_for_user')
sync_schema_props_perms("token_for_user")
......@@ -21,9 +21,12 @@ from zope.interface import implementer
from pyramid.authentication import IAuthenticationPolicy
from cubicweb import AuthenticationError
from cubicweb_signedrequest.tools import (hash_content, build_string_to_sign,
authenticate_user,
get_credentials_from_headers)
from cubicweb_signedrequest.tools import (
hash_content,
build_string_to_sign,
authenticate_user,
get_credentials_from_headers,
)
logger = logging.getLogger(__name__)
......@@ -50,30 +53,29 @@ class SignedRequestAuthPolicy(object):
Authentication header.
"""
headers_to_sign = ('Content-SHA512', 'Content-Type', 'Date')
headers_to_sign = ("Content-SHA512", "Content-Type", "Date")
def unauthenticated_userid(self, request):
return None
def authenticated_userid(self, request):
logger.debug('authentication by %s', self.__class__.__name__)
logger.debug("authentication by %s", self.__class__.__name__)
try:
credentials = get_credentials_from_headers(
request, request.body_hash)
credentials = get_credentials_from_headers(request, request.body_hash)
except AuthenticationError:
credentials = None
if credentials is None:
return
try:
userid, signature = credentials.split(':', 1)
userid, signature = credentials.split(":", 1)
except ValueError:
logger.warning('authentication failure: invalid credentials')
logger.warning("authentication failure: invalid credentials")
return
repo = request.registry['cubicweb.repository']
repo = request.registry["cubicweb.repository"]
signed_content = build_string_to_sign(request)
with repo.internal_cnx() as cnx:
user_eid = authenticate_user(
cnx, userid, signed_content, signature)
user_eid = authenticate_user(cnx, userid, signed_content, signature)
return user_eid
def effective_principals(self, request):
......
......@@ -27,32 +27,43 @@ from cubicweb.schema import ERQLExpression, RRQLExpression
class AuthToken(EntityType):
"""Authentication token"""
__permissions__ = {
'read': ('managers', ERQLExpression('X token_for_user U')),
'update': (ERQLExpression('X token_for_user U'),),
'add': ('managers', ERQLExpression('X token_for_user U')),
'delete': ('managers', ERQLExpression('X token_for_user U'))}
"read": ("managers", ERQLExpression("X token_for_user U")),
"update": (ERQLExpression("X token_for_user U"),),
"add": ("managers", ERQLExpression("X token_for_user U")),
"delete": ("managers", ERQLExpression("X token_for_user U")),
}
enabled = Boolean(required=True, default=False)
token = String(maxsize=128, required=True, description=_(u'secret token'),
# use default 'read' permission as RQL expressions are not
# allowed in 'read' permission but rely on entity type
# permissions anyways.
__permissions__={'read': ('managers', 'users', 'guests',),
'add': (),
'update': ()},
)
id = String(maxsize=128, required=True, unique=True,
description=_(u'identifier for the token (must be unique)'))
token = String(
maxsize=128,
required=True,
description=_("secret token"),
# use default 'read' permission as RQL expressions are not
# allowed in 'read' permission but rely on entity type
# permissions anyways.
__permissions__={
"read": ("managers", "users", "guests",),
"add": (),
"update": (),
},
)
id = String(
maxsize=128,
required=True,
unique=True,
description=_("identifier for the token (must be unique)"),
)
class token_for_user(RelationDefinition):
__permissions__ = {
'read': ('managers', 'users', 'guests',),
'delete': ('managers', RRQLExpression('S token_for_user U')),
'add': ('managers', 'users'),
"read": ("managers", "users", "guests",),
"delete": ("managers", RRQLExpression("S token_for_user U")),
"add": ("managers", "users"),
}
subject = 'AuthToken'
object = 'CWUser'
cardinality = '1*'
subject = "AuthToken"
object = "CWUser"
cardinality = "1*"
inlined = True
composite = 'object'
composite = "object"
......@@ -45,10 +45,11 @@ except ImportError:
result |= ord(a) ^ ord(b)
return result == 0
log = logging.getLogger(__name__)
HEADERS_TO_SIGN = ('Content-SHA512', 'Content-Type', 'Date')
ALTERNATE_HEADERS = {'Date': ['X-Cubicweb-Date', 'Date']}
HEADERS_TO_SIGN = ("Content-SHA512", "Content-Type", "Date")
ALTERNATE_HEADERS = {"Date": ["X-Cubicweb-Date", "Date"]}
def get_replaceable_header_value(request, header_name, default=None):
......@@ -96,43 +97,48 @@ def get_credentials_from_headers(request, content_sha512):
forged using the token's secret key to authenticate
the user linked with the AuthToken
"""
header = get_replaceable_header_value(request, 'Authorization', None)
header = get_replaceable_header_value(request, "Authorization", None)
if header is None:
log.debug('SIGNED REQUEST: error header is none')
log.debug("SIGNED REQUEST: error header is none")
return
try:
method, credentials = header.split(None, 1)
except ValueError:
log.debug("SIGNED REQUEST: couldn't determine method from "
"Authorization header")
log.debug(
"SIGNED REQUEST: couldn't determine method from " "Authorization header"
)
return
if method != 'Cubicweb':
log.debug('SIGNED REQUEST: method is not Cubicweb')
if method != "Cubicweb":
log.debug("SIGNED REQUEST: method is not Cubicweb")
return
if request.http_method() != 'GET':
if content_sha512 != get_replaceable_header_value(request, 'Content-SHA512'):
log.error('SIGNED REQUEST: wrong sha512, %s != %s' % (
content_sha512,
get_replaceable_header_value(request, 'Content-SHA512')))
if request.http_method() != "GET":
if content_sha512 != get_replaceable_header_value(request, "Content-SHA512"):
log.error(
"SIGNED REQUEST: wrong sha512, %s != %s"
% (
content_sha512,
get_replaceable_header_value(request, "Content-SHA512"),
)
)
raise AuthenticationError()
date_header = get_replaceable_header_value(request, 'Date')
date_header = get_replaceable_header_value(request, "Date")
if date_header is None:
raise AuthenticationError()
try:
date = datetime(*parsedate(date_header)[:6])
except (ValueError, TypeError):
log.error('SIGNED REQUEST: wrong date format')
log.error("SIGNED REQUEST: wrong date format")
raise AuthenticationError()
delta = abs(datetime.utcnow() - date)
if delta > timedelta(0, 300):
log.error('SIGNED REQUEST: date delta error')
log.error("SIGNED REQUEST: date delta error")
raise AuthenticationError()
try:
id, signature = credentials.split(':', 1)
log.debug('SIGNED REQUEST: encoding info for %s' % id)
id, signature = credentials.split(":", 1)
log.debug("SIGNED REQUEST: encoding info for %s" % id)
return credentials
except ValueError:
log.exception('HTTP REST authenticator failed')
log.exception("HTTP REST authenticator failed")
raise AuthenticationError()
......@@ -148,9 +154,10 @@ def build_string_to_sign(request, url=None, headers=None):
headers = HEADERS_TO_SIGN
if url is None:
url = request.url
get_header = lambda field: get_replaceable_header_value(request, field, '') # noqa
return (request.http_method() + url +
''.join(map(get_header, headers))).encode('utf-8')
get_header = lambda field: get_replaceable_header_value(request, field, "") # noqa
return (request.http_method() + url + "".join(map(get_header, headers))).encode(
"utf-8"
)
def authenticate_user(session, tokenid, signed_content, signature):
......@@ -173,22 +180,28 @@ def authenticate_user(session, tokenid, signed_content, signature):
"""
try:
rset = session.execute('Any U, K WHERE T token_for_user U, '
' T token K, '
' T enabled True, '
' T id %(id)s',
{'id': tokenid})
rset = session.execute(
"Any U, K WHERE T token_for_user U, "
" T token K, "
" T enabled True, "
" T id %(id)s",
{"id": tokenid},
)
if not rset:
return
assert len(rset) == 1
user_eid, secret_key = rset[0]
expected_signature = hmac.new(secret_key.encode('utf-8'),
signed_content, digestmod="sha512").hexdigest()
expected_signature = hmac.new(
secret_key.encode("utf-8"), signed_content, digestmod="sha512"
).hexdigest()
if compare_digest(expected_signature, signature):
return user_eid
else:
session.info('request content signature check failed for %s '
'(signed content is %r)',
tokenid, signed_content)
session.info(
"request content signature check failed for %s "
"(signed content is %r)",
tokenid,
signed_content,
)
except Exception as exc:
session.debug('authentication failure (%s)', exc)
session.debug("authentication failure (%s)", exc)
......@@ -23,15 +23,20 @@ from cubicweb.web.views.authentication import NoAuthInfo
from cubicweb.web.views.authentication import WebAuthInfoRetriever
from cubicweb.web.views import uicfg
from cubicweb_signedrequest.tools import (hash_content, build_string_to_sign,
get_credentials_from_headers)
from cubicweb_signedrequest.tools import (
hash_content,
build_string_to_sign,
get_credentials_from_headers,
)
# web authentication info retriever ###########################################
class HttpRESTAuthRetriever(WebAuthInfoRetriever):
"""Authenticate by the Authorization http header """
__regid__ = 'www-authorization'
__regid__ = "www-authorization"
order = 0
def authentication_information(self, req):
......@@ -39,10 +44,10 @@ class HttpRESTAuthRetriever(WebAuthInfoRetriever):
NoAuthInfo if expected information is not found
return token id, signed string and signature
"""
self.debug('web authenticator building auth info')
self.debug("web authenticator building auth info")
login, signature = self.parse_authorization_header(req)
string_to_sign = build_string_to_sign(req, req.url())
return login, {'signature': signature, 'request': string_to_sign}
return login, {"signature": signature, "request": string_to_sign}
def parse_authorization_header(self, req):
"""Return the token id and the request signature.
......@@ -60,10 +65,10 @@ class HttpRESTAuthRetriever(WebAuthInfoRetriever):
credentials = get_credentials_from_headers(req, sha512)
if credentials is None:
raise NoAuthInfo()
return credentials.split(':', 1)
return credentials.split(":", 1)
def request_has_auth_info(self, req):
signature = req.get_header('Authorization', None)
signature = req.get_header("Authorization", None)
return signature is not None
def revalidate_login(self, req):
......@@ -82,13 +87,13 @@ _rctrl = uicfg.reledit_ctrl
_affk = uicfg.autoform_field_kwargs
_pvdc = uicfg.primaryview_display_ctrl
_afs.tag_attribute(('AuthToken', 'token'), 'main', 'hidden')
_pvdc.tag_attribute(('AuthToken', 'token'), {'vid': 'verbatimattr'})
_afs.tag_attribute(("AuthToken", "token"), "main", "hidden")
_pvdc.tag_attribute(("AuthToken", "token"), {"vid": "verbatimattr"})
_rctrl.tag_attribute(('AuthToken', 'id'), {'reload': True})
_affk.tag_attribute(('AuthToken', 'id'), {'required': False})
_rctrl.tag_attribute(("AuthToken", "id"), {"reload": True})
_affk.tag_attribute(("AuthToken", "id"), {"required": False})
_afs.tag_object_of(('*', 'token_for_user', 'CWUser'), 'main', 'hidden')
_afs.tag_object_of(("*", "token_for_user", "CWUser"), "main", "hidden")
# Authentication test #########################################################
......@@ -97,10 +102,12 @@ _afs.tag_object_of(('*', 'token_for_user', 'CWUser'), 'main', 'hidden')
# on request with bad Authorization header, making hard to know if the
# authorization succeeded or not, so one may use /authtest if he want to know.
class AuthTestFail(Controller):
"""Dumb controller to test signed request authentication on an
anonymous allowed site"""
__regid__ = 'authtest'
__regid__ = "authtest"
__select__ = anonymous_user()
def publish(self, rset=None):
......@@ -108,8 +115,8 @@ class AuthTestFail(Controller):
class AuthTestSuccess(Controller):
__regid__ = 'authtest'
__regid__ = "authtest"
__select__ = ~anonymous_user()
def publish(self, rset=None):
return 'you are properly authenticated as %s' % self._cw.user.login
return "you are properly authenticated as %s" % self._cw.user.login
......@@ -20,7 +20,7 @@ from cubicweb.entities import AnyEntity
class AuthToken(AnyEntity):
__regid__ = 'AuthToken'
__regid__ = "AuthToken"
def dc_title(self):
return self.id
......@@ -30,34 +30,35 @@ here = dirname(__file__)
# load metadata from the __pkginfo__.py file so there is no risk of conflict
# see https://packaging.python.org/en/latest/single_source_version.html
pkginfo = join(here, 'cubicweb_signedrequest', '__pkginfo__.py')
pkginfo = join(here, "cubicweb_signedrequest", "__pkginfo__.py")
__pkginfo__ = {}
with open(pkginfo) as f:
exec(f.read(), __pkginfo__)
# get required metadatas
distname = __pkginfo__['distname']
version = __pkginfo__['version']
license = __pkginfo__['license']
description = __pkginfo__['description']
web = __pkginfo__['web']
author = __pkginfo__['author']
author_email = __pkginfo__['author_email']
classifiers = __pkginfo__['classifiers