Init project structure

This commit is contained in:
k1nq
2025-11-22 13:56:45 +05:00
parent ad0025be28
commit 74330b292f
25 changed files with 1343 additions and 1 deletions
+24
View File
@@ -0,0 +1,24 @@
"""Application settings using Pydantic Settings."""
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Runtime application configuration."""
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="allow")
project_name: str = "Test Task CRM"
version: str = "0.1.0"
api_v1_prefix: str = "/api/v1"
database_url: str = Field(
default="postgresql+asyncpg://postgres:postgres@localhost:5432/test_task_crm",
description="SQLAlchemy async connection string",
)
sqlalchemy_echo: bool = False
jwt_secret_key: SecretStr = Field(default=SecretStr("change-me"))
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 30
settings = Settings()
+17
View File
@@ -0,0 +1,17 @@
"""Database utilities for async SQLAlchemy engine and sessions."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.core.config import settings
engine = create_async_engine(settings.database_url, echo=settings.sqlalchemy_echo)
AsyncSessionMaker = async_sessionmaker(bind=engine, expire_on_commit=False)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Yield an async database session for request scope."""
async with AsyncSessionMaker() as session:
yield session
+57
View File
@@ -0,0 +1,57 @@
"""Security helpers for hashing passwords and issuing JWT tokens."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping
import jwt
from passlib.context import CryptContext
from app.core.config import settings
class PasswordHasher:
"""Wraps passlib context to hash and verify secrets."""
def __init__(self) -> None:
self._context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash(self, password: str) -> str:
return self._context.hash(password)
def verify(self, password: str, hashed_password: str) -> bool:
return self._context.verify(password, hashed_password)
class JWTService:
"""Handles encoding and decoding of JWT access tokens."""
def __init__(self, secret_key: str, algorithm: str) -> None:
self._secret_key = secret_key
self._algorithm = algorithm
def create_access_token(
self,
subject: str,
expires_delta: timedelta,
claims: Mapping[str, Any] | None = None,
) -> str:
now = datetime.now(timezone.utc)
payload: dict[str, Any] = {
"sub": subject,
"iat": now,
"exp": now + expires_delta,
}
if claims:
payload.update(claims)
return jwt.encode(payload, self._secret_key, algorithm=self._algorithm)
def decode(self, token: str) -> dict[str, Any]:
return jwt.decode(token, self._secret_key, algorithms=[self._algorithm])
password_hasher = PasswordHasher()
jwt_service = JWTService(
secret_key=settings.jwt_secret_key.get_secret_value(),
algorithm=settings.jwt_algorithm,
)