feat: implement Redis caching for analytics endpoints with fallback to database
Test / test (push) Successful in 15s

This commit is contained in:
k1nq
2025-11-29 09:45:27 +05:00
parent 31d6a05521
commit fbb3116a2d
15 changed files with 671 additions and 13 deletions
+22 -3
View File
@@ -6,6 +6,7 @@ from fastapi import Depends, Header, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.cache import get_cache_client
from app.core.config import settings
from app.core.database import get_session
from app.core.security import jwt_service, password_hasher
@@ -29,6 +30,7 @@ from app.services.organization_service import (
OrganizationService,
)
from app.services.task_service import TaskService
from redis.asyncio.client import Redis
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/token")
@@ -67,8 +69,19 @@ def get_analytics_repository(session: AsyncSession = Depends(get_db_session)) ->
return AnalyticsRepository(session=session)
def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService:
return DealService(repository=repo)
def get_cache_backend() -> Redis | None:
return get_cache_client()
def get_deal_service(
repo: DealRepository = Depends(get_deal_repository),
cache: Redis | None = Depends(get_cache_backend),
) -> DealService:
return DealService(
repository=repo,
cache=cache,
cache_backoff_ms=settings.analytics_cache_backoff_ms,
)
def get_auth_service(
@@ -95,8 +108,14 @@ def get_activity_service(
def get_analytics_service(
repo: AnalyticsRepository = Depends(get_analytics_repository),
cache: Redis | None = Depends(get_cache_backend),
) -> AnalyticsService:
return AnalyticsService(repository=repo)
return AnalyticsService(
repository=repo,
cache=cache,
ttl_seconds=settings.analytics_cache_ttl_seconds,
backoff_ms=settings.analytics_cache_backoff_ms,
)
def get_contact_service(
+160
View File
@@ -0,0 +1,160 @@
"""Redis cache utilities and availability tracking."""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, Awaitable, Callable, Optional
import redis.asyncio as redis
from redis.asyncio.client import Redis
from redis.exceptions import RedisError
from app.core.config import settings
logger = logging.getLogger(__name__)
class RedisCacheManager:
"""Manages lifecycle and availability of the Redis cache client."""
def __init__(self) -> None:
self._client: Redis | None = None
self._available: bool = False
self._lock = asyncio.Lock()
@property
def is_enabled(self) -> bool:
return settings.redis_enabled
@property
def is_available(self) -> bool:
return self._available and self._client is not None
def get_client(self) -> Redis | None:
if not self.is_enabled:
return None
if self.is_available:
return self._client
return None
async def startup(self) -> None:
if not self.is_enabled:
return
async with self._lock:
if self._client is not None:
return
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
await self._refresh_availability()
async def shutdown(self) -> None:
async with self._lock:
if self._client is not None:
await self._client.close()
self._client = None
self._available = False
async def reconnect(self) -> None:
if not self.is_enabled:
return
async with self._lock:
if self._client is None:
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
await self._refresh_availability()
async def _refresh_availability(self) -> None:
if self._client is None:
self._available = False
return
try:
await self._client.ping()
except RedisError as exc: # pragma: no cover - logging only
self._available = False
logger.warning("Redis ping failed: %s", exc)
else:
self._available = True
def mark_unavailable(self) -> None:
self._available = False
def mark_available(self) -> None:
if self._client is not None:
self._available = True
cache_manager = RedisCacheManager()
async def init_cache() -> None:
"""Initialize Redis cache connection if enabled."""
await cache_manager.startup()
async def shutdown_cache() -> None:
"""Close Redis cache connection."""
await cache_manager.shutdown()
def get_cache_client() -> Optional[Redis]:
"""Expose the active Redis client for dependency injection."""
return cache_manager.get_client()
async def read_json(client: Redis, key: str) -> Any | None:
"""Read and decode JSON payload from Redis."""
try:
raw = await client.get(key)
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.debug("Redis GET failed for %s: %s", key, exc)
return None
if raw is None:
return None
cache_manager.mark_available()
try:
return json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc: # pragma: no cover - malformed payloads
logger.warning("Discarding malformed cache entry %s: %s", key, exc)
return None
async def write_json(client: Redis, key: str, value: Any, ttl_seconds: int, backoff_ms: int) -> None:
"""Serialize data to JSON and store it with TTL using retry/backoff."""
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
async def _operation() -> Any:
return await client.set(name=key, value=payload, ex=ttl_seconds)
await _run_with_retry(_operation, backoff_ms)
async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None:
"""Delete cache keys with retry/backoff semantics."""
if not keys:
return
async def _operation() -> Any:
return await client.delete(*keys)
await _run_with_retry(_operation, backoff_ms)
async def _run_with_retry(operation: Callable[[], Awaitable[Any]], max_sleep_ms: int) -> None:
try:
await operation()
cache_manager.mark_available()
return
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.debug("Redis cache operation failed: %s", exc)
if max_sleep_ms <= 0:
return
sleep_seconds = min(max_sleep_ms / 1000, 0.1)
await asyncio.sleep(sleep_seconds)
await cache_manager.reconnect()
try:
await operation()
cache_manager.mark_available()
except RedisError as exc: # pragma: no cover - repeated network errors
cache_manager.mark_unavailable()
logger.warning("Redis cache operation failed after retry: %s", exc)
+8
View File
@@ -20,6 +20,14 @@ class Settings(BaseSettings):
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
redis_enabled: bool = Field(default=False, description="Toggle Redis-backed cache usage")
redis_url: str = Field(default="redis://localhost:6379/0", description="Redis connection URL")
analytics_cache_ttl_seconds: int = Field(default=120, ge=1, description="TTL for cached analytics responses")
analytics_cache_backoff_ms: int = Field(
default=200,
ge=0,
description="Maximum backoff (ms) for retrying cache writes/invalidation",
)
settings = Settings()
+1
View File
@@ -0,0 +1 @@
"""Application middleware components."""
+38
View File
@@ -0,0 +1,38 @@
"""Middleware that logs cache availability transitions."""
from __future__ import annotations
import logging
from starlette.types import ASGIApp, Receive, Scope, Send
from app.core.cache import cache_manager
from app.core.config import settings
logger = logging.getLogger(__name__)
class CacheAvailabilityMiddleware:
"""Logs when Redis cache becomes unavailable or recovers."""
def __init__(self, app: ASGIApp) -> None:
self.app = app
self._last_state: bool | None = None
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "http" and settings.redis_enabled:
self._log_transition()
await self.app(scope, receive, send)
def _log_transition(self) -> None:
available = cache_manager.is_available
if self._last_state is None:
self._last_state = available
if not available:
logger.warning("Redis cache unavailable, serving responses without cache")
return
if available == self._last_state:
return
if available:
logger.info("Redis cache connectivity restored; caching re-enabled")
else:
logger.warning("Redis cache unavailable, serving responses without cache")
self._last_state = available
+12
View File
@@ -2,13 +2,25 @@
from fastapi import FastAPI
from app.api.routes import api_router
from app.core.cache import init_cache, shutdown_cache
from app.core.config import settings
from app.core.middleware.cache_monitor import CacheAvailabilityMiddleware
def create_app() -> FastAPI:
"""Build FastAPI application instance."""
application = FastAPI(title=settings.project_name, version=settings.version)
application.include_router(api_router)
application.add_middleware(CacheAvailabilityMiddleware)
@application.on_event("startup")
async def _startup() -> None:
await init_cache()
@application.on_event("shutdown")
async def _shutdown() -> None:
await shutdown_cache()
return application
+206 -4
View File
@@ -1,11 +1,16 @@
"""Analytics-related business logic."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Iterable
from decimal import Decimal, InvalidOperation
from typing import Any, Iterable
from redis.asyncio.client import Redis
from redis.exceptions import RedisError
from app.core.cache import cache_manager, delete_keys, read_json, write_json
from app.models.deal import DealStage, DealStatus
from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup
@@ -53,13 +58,33 @@ class StageBreakdown:
conversion_to_next: float | None
logger = logging.getLogger(__name__)
_SUMMARY_CACHE_PREFIX = "analytics:summary"
_FUNNEL_CACHE_PREFIX = "analytics:funnel"
class AnalyticsService:
"""Provides aggregated analytics for deals."""
def __init__(self, repository: AnalyticsRepository) -> None:
def __init__(
self,
repository: AnalyticsRepository,
cache: Redis | None = None,
*,
ttl_seconds: int = 0,
backoff_ms: int = 0,
) -> None:
self._repository = repository
self._cache = cache
self._ttl_seconds = ttl_seconds
self._backoff_ms = backoff_ms
async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary:
cached = await self._fetch_cached_summary(organization_id, days)
if cached is not None:
return cached
status_rollup = await self._repository.fetch_status_rollup(organization_id)
status_map = {item.status: item for item in status_rollup}
@@ -87,7 +112,7 @@ class AnalyticsService:
window_threshold = _threshold_from_days(days)
new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold)
return DealSummary(
summary = DealSummary(
by_status=summaries,
won=WonStatistics(
count=won_count,
@@ -98,7 +123,14 @@ class AnalyticsService:
total_deals=total_deals,
)
await self._store_summary_cache(organization_id, days, summary)
return summary
async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]:
cached = await self._fetch_cached_funnel(organization_id)
if cached is not None:
return cached
rollup = await self._repository.fetch_stage_status_rollup(organization_id)
stage_map = _build_stage_map(rollup)
@@ -121,8 +153,44 @@ class AnalyticsService:
conversion_to_next=conversion,
)
)
await self._store_funnel_cache(organization_id, breakdowns)
return breakdowns
def _is_cache_enabled(self) -> bool:
return self._cache is not None and self._ttl_seconds > 0
async def _fetch_cached_summary(self, organization_id: int, days: int) -> DealSummary | None:
if not self._is_cache_enabled() or self._cache is None:
return None
key = _summary_cache_key(organization_id, days)
payload = await read_json(self._cache, key)
if payload is None:
return None
return _deserialize_summary(payload)
async def _store_summary_cache(self, organization_id: int, days: int, summary: DealSummary) -> None:
if not self._is_cache_enabled() or self._cache is None:
return
key = _summary_cache_key(organization_id, days)
payload = _serialize_summary(summary)
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
async def _fetch_cached_funnel(self, organization_id: int) -> list[StageBreakdown] | None:
if not self._is_cache_enabled() or self._cache is None:
return None
key = _funnel_cache_key(organization_id)
payload = await read_json(self._cache, key)
if payload is None:
return None
return _deserialize_funnel(payload)
async def _store_funnel_cache(self, organization_id: int, breakdowns: list[StageBreakdown]) -> None:
if not self._is_cache_enabled() or self._cache is None:
return
key = _funnel_cache_key(organization_id)
payload = _serialize_funnel(breakdowns)
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
def _threshold_from_days(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days)
@@ -137,3 +205,137 @@ def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dic
stage_map.setdefault(item.stage, {status: 0 for status in DealStatus})
stage_map[item.stage][item.status] = item.deal_count
return stage_map
def _summary_cache_key(organization_id: int, days: int) -> str:
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:{days}"
def summary_cache_pattern(organization_id: int) -> str:
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:*"
def _funnel_cache_key(organization_id: int) -> str:
return f"{_FUNNEL_CACHE_PREFIX}:{organization_id}"
def funnel_cache_key(organization_id: int) -> str:
return _funnel_cache_key(organization_id)
def _serialize_summary(summary: DealSummary) -> dict[str, Any]:
return {
"by_status": [
{
"status": item.status.value,
"count": item.count,
"amount_sum": str(item.amount_sum),
}
for item in summary.by_status
],
"won": {
"count": summary.won.count,
"amount_sum": str(summary.won.amount_sum),
"average_amount": str(summary.won.average_amount),
},
"new_deals": {
"days": summary.new_deals.days,
"count": summary.new_deals.count,
},
"total_deals": summary.total_deals,
}
def _deserialize_summary(payload: Any) -> DealSummary | None:
try:
by_status_payload = payload["by_status"]
won_payload = payload["won"]
new_deals_payload = payload["new_deals"]
total_deals = int(payload["total_deals"])
except (KeyError, TypeError, ValueError):
return None
summaries: list[StatusSummary] = []
try:
for item in by_status_payload:
summaries.append(
StatusSummary(
status=DealStatus(item["status"]),
count=int(item["count"]),
amount_sum=Decimal(item["amount_sum"]),
)
)
won = WonStatistics(
count=int(won_payload["count"]),
amount_sum=Decimal(won_payload["amount_sum"]),
average_amount=Decimal(won_payload["average_amount"]),
)
new_deals = NewDealsWindow(
days=int(new_deals_payload["days"]),
count=int(new_deals_payload["count"]),
)
except (KeyError, TypeError, ValueError, InvalidOperation):
return None
return DealSummary(by_status=summaries, won=won, new_deals=new_deals, total_deals=total_deals)
def _serialize_funnel(breakdowns: list[StageBreakdown]) -> list[dict[str, Any]]:
serialized: list[dict[str, Any]] = []
for item in breakdowns:
serialized.append(
{
"stage": item.stage.value,
"total": item.total,
"by_status": {status.value: count for status, count in item.by_status.items()},
"conversion_to_next": item.conversion_to_next,
}
)
return serialized
def _deserialize_funnel(payload: Any) -> list[StageBreakdown] | None:
if not isinstance(payload, list):
return None
breakdowns: list[StageBreakdown] = []
try:
for item in payload:
by_status_payload = item["by_status"]
by_status = {DealStatus(key): int(value) for key, value in by_status_payload.items()}
breakdowns.append(
StageBreakdown(
stage=DealStage(item["stage"]),
total=int(item["total"]),
by_status=by_status,
conversion_to_next=float(item["conversion_to_next"]) if item["conversion_to_next"] is not None else None,
)
)
except (KeyError, TypeError, ValueError):
return None
return breakdowns
async def invalidate_analytics_cache(cache: Redis | None, organization_id: int, backoff_ms: int) -> None:
"""Remove cached analytics payloads for the organization."""
if cache is None:
return
summary_pattern = summary_cache_pattern(organization_id)
keys: list[str] = [funnel_cache_key(organization_id)]
try:
async for raw_key in cache.scan_iter(match=summary_pattern):
if isinstance(raw_key, bytes):
keys.append(raw_key.decode("utf-8"))
else:
keys.append(str(raw_key))
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.warning(
"Failed to enumerate summary cache keys for organization %s: %s",
organization_id,
exc,
)
return
await delete_keys(cache, keys, backoff_ms)
+15 -2
View File
@@ -5,6 +5,7 @@ from collections.abc import Iterable
from dataclasses import dataclass
from decimal import Decimal
from redis.asyncio.client import Redis
from sqlalchemy import func, select
from app.models.activity import Activity, ActivityType
@@ -12,6 +13,7 @@ from app.models.contact import Contact
from app.models.deal import Deal, DealCreate, DealStage, DealStatus
from app.models.organization_member import OrganizationRole
from app.repositories.deal_repo import DealRepository
from app.services.analytics_service import invalidate_analytics_cache
from app.services.organization_service import OrganizationContext
@@ -61,13 +63,23 @@ class DealUpdateData:
class DealService:
"""Encapsulates deal workflows and validations."""
def __init__(self, repository: DealRepository) -> None:
def __init__(
self,
repository: DealRepository,
cache: Redis | None = None,
*,
cache_backoff_ms: int = 0,
) -> None:
self._repository = repository
self._cache = cache
self._cache_backoff_ms = cache_backoff_ms
async def create_deal(self, data: DealCreate, *, context: OrganizationContext) -> Deal:
self._ensure_same_organization(data.organization_id, context)
await self._ensure_contact_in_organization(data.contact_id, context.organization_id)
return await self._repository.create(data=data, role=context.role, user_id=context.user_id)
deal = await self._repository.create(data=data, role=context.role, user_id=context.user_id)
await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms)
return deal
async def update_deal(
self,
@@ -111,6 +123,7 @@ class DealService:
author_id=context.user_id,
activities=[activity for activity in [stage_activity, status_activity] if activity],
)
await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms)
return updated
async def ensure_contact_can_be_deleted(self, contact_id: int) -> None: