import json import re import uuid from collections import defaultdict from datetime import UTC, datetime from pathlib import Path from typing import Annotated import boto3 from fastapi import Body, Depends, FastAPI, Header, HTTPException, Query, status from sqlalchemy import func, select from sqlalchemy.orm import Session, selectinload from app.core import settings from app.db import SessionLocal, create_schema, get_db from app.models import ActivitySource, Base, Equipment, Exercise, Workout, WorkoutItem, WorkoutSet from app.schemas import ( ActivitySourceCreate, ActivitySourceRead, CaloriesRead, EquipmentCreate, ExerciseCreate, ProgressionPoint, ProgressionRead, WorkoutCreate, WorkoutFinishRequest, WorkoutItemCreate, WorkoutItemRead, WorkoutRead, WorkoutSetBatchCreate, WorkoutSetCreate, WorkoutSetRead, WorkoutSetUpdate, WorkoutUpdate, ) app = FastAPI(title="Train Watcher Logic Service", version="0.1.0") SEED_FILE = Path(__file__).parent / "seeds" / "activity_sources.json" ASSET_CONTENT_TYPES = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp", ".avif": "image/avif", } def require_service_token(x_service_token: Annotated[str | None, Header()] = None) -> None: if x_service_token != settings.service_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid service token", ) def get_user_id(x_user_id: Annotated[str | None, Header()] = None) -> uuid.UUID: if not x_user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Missing X-User-Id header", ) try: return uuid.UUID(x_user_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid X-User-Id", ) from exc InternalAuth = Depends(require_service_token) Db = Annotated[Session, Depends(get_db)] CurrentUserId = Annotated[uuid.UUID, Depends(get_user_id)] @app.on_event("startup") def on_startup() -> None: create_schema(Base.metadata) with SessionLocal() as db: seed_builtin_catalog(db) @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} def seed_builtin_catalog(db: Session) -> None: if not SEED_FILE.exists(): return seed_rows = json.loads(SEED_FILE.read_text(encoding="utf-8")) assets_dir = resolve_builtin_assets_dir() if not assets_dir: return for row in seed_rows: asset_path = assets_dir / row["asset_filename"] image = upload_builtin_asset(row["kind"], row["slug"], asset_path) if not image: continue activity = db.scalar(select(ActivitySource).where(ActivitySource.slug == row["slug"])) if activity is None: activity = ActivitySource(slug=row["slug"], owner_user_id=None, is_builtin=True) db.add(activity) activity.kind = row["kind"] activity.title = row["title"] activity.description = row.get("description") activity.category = row["category"] activity.equipment = row["equipment"] activity.measurement_type = row["measurement_type"] activity.difficulty = row["difficulty"] activity.default_calories_per_minute = row.get("default_calories_per_minute") activity.image_s3_key = image["image_s3_key"] activity.image_s3_url = image["image_s3_url"] activity.is_builtin = True db.commit() def resolve_builtin_assets_dir() -> Path | None: configured = Path(settings.builtin_assets_dir) candidates = [configured, Path(__file__).resolve().parents[3] / "workout_assets"] for candidate in candidates: if candidate.exists() and candidate.is_dir(): return candidate return None def upload_builtin_asset(kind: str, slug: str, asset_path: Path) -> dict[str, str] | None: if not asset_path.exists() or asset_path.suffix.lower() not in ASSET_CONTENT_TYPES: return None extension = asset_path.suffix.lower() object_key = f"builtin/activity-sources/{kind}/{slug}{extension}" try: boto3.client( "s3", endpoint_url=settings.s3_endpoint_url, aws_access_key_id=settings.s3_access_key_id, aws_secret_access_key=settings.s3_secret_access_key, region_name=settings.s3_region, ).put_object( Bucket=settings.s3_bucket, Key=object_key, Body=asset_path.read_bytes(), ContentType=ASSET_CONTENT_TYPES[extension], ) except Exception as exc: # noqa: BLE001 - failed asset upload should not block service startup print(f"Skipping builtin asset {asset_path}: {exc}", flush=True) return None public_base = settings.s3_public_base_url.rstrip("/") return { "image_s3_key": object_key, "image_s3_url": f"{public_base}/{settings.s3_bucket}/{object_key}", } def accessible_equipment(db: Session, user_id: uuid.UUID): return select(Equipment).where( (Equipment.is_builtin.is_(True)) | (Equipment.owner_user_id == user_id) ) def accessible_exercises(db: Session, user_id: uuid.UUID): return select(Exercise).where( (Exercise.is_builtin.is_(True)) | (Exercise.owner_user_id == user_id) ) def accessible_activity_sources(db: Session, user_id: uuid.UUID): return select(ActivitySource).where( (ActivitySource.is_builtin.is_(True)) | (ActivitySource.owner_user_id == user_id) ) def normalize_kind(kind: str | None) -> str | None: if kind == "equipment": return "machine" return kind def slugify(value: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") return slug or "activity" def unique_user_slug(db: Session, title: str, user_id: uuid.UUID) -> str: base = f"custom-{slugify(title)}-{str(user_id)[:8]}" candidate = base index = 2 while db.scalar(select(ActivitySource.id).where(ActivitySource.slug == candidate)): candidate = f"{base}-{index}" index += 1 return candidate def load_workout(db: Session, workout_id: uuid.UUID, user_id: uuid.UUID) -> Workout: workout = db.scalar( select(Workout) .where(Workout.id == workout_id, Workout.user_id == user_id) .options( selectinload(Workout.items).selectinload(WorkoutItem.sets), selectinload(Workout.items).selectinload(WorkoutItem.activity_source), ) ) if not workout: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout not found") return workout def get_active_workout_for_user(db: Session, user_id: uuid.UUID) -> Workout | None: return db.scalar( select(Workout) .where(Workout.user_id == user_id, Workout.status == "active") .options( selectinload(Workout.items).selectinload(WorkoutItem.sets), selectinload(Workout.items).selectinload(WorkoutItem.activity_source), ) .order_by(Workout.started_at.desc()) ) def ensure_active_workout(workout: Workout) -> None: if workout.status != "active": raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Workout is not active", ) @app.get( "/internal/catalog/activity-sources", dependencies=[InternalAuth], response_model=list[ActivitySourceRead], ) def list_activity_sources( db: Db, user_id: CurrentUserId, search: str | None = None, kind: Annotated[str | None, Query(pattern="^(exercise|machine|equipment)$")] = None, category: str | None = None, scope: Annotated[str, Query(pattern="^(all|builtin|mine)$")] = "all", ) -> list[ActivitySource]: statement = accessible_activity_sources(db, user_id).order_by( ActivitySource.is_builtin.desc(), ActivitySource.title ) normalized_kind = normalize_kind(kind) if normalized_kind: statement = statement.where(ActivitySource.kind == normalized_kind) if category: statement = statement.where(ActivitySource.category == category) if scope == "builtin": statement = statement.where(ActivitySource.is_builtin.is_(True)) elif scope == "mine": statement = statement.where(ActivitySource.owner_user_id == user_id) if search: needle = f"%{search}%" statement = statement.where( ActivitySource.title.ilike(needle) | ActivitySource.description.ilike(needle) ) return list(db.scalars(statement)) @app.post( "/internal/catalog/activity-sources", dependencies=[InternalAuth], response_model=ActivitySourceRead, status_code=status.HTTP_201_CREATED, ) def create_activity_source( payload: ActivitySourceCreate, db: Db, user_id: CurrentUserId ) -> ActivitySource: slug = payload.slug or unique_user_slug(db, payload.title, user_id) if db.scalar(select(ActivitySource.id).where(ActivitySource.slug == slug)): raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Slug already exists") data = payload.model_dump(exclude={"slug"}) activity = ActivitySource( slug=slug, owner_user_id=user_id, is_builtin=False, **data, ) db.add(activity) db.commit() db.refresh(activity) return activity @app.get( "/internal/catalog/equipment", dependencies=[InternalAuth], response_model=list[ActivitySourceRead], ) def list_equipment( db: Db, user_id: CurrentUserId, search: str | None = None ) -> list[ActivitySource]: statement = ( accessible_activity_sources(db, user_id) .where(ActivitySource.kind == "machine") .order_by(ActivitySource.is_builtin.desc(), ActivitySource.title) ) if search: statement = statement.where(ActivitySource.title.ilike(f"%{search}%")) return list(db.scalars(statement)) @app.post( "/internal/catalog/equipment", dependencies=[InternalAuth], response_model=ActivitySourceRead, status_code=status.HTTP_201_CREATED, ) def create_equipment(payload: EquipmentCreate, db: Db, user_id: CurrentUserId) -> ActivitySource: activity = ActivitySource( owner_user_id=user_id, slug=unique_user_slug(db, payload.name, user_id), kind="machine", title=payload.name, description=payload.description, category="other", equipment="machine", measurement_type="weight_reps", difficulty="intermediate", image_s3_url=payload.image_s3_url, image_s3_key=payload.image_s3_key, is_builtin=False, ) db.add(activity) db.commit() db.refresh(activity) return activity @app.get( "/internal/catalog/exercises", dependencies=[InternalAuth], response_model=list[ActivitySourceRead], ) def list_exercises( db: Db, user_id: CurrentUserId, search: str | None = None ) -> list[ActivitySource]: statement = accessible_activity_sources(db, user_id).where( ActivitySource.kind == "exercise" ).order_by(ActivitySource.is_builtin.desc(), ActivitySource.title) if search: statement = statement.where(ActivitySource.title.ilike(f"%{search}%")) return list(db.scalars(statement)) @app.post( "/internal/catalog/exercises", dependencies=[InternalAuth], response_model=ActivitySourceRead, status_code=status.HTTP_201_CREATED, ) def create_exercise(payload: ExerciseCreate, db: Db, user_id: CurrentUserId) -> ActivitySource: activity = ActivitySource( owner_user_id=user_id, slug=unique_user_slug(db, payload.name, user_id), kind="exercise", title=payload.name, description=payload.description, category="other", equipment="other", measurement_type="weight_reps", difficulty="intermediate", image_s3_url=payload.image_s3_url, image_s3_key=payload.image_s3_key, default_calories_per_minute=payload.default_calories_per_minute, is_builtin=False, ) db.add(activity) db.commit() db.refresh(activity) return activity @app.get("/internal/workouts", dependencies=[InternalAuth], response_model=list[WorkoutRead]) def list_workouts(db: Db, user_id: CurrentUserId) -> list[Workout]: return list( db.scalars( select(Workout) .where(Workout.user_id == user_id) .options( selectinload(Workout.items).selectinload(WorkoutItem.sets), selectinload(Workout.items).selectinload(WorkoutItem.activity_source), ) .order_by(Workout.started_at.desc()) ) ) @app.post( "/internal/workouts", dependencies=[InternalAuth], response_model=WorkoutRead, status_code=status.HTTP_201_CREATED, ) def create_workout(payload: WorkoutCreate, db: Db, user_id: CurrentUserId) -> Workout: if get_active_workout_for_user(db, user_id): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Active workout already exists", ) workout = Workout( user_id=user_id, status="active", started_at=payload.started_at or datetime.now(UTC), notes=payload.notes, ) db.add(workout) db.commit() db.refresh(workout) return workout @app.get( "/internal/workouts/active", dependencies=[InternalAuth], response_model=WorkoutRead | None, ) def get_active_workout(db: Db, user_id: CurrentUserId) -> Workout | None: return get_active_workout_for_user(db, user_id) @app.get("/internal/workouts/{workout_id}", dependencies=[InternalAuth], response_model=WorkoutRead) def get_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout: return load_workout(db, workout_id, user_id) @app.patch( "/internal/workouts/{workout_id}", dependencies=[InternalAuth], response_model=WorkoutRead, ) def update_workout( workout_id: uuid.UUID, payload: WorkoutUpdate, db: Db, user_id: CurrentUserId ) -> Workout: workout = load_workout(db, workout_id, user_id) if payload.finished_at is not None: workout.finished_at = payload.finished_at workout.status = "finished" if payload.notes is not None: workout.notes = payload.notes recalculate_workout_totals(db, workout.id) db.commit() db.refresh(workout) return workout @app.post( "/internal/workouts/{workout_id}/finish", dependencies=[InternalAuth], response_model=WorkoutRead, ) def finish_workout( workout_id: uuid.UUID, db: Db, user_id: CurrentUserId, payload: Annotated[WorkoutFinishRequest | None, Body()] = None, ) -> Workout: workout = load_workout(db, workout_id, user_id) ensure_active_workout(workout) if payload and payload.notes is not None: workout.notes = payload.notes recalculate_workout_totals(db, workout.id) workout.finished_at = datetime.now(UTC) workout.status = "finished" db.commit() db.refresh(workout) return workout @app.post( "/internal/workouts/{workout_id}/discard", dependencies=[InternalAuth], response_model=WorkoutRead, ) def discard_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout: workout = load_workout(db, workout_id, user_id) ensure_active_workout(workout) recalculate_workout_totals(db, workout.id) workout.finished_at = datetime.now(UTC) workout.status = "discarded" db.commit() db.refresh(workout) return workout @app.post( "/internal/workouts/{workout_id}/items", dependencies=[InternalAuth], response_model=WorkoutItemRead, status_code=status.HTTP_201_CREATED, ) def add_workout_item( workout_id: uuid.UUID, payload: WorkoutItemCreate, db: Db, user_id: CurrentUserId ) -> WorkoutItem: workout = load_workout(db, workout_id, user_id) ensure_active_workout(workout) source_kind: str title_snapshot: str image_s3_url_snapshot: str | None measurement_type_snapshot = "weight_reps" category_snapshot = "other" equipment_snapshot = "other" if payload.activity_source_id: activity = db.get(ActivitySource, payload.activity_source_id) if not activity or (not activity.is_builtin and activity.owner_user_id != user_id): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Activity source not found" ) source_kind = activity.kind title_snapshot = activity.title image_s3_url_snapshot = activity.image_s3_url measurement_type_snapshot = activity.measurement_type category_snapshot = activity.category equipment_snapshot = activity.equipment elif payload.exercise_id: exercise = db.get(Exercise, payload.exercise_id) if not exercise or (not exercise.is_builtin and exercise.owner_user_id != user_id): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Exercise not found") source_kind = "exercise" title_snapshot = exercise.name image_s3_url_snapshot = exercise.image_s3_url elif payload.equipment_id: equipment = db.get(Equipment, payload.equipment_id) if not equipment or (not equipment.is_builtin and equipment.owner_user_id != user_id): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Equipment not found") source_kind = "machine" title_snapshot = equipment.name image_s3_url_snapshot = equipment.image_s3_url next_index = payload.order_index if next_index is None: max_index = db.scalar( select(func.max(WorkoutItem.order_index)).where(WorkoutItem.workout_id == workout.id) ) next_index = int(max_index or 0) + 1 if max_index is not None else 0 item = WorkoutItem( workout_id=workout.id, source_kind=source_kind, title_snapshot=title_snapshot, image_s3_url_snapshot=image_s3_url_snapshot, measurement_type_snapshot=measurement_type_snapshot, category_snapshot=category_snapshot, equipment_snapshot=equipment_snapshot, **payload.model_dump(exclude={"order_index"}), order_index=next_index, ) db.add(item) db.commit() db.refresh(item) return item @app.post( "/internal/workout-items/{item_id}/sets", dependencies=[InternalAuth], response_model=WorkoutSetRead, status_code=status.HTTP_201_CREATED, ) def add_workout_set( item_id: uuid.UUID, payload: WorkoutSetCreate, db: Db, user_id: CurrentUserId, ) -> WorkoutSet: item = db.scalar( select(WorkoutItem) .join(Workout) .where(WorkoutItem.id == item_id, Workout.user_id == user_id) .options( selectinload(WorkoutItem.sets), selectinload(WorkoutItem.activity_source), selectinload(WorkoutItem.exercise), selectinload(WorkoutItem.workout), ) ) if not item: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found") ensure_active_workout(item.workout) calories = estimate_set_calories( item, payload.weight, payload.reps, payload.duration_seconds, payload.calories, ) max_index = db.scalar( select(func.max(WorkoutSet.set_index)).where(WorkoutSet.workout_item_id == item.id) ) workout_set = WorkoutSet( workout_item_id=item.id, set_index=int(max_index or 0) + 1, weight=payload.weight, reps=payload.reps, duration_seconds=payload.duration_seconds, distance_km=payload.distance_km, calories=calories, completed_at=payload.completed_at or datetime.now(UTC), ) db.add(workout_set) db.flush() recalculate_workout_totals(db, item.workout_id) db.commit() db.refresh(workout_set) return workout_set @app.post( "/internal/workout-items/{item_id}/sets/batch", dependencies=[InternalAuth], response_model=list[WorkoutSetRead], status_code=status.HTTP_201_CREATED, ) def add_workout_sets_batch( item_id: uuid.UUID, payload: WorkoutSetBatchCreate, db: Db, user_id: CurrentUserId, ) -> list[WorkoutSet]: item = db.scalar( select(WorkoutItem) .join(Workout) .where(WorkoutItem.id == item_id, Workout.user_id == user_id) .options( selectinload(WorkoutItem.activity_source), selectinload(WorkoutItem.exercise), selectinload(WorkoutItem.workout), ) ) if not item: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found") ensure_active_workout(item.workout) max_index = db.scalar( select(func.max(WorkoutSet.set_index)).where(WorkoutSet.workout_item_id == item.id) ) next_index = int(max_index or 0) + 1 workout_sets: list[WorkoutSet] = [] for set_payload in payload.sets: workout_set = WorkoutSet( workout_item_id=item.id, set_index=next_index, weight=set_payload.weight, reps=set_payload.reps, duration_seconds=set_payload.duration_seconds, distance_km=set_payload.distance_km, calories=estimate_set_calories( item, set_payload.weight, set_payload.reps, set_payload.duration_seconds, set_payload.calories, ), completed_at=set_payload.completed_at or datetime.now(UTC), ) db.add(workout_set) workout_sets.append(workout_set) next_index += 1 db.flush() recalculate_workout_totals(db, item.workout_id) db.commit() for workout_set in workout_sets: db.refresh(workout_set) return workout_sets @app.patch( "/internal/workout-sets/{set_id}", dependencies=[InternalAuth], response_model=WorkoutSetRead, ) def update_workout_set( set_id: uuid.UUID, payload: WorkoutSetUpdate, db: Db, user_id: CurrentUserId, ) -> WorkoutSet: workout_set = db.scalar( select(WorkoutSet) .join(WorkoutItem) .join(Workout) .where(WorkoutSet.id == set_id, Workout.user_id == user_id) .options( selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.activity_source), selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.exercise), selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.workout), ) ) if not workout_set: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found") ensure_active_workout(workout_set.workout_item.workout) if payload.weight is not None: workout_set.weight = payload.weight if payload.reps is not None: workout_set.reps = payload.reps if "duration_seconds" in payload.model_fields_set: workout_set.duration_seconds = payload.duration_seconds if "distance_km" in payload.model_fields_set: workout_set.distance_km = payload.distance_km if "completed_at" in payload.model_fields_set and payload.completed_at is not None: workout_set.completed_at = payload.completed_at if "calories" in payload.model_fields_set: workout_set.calories = payload.calories else: workout_set.calories = estimate_set_calories( workout_set.workout_item, float(workout_set.weight or 0), int(workout_set.reps or 0), workout_set.duration_seconds, None, ) recalculate_workout_totals(db, workout_set.workout_item.workout_id) db.commit() db.refresh(workout_set) return workout_set @app.delete( "/internal/workout-items/{item_id}", dependencies=[InternalAuth], status_code=status.HTTP_204_NO_CONTENT, ) def delete_workout_item(item_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> None: item = db.scalar( select(WorkoutItem) .join(Workout) .where(WorkoutItem.id == item_id, Workout.user_id == user_id) ) if not item: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found") workout = db.get(Workout, item.workout_id) if workout: ensure_active_workout(workout) workout_id = item.workout_id db.delete(item) db.flush() recalculate_workout_totals(db, workout_id) db.commit() @app.delete( "/internal/workout-items/{item_id}/sets/{set_id}", dependencies=[InternalAuth], status_code=status.HTTP_204_NO_CONTENT, ) def delete_workout_set( item_id: uuid.UUID, set_id: uuid.UUID, db: Db, user_id: CurrentUserId ) -> None: ws = db.scalar( select(WorkoutSet) .join(WorkoutItem) .join(Workout) .where( WorkoutSet.id == set_id, WorkoutItem.id == item_id, Workout.user_id == user_id, ) ) if not ws: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found") workout = db.scalar( select(Workout).join(WorkoutItem).where(WorkoutItem.id == item_id) ) if workout: ensure_active_workout(workout) db.delete(ws) db.flush() reindex_workout_item_sets(db, item_id) if workout: recalculate_workout_totals(db, workout.id) db.commit() def reindex_workout_item_sets(db: Session, item_id: uuid.UUID) -> None: remaining_sets = list( db.scalars( select(WorkoutSet) .where(WorkoutSet.workout_item_id == item_id) .order_by(WorkoutSet.set_index.asc(), WorkoutSet.completed_at.asc()) ) ) for index, workout_set in enumerate(remaining_sets, start=1): workout_set.set_index = index def estimate_set_calories( item: WorkoutItem, weight: float, reps: int, duration_seconds: int | None, calories: float | None, ) -> float: if calories is not None: return calories if ( item.activity_source and item.activity_source.default_calories_per_minute and duration_seconds ): return round( float(item.activity_source.default_calories_per_minute) * duration_seconds / 60, 2, ) if item.exercise and item.exercise.default_calories_per_minute and duration_seconds: return round( float(item.exercise.default_calories_per_minute) * duration_seconds / 60, 2, ) return round((weight * max(reps, 1)) / 120, 2) def recalculate_workout_totals(db: Session, workout_id: uuid.UUID) -> None: total_sets, total_volume, estimated_calories = db.execute( select( func.count(WorkoutSet.id), func.coalesce(func.sum(WorkoutSet.weight * WorkoutSet.reps), 0), func.coalesce(func.sum(WorkoutSet.calories), 0), ) .join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id) .where(WorkoutItem.workout_id == workout_id) ).one() workout = db.get(Workout, workout_id) if workout: workout.total_sets = int(total_sets or 0) workout.total_volume = float(total_volume or 0) workout.estimated_calories = float(estimated_calories or 0) def recalculate_workout_calories(db: Session, workout_id: uuid.UUID) -> None: recalculate_workout_totals(db, workout_id) @app.get( "/internal/analytics/progression", dependencies=[InternalAuth], response_model=ProgressionRead, ) def get_progression( db: Db, user_id: CurrentUserId, kind: Annotated[str, Query(pattern="^(exercise|machine|equipment)$")], entity_id: Annotated[uuid.UUID | None, Query()] = None, ) -> ProgressionRead: statement = ( select(Workout.started_at, WorkoutSet.weight, WorkoutSet.reps) .join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id) .join(Workout, WorkoutItem.workout_id == Workout.id) .where(Workout.user_id == user_id, Workout.status != "discarded") .order_by(Workout.started_at.asc(), WorkoutSet.completed_at.asc()) ) normalized_kind = normalize_kind(kind) if entity_id: statement = statement.where(WorkoutItem.activity_source_id == entity_id) if normalized_kind: statement = statement.where(WorkoutItem.source_kind == normalized_kind) rows = list(db.execute(statement)) grouped: dict[str, dict[str, float]] = defaultdict(lambda: {"max_weight": 0, "volume": 0}) weights: list[float] = [] for started_at, weight, reps in rows: date_key = started_at.date().isoformat() numeric_weight = float(weight or 0) grouped[date_key]["max_weight"] = max(grouped[date_key]["max_weight"], numeric_weight) grouped[date_key]["volume"] += numeric_weight * int(reps or 0) weights.append(numeric_weight) points = [ ProgressionPoint(date=date, max_weight=values["max_weight"], volume=values["volume"]) for date, values in sorted(grouped.items()) ] previous_delta = None if len(weights) >= 2: previous_delta = round(weights[-1] - weights[-2], 2) return ProgressionRead( last_weight=weights[-1] if weights else None, max_weight=max(weights) if weights else None, previous_delta=previous_delta, points=points, ) @app.get("/internal/analytics/calories", dependencies=[InternalAuth], response_model=CaloriesRead) def get_calories(db: Db, user_id: CurrentUserId) -> CaloriesRead: workouts = list( db.scalars( select(Workout) .where(Workout.user_id == user_id, Workout.status != "discarded") .order_by(Workout.started_at.desc()) ) ) total = sum(float(workout.estimated_calories or 0) for workout in workouts) return CaloriesRead( total_calories=round(total, 2), workouts=[ { "id": str(workout.id), "date": workout.started_at.date().isoformat(), "calories": float(workout.estimated_calories or 0), } for workout in workouts ], )