Files
workout_watcher/services/bff/app/main.py
T

194 lines
6.4 KiB
Python

from typing import Annotated, Any
import httpx
from fastapi import Depends, FastAPI, File, HTTPException, Query, UploadFile, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.core import settings
from app.db import create_schema, get_db
from app.models import Base, User
from app.s3 import upload_catalog_image
from app.schemas import MediaUploadRead, TokenRead, UserCreate, UserLogin, UserRead
from app.security import create_access_token, get_current_user, hash_password, verify_password
app = FastAPI(title="Train Watcher BFF", version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://127.0.0.1:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Db = Annotated[Session, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]
@app.on_event("startup")
def on_startup() -> None:
create_schema(Base.metadata)
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/auth/register", response_model=TokenRead, status_code=status.HTTP_201_CREATED)
def register(payload: UserCreate, db: Db) -> TokenRead:
user = User(
email=payload.email.lower(),
password_hash=hash_password(payload.password),
display_name=payload.display_name,
)
db.add(user)
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered",
) from exc
db.refresh(user)
return TokenRead(access_token=create_access_token(user), user=UserRead.model_validate(user))
@app.post("/auth/login", response_model=TokenRead)
def login(payload: UserLogin, db: Db) -> TokenRead:
user = db.scalar(select(User).where(User.email == payload.email.lower()))
if not user or not verify_password(payload.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
)
return TokenRead(access_token=create_access_token(user), user=UserRead.model_validate(user))
@app.post("/auth/logout")
def logout() -> dict[str, str]:
return {"status": "ok"}
@app.get("/me", response_model=UserRead)
def me(user: CurrentUser) -> User:
return user
def logic_headers(user: User) -> dict[str, str]:
return {"X-Service-Token": settings.service_token, "X-User-Id": str(user.id)}
async def logic_request(
method: str,
path: str,
user: User,
*,
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> Any:
async with httpx.AsyncClient(base_url=settings.logic_base_url, timeout=10) as client:
response = await client.request(
method,
path,
headers=logic_headers(user),
json=json,
params=params,
)
if response.status_code >= 400:
try:
detail = response.json().get("detail", response.text)
except ValueError:
detail = response.text
raise HTTPException(status_code=response.status_code, detail=detail)
if not response.content:
return None
return response.json()
@app.post("/media/images", response_model=MediaUploadRead, status_code=status.HTTP_201_CREATED)
async def upload_image(
user: CurrentUser,
entity_type: Annotated[str, Query(pattern="^(equipment|exercise)$")],
file: Annotated[UploadFile, File()],
) -> dict[str, str]:
return await upload_catalog_image(file, user.id, entity_type)
@app.get("/catalog/equipment")
async def list_equipment(user: CurrentUser, search: str | None = None) -> Any:
return await logic_request(
"GET", "/internal/catalog/equipment", user, params={"search": search}
)
@app.post("/catalog/equipment", status_code=status.HTTP_201_CREATED)
async def create_equipment(payload: dict[str, Any], user: CurrentUser) -> Any:
return await logic_request("POST", "/internal/catalog/equipment", user, json=payload)
@app.get("/catalog/exercises")
async def list_exercises(user: CurrentUser, search: str | None = None) -> Any:
return await logic_request(
"GET", "/internal/catalog/exercises", user, params={"search": search}
)
@app.post("/catalog/exercises", status_code=status.HTTP_201_CREATED)
async def create_exercise(payload: dict[str, Any], user: CurrentUser) -> Any:
return await logic_request("POST", "/internal/catalog/exercises", user, json=payload)
@app.get("/workouts")
async def list_workouts(user: CurrentUser) -> Any:
return await logic_request("GET", "/internal/workouts", user)
@app.post("/workouts", status_code=status.HTTP_201_CREATED)
async def create_workout(payload: dict[str, Any], user: CurrentUser) -> Any:
return await logic_request("POST", "/internal/workouts", user, json=payload)
@app.get("/workouts/{workout_id}")
async def get_workout(workout_id: str, user: CurrentUser) -> Any:
return await logic_request("GET", f"/internal/workouts/{workout_id}", user)
@app.patch("/workouts/{workout_id}")
async def update_workout(workout_id: str, payload: dict[str, Any], user: CurrentUser) -> Any:
return await logic_request("PATCH", f"/internal/workouts/{workout_id}", user, json=payload)
@app.post("/workouts/{workout_id}/items", status_code=status.HTTP_201_CREATED)
async def add_workout_item(workout_id: str, payload: dict[str, Any], user: CurrentUser) -> Any:
return await logic_request("POST", f"/internal/workouts/{workout_id}/items", user, json=payload)
@app.post("/workout-items/{item_id}/sets", status_code=status.HTTP_201_CREATED)
async def add_workout_set(item_id: str, payload: dict[str, Any], user: CurrentUser) -> Any:
return await logic_request(
"POST", f"/internal/workout-items/{item_id}/sets", user, json=payload
)
@app.get("/analytics/progression")
async def progression(
user: CurrentUser,
kind: Annotated[str, Query(pattern="^(exercise|equipment)$")] = "exercise",
entity_id: str | None = None,
) -> Any:
return await logic_request(
"GET",
"/internal/analytics/progression",
user,
params={"kind": kind, "entity_id": entity_id},
)
@app.get("/analytics/calories")
async def calories(user: CurrentUser) -> Any:
return await logic_request("GET", "/internal/analytics/calories", user)