feat: implement refresh token functionality; update authentication and token models; add tests for refresh endpoint
Test / test (push) Successful in 13s

This commit is contained in:
k1nq
2025-11-28 13:56:04 +05:00
parent a8bdf18e38
commit 6db1e865f6
7 changed files with 165 additions and 16 deletions
+44 -5
View File
@@ -10,7 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import JWTService, PasswordHasher
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.services.auth_service import AuthService, InvalidCredentialsError
from app.services.auth_service import AuthService, InvalidCredentialsError, InvalidRefreshTokenError
class StubUserRepository(UserRepository):
@@ -25,6 +25,11 @@ class StubUserRepository(UserRepository):
return self._user
return None
async def get_by_id(self, user_id: int) -> User | None: # pragma: no cover - helper
if self._user and self._user.id == user_id:
return self._user
return None
@pytest.fixture()
def password_hasher() -> PasswordHasher:
@@ -71,7 +76,7 @@ async def test_authenticate_invalid_credentials(
await service.authenticate("user@example.com", "wrong-pass")
def test_create_access_token_contains_user_claims(
def test_issue_tokens_contains_user_claims(
password_hasher: PasswordHasher,
jwt_service: JWTService,
) -> None:
@@ -79,9 +84,43 @@ def test_create_access_token_contains_user_claims(
user.id = 42
service = AuthService(StubUserRepository(user), password_hasher, jwt_service)
token = service.create_access_token(user)
payload = jwt_service.decode(token.access_token)
token_pair = service.issue_tokens(user)
payload = jwt_service.decode(token_pair.access_token)
assert payload["sub"] == str(user.id)
assert payload["email"] == user.email
assert token.expires_in > 0
assert payload["scope"] == "access"
assert token_pair.refresh_token
assert token_pair.expires_in > 0
assert token_pair.refresh_expires_in > token_pair.expires_in
@pytest.mark.asyncio
async def test_refresh_tokens_returns_new_pair(
password_hasher: PasswordHasher,
jwt_service: JWTService,
) -> None:
user = User(email="refresh@example.com", hashed_password="hashed", name="Refresh", is_active=True)
user.id = 7
service = AuthService(StubUserRepository(user), password_hasher, jwt_service)
initial = service.issue_tokens(user)
refreshed = await service.refresh_tokens(initial.refresh_token)
assert refreshed.access_token
assert refreshed.refresh_token
@pytest.mark.asyncio
async def test_refresh_tokens_rejects_access_token(
password_hasher: PasswordHasher,
jwt_service: JWTService,
) -> None:
user = User(email="refresh@example.com", hashed_password="hashed", name="Refresh", is_active=True)
user.id = 9
service = AuthService(StubUserRepository(user), password_hasher, jwt_service)
pair = service.issue_tokens(user)
with pytest.raises(InvalidRefreshTokenError):
await service.refresh_tokens(pair.access_token)