7b34ce1a98
- Added `status`, `total_sets`, and `total_volume` fields to the Workout model. - Introduced `source_kind`, `title_snapshot`, and `image_s3_url_snapshot` fields to the WorkoutItem model. - Created endpoints for managing active workouts, including finishing and discarding workouts. - Updated workout creation to ensure only one active workout exists per user. - Implemented batch addition of workout sets and updates to workout set details. - Enhanced database schema with Alembic migrations to support new fields and constraints. - Added validation to ensure at least one field is provided for workout set updates. - Updated calorie estimation logic to reflect new workout set structure.
713 lines
23 KiB
Python
713 lines
23 KiB
Python
import uuid
|
|
from collections import defaultdict
|
|
from datetime import UTC, datetime
|
|
from typing import Annotated
|
|
|
|
from fastapi import Body, Depends, FastAPI, Header, HTTPException, Query, status
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session, selectinload
|
|
|
|
from app.core import settings
|
|
from app.db import SessionLocal, create_schema, get_db
|
|
from app.models import Base, Equipment, Exercise, Workout, WorkoutItem, WorkoutSet
|
|
from app.schemas import (
|
|
CaloriesRead,
|
|
EquipmentCreate,
|
|
EquipmentRead,
|
|
ExerciseCreate,
|
|
ExerciseRead,
|
|
ProgressionPoint,
|
|
ProgressionRead,
|
|
WorkoutCreate,
|
|
WorkoutFinishRequest,
|
|
WorkoutItemCreate,
|
|
WorkoutItemRead,
|
|
WorkoutRead,
|
|
WorkoutSetBatchCreate,
|
|
WorkoutSetCreate,
|
|
WorkoutSetRead,
|
|
WorkoutSetUpdate,
|
|
WorkoutUpdate,
|
|
)
|
|
|
|
app = FastAPI(title="Train Watcher Logic Service", version="0.1.0")
|
|
|
|
|
|
def require_service_token(x_service_token: Annotated[str | None, Header()] = None) -> None:
|
|
if x_service_token != settings.service_token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid service token",
|
|
)
|
|
|
|
|
|
def get_user_id(x_user_id: Annotated[str | None, Header()] = None) -> uuid.UUID:
|
|
if not x_user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Missing X-User-Id header",
|
|
)
|
|
try:
|
|
return uuid.UUID(x_user_id)
|
|
except ValueError as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid X-User-Id",
|
|
) from exc
|
|
|
|
|
|
InternalAuth = Depends(require_service_token)
|
|
Db = Annotated[Session, Depends(get_db)]
|
|
CurrentUserId = Annotated[uuid.UUID, Depends(get_user_id)]
|
|
|
|
|
|
@app.on_event("startup")
|
|
def on_startup() -> None:
|
|
create_schema(Base.metadata)
|
|
with SessionLocal() as db:
|
|
seed_builtin_catalog(db)
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> dict[str, str]:
|
|
return {"status": "ok"}
|
|
|
|
|
|
def seed_builtin_catalog(db: Session) -> None:
|
|
exists = db.scalar(
|
|
select(func.count()).select_from(Equipment).where(Equipment.is_builtin.is_(True))
|
|
)
|
|
if exists:
|
|
return
|
|
|
|
treadmill = Equipment(
|
|
name="Беговая дорожка",
|
|
description="Кардио-тренажер для ходьбы и бега.",
|
|
is_builtin=True,
|
|
)
|
|
smith = Equipment(
|
|
name="Машина Смита",
|
|
description="Силовая рама с фиксированной траекторией грифа.",
|
|
is_builtin=True,
|
|
)
|
|
db.add_all([treadmill, smith])
|
|
db.flush()
|
|
db.add_all(
|
|
[
|
|
Exercise(
|
|
name="Жим лежа",
|
|
description="Базовое упражнение для груди, трицепса и передней дельты.",
|
|
is_builtin=True,
|
|
default_calories_per_minute=6,
|
|
),
|
|
Exercise(
|
|
name="Приседания",
|
|
description="Базовое упражнение для ног и корпуса.",
|
|
is_builtin=True,
|
|
default_calories_per_minute=8,
|
|
),
|
|
Exercise(
|
|
name="Бег",
|
|
description="Кардио-нагрузка на беговой дорожке.",
|
|
equipment_id=treadmill.id,
|
|
is_builtin=True,
|
|
default_calories_per_minute=10,
|
|
),
|
|
]
|
|
)
|
|
db.commit()
|
|
|
|
|
|
def accessible_equipment(db: Session, user_id: uuid.UUID):
|
|
return select(Equipment).where(
|
|
(Equipment.is_builtin.is_(True)) | (Equipment.owner_user_id == user_id)
|
|
)
|
|
|
|
|
|
def accessible_exercises(db: Session, user_id: uuid.UUID):
|
|
return select(Exercise).where(
|
|
(Exercise.is_builtin.is_(True)) | (Exercise.owner_user_id == user_id)
|
|
)
|
|
|
|
|
|
def load_workout(db: Session, workout_id: uuid.UUID, user_id: uuid.UUID) -> Workout:
|
|
workout = db.scalar(
|
|
select(Workout)
|
|
.where(Workout.id == workout_id, Workout.user_id == user_id)
|
|
.options(selectinload(Workout.items).selectinload(WorkoutItem.sets))
|
|
)
|
|
if not workout:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout not found")
|
|
return workout
|
|
|
|
|
|
def get_active_workout_for_user(db: Session, user_id: uuid.UUID) -> Workout | None:
|
|
return db.scalar(
|
|
select(Workout)
|
|
.where(Workout.user_id == user_id, Workout.status == "active")
|
|
.options(selectinload(Workout.items).selectinload(WorkoutItem.sets))
|
|
.order_by(Workout.started_at.desc())
|
|
)
|
|
|
|
|
|
def ensure_active_workout(workout: Workout) -> None:
|
|
if workout.status != "active":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Workout is not active",
|
|
)
|
|
|
|
|
|
@app.get(
|
|
"/internal/catalog/equipment",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[EquipmentRead],
|
|
)
|
|
def list_equipment(db: Db, user_id: CurrentUserId, search: str | None = None) -> list[Equipment]:
|
|
statement = accessible_equipment(db, user_id).order_by(
|
|
Equipment.is_builtin.desc(), Equipment.name
|
|
)
|
|
if search:
|
|
statement = statement.where(Equipment.name.ilike(f"%{search}%"))
|
|
return list(db.scalars(statement))
|
|
|
|
|
|
@app.post(
|
|
"/internal/catalog/equipment",
|
|
dependencies=[InternalAuth],
|
|
response_model=EquipmentRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_equipment(payload: EquipmentCreate, db: Db, user_id: CurrentUserId) -> Equipment:
|
|
equipment = Equipment(owner_user_id=user_id, is_builtin=False, **payload.model_dump())
|
|
db.add(equipment)
|
|
db.commit()
|
|
db.refresh(equipment)
|
|
return equipment
|
|
|
|
|
|
@app.get(
|
|
"/internal/catalog/exercises",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[ExerciseRead],
|
|
)
|
|
def list_exercises(db: Db, user_id: CurrentUserId, search: str | None = None) -> list[Exercise]:
|
|
statement = accessible_exercises(db, user_id).order_by(
|
|
Exercise.is_builtin.desc(), Exercise.name
|
|
)
|
|
if search:
|
|
statement = statement.where(Exercise.name.ilike(f"%{search}%"))
|
|
return list(db.scalars(statement))
|
|
|
|
|
|
@app.post(
|
|
"/internal/catalog/exercises",
|
|
dependencies=[InternalAuth],
|
|
response_model=ExerciseRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_exercise(payload: ExerciseCreate, db: Db, user_id: CurrentUserId) -> Exercise:
|
|
if payload.equipment_id:
|
|
equipment = db.get(Equipment, payload.equipment_id)
|
|
if not equipment or (not equipment.is_builtin and equipment.owner_user_id != user_id):
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Equipment not found")
|
|
exercise = Exercise(owner_user_id=user_id, is_builtin=False, **payload.model_dump())
|
|
db.add(exercise)
|
|
db.commit()
|
|
db.refresh(exercise)
|
|
return exercise
|
|
|
|
|
|
@app.get("/internal/workouts", dependencies=[InternalAuth], response_model=list[WorkoutRead])
|
|
def list_workouts(db: Db, user_id: CurrentUserId) -> list[Workout]:
|
|
return list(
|
|
db.scalars(
|
|
select(Workout)
|
|
.where(Workout.user_id == user_id)
|
|
.options(selectinload(Workout.items).selectinload(WorkoutItem.sets))
|
|
.order_by(Workout.started_at.desc())
|
|
)
|
|
)
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_workout(payload: WorkoutCreate, db: Db, user_id: CurrentUserId) -> Workout:
|
|
if get_active_workout_for_user(db, user_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Active workout already exists",
|
|
)
|
|
workout = Workout(
|
|
user_id=user_id,
|
|
status="active",
|
|
started_at=payload.started_at or datetime.now(UTC),
|
|
notes=payload.notes,
|
|
)
|
|
db.add(workout)
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.get(
|
|
"/internal/workouts/active",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead | None,
|
|
)
|
|
def get_active_workout(db: Db, user_id: CurrentUserId) -> Workout | None:
|
|
return get_active_workout_for_user(db, user_id)
|
|
|
|
|
|
@app.get("/internal/workouts/{workout_id}", dependencies=[InternalAuth], response_model=WorkoutRead)
|
|
def get_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout:
|
|
return load_workout(db, workout_id, user_id)
|
|
|
|
|
|
@app.patch(
|
|
"/internal/workouts/{workout_id}",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
)
|
|
def update_workout(
|
|
workout_id: uuid.UUID, payload: WorkoutUpdate, db: Db, user_id: CurrentUserId
|
|
) -> Workout:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
if payload.finished_at is not None:
|
|
workout.finished_at = payload.finished_at
|
|
workout.status = "finished"
|
|
if payload.notes is not None:
|
|
workout.notes = payload.notes
|
|
recalculate_workout_totals(db, workout.id)
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts/{workout_id}/finish",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
)
|
|
def finish_workout(
|
|
workout_id: uuid.UUID,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
payload: Annotated[WorkoutFinishRequest | None, Body()] = None,
|
|
) -> Workout:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
ensure_active_workout(workout)
|
|
if payload and payload.notes is not None:
|
|
workout.notes = payload.notes
|
|
recalculate_workout_totals(db, workout.id)
|
|
workout.finished_at = datetime.now(UTC)
|
|
workout.status = "finished"
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts/{workout_id}/discard",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutRead,
|
|
)
|
|
def discard_workout(workout_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> Workout:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
ensure_active_workout(workout)
|
|
recalculate_workout_totals(db, workout.id)
|
|
workout.finished_at = datetime.now(UTC)
|
|
workout.status = "discarded"
|
|
db.commit()
|
|
db.refresh(workout)
|
|
return workout
|
|
|
|
|
|
@app.post(
|
|
"/internal/workouts/{workout_id}/items",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutItemRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def add_workout_item(
|
|
workout_id: uuid.UUID, payload: WorkoutItemCreate, db: Db, user_id: CurrentUserId
|
|
) -> WorkoutItem:
|
|
workout = load_workout(db, workout_id, user_id)
|
|
ensure_active_workout(workout)
|
|
source_kind: str
|
|
title_snapshot: str
|
|
image_s3_url_snapshot: str | None
|
|
if payload.exercise_id:
|
|
exercise = db.get(Exercise, payload.exercise_id)
|
|
if not exercise or (not exercise.is_builtin and exercise.owner_user_id != user_id):
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Exercise not found")
|
|
source_kind = "exercise"
|
|
title_snapshot = exercise.name
|
|
image_s3_url_snapshot = exercise.image_s3_url
|
|
elif payload.equipment_id:
|
|
equipment = db.get(Equipment, payload.equipment_id)
|
|
if not equipment or (not equipment.is_builtin and equipment.owner_user_id != user_id):
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Equipment not found")
|
|
source_kind = "equipment"
|
|
title_snapshot = equipment.name
|
|
image_s3_url_snapshot = equipment.image_s3_url
|
|
|
|
next_index = payload.order_index
|
|
if next_index is None:
|
|
max_index = db.scalar(
|
|
select(func.max(WorkoutItem.order_index)).where(WorkoutItem.workout_id == workout.id)
|
|
)
|
|
next_index = int(max_index or 0) + 1 if max_index is not None else 0
|
|
item = WorkoutItem(
|
|
workout_id=workout.id,
|
|
source_kind=source_kind,
|
|
title_snapshot=title_snapshot,
|
|
image_s3_url_snapshot=image_s3_url_snapshot,
|
|
**payload.model_dump(exclude={"order_index"}),
|
|
order_index=next_index,
|
|
)
|
|
db.add(item)
|
|
db.commit()
|
|
db.refresh(item)
|
|
return item
|
|
|
|
|
|
@app.post(
|
|
"/internal/workout-items/{item_id}/sets",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutSetRead,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def add_workout_set(
|
|
item_id: uuid.UUID,
|
|
payload: WorkoutSetCreate,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
) -> WorkoutSet:
|
|
item = db.scalar(
|
|
select(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutItem.id == item_id, Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(WorkoutItem.sets),
|
|
selectinload(WorkoutItem.exercise),
|
|
selectinload(WorkoutItem.workout),
|
|
)
|
|
)
|
|
if not item:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found")
|
|
ensure_active_workout(item.workout)
|
|
|
|
calories = estimate_set_calories(
|
|
item,
|
|
payload.weight,
|
|
payload.reps,
|
|
payload.duration_seconds,
|
|
payload.calories,
|
|
)
|
|
max_index = db.scalar(
|
|
select(func.max(WorkoutSet.set_index)).where(WorkoutSet.workout_item_id == item.id)
|
|
)
|
|
|
|
workout_set = WorkoutSet(
|
|
workout_item_id=item.id,
|
|
set_index=int(max_index or 0) + 1,
|
|
weight=payload.weight,
|
|
reps=payload.reps,
|
|
duration_seconds=payload.duration_seconds,
|
|
calories=calories,
|
|
completed_at=payload.completed_at or datetime.now(UTC),
|
|
)
|
|
db.add(workout_set)
|
|
db.flush()
|
|
recalculate_workout_totals(db, item.workout_id)
|
|
db.commit()
|
|
db.refresh(workout_set)
|
|
return workout_set
|
|
|
|
|
|
@app.post(
|
|
"/internal/workout-items/{item_id}/sets/batch",
|
|
dependencies=[InternalAuth],
|
|
response_model=list[WorkoutSetRead],
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def add_workout_sets_batch(
|
|
item_id: uuid.UUID,
|
|
payload: WorkoutSetBatchCreate,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
) -> list[WorkoutSet]:
|
|
item = db.scalar(
|
|
select(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutItem.id == item_id, Workout.user_id == user_id)
|
|
.options(selectinload(WorkoutItem.exercise), selectinload(WorkoutItem.workout))
|
|
)
|
|
if not item:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found")
|
|
ensure_active_workout(item.workout)
|
|
|
|
max_index = db.scalar(
|
|
select(func.max(WorkoutSet.set_index)).where(WorkoutSet.workout_item_id == item.id)
|
|
)
|
|
next_index = int(max_index or 0) + 1
|
|
workout_sets: list[WorkoutSet] = []
|
|
for set_payload in payload.sets:
|
|
workout_set = WorkoutSet(
|
|
workout_item_id=item.id,
|
|
set_index=next_index,
|
|
weight=set_payload.weight,
|
|
reps=set_payload.reps,
|
|
duration_seconds=set_payload.duration_seconds,
|
|
calories=estimate_set_calories(
|
|
item,
|
|
set_payload.weight,
|
|
set_payload.reps,
|
|
set_payload.duration_seconds,
|
|
set_payload.calories,
|
|
),
|
|
completed_at=set_payload.completed_at or datetime.now(UTC),
|
|
)
|
|
db.add(workout_set)
|
|
workout_sets.append(workout_set)
|
|
next_index += 1
|
|
|
|
db.flush()
|
|
recalculate_workout_totals(db, item.workout_id)
|
|
db.commit()
|
|
for workout_set in workout_sets:
|
|
db.refresh(workout_set)
|
|
return workout_sets
|
|
|
|
|
|
@app.patch(
|
|
"/internal/workout-sets/{set_id}",
|
|
dependencies=[InternalAuth],
|
|
response_model=WorkoutSetRead,
|
|
)
|
|
def update_workout_set(
|
|
set_id: uuid.UUID,
|
|
payload: WorkoutSetUpdate,
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
) -> WorkoutSet:
|
|
workout_set = db.scalar(
|
|
select(WorkoutSet)
|
|
.join(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutSet.id == set_id, Workout.user_id == user_id)
|
|
.options(
|
|
selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.exercise),
|
|
selectinload(WorkoutSet.workout_item).selectinload(WorkoutItem.workout),
|
|
)
|
|
)
|
|
if not workout_set:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found")
|
|
ensure_active_workout(workout_set.workout_item.workout)
|
|
|
|
if payload.weight is not None:
|
|
workout_set.weight = payload.weight
|
|
if payload.reps is not None:
|
|
workout_set.reps = payload.reps
|
|
if "duration_seconds" in payload.model_fields_set:
|
|
workout_set.duration_seconds = payload.duration_seconds
|
|
if "completed_at" in payload.model_fields_set and payload.completed_at is not None:
|
|
workout_set.completed_at = payload.completed_at
|
|
if "calories" in payload.model_fields_set:
|
|
workout_set.calories = payload.calories
|
|
else:
|
|
workout_set.calories = estimate_set_calories(
|
|
workout_set.workout_item,
|
|
float(workout_set.weight or 0),
|
|
int(workout_set.reps or 0),
|
|
workout_set.duration_seconds,
|
|
None,
|
|
)
|
|
|
|
recalculate_workout_totals(db, workout_set.workout_item.workout_id)
|
|
db.commit()
|
|
db.refresh(workout_set)
|
|
return workout_set
|
|
|
|
|
|
@app.delete(
|
|
"/internal/workout-items/{item_id}",
|
|
dependencies=[InternalAuth],
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
)
|
|
def delete_workout_item(item_id: uuid.UUID, db: Db, user_id: CurrentUserId) -> None:
|
|
item = db.scalar(
|
|
select(WorkoutItem)
|
|
.join(Workout)
|
|
.where(WorkoutItem.id == item_id, Workout.user_id == user_id)
|
|
)
|
|
if not item:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workout item not found")
|
|
workout = db.get(Workout, item.workout_id)
|
|
if workout:
|
|
ensure_active_workout(workout)
|
|
workout_id = item.workout_id
|
|
db.delete(item)
|
|
db.flush()
|
|
recalculate_workout_totals(db, workout_id)
|
|
db.commit()
|
|
|
|
|
|
@app.delete(
|
|
"/internal/workout-items/{item_id}/sets/{set_id}",
|
|
dependencies=[InternalAuth],
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
)
|
|
def delete_workout_set(
|
|
item_id: uuid.UUID, set_id: uuid.UUID, db: Db, user_id: CurrentUserId
|
|
) -> None:
|
|
ws = db.scalar(
|
|
select(WorkoutSet)
|
|
.join(WorkoutItem)
|
|
.join(Workout)
|
|
.where(
|
|
WorkoutSet.id == set_id,
|
|
WorkoutItem.id == item_id,
|
|
Workout.user_id == user_id,
|
|
)
|
|
)
|
|
if not ws:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Set not found")
|
|
workout = db.scalar(
|
|
select(Workout).join(WorkoutItem).where(WorkoutItem.id == item_id)
|
|
)
|
|
if workout:
|
|
ensure_active_workout(workout)
|
|
db.delete(ws)
|
|
db.flush()
|
|
reindex_workout_item_sets(db, item_id)
|
|
if workout:
|
|
recalculate_workout_totals(db, workout.id)
|
|
db.commit()
|
|
|
|
|
|
def reindex_workout_item_sets(db: Session, item_id: uuid.UUID) -> None:
|
|
remaining_sets = list(
|
|
db.scalars(
|
|
select(WorkoutSet)
|
|
.where(WorkoutSet.workout_item_id == item_id)
|
|
.order_by(WorkoutSet.set_index.asc(), WorkoutSet.completed_at.asc())
|
|
)
|
|
)
|
|
for index, workout_set in enumerate(remaining_sets, start=1):
|
|
workout_set.set_index = index
|
|
|
|
|
|
def estimate_set_calories(
|
|
item: WorkoutItem,
|
|
weight: float,
|
|
reps: int,
|
|
duration_seconds: int | None,
|
|
calories: float | None,
|
|
) -> float:
|
|
if calories is not None:
|
|
return calories
|
|
if item.exercise and item.exercise.default_calories_per_minute and duration_seconds:
|
|
return round(
|
|
float(item.exercise.default_calories_per_minute) * duration_seconds / 60,
|
|
2,
|
|
)
|
|
return round((weight * max(reps, 1)) / 120, 2)
|
|
|
|
|
|
def recalculate_workout_totals(db: Session, workout_id: uuid.UUID) -> None:
|
|
total_sets, total_volume, estimated_calories = db.execute(
|
|
select(
|
|
func.count(WorkoutSet.id),
|
|
func.coalesce(func.sum(WorkoutSet.weight * WorkoutSet.reps), 0),
|
|
func.coalesce(func.sum(WorkoutSet.calories), 0),
|
|
)
|
|
.join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id)
|
|
.where(WorkoutItem.workout_id == workout_id)
|
|
).one()
|
|
workout = db.get(Workout, workout_id)
|
|
if workout:
|
|
workout.total_sets = int(total_sets or 0)
|
|
workout.total_volume = float(total_volume or 0)
|
|
workout.estimated_calories = float(estimated_calories or 0)
|
|
|
|
|
|
def recalculate_workout_calories(db: Session, workout_id: uuid.UUID) -> None:
|
|
recalculate_workout_totals(db, workout_id)
|
|
|
|
|
|
@app.get(
|
|
"/internal/analytics/progression",
|
|
dependencies=[InternalAuth],
|
|
response_model=ProgressionRead,
|
|
)
|
|
def get_progression(
|
|
db: Db,
|
|
user_id: CurrentUserId,
|
|
kind: Annotated[str, Query(pattern="^(exercise|equipment)$")],
|
|
entity_id: Annotated[uuid.UUID | None, Query()] = None,
|
|
) -> ProgressionRead:
|
|
statement = (
|
|
select(Workout.started_at, WorkoutSet.weight, WorkoutSet.reps)
|
|
.join(WorkoutItem, WorkoutSet.workout_item_id == WorkoutItem.id)
|
|
.join(Workout, WorkoutItem.workout_id == Workout.id)
|
|
.where(Workout.user_id == user_id, Workout.status != "discarded")
|
|
.order_by(Workout.started_at.asc(), WorkoutSet.completed_at.asc())
|
|
)
|
|
if entity_id and kind == "exercise":
|
|
statement = statement.where(WorkoutItem.exercise_id == entity_id)
|
|
elif entity_id and kind == "equipment":
|
|
statement = statement.where(WorkoutItem.equipment_id == entity_id)
|
|
|
|
rows = list(db.execute(statement))
|
|
grouped: dict[str, dict[str, float]] = defaultdict(lambda: {"max_weight": 0, "volume": 0})
|
|
weights: list[float] = []
|
|
for started_at, weight, reps in rows:
|
|
date_key = started_at.date().isoformat()
|
|
numeric_weight = float(weight or 0)
|
|
grouped[date_key]["max_weight"] = max(grouped[date_key]["max_weight"], numeric_weight)
|
|
grouped[date_key]["volume"] += numeric_weight * int(reps or 0)
|
|
weights.append(numeric_weight)
|
|
|
|
points = [
|
|
ProgressionPoint(date=date, max_weight=values["max_weight"], volume=values["volume"])
|
|
for date, values in sorted(grouped.items())
|
|
]
|
|
previous_delta = None
|
|
if len(weights) >= 2:
|
|
previous_delta = round(weights[-1] - weights[-2], 2)
|
|
return ProgressionRead(
|
|
last_weight=weights[-1] if weights else None,
|
|
max_weight=max(weights) if weights else None,
|
|
previous_delta=previous_delta,
|
|
points=points,
|
|
)
|
|
|
|
|
|
@app.get("/internal/analytics/calories", dependencies=[InternalAuth], response_model=CaloriesRead)
|
|
def get_calories(db: Db, user_id: CurrentUserId) -> CaloriesRead:
|
|
workouts = list(
|
|
db.scalars(
|
|
select(Workout)
|
|
.where(Workout.user_id == user_id, Workout.status != "discarded")
|
|
.order_by(Workout.started_at.desc())
|
|
)
|
|
)
|
|
total = sum(float(workout.estimated_calories or 0) for workout in workouts)
|
|
return CaloriesRead(
|
|
total_calories=round(total, 2),
|
|
workouts=[
|
|
{
|
|
"id": str(workout.id),
|
|
"date": workout.started_at.date().isoformat(),
|
|
"calories": float(workout.estimated_calories or 0),
|
|
}
|
|
for workout in workouts
|
|
],
|
|
)
|