Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import logging
import jwt
from pyramid_jwt import JWTCookieAuthenticationPolicy, create_jwt_authentication_policy
from pyramid.config import Configurator
log = logging.getLogger(__name__)
def create_cubicweb_jwt_cookie_policy(config, prefix="cubicweb.auth.jwt"):
cfg = config.registry.settings
if prefix + ".private_key" not in cfg:
log.info("JWT policy %s disabled (no private_key defined)", prefix)
return
keys = (
"private_key",
"public_key",
"algorithm",
"expiration",
"leeway",
"http_header",
"auth_type",
)
kwargs = {k: cfg.get("{}.{}".format(prefix, k), None) for k in keys}
auth_policy = create_jwt_authentication_policy(config, **kwargs)
cookie_policy = JWTCookieAuthenticationPolicy.make_from(
auth_policy, cookie_name="CW_JWT", https_only=True, reissue_time=7200
)
log.info("JWT policy configured")
return cookie_policy
def _request_create_token(request, principal, expiration=None, audience=None, **claims):
return request.authentication_policy.create_token(
principal, expiration, audience, **claims
)
def _request_claims(request):
try:
return jwt.decode(
request.cookies.get(request.authentication_policy.cookie_name),
request.authentication_policy.private_key,
algorithms=[request.authentication_policy.algorithm],
)
except Exception:
return {}
def setup_jwt(config: Configurator):
config.include("pyramid_jwt")
policy = create_cubicweb_jwt_cookie_policy(config)
config.registry["cubicweb.authpolicy"]._policies.append(policy)
config.add_request_method(_request_create_token, "create_jwt_token")
config.add_request_method(_request_claims, "jwt_claims", reify=True)
config.add_request_method(
lambda request: policy, "authentication_policy", reify=True
)