Files
workout_watcher/services/bff/app/security.py
T

56 lines
1.8 KiB
Python

import uuid
from datetime import UTC, datetime, timedelta
from typing import Annotated
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pwdlib import PasswordHash
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core import settings
from app.db import get_db
from app.models import User
password_hash = PasswordHash.recommended()
bearer = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
return password_hash.hash(password)
def verify_password(password: str, hashed: str) -> bool:
return password_hash.verify(password, hashed)
def create_access_token(user: User) -> str:
expires_at = datetime.now(UTC) + timedelta(minutes=settings.access_token_ttl_minutes)
payload = {"sub": str(user.id), "email": user.email, "exp": expires_at}
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer)],
db: Annotated[Session, Depends(get_db)],
) -> User:
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing bearer token")
try:
payload = jwt.decode(
credentials.credentials,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
)
user_id = uuid.UUID(payload["sub"])
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
) from exc
user = db.scalar(select(User).where(User.id == user_id))
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user