800dee31b2
- Added boto3 as a dependency in pyproject.toml and uv.lock. - Introduced multiple new exercise images in various formats (jpg, webp, avif, png). - Added new machine images to enhance the workout assets library.
911 lines
30 KiB
Python
911 lines
30 KiB
Python
import json
|
|
import re
|
|
import uuid
|
|
from collections import defaultdict
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
import boto3
|
|
from fastapi import Body, Depends, FastAPI, Header, HTTPException, Query, status
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session, selectinload
|
|
|
|
from app.core import settings
|
|
from app.db import SessionLocal, create_schema, get_db
|
|
from app.models import ActivitySource, Base, Equipment, Exercise, Workout, WorkoutItem, WorkoutSet
|
|
from app.schemas import (
|
|
ActivitySourceCreate,
|
|
ActivitySourceRead,
|
|
CaloriesRead,
|
|
EquipmentCreate,
|
|
ExerciseCreate,
|
|
ProgressionPoint,
|
|
ProgressionRead,
|
|
WorkoutCreate,
|
|
WorkoutFinishRequest,
|
|
WorkoutItemCreate,
|
|
WorkoutItemRead,
|
|
WorkoutRead,
|
|
WorkoutSetBatchCreate,
|
|
WorkoutSetCreate,
|
|
WorkoutSetRead,
|
|
WorkoutSetUpdate,
|
|
WorkoutUpdate,
|
|
)
|
|
|
|
app = FastAPI(title="Train Watcher Logic Service", version="0.1.0")
|
|
|
|
SEED_FILE = Path(__file__).parent / "seeds" / "activity_sources.json"
|
|
ASSET_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".webp": "image/webp",
|
|
".avif": "image/avif",
|
|
}
|
|
|
|
|
|
def require_service_token(x_service_token: Annotated[str | None, Header()] = None) -> None:
|
|
if x_service_token != settings.service_token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid service token",
|
|
)
|
|
|
|
|
|
def get_user_id(x_user_id: Annotated[str | None, Header()] = None) -> uuid.UUID:
|
|
if not x_user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Missing X-User-Id header",
|
|
)
|
|
try:
|
|
return uuid.UUID(x_user_id)
|
|
except ValueError as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid X-User-Id",
|
|
) from exc
|
|
|
|
|
|
InternalAuth = Depends(require_service_token)
|
|
Db = Annotated[Session, Depends(get_db)]
|
|
CurrentUserId = Annotated[uuid.UUID, Depends(get_user_id)]
|
|
|
|
|
|
@app.on_event("startup")
|
|
def on_startup() -> None:
|
|
create_schema(Base.metadata)
|
|
with SessionLocal() as db:
|
|
seed_builtin_catalog(db)
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> dict[str, str]:
|
|
return {"status": "ok"}
|
|
|
|
|
|
def seed_builtin_catalog(db: Session) -> None:
|
|
if not SEED_FILE.exists():
|
|
return
|
|
seed_rows = json.loads(SEED_FILE.read_text(encoding="utf-8"))
|
|
assets_dir = resolve_builtin_assets_dir()
|
|
if not assets_dir:
|
|
return
|
|
|
|
for row in seed_rows:
|
|
asset_path = assets_dir / row["asset_filename"]
|
|
image = upload_builtin_asset(row["kind"], row["slug"], asset_path)
|
|
if not image:
|
|
continue
|
|
activity = db.scalar(select(ActivitySource).where(ActivitySource.slug == row["slug"]))
|
|
if activity is None:
|
|
activity = ActivitySource(slug=row["slug"], owner_user_id=None, is_builtin=True)
|
|
db.add(activity)
|
|
|
|
activity.kind = row["kind"]
|
|
activity.title = row["title"]
|
|
activity.description = row.get("description")
|
|
activity.category = row["category"]
|
|
activity.equipment = row["equipment"]
|
|
activity.measurement_type = row["measurement_type"]
|
|
activity.difficulty = row["difficulty"]
|
|
activity.default_calories_per_minute = row.get("default_calories_per_minute")
|
|
activity.image_s3_key = image["image_s3_key"]
|
|
activity.image_s3_url = image["image_s3_url"]
|
|
activity.is_builtin = True
|
|
db.commit()
|
|
|
|
|
|
def resolve_builtin_assets_dir() -> Path | None:
|
|
configured = Path(settings.builtin_assets_dir)
|
|
candidates = [configured, Path(__file__).resolve().parents[3] / "workout_assets"]
|
|
for candidate in candidates:
|
|
if candidate.exists() and candidate.is_dir():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def upload_builtin_asset(kind: str, slug: str, asset_path: Path) -> dict[str, str] | None:
|
|
if not asset_path.exists() or asset_path.suffix.lower() not in ASSET_CONTENT_TYPES:
|
|
return None
|
|
extension = asset_path.suffix.lower()
|
|
object_key = f"builtin/activity-sources/{kind}/{slug}{extension}"
|
|
try:
|
|
boto3.client(
|
|
"s3",
|
|
endpoint_url=settings.s3_endpoint_url,
|
|
aws_access_key_id=settings.s3_access_key_id,
|
|
aws_secret_access_key=settings.s3_secret_access_key,
|
|
region_name=settings.s3_region,
|
|
).put_object(
|
|
Bucket=settings.s3_bucket,
|
|
Key=object_key,
|
|
Body=asset_path.read_bytes(),
|
|
ContentType=ASSET_CONTENT_TYPES[extension],
|
|
)
|
|
except Exception as exc: # noqa: BLE001 - failed asset upload should not block service startup
|
|
print(f"Skipping builtin asset {asset_path}: {exc}", flush=True)
|
|
return None
|
|
public_base = settings.s3_public_base_url.rstrip("/")
|
|
return {
|
|
"image_s3_key": object_key,
|
|
"image_s3_url": f"{public_base}/{settings.s3_bucket}/{object_key}",
|
|
}
|
|
|
|
|
|
def accessible_equipment(db: Session, user_id: uuid.UUID):
|
|
return select(Equipment).where(
|
|
(Equipment.is_builtin.is_(True)) | (Equipment.owner_user_id == user_id)
|
|
)
|
|
|
|
|
|
def accessible_exercises(db: Session, user_id: uuid.UUID):
|
|
return select(Exercise).where(
|
|
(Exercise.is_builtin.is_(True)) | (Exercise.owner_user_id == user_id)
|
|
)
|
|
|
|
|
|
def accessible_activity_sources(db: Session, user_id: uuid.UUID):
|
|
return select(ActivitySource).where(
|
|
(ActivitySource.is_builtin.is_(True)) | (ActivitySource.owner_user_id == user_id)
|
|
)
|
|
|
|
|
|
def normalize_kind(kind: str | None) -> str | None:
|
|
if kind == "equipment":
|
|
return "machine"
|
|
return kind
|
|
|
|
|
|
def slugify(value: str) -> str:
|
|
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
|
|
return slug or "activity"
|
|
|
|
|
|
def unique_user_slug(db: Session, title: str, user_id: uuid.UUID) -> str:
|
|
base = f"custom-{slugify(title)}-{str(user_id)[:8]}"
|
|
candidate = base
|
|
index = 2
|
|
while db.scalar(select(ActivitySource.id).where(ActivitySource.slug == candidate)):
|
|
candidate = f"{base}-{index}"
|
|
index += 1
|
|
return candidate
|
|
|
|
|
|
def load_workout(db: Session, workout_id: uuid.UUID, user_id: uuid.UUID) -> Workout:
|
|
workout = db.scalar(
|
|
select(Workout)
|
|
.where(Workout.id == workout_id, Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(Workout.items).selectinload(WorkoutItem.sets),
|
|
selectinload(Workout.items).selectinload(WorkoutItem.activity_source),
|
|
)
|
|
)
|
|
if not workout:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout not found")
|
|
return workout
|
|
|
|
|
|
def get_active_workout_for_user(db: Session, user_id: uuid.UUID) -> Workout | None:
|
|
return db.scalar(
|
|
select(Workout)
|
|
.where(Workout.user_id == user_id, Workout.status == "active")
|
|
.options(
|
|
selectinload(Workout.items).selectinload(WorkoutItem.sets),
|
|
selectinload(Workout.items).selectinload(WorkoutItem.activity_source),
|
|
)
|
|
.order_by(Workout.started_at.desc())
|
|
)
|
|
|
|
|
|
def ensure_active_workout(workout: Workout) -> None:
|
|
if workout.status != "active":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Workout is not active",
|
|
)
|
|
|
|
|
|
@app.get(
|
|
"/internal/catalog/activity-sources",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[ActivitySourceRead],
|
|
)
|
|
def list_activity_sources(
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
search: str | None = None,
|
|
kind: Annotated[str | None, Query(pattern="^(exercise|machine|equipment)$")] = None,
|
|
category: str | None = None,
|
|
scope: Annotated[str, Query(pattern="^(all|builtin|mine)$")] = "all",
|
|
) -> list[ActivitySource]:
|
|
statement = accessible_activity_sources(db, user_id).order_by(
|
|
ActivitySource.is_builtin.desc(), ActivitySource.title
|
|
)
|
|
normalized_kind = normalize_kind(kind)
|
|
if normalized_kind:
|
|
statement = statement.where(ActivitySource.kind == normalized_kind)
|
|
if category:
|
|
statement = statement.where(ActivitySource.category == category)
|
|
if scope == "builtin":
|
|
statement = statement.where(ActivitySource.is_builtin.is_(True))
|
|
elif scope == "mine":
|
|
statement = statement.where(ActivitySource.owner_user_id == user_id)
|
|
if search:
|
|
needle = f"%{search}%"
|
|
statement = statement.where(
|
|
ActivitySource.title.ilike(needle) | ActivitySource.description.ilike(needle)
|
|
)
|
|
return list(db.scalars(statement))
|
|
|
|
|
|
@app.post(
|
|
"/internal/catalog/activity-sources",
|
|
dependencies=[InternalAuth],
|
|
response_model=ActivitySourceRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_activity_source(
|
|
payload: ActivitySourceCreate, db: Db, user_id: CurrentUserId
|
|
) -> ActivitySource:
|
|
slug = payload.slug or unique_user_slug(db, payload.title, user_id)
|
|
if db.scalar(select(ActivitySource.id).where(ActivitySource.slug == slug)):
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Slug already exists")
|
|
data = payload.model_dump(exclude={"slug"})
|
|
activity = ActivitySource(
|
|
slug=slug,
|
|
owner_user_id=user_id,
|
|
is_builtin=False,
|
|
**data,
|
|
)
|
|
db.add(activity)
|
|
db.commit()
|
|
db.refresh(activity)
|
|
return activity
|
|
|
|
|
|
@app.get(
|
|
"/internal/catalog/equipment",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[ActivitySourceRead],
|
|
)
|
|
def list_equipment(
|
|
db: Db, user_id: CurrentUserId, search: str | None = None
|
|
) -> list[ActivitySource]:
|
|
statement = (
|
|
accessible_activity_sources(db, user_id)
|
|
.where(ActivitySource.kind == "machine")
|
|
.order_by(ActivitySource.is_builtin.desc(), ActivitySource.title)
|
|
)
|
|
if search:
|
|
statement = statement.where(ActivitySource.title.ilike(f"%{search}%"))
|
|
return list(db.scalars(statement))
|
|
|
|
|
|
@app.post(
|
|
"/internal/catalog/equipment",
|
|
dependencies=[InternalAuth],
|
|
response_model=ActivitySourceRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_equipment(payload: EquipmentCreate, db: Db, user_id: CurrentUserId) -> ActivitySource:
|
|
activity = ActivitySource(
|
|
owner_user_id=user_id,
|
|
slug=unique_user_slug(db, payload.name, user_id),
|
|
kind="machine",
|
|
title=payload.name,
|
|
description=payload.description,
|
|
category="other",
|
|
equipment="machine",
|
|
measurement_type="weight_reps",
|
|
difficulty="intermediate",
|
|
image_s3_url=payload.image_s3_url,
|
|
image_s3_key=payload.image_s3_key,
|
|
is_builtin=False,
|
|
)
|
|
db.add(activity)
|
|
db.commit()
|
|
db.refresh(activity)
|
|
return activity
|
|
|
|
|
|
@app.get(
|
|
"/internal/catalog/exercises",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[ActivitySourceRead],
|
|
)
|
|
def list_exercises(
|
|
db: Db, user_id: CurrentUserId, search: str | None = None
|
|
) -> list[ActivitySource]:
|
|
statement = accessible_activity_sources(db, user_id).where(
|
|
ActivitySource.kind == "exercise"
|
|
).order_by(ActivitySource.is_builtin.desc(), ActivitySource.title)
|
|
if search:
|
|
statement = statement.where(ActivitySource.title.ilike(f"%{search}%"))
|
|
return list(db.scalars(statement))
|
|
|
|
|
|
@app.post(
|
|
"/internal/catalog/exercises",
|
|
dependencies=[InternalAuth],
|
|
response_model=ActivitySourceRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_exercise(payload: ExerciseCreate, db: Db, user_id: CurrentUserId) -> ActivitySource:
|
|
activity = ActivitySource(
|
|
owner_user_id=user_id,
|
|
slug=unique_user_slug(db, payload.name, user_id),
|
|
kind="exercise",
|
|
title=payload.name,
|
|
description=payload.description,
|
|
category="other",
|
|
equipment="other",
|
|
measurement_type="weight_reps",
|
|
difficulty="intermediate",
|
|
image_s3_url=payload.image_s3_url,
|
|
image_s3_key=payload.image_s3_key,
|
|
default_calories_per_minute=payload.default_calories_per_minute,
|
|
is_builtin=False,
|
|
)
|
|
db.add(activity)
|
|
db.commit()
|
|
db.refresh(activity)
|
|
return activity
|
|
|
|
|
|
@app.get("/internal/workouts", dependencies=[InternalAuth], response_model=list[WorkoutRead])
|
|
def list_workouts(db: Db, user_id: CurrentUserId) -> list[Workout]:
|
|
return list(
|
|
db.scalars(
|
|
select(Workout)
|
|
.where(Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(Workout.items).selectinload(WorkoutItem.sets),
|
|
selectinload(Workout.items).selectinload(WorkoutItem.activity_source),
|
|
)
|
|
.order_by(Workout.started_at.desc())
|
|
)
|
|
)
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_workout(payload: WorkoutCreate, db: Db, user_id: CurrentUserId) -> Workout:
|
|
if get_active_workout_for_user(db, user_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Active workout already exists",
|
|
)
|
|
workout = Workout(
|
|
user_id=user_id,
|
|
status="active",
|
|
started_at=payload.started_at or datetime.now(UTC),
|
|
notes=payload.notes,
|
|
)
|
|
db.add(workout)
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.get(
|
|
"/internal/workouts/active",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead | None,
|
|
)
|
|
def get_active_workout(db: Db, user_id: CurrentUserId) -> Workout | None:
|
|
return get_active_workout_for_user(db, user_id)
|
|
|
|
|
|
@app.get("/internal/workouts/{workout_id}", dependencies=[InternalAuth], response_model=WorkoutRead)
|
|
def get_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout:
|
|
return load_workout(db, workout_id, user_id)
|
|
|
|
|
|
@app.patch(
|
|
"/internal/workouts/{workout_id}",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
)
|
|
def update_workout(
|
|
workout_id: uuid.UUID, payload: WorkoutUpdate, db: Db, user_id: CurrentUserId
|
|
) -> Workout:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
if payload.finished_at is not None:
|
|
workout.finished_at = payload.finished_at
|
|
workout.status = "finished"
|
|
if payload.notes is not None:
|
|
workout.notes = payload.notes
|
|
recalculate_workout_totals(db, workout.id)
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts/{workout_id}/finish",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
)
|
|
def finish_workout(
|
|
workout_id: uuid.UUID,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
payload: Annotated[WorkoutFinishRequest | None, Body()] = None,
|
|
) -> Workout:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
ensure_active_workout(workout)
|
|
if payload and payload.notes is not None:
|
|
workout.notes = payload.notes
|
|
recalculate_workout_totals(db, workout.id)
|
|
workout.finished_at = datetime.now(UTC)
|
|
workout.status = "finished"
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts/{workout_id}/discard",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
)
|
|
def discard_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
ensure_active_workout(workout)
|
|
recalculate_workout_totals(db, workout.id)
|
|
workout.finished_at = datetime.now(UTC)
|
|
workout.status = "discarded"
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts/{workout_id}/items",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutItemRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def add_workout_item(
|
|
workout_id: uuid.UUID, payload: WorkoutItemCreate, db: Db, user_id: CurrentUserId
|
|
) -> WorkoutItem:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
ensure_active_workout(workout)
|
|
source_kind: str
|
|
title_snapshot: str
|
|
image_s3_url_snapshot: str | None
|
|
measurement_type_snapshot = "weight_reps"
|
|
category_snapshot = "other"
|
|
equipment_snapshot = "other"
|
|
if payload.activity_source_id:
|
|
activity = db.get(ActivitySource, payload.activity_source_id)
|
|
if not activity or (not activity.is_builtin and activity.owner_user_id != user_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Activity source not found"
|
|
)
|
|
source_kind = activity.kind
|
|
title_snapshot = activity.title
|
|
image_s3_url_snapshot = activity.image_s3_url
|
|
measurement_type_snapshot = activity.measurement_type
|
|
category_snapshot = activity.category
|
|
equipment_snapshot = activity.equipment
|
|
elif payload.exercise_id:
|
|
exercise = db.get(Exercise, payload.exercise_id)
|
|
if not exercise or (not exercise.is_builtin and exercise.owner_user_id != user_id):
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Exercise not found")
|
|
source_kind = "exercise"
|
|
title_snapshot = exercise.name
|
|
image_s3_url_snapshot = exercise.image_s3_url
|
|
elif payload.equipment_id:
|
|
equipment = db.get(Equipment, payload.equipment_id)
|
|
if not equipment or (not equipment.is_builtin and equipment.owner_user_id != user_id):
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Equipment not found")
|
|
source_kind = "machine"
|
|
title_snapshot = equipment.name
|
|
image_s3_url_snapshot = equipment.image_s3_url
|
|
|
|
next_index = payload.order_index
|
|
if next_index is None:
|
|
max_index = db.scalar(
|
|
select(func.max(WorkoutItem.order_index)).where(WorkoutItem.workout_id == workout.id)
|
|
)
|
|
next_index = int(max_index or 0) + 1 if max_index is not None else 0
|
|
item = WorkoutItem(
|
|
workout_id=workout.id,
|
|
source_kind=source_kind,
|
|
title_snapshot=title_snapshot,
|
|
image_s3_url_snapshot=image_s3_url_snapshot,
|
|
measurement_type_snapshot=measurement_type_snapshot,
|
|
category_snapshot=category_snapshot,
|
|
equipment_snapshot=equipment_snapshot,
|
|
**payload.model_dump(exclude={"order_index"}),
|
|
order_index=next_index,
|
|
)
|
|
db.add(item)
|
|
db.commit()
|
|
db.refresh(item)
|
|
return item
|
|
|
|
|
|
@app.post(
|
|
"/internal/workout-items/{item_id}/sets",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutSetRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def add_workout_set(
|
|
item_id: uuid.UUID,
|
|
payload: WorkoutSetCreate,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
) -> WorkoutSet:
|
|
item = db.scalar(
|
|
select(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutItem.id == item_id, Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(WorkoutItem.sets),
|
|
selectinload(WorkoutItem.activity_source),
|
|
selectinload(WorkoutItem.exercise),
|
|
selectinload(WorkoutItem.workout),
|
|
)
|
|
)
|
|
if not item:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found")
|
|
ensure_active_workout(item.workout)
|
|
|
|
calories = estimate_set_calories(
|
|
item,
|
|
payload.weight,
|
|
payload.reps,
|
|
payload.duration_seconds,
|
|
payload.calories,
|
|
)
|
|
max_index = db.scalar(
|
|
select(func.max(WorkoutSet.set_index)).where(WorkoutSet.workout_item_id == item.id)
|
|
)
|
|
|
|
workout_set = WorkoutSet(
|
|
workout_item_id=item.id,
|
|
set_index=int(max_index or 0) + 1,
|
|
weight=payload.weight,
|
|
reps=payload.reps,
|
|
duration_seconds=payload.duration_seconds,
|
|
distance_km=payload.distance_km,
|
|
calories=calories,
|
|
completed_at=payload.completed_at or datetime.now(UTC),
|
|
)
|
|
db.add(workout_set)
|
|
db.flush()
|
|
recalculate_workout_totals(db, item.workout_id)
|
|
db.commit()
|
|
db.refresh(workout_set)
|
|
return workout_set
|
|
|
|
|
|
@app.post(
|
|
"/internal/workout-items/{item_id}/sets/batch",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[WorkoutSetRead],
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def add_workout_sets_batch(
|
|
item_id: uuid.UUID,
|
|
payload: WorkoutSetBatchCreate,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
) -> list[WorkoutSet]:
|
|
item = db.scalar(
|
|
select(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutItem.id == item_id, Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(WorkoutItem.activity_source),
|
|
selectinload(WorkoutItem.exercise),
|
|
selectinload(WorkoutItem.workout),
|
|
)
|
|
)
|
|
if not item:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found")
|
|
ensure_active_workout(item.workout)
|
|
|
|
max_index = db.scalar(
|
|
select(func.max(WorkoutSet.set_index)).where(WorkoutSet.workout_item_id == item.id)
|
|
)
|
|
next_index = int(max_index or 0) + 1
|
|
workout_sets: list[WorkoutSet] = []
|
|
for set_payload in payload.sets:
|
|
workout_set = WorkoutSet(
|
|
workout_item_id=item.id,
|
|
set_index=next_index,
|
|
weight=set_payload.weight,
|
|
reps=set_payload.reps,
|
|
duration_seconds=set_payload.duration_seconds,
|
|
distance_km=set_payload.distance_km,
|
|
calories=estimate_set_calories(
|
|
item,
|
|
set_payload.weight,
|
|
set_payload.reps,
|
|
set_payload.duration_seconds,
|
|
set_payload.calories,
|
|
),
|
|
completed_at=set_payload.completed_at or datetime.now(UTC),
|
|
)
|
|
db.add(workout_set)
|
|
workout_sets.append(workout_set)
|
|
next_index += 1
|
|
|
|
db.flush()
|
|
recalculate_workout_totals(db, item.workout_id)
|
|
db.commit()
|
|
for workout_set in workout_sets:
|
|
db.refresh(workout_set)
|
|
return workout_sets
|
|
|
|
|
|
@app.patch(
|
|
"/internal/workout-sets/{set_id}",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutSetRead,
|
|
)
|
|
def update_workout_set(
|
|
set_id: uuid.UUID,
|
|
payload: WorkoutSetUpdate,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
) -> WorkoutSet:
|
|
workout_set = db.scalar(
|
|
select(WorkoutSet)
|
|
.join(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutSet.id == set_id, Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.activity_source),
|
|
selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.exercise),
|
|
selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.workout),
|
|
)
|
|
)
|
|
if not workout_set:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found")
|
|
ensure_active_workout(workout_set.workout_item.workout)
|
|
|
|
if payload.weight is not None:
|
|
workout_set.weight = payload.weight
|
|
if payload.reps is not None:
|
|
workout_set.reps = payload.reps
|
|
if "duration_seconds" in payload.model_fields_set:
|
|
workout_set.duration_seconds = payload.duration_seconds
|
|
if "distance_km" in payload.model_fields_set:
|
|
workout_set.distance_km = payload.distance_km
|
|
if "completed_at" in payload.model_fields_set and payload.completed_at is not None:
|
|
workout_set.completed_at = payload.completed_at
|
|
if "calories" in payload.model_fields_set:
|
|
workout_set.calories = payload.calories
|
|
else:
|
|
workout_set.calories = estimate_set_calories(
|
|
workout_set.workout_item,
|
|
float(workout_set.weight or 0),
|
|
int(workout_set.reps or 0),
|
|
workout_set.duration_seconds,
|
|
None,
|
|
)
|
|
|
|
recalculate_workout_totals(db, workout_set.workout_item.workout_id)
|
|
db.commit()
|
|
db.refresh(workout_set)
|
|
return workout_set
|
|
|
|
|
|
@app.delete(
|
|
"/internal/workout-items/{item_id}",
|
|
dependencies=[InternalAuth],
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
)
|
|
def delete_workout_item(item_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> None:
|
|
item = db.scalar(
|
|
select(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutItem.id == item_id, Workout.user_id == user_id)
|
|
)
|
|
if not item:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found")
|
|
workout = db.get(Workout, item.workout_id)
|
|
if workout:
|
|
ensure_active_workout(workout)
|
|
workout_id = item.workout_id
|
|
db.delete(item)
|
|
db.flush()
|
|
recalculate_workout_totals(db, workout_id)
|
|
db.commit()
|
|
|
|
|
|
@app.delete(
|
|
"/internal/workout-items/{item_id}/sets/{set_id}",
|
|
dependencies=[InternalAuth],
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
)
|
|
def delete_workout_set(
|
|
item_id: uuid.UUID, set_id: uuid.UUID, db: Db, user_id: CurrentUserId
|
|
) -> None:
|
|
ws = db.scalar(
|
|
select(WorkoutSet)
|
|
.join(WorkoutItem)
|
|
.join(Workout)
|
|
.where(
|
|
WorkoutSet.id == set_id,
|
|
WorkoutItem.id == item_id,
|
|
Workout.user_id == user_id,
|
|
)
|
|
)
|
|
if not ws:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found")
|
|
workout = db.scalar(
|
|
select(Workout).join(WorkoutItem).where(WorkoutItem.id == item_id)
|
|
)
|
|
if workout:
|
|
ensure_active_workout(workout)
|
|
db.delete(ws)
|
|
db.flush()
|
|
reindex_workout_item_sets(db, item_id)
|
|
if workout:
|
|
recalculate_workout_totals(db, workout.id)
|
|
db.commit()
|
|
|
|
|
|
def reindex_workout_item_sets(db: Session, item_id: uuid.UUID) -> None:
|
|
remaining_sets = list(
|
|
db.scalars(
|
|
select(WorkoutSet)
|
|
.where(WorkoutSet.workout_item_id == item_id)
|
|
.order_by(WorkoutSet.set_index.asc(), WorkoutSet.completed_at.asc())
|
|
)
|
|
)
|
|
for index, workout_set in enumerate(remaining_sets, start=1):
|
|
workout_set.set_index = index
|
|
|
|
|
|
def estimate_set_calories(
|
|
item: WorkoutItem,
|
|
weight: float,
|
|
reps: int,
|
|
duration_seconds: int | None,
|
|
calories: float | None,
|
|
) -> float:
|
|
if calories is not None:
|
|
return calories
|
|
if (
|
|
item.activity_source
|
|
and item.activity_source.default_calories_per_minute
|
|
and duration_seconds
|
|
):
|
|
return round(
|
|
float(item.activity_source.default_calories_per_minute) * duration_seconds / 60,
|
|
2,
|
|
)
|
|
if item.exercise and item.exercise.default_calories_per_minute and duration_seconds:
|
|
return round(
|
|
float(item.exercise.default_calories_per_minute) * duration_seconds / 60,
|
|
2,
|
|
)
|
|
return round((weight * max(reps, 1)) / 120, 2)
|
|
|
|
|
|
def recalculate_workout_totals(db: Session, workout_id: uuid.UUID) -> None:
|
|
total_sets, total_volume, estimated_calories = db.execute(
|
|
select(
|
|
func.count(WorkoutSet.id),
|
|
func.coalesce(func.sum(WorkoutSet.weight * WorkoutSet.reps), 0),
|
|
func.coalesce(func.sum(WorkoutSet.calories), 0),
|
|
)
|
|
.join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id)
|
|
.where(WorkoutItem.workout_id == workout_id)
|
|
).one()
|
|
workout = db.get(Workout, workout_id)
|
|
if workout:
|
|
workout.total_sets = int(total_sets or 0)
|
|
workout.total_volume = float(total_volume or 0)
|
|
workout.estimated_calories = float(estimated_calories or 0)
|
|
|
|
|
|
def recalculate_workout_calories(db: Session, workout_id: uuid.UUID) -> None:
|
|
recalculate_workout_totals(db, workout_id)
|
|
|
|
|
|
@app.get(
|
|
"/internal/analytics/progression",
|
|
dependencies=[InternalAuth],
|
|
response_model=ProgressionRead,
|
|
)
|
|
def get_progression(
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
kind: Annotated[str, Query(pattern="^(exercise|machine|equipment)$")],
|
|
entity_id: Annotated[uuid.UUID | None, Query()] = None,
|
|
) -> ProgressionRead:
|
|
statement = (
|
|
select(Workout.started_at, WorkoutSet.weight, WorkoutSet.reps)
|
|
.join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id)
|
|
.join(Workout, WorkoutItem.workout_id == Workout.id)
|
|
.where(Workout.user_id == user_id, Workout.status != "discarded")
|
|
.order_by(Workout.started_at.asc(), WorkoutSet.completed_at.asc())
|
|
)
|
|
normalized_kind = normalize_kind(kind)
|
|
if entity_id:
|
|
statement = statement.where(WorkoutItem.activity_source_id == entity_id)
|
|
if normalized_kind:
|
|
statement = statement.where(WorkoutItem.source_kind == normalized_kind)
|
|
|
|
rows = list(db.execute(statement))
|
|
grouped: dict[str, dict[str, float]] = defaultdict(lambda: {"max_weight": 0, "volume": 0})
|
|
weights: list[float] = []
|
|
for started_at, weight, reps in rows:
|
|
date_key = started_at.date().isoformat()
|
|
numeric_weight = float(weight or 0)
|
|
grouped[date_key]["max_weight"] = max(grouped[date_key]["max_weight"], numeric_weight)
|
|
grouped[date_key]["volume"] += numeric_weight * int(reps or 0)
|
|
weights.append(numeric_weight)
|
|
|
|
points = [
|
|
ProgressionPoint(date=date, max_weight=values["max_weight"], volume=values["volume"])
|
|
for date, values in sorted(grouped.items())
|
|
]
|
|
previous_delta = None
|
|
if len(weights) >= 2:
|
|
previous_delta = round(weights[-1] - weights[-2], 2)
|
|
return ProgressionRead(
|
|
last_weight=weights[-1] if weights else None,
|
|
max_weight=max(weights) if weights else None,
|
|
previous_delta=previous_delta,
|
|
points=points,
|
|
)
|
|
|
|
|
|
@app.get("/internal/analytics/calories", dependencies=[InternalAuth], response_model=CaloriesRead)
|
|
def get_calories(db: Db, user_id: CurrentUserId) -> CaloriesRead:
|
|
workouts = list(
|
|
db.scalars(
|
|
select(Workout)
|
|
.where(Workout.user_id == user_id, Workout.status != "discarded")
|
|
.order_by(Workout.started_at.desc())
|
|
)
|
|
)
|
|
total = sum(float(workout.estimated_calories or 0) for workout in workouts)
|
|
return CaloriesRead(
|
|
total_calories=round(total, 2),
|
|
workouts=[
|
|
{
|
|
"id": str(workout.id),
|
|
"date": workout.started_at.date().isoformat(),
|
|
"calories": float(workout.estimated_calories or 0),
|
|
}
|
|
for workout in workouts
|
|
],
|
|
)
|