import uuid from datetime import UTC, datetime, timedelta from typing import Annotated import jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pwdlib import PasswordHash from sqlalchemy import select from sqlalchemy.orm import Session from app.core import settings from app.db import get_db from app.models import User password_hash = PasswordHash.recommended() bearer = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: return password_hash.hash(password) def verify_password(password: str, hashed: str) -> bool: return password_hash.verify(password, hashed) def create_access_token(user: User) -> str: expires_at = datetime.now(UTC) + timedelta(minutes=settings.access_token_ttl_minutes) payload = {"sub": str(user.id), "email": user.email, "exp": expires_at} return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm) def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer)], db: Annotated[Session, Depends(get_db)], ) -> User: if credentials is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing bearer token") try: payload = jwt.decode( credentials.credentials, settings.jwt_secret, algorithms=[settings.jwt_algorithm], ) user_id = uuid.UUID(payload["sub"]) except Exception as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) from exc user = db.scalar(select(User).where(User.id == user_id)) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") return user