feat: add unit and API tests for activities and tasks, including shared fixtures and scenarios
Test / test (push) Successful in 13s

This commit is contained in:
Artem Kashaev
2025-11-27 16:57:02 +05:00
parent b8958dedbd
commit 274ae7ee30
6 changed files with 643 additions and 0 deletions
+38
View File
@@ -0,0 +1,38 @@
"""Pytest fixtures shared across API v1 tests."""
from __future__ import annotations
from collections.abc import AsyncGenerator
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.api.deps import get_db_session
from app.main import create_app
from app.models import Base
@pytest_asyncio.fixture()
async def session_factory() -> AsyncGenerator[async_sessionmaker[AsyncSession], None]:
engine = create_async_engine("sqlite+aiosqlite:///:memory:", future=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
yield factory
await engine.dispose()
@pytest_asyncio.fixture()
async def client(
session_factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncClient, None]:
app = create_app()
async def _get_session_override() -> AsyncGenerator[AsyncSession, None]:
async with session_factory() as session:
yield session
app.dependency_overrides[get_db_session] = _get_session_override
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as test_client:
yield test_client
+101
View File
@@ -0,0 +1,101 @@
"""Shared helpers for task and activity API tests."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.core.security import jwt_service
from app.models.contact import Contact
from app.models.deal import Deal
from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User
@dataclass(slots=True)
class Scenario:
"""Captures seeded entities for API tests."""
user_id: int
user_email: str
organization_id: int
contact_id: int
deal_id: int
async def prepare_scenario(session_factory: async_sessionmaker[AsyncSession]) -> Scenario:
async with session_factory() as session:
user = User(email="owner@example.com", hashed_password="hashed", name="Owner", is_active=True)
org = Organization(name="Acme LLC")
session.add_all([user, org])
await session.flush()
membership = OrganizationMember(
organization_id=org.id,
user_id=user.id,
role=OrganizationRole.OWNER,
)
session.add(membership)
contact = Contact(
organization_id=org.id,
owner_id=user.id,
name="John Doe",
email="john@example.com",
)
session.add(contact)
await session.flush()
deal = Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Website redesign",
amount=None,
)
session.add(deal)
await session.commit()
return Scenario(
user_id=user.id,
user_email=user.email,
organization_id=org.id,
contact_id=contact.id,
deal_id=deal.id,
)
async def create_deal(
session_factory: async_sessionmaker[AsyncSession],
*,
scenario: Scenario,
title: str,
) -> int:
async with session_factory() as session:
deal = Deal(
organization_id=scenario.organization_id,
contact_id=scenario.contact_id,
owner_id=scenario.user_id,
title=title,
amount=None,
)
session.add(deal)
await session.commit()
return deal.id
def auth_headers(token: str, scenario: Scenario) -> dict[str, str]:
return {
"Authorization": f"Bearer {token}",
"X-Organization-Id": str(scenario.organization_id),
}
def make_token(user_id: int, email: str) -> str:
return jwt_service.create_access_token(
subject=str(user_id),
expires_delta=timedelta(minutes=30),
claims={"email": email},
)
+63
View File
@@ -0,0 +1,63 @@
"""API tests for activity endpoints."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.models.activity import Activity, ActivityType
from tests.api.v1.task_activity_shared import auth_headers, make_token, prepare_scenario
@pytest.mark.asyncio
async def test_create_activity_comment_endpoint(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
response = await client.post(
f"/api/v1/deals/{scenario.deal_id}/activities/",
json={"type": "comment", "payload": {"text": " hello world "}},
headers=auth_headers(token, scenario),
)
assert response.status_code == 201
payload = response.json()
assert payload["payload"]["text"] == "hello world"
assert payload["type"] == ActivityType.COMMENT.value
@pytest.mark.asyncio
async def test_list_activities_endpoint_supports_pagination(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
base_time = datetime.now(timezone.utc)
async with session_factory() as session:
for index in range(3):
activity = Activity(
deal_id=scenario.deal_id,
author_id=scenario.user_id,
type=ActivityType.COMMENT,
payload={"text": f"Entry {index}"},
created_at=base_time + timedelta(seconds=index),
)
session.add(activity)
await session.commit()
response = await client.get(
f"/api/v1/deals/{scenario.deal_id}/activities/?limit=2&offset=1",
headers=auth_headers(token, scenario),
)
assert response.status_code == 200
data = response.json()
assert len(data) == 2
assert data[0]["payload"]["text"] == "Entry 1"
assert data[1]["payload"]["text"] == "Entry 2"
+78
View File
@@ -0,0 +1,78 @@
"""API tests for task endpoints."""
from __future__ import annotations
from datetime import date, datetime, timedelta, timezone
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.models.task import Task
from tests.api.v1.task_activity_shared import auth_headers, create_deal, make_token, prepare_scenario
@pytest.mark.asyncio
async def test_create_task_endpoint_creates_task_and_activity(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
due_date = (date.today() + timedelta(days=5)).isoformat()
response = await client.post(
"/api/v1/tasks/",
json={
"deal_id": scenario.deal_id,
"title": "Prepare proposal",
"description": "Send draft",
"due_date": due_date,
},
headers=auth_headers(token, scenario),
)
assert response.status_code == 201
payload = response.json()
assert payload["deal_id"] == scenario.deal_id
assert payload["title"] == "Prepare proposal"
assert payload["is_done"] is False
@pytest.mark.asyncio
async def test_list_tasks_endpoint_filters_by_deal(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
other_deal_id = await create_deal(session_factory, scenario=scenario, title="Renewal")
async with session_factory() as session:
session.add_all(
[
Task(
deal_id=scenario.deal_id,
title="Task A",
description=None,
due_date=datetime.now(timezone.utc) + timedelta(days=2),
is_done=False,
),
Task(
deal_id=other_deal_id,
title="Task B",
description=None,
due_date=datetime.now(timezone.utc) + timedelta(days=3),
is_done=False,
),
]
)
await session.commit()
response = await client.get(
f"/api/v1/tasks/?deal_id={scenario.deal_id}",
headers=auth_headers(token, scenario),
)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["title"] == "Task A"