refactor: enhance type hinting and casting for improved type safety across multiple files

This commit is contained in:
Artem Kashaev
2025-12-01 16:44:14 +05:00
parent f234e60e65
commit 688ade0452
14 changed files with 62 additions and 42 deletions
+20 -14
View File
@@ -6,7 +6,7 @@ import asyncio
import json
import logging
from collections.abc import Awaitable, Callable
from typing import Any
from typing import Any, cast
import redis.asyncio as redis
from app.core.config import settings
@@ -45,10 +45,13 @@ class RedisCacheManager:
async with self._lock:
if self._client is not None:
return
self._client = redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=False,
self._client = cast(
Redis,
redis.from_url( # type: ignore[no-untyped-call]
settings.redis_url,
encoding="utf-8",
decode_responses=False,
),
)
await self._refresh_availability()
@@ -64,10 +67,13 @@ class RedisCacheManager:
return
async with self._lock:
if self._client is None:
self._client = redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=False,
self._client = cast(
Redis,
redis.from_url( # type: ignore[no-untyped-call]
settings.redis_url,
encoding="utf-8",
decode_responses=False,
),
)
await self._refresh_availability()
@@ -76,7 +82,7 @@ class RedisCacheManager:
self._available = False
return
try:
await self._client.ping()
await cast(Awaitable[Any], self._client.ping())
except RedisError as exc: # pragma: no cover - logging only
self._available = False
logger.warning("Redis ping failed: %s", exc)
@@ -140,8 +146,8 @@ async def write_json(
"""Serialize data to JSON and store it with TTL using retry/backoff."""
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
async def _operation() -> Any:
return await client.set(name=key, value=payload, ex=ttl_seconds)
async def _operation() -> None:
await client.set(name=key, value=payload, ex=ttl_seconds)
await _run_with_retry(_operation, backoff_ms)
@@ -151,8 +157,8 @@ async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None:
if not keys:
return
async def _operation() -> Any:
return await client.delete(*keys)
async def _operation() -> None:
await client.delete(*keys)
await _run_with_retry(_operation, backoff_ms)
+5 -5
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
from collections.abc import Mapping
from datetime import datetime, timedelta, timezone
from typing import Any
from typing import Any, cast
import jwt
from app.core.config import settings
@@ -18,10 +18,10 @@ class PasswordHasher:
self._context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
def hash(self, password: str) -> str:
return self._context.hash(password)
return cast(str, self._context.hash(password))
def verify(self, password: str, hashed_password: str) -> bool:
return self._context.verify(password, hashed_password)
return bool(self._context.verify(password, hashed_password))
class JWTService:
@@ -45,10 +45,10 @@ class JWTService:
}
if claims:
payload.update(claims)
return jwt.encode(payload, self._secret_key, algorithm=self._algorithm)
return cast(str, jwt.encode(payload, self._secret_key, algorithm=self._algorithm))
def decode(self, token: str) -> dict[str, Any]:
return jwt.decode(token, self._secret_key, algorithms=[self._algorithm])
return cast(dict[str, Any], jwt.decode(token, self._secret_key, algorithms=[self._algorithm]))
password_hasher = PasswordHasher()