Newer
Older
import logging
import jwt
from cubicweb_api.jwt_policy import (
JWTCookieAuthenticationPolicy,
JWTAuthenticationPolicy,
)
from pyramid.config import Configurator
log = logging.getLogger(__name__)
def create_jwt_policy(config: Configurator, prefix="cubicweb.auth.jwt"):
cfg = config.registry.settings
private_key_string = prefix + ".private_key"
if private_key_string not in cfg:
raise KeyError(private_key_string)
keys = (
"private_key",
"public_key",
"algorithm",
"expiration",
"leeway",
"http_header",
"auth_type",
)
kwargs = {}
for k in keys:
key_path = "{}.{}".format(prefix, k)
if key_path in cfg:
kwargs[k] = cfg.get(key_path)
auth_policy = JWTAuthenticationPolicy(**kwargs)
cookie_policy = JWTCookieAuthenticationPolicy.make_from(
auth_policy, cookie_name="CW_JWT", https_only=True, reissue_time=7200
)
return cookie_policy
def _request_create_token(request, principal, expiration=None, audience=None, **claims):
return request.authentication_policy.create_token(
principal, expiration, audience, **claims
)
def _request_claims(request):
try:
return jwt.decode(
request.cookies.get(request.authentication_policy.cookie_name),
request.authentication_policy.private_key,
algorithms=[request.authentication_policy.algorithm],
)
except Exception:
return {}
def setup_jwt(config: Configurator):
config.include("pyramid_jwt")
policy = create_jwt_policy(config)
except KeyError as e:
log.warning(
"Could not configure JWT policy: missing configuration key %s", str(e)
)
else:
config.registry["cubicweb.authpolicy"]._policies.append(policy)
config.add_request_method(_request_create_token, "create_jwt_token")
config.add_request_method(_request_claims, "jwt_claims", reify=True)
config.add_request_method(
lambda request: policy, "authentication_policy", reify=True
)
log.info("JWT policy configured")