import uuid from collections import defaultdict from datetime import UTC, datetime from typing import Annotated from fastapi import Depends, FastAPI, Header, HTTPException, Query, status from sqlalchemy import func, select from sqlalchemy.orm import Session, selectinload from app.core import settings from app.db import SessionLocal, create_schema, get_db from app.models import Base, Equipment, Exercise, Workout, WorkoutItem, WorkoutSet from app.schemas import ( CaloriesRead, EquipmentCreate, EquipmentRead, ExerciseCreate, ExerciseRead, ProgressionPoint, ProgressionRead, WorkoutCreate, WorkoutItemCreate, WorkoutItemRead, WorkoutRead, WorkoutSetCreate, WorkoutSetRead, WorkoutUpdate, ) app = FastAPI(title="Train Watcher Logic Service", version="0.1.0") def require_service_token(x_service_token: Annotated[str | None, Header()] = None) -> None: if x_service_token != settings.service_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid service token", ) def get_user_id(x_user_id: Annotated[str | None, Header()] = None) -> uuid.UUID: if not x_user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Missing X-User-Id header", ) try: return uuid.UUID(x_user_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid X-User-Id", ) from exc InternalAuth = Depends(require_service_token) Db = Annotated[Session, Depends(get_db)] CurrentUserId = Annotated[uuid.UUID, Depends(get_user_id)] @app.on_event("startup") def on_startup() -> None: create_schema(Base.metadata) with SessionLocal() as db: seed_builtin_catalog(db) @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} def seed_builtin_catalog(db: Session) -> None: exists = db.scalar( select(func.count()).select_from(Equipment).where(Equipment.is_builtin.is_(True)) ) if exists: return treadmill = Equipment( name="Беговая дорожка", description="Кардио-тренажер для ходьбы и бега.", is_builtin=True, ) smith = Equipment( name="Машина Смита", description="Силовая рама с фиксированной траекторией грифа.", is_builtin=True, ) db.add_all([treadmill, smith]) db.flush() db.add_all( [ Exercise( name="Жим лежа", description="Базовое упражнение для груди, трицепса и передней дельты.", is_builtin=True, default_calories_per_minute=6, ), Exercise( name="Приседания", description="Базовое упражнение для ног и корпуса.", is_builtin=True, default_calories_per_minute=8, ), Exercise( name="Бег", description="Кардио-нагрузка на беговой дорожке.", equipment_id=treadmill.id, is_builtin=True, default_calories_per_minute=10, ), ] ) db.commit() def accessible_equipment(db: Session, user_id: uuid.UUID): return select(Equipment).where( (Equipment.is_builtin.is_(True)) | (Equipment.owner_user_id == user_id) ) def accessible_exercises(db: Session, user_id: uuid.UUID): return select(Exercise).where( (Exercise.is_builtin.is_(True)) | (Exercise.owner_user_id == user_id) ) @app.get( "/internal/catalog/equipment", dependencies=[InternalAuth], response_model=list[EquipmentRead], ) def list_equipment(db: Db, user_id: CurrentUserId, search: str | None = None) -> list[Equipment]: statement = accessible_equipment(db, user_id).order_by( Equipment.is_builtin.desc(), Equipment.name ) if search: statement = statement.where(Equipment.name.ilike(f"%{search}%")) return list(db.scalars(statement)) @app.post( "/internal/catalog/equipment", dependencies=[InternalAuth], response_model=EquipmentRead, status_code=status.HTTP_201_CREATED, ) def create_equipment(payload: EquipmentCreate, db: Db, user_id: CurrentUserId) -> Equipment: equipment = Equipment(owner_user_id=user_id, is_builtin=False, **payload.model_dump()) db.add(equipment) db.commit() db.refresh(equipment) return equipment @app.get( "/internal/catalog/exercises", dependencies=[InternalAuth], response_model=list[ExerciseRead], ) def list_exercises(db: Db, user_id: CurrentUserId, search: str | None = None) -> list[Exercise]: statement = accessible_exercises(db, user_id).order_by( Exercise.is_builtin.desc(), Exercise.name ) if search: statement = statement.where(Exercise.name.ilike(f"%{search}%")) return list(db.scalars(statement)) @app.post( "/internal/catalog/exercises", dependencies=[InternalAuth], response_model=ExerciseRead, status_code=status.HTTP_201_CREATED, ) def create_exercise(payload: ExerciseCreate, db: Db, user_id: CurrentUserId) -> Exercise: if payload.equipment_id: equipment = db.get(Equipment, payload.equipment_id) if not equipment or (not equipment.is_builtin and equipment.owner_user_id != user_id): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Equipment not found") exercise = Exercise(owner_user_id=user_id, is_builtin=False, **payload.model_dump()) db.add(exercise) db.commit() db.refresh(exercise) return exercise @app.get("/internal/workouts", dependencies=[InternalAuth], response_model=list[WorkoutRead]) def list_workouts(db: Db, user_id: CurrentUserId) -> list[Workout]: return list( db.scalars( select(Workout) .where(Workout.user_id == user_id) .options(selectinload(Workout.items).selectinload(WorkoutItem.sets)) .order_by(Workout.started_at.desc()) ) ) @app.post( "/internal/workouts", dependencies=[InternalAuth], response_model=WorkoutRead, status_code=status.HTTP_201_CREATED, ) def create_workout(payload: WorkoutCreate, db: Db, user_id: CurrentUserId) -> Workout: workout = Workout( user_id=user_id, started_at=payload.started_at or datetime.now(UTC), notes=payload.notes, ) db.add(workout) db.commit() db.refresh(workout) return workout @app.get("/internal/workouts/{workout_id}", dependencies=[InternalAuth], response_model=WorkoutRead) def get_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout: workout = db.scalar( select(Workout) .where(Workout.id == workout_id, Workout.user_id == user_id) .options(selectinload(Workout.items).selectinload(WorkoutItem.sets)) ) if not workout: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout not found") return workout @app.patch( "/internal/workouts/{workout_id}", dependencies=[InternalAuth], response_model=WorkoutRead, ) def update_workout( workout_id: uuid.UUID, payload: WorkoutUpdate, db: Db, user_id: CurrentUserId ) -> Workout: workout = get_workout(workout_id, db, user_id) if payload.finished_at is not None: workout.finished_at = payload.finished_at if payload.notes is not None: workout.notes = payload.notes recalculate_workout_calories(db, workout.id) db.commit() db.refresh(workout) return workout @app.post( "/internal/workouts/{workout_id}/items", dependencies=[InternalAuth], response_model=WorkoutItemRead, status_code=status.HTTP_201_CREATED, ) def add_workout_item( workout_id: uuid.UUID, payload: WorkoutItemCreate, db: Db, user_id: CurrentUserId ) -> WorkoutItem: workout = get_workout(workout_id, db, user_id) if payload.exercise_id: exercise = db.get(Exercise, payload.exercise_id) if not exercise or (not exercise.is_builtin and exercise.owner_user_id != user_id): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Exercise not found") if payload.equipment_id: equipment = db.get(Equipment, payload.equipment_id) if not equipment or (not equipment.is_builtin and equipment.owner_user_id != user_id): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Equipment not found") next_index = payload.order_index if next_index is None: next_index = len(workout.items) item = WorkoutItem( workout_id=workout.id, **payload.model_dump(exclude={"order_index"}), order_index=next_index, ) db.add(item) db.commit() db.refresh(item) return item @app.post( "/internal/workout-items/{item_id}/sets", dependencies=[InternalAuth], response_model=WorkoutSetRead, status_code=status.HTTP_201_CREATED, ) def add_workout_set( item_id: uuid.UUID, payload: WorkoutSetCreate, db: Db, user_id: CurrentUserId, ) -> WorkoutSet: item = db.scalar( select(WorkoutItem) .join(Workout) .where(WorkoutItem.id == item_id, Workout.user_id == user_id) .options(selectinload(WorkoutItem.sets), selectinload(WorkoutItem.exercise)) ) if not item: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found") calories = payload.calories if calories is None: calories = estimate_set_calories(item, payload) workout_set = WorkoutSet( workout_item_id=item.id, set_index=len(item.sets) + 1, weight=payload.weight, reps=payload.reps, duration_seconds=payload.duration_seconds, calories=calories, completed_at=payload.completed_at or datetime.now(UTC), ) db.add(workout_set) db.flush() recalculate_workout_calories(db, item.workout_id) db.commit() db.refresh(workout_set) return workout_set @app.delete( "/internal/workout-items/{item_id}", dependencies=[InternalAuth], status_code=status.HTTP_204_NO_CONTENT, ) def delete_workout_item(item_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> None: item = db.scalar( select(WorkoutItem) .join(Workout) .where(WorkoutItem.id == item_id, Workout.user_id == user_id) ) if not item: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found") workout = db.get(Workout, item.workout_id) if workout and workout.finished_at: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Workout already finished") workout_id = item.workout_id db.delete(item) db.flush() recalculate_workout_calories(db, workout_id) db.commit() @app.delete( "/internal/workout-items/{item_id}/sets/{set_id}", dependencies=[InternalAuth], status_code=status.HTTP_204_NO_CONTENT, ) def delete_workout_set( item_id: uuid.UUID, set_id: uuid.UUID, db: Db, user_id: CurrentUserId ) -> None: ws = db.scalar( select(WorkoutSet) .join(WorkoutItem) .join(Workout) .where( WorkoutSet.id == set_id, WorkoutItem.id == item_id, Workout.user_id == user_id, ) ) if not ws: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found") workout = db.scalar( select(Workout).join(WorkoutItem).where(WorkoutItem.id == item_id) ) if workout and workout.finished_at: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Workout already finished") db.delete(ws) db.flush() if workout: recalculate_workout_calories(db, workout.id) db.commit() def estimate_set_calories(item: WorkoutItem, payload: WorkoutSetCreate) -> float: if item.exercise and item.exercise.default_calories_per_minute and payload.duration_seconds: return round( float(item.exercise.default_calories_per_minute) * payload.duration_seconds / 60, 2, ) return round((payload.weight * max(payload.reps, 1)) / 120, 2) def recalculate_workout_calories(db: Session, workout_id: uuid.UUID) -> None: total = db.scalar( select(func.coalesce(func.sum(WorkoutSet.calories), 0)) .join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id) .where(WorkoutItem.workout_id == workout_id) ) workout = db.get(Workout, workout_id) if workout: workout.estimated_calories = float(total or 0) @app.get( "/internal/analytics/progression", dependencies=[InternalAuth], response_model=ProgressionRead, ) def get_progression( db: Db, user_id: CurrentUserId, kind: Annotated[str, Query(pattern="^(exercise|equipment)$")], entity_id: Annotated[uuid.UUID | None, Query()] = None, ) -> ProgressionRead: statement = ( select(Workout.started_at, WorkoutSet.weight, WorkoutSet.reps) .join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id) .join(Workout, WorkoutItem.workout_id == Workout.id) .where(Workout.user_id == user_id) .order_by(Workout.started_at.asc(), WorkoutSet.completed_at.asc()) ) if entity_id and kind == "exercise": statement = statement.where(WorkoutItem.exercise_id == entity_id) elif entity_id and kind == "equipment": statement = statement.where(WorkoutItem.equipment_id == entity_id) rows = list(db.execute(statement)) grouped: dict[str, dict[str, float]] = defaultdict(lambda: {"max_weight": 0, "volume": 0}) weights: list[float] = [] for started_at, weight, reps in rows: date_key = started_at.date().isoformat() numeric_weight = float(weight or 0) grouped[date_key]["max_weight"] = max(grouped[date_key]["max_weight"], numeric_weight) grouped[date_key]["volume"] += numeric_weight * int(reps or 0) weights.append(numeric_weight) points = [ ProgressionPoint(date=date, max_weight=values["max_weight"], volume=values["volume"]) for date, values in sorted(grouped.items()) ] previous_delta = None if len(weights) >= 2: previous_delta = round(weights[-1] - weights[-2], 2) return ProgressionRead( last_weight=weights[-1] if weights else None, max_weight=max(weights) if weights else None, previous_delta=previous_delta, points=points, ) @app.get("/internal/analytics/calories", dependencies=[InternalAuth], response_model=CaloriesRead) def get_calories(db: Db, user_id: CurrentUserId) -> CaloriesRead: workouts = list( db.scalars( select(Workout).where(Workout.user_id == user_id).order_by(Workout.started_at.desc()) ) ) total = sum(float(workout.estimated_calories or 0) for workout in workouts) return CaloriesRead( total_calories=round(total, 2), workouts=[ { "id": str(workout.id), "date": workout.started_at.date().isoformat(), "calories": float(workout.estimated_calories or 0), } for workout in workouts ], )