feat: add API and unit tests for analytics endpoints and services
This commit is contained in:
@@ -0,0 +1,152 @@
|
||||
"""Unit tests for AnalyticsService."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from decimal import Decimal
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from app.models import Base
|
||||
from app.models.contact import Contact
|
||||
from app.models.deal import Deal, DealStage, DealStatus
|
||||
from app.models.organization import Organization
|
||||
from app.models.organization_member import OrganizationMember, OrganizationRole
|
||||
from app.models.user import User
|
||||
from app.repositories.analytics_repo import AnalyticsRepository
|
||||
from app.services.analytics_service import AnalyticsService
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def session() -> AsyncGenerator[AsyncSession, None]:
|
||||
engine = create_async_engine(
|
||||
"sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool
|
||||
)
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
Session = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with Session() as session:
|
||||
yield session
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def _seed_data(session: AsyncSession) -> tuple[int, int, int]:
|
||||
org = Organization(name="Analytics Org")
|
||||
user = User(email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True)
|
||||
session.add_all([org, user])
|
||||
await session.flush()
|
||||
|
||||
member = OrganizationMember(organization_id=org.id, user_id=user.id, role=OrganizationRole.OWNER)
|
||||
contact = Contact(organization_id=org.id, owner_id=user.id, name="Client", email="client@example.com")
|
||||
session.add_all([member, contact])
|
||||
await session.flush()
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
deals = [
|
||||
Deal(
|
||||
organization_id=org.id,
|
||||
contact_id=contact.id,
|
||||
owner_id=user.id,
|
||||
title="Qual 1",
|
||||
amount=Decimal("100"),
|
||||
status=DealStatus.NEW,
|
||||
stage=DealStage.QUALIFICATION,
|
||||
created_at=now - timedelta(days=5),
|
||||
),
|
||||
Deal(
|
||||
organization_id=org.id,
|
||||
contact_id=contact.id,
|
||||
owner_id=user.id,
|
||||
title="Qual 2",
|
||||
amount=Decimal("150"),
|
||||
status=DealStatus.NEW,
|
||||
stage=DealStage.QUALIFICATION,
|
||||
created_at=now - timedelta(days=3),
|
||||
),
|
||||
Deal(
|
||||
organization_id=org.id,
|
||||
contact_id=contact.id,
|
||||
owner_id=user.id,
|
||||
title="Proposal",
|
||||
amount=Decimal("200"),
|
||||
status=DealStatus.IN_PROGRESS,
|
||||
stage=DealStage.PROPOSAL,
|
||||
created_at=now - timedelta(days=15),
|
||||
),
|
||||
Deal(
|
||||
organization_id=org.id,
|
||||
contact_id=contact.id,
|
||||
owner_id=user.id,
|
||||
title="Negotiation Won",
|
||||
amount=Decimal("500"),
|
||||
status=DealStatus.WON,
|
||||
stage=DealStage.NEGOTIATION,
|
||||
created_at=now - timedelta(days=2),
|
||||
),
|
||||
Deal(
|
||||
organization_id=org.id,
|
||||
contact_id=contact.id,
|
||||
owner_id=user.id,
|
||||
title="Negotiation Won No Amount",
|
||||
amount=None,
|
||||
status=DealStatus.WON,
|
||||
stage=DealStage.NEGOTIATION,
|
||||
created_at=now - timedelta(days=1),
|
||||
),
|
||||
Deal(
|
||||
organization_id=org.id,
|
||||
contact_id=contact.id,
|
||||
owner_id=user.id,
|
||||
title="Closed Lost",
|
||||
amount=Decimal("300"),
|
||||
status=DealStatus.LOST,
|
||||
stage=DealStage.CLOSED,
|
||||
created_at=now - timedelta(days=40),
|
||||
),
|
||||
]
|
||||
session.add_all(deals)
|
||||
await session.commit()
|
||||
return org.id, user.id, contact.id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deal_summary_returns_expected_metrics(session: AsyncSession) -> None:
|
||||
org_id, _, _ = await _seed_data(session)
|
||||
service = AnalyticsService(repository=AnalyticsRepository(session))
|
||||
|
||||
summary = await service.get_deal_summary(org_id, days=30)
|
||||
|
||||
assert summary.total_deals == 6
|
||||
status_map = {item.status: item for item in summary.by_status}
|
||||
assert status_map[DealStatus.NEW].count == 2
|
||||
assert Decimal(status_map[DealStatus.NEW].amount_sum) == Decimal("250")
|
||||
assert status_map[DealStatus.WON].count == 2
|
||||
assert Decimal(summary.won.amount_sum) == Decimal("500")
|
||||
assert Decimal(summary.won.average_amount) == Decimal("500")
|
||||
assert summary.new_deals.count == 5 # все кроме старой закрытой сделки
|
||||
assert summary.new_deals.days == 30
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession) -> None:
|
||||
org_id, _, _ = await _seed_data(session)
|
||||
service = AnalyticsService(repository=AnalyticsRepository(session))
|
||||
|
||||
funnel = await service.get_deal_funnel(org_id)
|
||||
|
||||
assert len(funnel) == 4
|
||||
qual = next(item for item in funnel if item.stage == DealStage.QUALIFICATION)
|
||||
assert qual.total == 2
|
||||
assert qual.by_status[DealStatus.NEW] == 2
|
||||
assert qual.conversion_to_next == 50.0
|
||||
|
||||
proposal = next(item for item in funnel if item.stage == DealStage.PROPOSAL)
|
||||
assert proposal.total == 1
|
||||
assert proposal.by_status[DealStatus.IN_PROGRESS] == 1
|
||||
assert proposal.conversion_to_next == 200.0
|
||||
|
||||
last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED)
|
||||
assert last_stage.conversion_to_next is None
|
||||
Reference in New Issue
Block a user