Skip to content
Snippets Groups Projects
jwt_auth.py 1.87 KiB
Newer Older
import logging
import jwt
from pyramid_jwt import JWTCookieAuthenticationPolicy, create_jwt_authentication_policy
from pyramid.config import Configurator

log = logging.getLogger(__name__)


def create_cubicweb_jwt_cookie_policy(config, prefix="cubicweb.auth.jwt"):
    cfg = config.registry.settings
    if prefix + ".private_key" not in cfg:
        log.info("JWT policy %s disabled (no private_key defined)", prefix)
        return
    keys = (
        "private_key",
        "public_key",
        "algorithm",
        "expiration",
        "leeway",
        "http_header",
        "auth_type",
    )
    kwargs = {k: cfg.get("{}.{}".format(prefix, k), None) for k in keys}
    auth_policy = create_jwt_authentication_policy(config, **kwargs)
    cookie_policy = JWTCookieAuthenticationPolicy.make_from(
        auth_policy, cookie_name="CW_JWT", https_only=True, reissue_time=7200
    )
    log.info("JWT policy configured")
    return cookie_policy


def _request_create_token(request, principal, expiration=None, audience=None, **claims):

    return request.authentication_policy.create_token(
        principal, expiration, audience, **claims
    )


def _request_claims(request):
    try:
        return jwt.decode(
            request.cookies.get(request.authentication_policy.cookie_name),
            request.authentication_policy.private_key,
            algorithms=[request.authentication_policy.algorithm],
        )
    except Exception:
        return {}


def setup_jwt(config: Configurator):
    config.include("pyramid_jwt")
    policy = create_cubicweb_jwt_cookie_policy(config)
    config.registry["cubicweb.authpolicy"]._policies.append(policy)
    config.add_request_method(_request_create_token, "create_jwt_token")
    config.add_request_method(_request_claims, "jwt_claims", reify=True)
    config.add_request_method(
        lambda request: policy, "authentication_policy", reify=True
    )