Select Git revision
authentication.py 2.69 KiB
from fastapi import APIRouter, Cookie, HTTPException, Depends
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from typing import Optional, Union
from requests import get, post
from urllib.parse import urlencode
from dotenv import load_dotenv
import os
from db.database import get_db
from db import crud
# load environment variables
load_dotenv("../.env")
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.get("/")
async def whoami(connect_id: str = Cookie(...), db: Session = Depends(get_db)):
user = crud.get_user(connect_id, db)
return user.username
@router.get("/login")
async def login(code: Optional[str] = None, state: Optional[str] = None, connect_id: Union[str, None] = Cookie(default=None), db: Session = Depends(get_db)):
if not code:
init_user = crud.init_user(db)
params = urlencode({
"client_id": os.getenv("CLIENT_ID"),
"redirect_uri": f"{os.getenv('API_ROOT')}/auth/login",
"response_type": "code",
"state": init_user.state,
"scope": "default"})
response = RedirectResponse(f"{os.getenv('AUTH_ROOT')}/oauth/authorize?{params}")
response.set_cookie(key="connect_id", value=init_user.cookie)
return response
if not connect_id or not state:
raise HTTPException(status_code=403, detail="Cookie Invalid")
user = crud.get_user(connect_id, db)
if not user:
raise HTTPException(status_code=599, detail="Timeout error")
if user.state != state:
raise HTTPException(status_code=403, detail="State Invalid")
crud.delete_state(user, db)
headers = {"content-type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": f"{os.getenv('API_ROOT')}/auth/login",
"client_id": os.getenv("CLIENT_ID"),
"client_secret": os.getenv("CLIENT_SECRET"),
}
token_response = post(
f"{os.getenv('AUTH_ROOT')}/oauth/token",
data=data,
headers=headers
)
access_token = token_response.json()["access_token"]
user_info = get(
f"{os.getenv('AUTH_ROOT')}/api/user/show/me",
headers={"Authorization": f"Bearer {access_token}"}
)
user = crud.update_user(user, user_info.json(), db)
return RedirectResponse(f"{os.getenv('WEB_ROOT')}?connected=true")
@router.get("/logout")
async def delete_session(connect_id: str = Cookie(...), db: Session = Depends(get_db)):
response = RedirectResponse(f"{os.getenv('AUTH_ROOT')}/logout?{urlencode({'redirect_logout': os.getenv('WEB_ROOT')})}")
response.delete_cookie(key="connect_id")
crud.end_session(connect_id, db)
return response