Skip to content
Snippets Groups Projects

feat: add a OAuth authentication mechanism

2 files
+ 102
16
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 101
16
#!/usr/bin/env python3
# coding: utf-8
from fastapi import FastAPI
from typing import Annotated
from datetime import timedelta, datetime, timezone
import jwt
from fastapi import FastAPI, Depends, status, HTTPException
from pydantic import BaseModel
@@ -5,4 +10,6 @@
from pydantic import BaseModel
from jwt import InvalidTokenError
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from cubicweb.utils import admincnx
from cubicweb.server.session import Connection
@@ -6,5 +13,14 @@
from cubicweb.utils import admincnx
from cubicweb.server.session import Connection
app = FastAPI()
APP_ID = "blog"
# openssl rand -hex 32
SECRET_KEY = "38bab170890dcea3415a06c75e12b4feb765e0f6be25c55a90bc2aac53e725a8"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class CubicWeb:
def __init__(self, appid):
self.appid = appid
@@ -10,5 +26,18 @@
APP_ID = "blog-flask"
def create_admin_cnx(self):
self.cnx = admincnx(self.appid)
app = FastAPI()
# maintain a global “CubicWeb” instance
# see https://github.com/fastapi/fastapi/issues/1800
cw = CubicWeb(APP_ID)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class RQLBody(BaseModel):
@@ -12,8 +41,73 @@
class RQLBody(BaseModel):
q: str
login: str
query: str
args: dict[str, str | float | int | datetime]
def get_current_active_user_cnx(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_eid = payload.get("sub")
if user_eid is None:
raise credentials_exception
except InvalidTokenError:
raise credentials_exception
with cw.cnx as cnx:
user = cnx.entity_from_eid(int(user_eid))
with Connection(cnx.repo, user) as user_cnx:
yield user_cnx
UserCnxDep = Annotated[Connection, Depends(get_current_active_user_cnx)]
def authenticate_user(username: str, password: str):
with cw.cnx as cnx:
try:
return cnx.repo.authenticate_user(cnx, username, password=password)
except Exception:
return None
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.on_event("startup")
def startup():
cw.create_admin_cnx()
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.eid)}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.post("/rql/")
@@ -17,14 +111,5 @@
@app.post("/rql/")
def rql(body: RQLBody):
with admincnx(APP_ID) as cnx:
# For demo purpose, we authenticate the user with their login.
user = cnx.execute(
"Any X WHERE X is CWUser, X login %(l)s", {"l": body.login}
).one()
# Once the user is authenticated, open a user connection and execute the
# query
with Connection(cnx.repo, user) as user_cnx:
return list(user_cnx.execute(body.q))
def rql(body: RQLBody, cnx: UserCnxDep):
return list(cnx.execute(body.query, body.args))
Loading