All checks were successful
CI/CD Pipeline / VM Test - security (push) Successful in 7s
CI/CD Pipeline / Backend Linting (push) Successful in 4s
CI/CD Pipeline / VM Test - backend-integration (push) Successful in 11s
CI/CD Pipeline / VM Test - full-stack (push) Successful in 8s
CI/CD Pipeline / VM Test - performance (push) Successful in 8s
CI/CD Pipeline / Nix Flake Check (push) Successful in 38s
CI/CD Pipeline / CI Summary (push) Successful in 0s
CI/CD Pipeline / Frontend Linting (push) Successful in 17s
365 lines
16 KiB
Python
365 lines
16 KiB
Python
"""Integration tests for authentication endpoints."""
|
|
|
|
from fastapi import status
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
class TestRegisterEndpoint:
|
|
"""Test POST /auth/register endpoint."""
|
|
|
|
def test_register_user_success(self, client: TestClient, test_user_data: dict):
|
|
"""Test successful user registration."""
|
|
response = client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
data = response.json()
|
|
assert "id" in data
|
|
assert data["email"] == test_user_data["email"]
|
|
assert "password" not in data # Password should not be returned
|
|
assert "password_hash" not in data
|
|
assert "created_at" in data
|
|
|
|
def test_register_user_duplicate_email(self, client: TestClient, test_user_data: dict):
|
|
"""Test that duplicate email registration fails."""
|
|
# Register first user
|
|
response1 = client.post("/api/v1/auth/register", json=test_user_data)
|
|
assert response1.status_code == status.HTTP_201_CREATED
|
|
|
|
# Try to register with same email
|
|
response2 = client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
assert response2.status_code == status.HTTP_409_CONFLICT
|
|
assert "already registered" in response2.json()["detail"].lower()
|
|
|
|
def test_register_user_weak_password(self, client: TestClient, test_user_data_weak_password: dict):
|
|
"""Test that weak password is rejected."""
|
|
response = client.post("/api/v1/auth/register", json=test_user_data_weak_password)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert "password" in response.json()["detail"].lower()
|
|
|
|
def test_register_user_no_uppercase(self, client: TestClient, test_user_data_no_uppercase: dict):
|
|
"""Test that password without uppercase is rejected."""
|
|
response = client.post("/api/v1/auth/register", json=test_user_data_no_uppercase)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert "uppercase" in response.json()["detail"].lower()
|
|
|
|
def test_register_user_no_lowercase(self, client: TestClient):
|
|
"""Test that password without lowercase is rejected."""
|
|
user_data = {"email": "test@example.com", "password": "TESTPASSWORD123"}
|
|
response = client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert "lowercase" in response.json()["detail"].lower()
|
|
|
|
def test_register_user_no_number(self, client: TestClient):
|
|
"""Test that password without number is rejected."""
|
|
user_data = {"email": "test@example.com", "password": "TestPassword"}
|
|
response = client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert "number" in response.json()["detail"].lower()
|
|
|
|
def test_register_user_too_short(self, client: TestClient):
|
|
"""Test that password shorter than 8 characters is rejected."""
|
|
user_data = {"email": "test@example.com", "password": "Test123"}
|
|
response = client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert "8 characters" in response.json()["detail"].lower()
|
|
|
|
def test_register_user_invalid_email(self, client: TestClient):
|
|
"""Test that invalid email format is rejected."""
|
|
invalid_emails = [
|
|
{"email": "not-an-email", "password": "TestPassword123"},
|
|
{"email": "missing@domain", "password": "TestPassword123"},
|
|
{"email": "@example.com", "password": "TestPassword123"},
|
|
{"email": "user@", "password": "TestPassword123"},
|
|
]
|
|
|
|
for user_data in invalid_emails:
|
|
response = client.post("/api/v1/auth/register", json=user_data)
|
|
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
|
|
def test_register_user_missing_fields(self, client: TestClient):
|
|
"""Test that missing required fields are rejected."""
|
|
# Missing email
|
|
response1 = client.post("/api/v1/auth/register", json={"password": "TestPassword123"})
|
|
assert response1.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
|
|
# Missing password
|
|
response2 = client.post("/api/v1/auth/register", json={"email": "test@example.com"})
|
|
assert response2.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
|
|
# Empty body
|
|
response3 = client.post("/api/v1/auth/register", json={})
|
|
assert response3.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
|
|
def test_register_user_email_case_handling(self, client: TestClient):
|
|
"""Test email case handling in registration."""
|
|
user_data_upper = {"email": "TEST@EXAMPLE.COM", "password": "TestPassword123"}
|
|
|
|
response = client.post("/api/v1/auth/register", json=user_data_upper)
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
# Email should be stored as lowercase
|
|
data = response.json()
|
|
assert data["email"] == "test@example.com"
|
|
|
|
|
|
class TestLoginEndpoint:
|
|
"""Test POST /auth/login endpoint."""
|
|
|
|
def test_login_user_success(self, client: TestClient, test_user_data: dict):
|
|
"""Test successful user login."""
|
|
# Register user first
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
# Login
|
|
response = client.post("/api/v1/auth/login", json=test_user_data)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
assert data["token_type"] == "bearer"
|
|
assert "user" in data
|
|
assert data["user"]["email"] == test_user_data["email"]
|
|
|
|
def test_login_user_wrong_password(self, client: TestClient, test_user_data: dict):
|
|
"""Test that wrong password fails login."""
|
|
# Register user
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
# Try to login with wrong password
|
|
wrong_data = {"email": test_user_data["email"], "password": "WrongPassword123"}
|
|
response = client.post("/api/v1/auth/login", json=wrong_data)
|
|
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
assert "WWW-Authenticate" in response.headers
|
|
assert response.headers["WWW-Authenticate"] == "Bearer"
|
|
|
|
def test_login_user_nonexistent_email(self, client: TestClient):
|
|
"""Test that login with nonexistent email fails."""
|
|
login_data = {"email": "nonexistent@example.com", "password": "TestPassword123"}
|
|
response = client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_login_user_case_sensitive_password(self, client: TestClient, test_user_data: dict):
|
|
"""Test that password is case-sensitive."""
|
|
# Register user
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
# Try to login with different case
|
|
wrong_case = {"email": test_user_data["email"], "password": test_user_data["password"].lower()}
|
|
response = client.post("/api/v1/auth/login", json=wrong_case)
|
|
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_login_user_email_case_insensitive(self, client: TestClient, test_user_data: dict):
|
|
"""Test that email login is case-insensitive."""
|
|
# Register user
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
# Login with different email case
|
|
upper_email = {"email": test_user_data["email"].upper(), "password": test_user_data["password"]}
|
|
response = client.post("/api/v1/auth/login", json=upper_email)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
def test_login_user_missing_fields(self, client: TestClient):
|
|
"""Test that missing fields are rejected."""
|
|
# Missing password
|
|
response1 = client.post("/api/v1/auth/login", json={"email": "test@example.com"})
|
|
assert response1.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
|
|
# Missing email
|
|
response2 = client.post("/api/v1/auth/login", json={"password": "TestPassword123"})
|
|
assert response2.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
|
|
def test_login_user_token_format(self, client: TestClient, test_user_data: dict):
|
|
"""Test that returned token is valid JWT format."""
|
|
# Register and login
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
response = client.post("/api/v1/auth/login", json=test_user_data)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
data = response.json()
|
|
token = data["access_token"]
|
|
|
|
# JWT should have 3 parts separated by dots
|
|
parts = token.split(".")
|
|
assert len(parts) == 3
|
|
|
|
# Each part should be base64-encoded (URL-safe)
|
|
import string
|
|
|
|
url_safe = string.ascii_letters + string.digits + "-_"
|
|
for part in parts:
|
|
assert all(c in url_safe for c in part)
|
|
|
|
|
|
class TestGetCurrentUserEndpoint:
|
|
"""Test GET /auth/me endpoint."""
|
|
|
|
def test_get_current_user_success(self, client: TestClient, test_user_data: dict):
|
|
"""Test getting current user info with valid token."""
|
|
# Register and login
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
login_response = client.post("/api/v1/auth/login", json=test_user_data)
|
|
|
|
token = login_response.json()["access_token"]
|
|
|
|
# Get current user
|
|
response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
data = response.json()
|
|
assert data["email"] == test_user_data["email"]
|
|
assert "id" in data
|
|
assert "created_at" in data
|
|
assert "password" not in data
|
|
|
|
def test_get_current_user_no_token(self, client: TestClient):
|
|
"""Test that missing token returns 401."""
|
|
response = client.get("/api/v1/auth/me")
|
|
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_get_current_user_invalid_token(self, client: TestClient):
|
|
"""Test that invalid token returns 401."""
|
|
response = client.get("/api/v1/auth/me", headers={"Authorization": "Bearer invalid_token"})
|
|
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_get_current_user_malformed_header(self, client: TestClient):
|
|
"""Test that malformed auth header returns 401."""
|
|
# Missing "Bearer" prefix
|
|
response1 = client.get("/api/v1/auth/me", headers={"Authorization": "just_a_token"})
|
|
assert response1.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
# Wrong prefix
|
|
response2 = client.get("/api/v1/auth/me", headers={"Authorization": "Basic dGVzdA=="})
|
|
assert response2.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_get_current_user_expired_token(self, client: TestClient, test_user_data: dict):
|
|
"""Test that expired token returns 401."""
|
|
from datetime import timedelta
|
|
|
|
from app.auth.jwt import create_access_token
|
|
|
|
# Register user
|
|
register_response = client.post("/api/v1/auth/register", json=test_user_data)
|
|
user_id = register_response.json()["id"]
|
|
|
|
# Create expired token
|
|
from uuid import UUID
|
|
|
|
expired_token = create_access_token(UUID(user_id), test_user_data["email"], timedelta(seconds=-10))
|
|
|
|
# Try to use expired token
|
|
response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {expired_token}"})
|
|
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
|
|
class TestAuthenticationFlow:
|
|
"""Test complete authentication flows."""
|
|
|
|
def test_complete_register_login_access_flow(self, client: TestClient, test_user_data: dict):
|
|
"""Test complete flow: register → login → access protected resource."""
|
|
# Step 1: Register
|
|
register_response = client.post("/api/v1/auth/register", json=test_user_data)
|
|
assert register_response.status_code == status.HTTP_201_CREATED
|
|
|
|
registered_user = register_response.json()
|
|
assert registered_user["email"] == test_user_data["email"]
|
|
|
|
# Step 2: Login
|
|
login_response = client.post("/api/v1/auth/login", json=test_user_data)
|
|
assert login_response.status_code == status.HTTP_200_OK
|
|
|
|
token = login_response.json()["access_token"]
|
|
login_user = login_response.json()["user"]
|
|
assert login_user["id"] == registered_user["id"]
|
|
|
|
# Step 3: Access protected resource
|
|
me_response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
|
|
assert me_response.status_code == status.HTTP_200_OK
|
|
|
|
current_user = me_response.json()
|
|
assert current_user["id"] == registered_user["id"]
|
|
assert current_user["email"] == test_user_data["email"]
|
|
|
|
def test_multiple_users_independent_authentication(self, client: TestClient):
|
|
"""Test that multiple users can register and authenticate independently."""
|
|
users = [
|
|
{"email": "user1@example.com", "password": "Password123"},
|
|
{"email": "user2@example.com", "password": "Password456"},
|
|
{"email": "user3@example.com", "password": "Password789"},
|
|
]
|
|
|
|
tokens = []
|
|
|
|
# Register all users
|
|
for user_data in users:
|
|
register_response = client.post("/api/v1/auth/register", json=user_data)
|
|
assert register_response.status_code == status.HTTP_201_CREATED
|
|
|
|
# Login each user
|
|
login_response = client.post("/api/v1/auth/login", json=user_data)
|
|
assert login_response.status_code == status.HTTP_200_OK
|
|
|
|
tokens.append(login_response.json()["access_token"])
|
|
|
|
# Verify each token works independently
|
|
for i, (user_data, token) in enumerate(zip(users, tokens)):
|
|
response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json()["email"] == user_data["email"]
|
|
|
|
def test_token_reuse_across_multiple_requests(self, client: TestClient, test_user_data: dict):
|
|
"""Test that same token can be reused for multiple requests."""
|
|
# Register and login
|
|
client.post("/api/v1/auth/register", json=test_user_data)
|
|
login_response = client.post("/api/v1/auth/login", json=test_user_data)
|
|
|
|
token = login_response.json()["access_token"]
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Make multiple requests with same token
|
|
for _ in range(5):
|
|
response = client.get("/api/v1/auth/me", headers=headers)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json()["email"] == test_user_data["email"]
|
|
|
|
def test_password_not_exposed_in_any_response(self, client: TestClient, test_user_data: dict):
|
|
"""Test that password is never exposed in any API response."""
|
|
# Register
|
|
register_response = client.post("/api/v1/auth/register", json=test_user_data)
|
|
register_data = register_response.json()
|
|
|
|
assert "password" not in register_data
|
|
assert "password_hash" not in register_data
|
|
|
|
# Login
|
|
login_response = client.post("/api/v1/auth/login", json=test_user_data)
|
|
login_data = login_response.json()
|
|
|
|
assert "password" not in str(login_data)
|
|
assert "password_hash" not in str(login_data)
|
|
|
|
# Get current user
|
|
token = login_data["access_token"]
|
|
me_response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
|
|
me_data = me_response.json()
|
|
|
|
assert "password" not in me_data
|
|
assert "password_hash" not in me_data
|
|
|