"""Integration tests for authentication endpoints.""" from fastapi import status from fastapi.testclient import TestClient class TestRegisterEndpoint: """Test POST /auth/register endpoint.""" def test_register_user_success(self, client: TestClient, test_user_data: dict): """Test successful user registration.""" response = client.post("/api/v1/auth/register", json=test_user_data) assert response.status_code == status.HTTP_201_CREATED data = response.json() assert "id" in data assert data["email"] == test_user_data["email"] assert "password" not in data # Password should not be returned assert "password_hash" not in data assert "created_at" in data def test_register_user_duplicate_email(self, client: TestClient, test_user_data: dict): """Test that duplicate email registration fails.""" # Register first user response1 = client.post("/api/v1/auth/register", json=test_user_data) assert response1.status_code == status.HTTP_201_CREATED # Try to register with same email response2 = client.post("/api/v1/auth/register", json=test_user_data) assert response2.status_code == status.HTTP_409_CONFLICT assert "already registered" in response2.json()["detail"].lower() def test_register_user_weak_password(self, client: TestClient, test_user_data_weak_password: dict): """Test that weak password is rejected.""" response = client.post("/api/v1/auth/register", json=test_user_data_weak_password) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "password" in response.json()["detail"].lower() def test_register_user_no_uppercase(self, client: TestClient, test_user_data_no_uppercase: dict): """Test that password without uppercase is rejected.""" response = client.post("/api/v1/auth/register", json=test_user_data_no_uppercase) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "uppercase" in response.json()["detail"].lower() def test_register_user_no_lowercase(self, client: TestClient): """Test that password without lowercase is rejected.""" user_data = {"email": "test@example.com", "password": "TESTPASSWORD123"} response = client.post("/api/v1/auth/register", json=user_data) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "lowercase" in response.json()["detail"].lower() def test_register_user_no_number(self, client: TestClient): """Test that password without number is rejected.""" user_data = {"email": "test@example.com", "password": "TestPassword"} response = client.post("/api/v1/auth/register", json=user_data) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "number" in response.json()["detail"].lower() def test_register_user_too_short(self, client: TestClient): """Test that password shorter than 8 characters is rejected.""" user_data = {"email": "test@example.com", "password": "Test123"} response = client.post("/api/v1/auth/register", json=user_data) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "8 characters" in response.json()["detail"].lower() def test_register_user_invalid_email(self, client: TestClient): """Test that invalid email format is rejected.""" invalid_emails = [ {"email": "not-an-email", "password": "TestPassword123"}, {"email": "missing@domain", "password": "TestPassword123"}, {"email": "@example.com", "password": "TestPassword123"}, {"email": "user@", "password": "TestPassword123"}, ] for user_data in invalid_emails: response = client.post("/api/v1/auth/register", json=user_data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_register_user_missing_fields(self, client: TestClient): """Test that missing required fields are rejected.""" # Missing email response1 = client.post("/api/v1/auth/register", json={"password": "TestPassword123"}) assert response1.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY # Missing password response2 = client.post("/api/v1/auth/register", json={"email": "test@example.com"}) assert response2.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY # Empty body response3 = client.post("/api/v1/auth/register", json={}) assert response3.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_register_user_email_case_handling(self, client: TestClient): """Test email case handling in registration.""" user_data_upper = {"email": "TEST@EXAMPLE.COM", "password": "TestPassword123"} response = client.post("/api/v1/auth/register", json=user_data_upper) assert response.status_code == status.HTTP_201_CREATED # Email should be stored as lowercase data = response.json() assert data["email"] == "test@example.com" class TestLoginEndpoint: """Test POST /auth/login endpoint.""" def test_login_user_success(self, client: TestClient, test_user_data: dict): """Test successful user login.""" # Register user first client.post("/api/v1/auth/register", json=test_user_data) # Login response = client.post("/api/v1/auth/login", json=test_user_data) assert response.status_code == status.HTTP_200_OK data = response.json() assert "access_token" in data assert data["token_type"] == "bearer" assert "user" in data assert data["user"]["email"] == test_user_data["email"] def test_login_user_wrong_password(self, client: TestClient, test_user_data: dict): """Test that wrong password fails login.""" # Register user client.post("/api/v1/auth/register", json=test_user_data) # Try to login with wrong password wrong_data = {"email": test_user_data["email"], "password": "WrongPassword123"} response = client.post("/api/v1/auth/login", json=wrong_data) assert response.status_code == status.HTTP_401_UNAUTHORIZED assert "WWW-Authenticate" in response.headers assert response.headers["WWW-Authenticate"] == "Bearer" def test_login_user_nonexistent_email(self, client: TestClient): """Test that login with nonexistent email fails.""" login_data = {"email": "nonexistent@example.com", "password": "TestPassword123"} response = client.post("/api/v1/auth/login", json=login_data) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_login_user_case_sensitive_password(self, client: TestClient, test_user_data: dict): """Test that password is case-sensitive.""" # Register user client.post("/api/v1/auth/register", json=test_user_data) # Try to login with different case wrong_case = {"email": test_user_data["email"], "password": test_user_data["password"].lower()} response = client.post("/api/v1/auth/login", json=wrong_case) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_login_user_email_case_insensitive(self, client: TestClient, test_user_data: dict): """Test that email login is case-insensitive.""" # Register user client.post("/api/v1/auth/register", json=test_user_data) # Login with different email case upper_email = {"email": test_user_data["email"].upper(), "password": test_user_data["password"]} response = client.post("/api/v1/auth/login", json=upper_email) assert response.status_code == status.HTTP_200_OK def test_login_user_missing_fields(self, client: TestClient): """Test that missing fields are rejected.""" # Missing password response1 = client.post("/api/v1/auth/login", json={"email": "test@example.com"}) assert response1.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY # Missing email response2 = client.post("/api/v1/auth/login", json={"password": "TestPassword123"}) assert response2.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_login_user_token_format(self, client: TestClient, test_user_data: dict): """Test that returned token is valid JWT format.""" # Register and login client.post("/api/v1/auth/register", json=test_user_data) response = client.post("/api/v1/auth/login", json=test_user_data) assert response.status_code == status.HTTP_200_OK data = response.json() token = data["access_token"] # JWT should have 3 parts separated by dots parts = token.split(".") assert len(parts) == 3 # Each part should be base64-encoded (URL-safe) import string url_safe = string.ascii_letters + string.digits + "-_" for part in parts: assert all(c in url_safe for c in part) class TestGetCurrentUserEndpoint: """Test GET /auth/me endpoint.""" def test_get_current_user_success(self, client: TestClient, test_user_data: dict): """Test getting current user info with valid token.""" # Register and login client.post("/api/v1/auth/register", json=test_user_data) login_response = client.post("/api/v1/auth/login", json=test_user_data) token = login_response.json()["access_token"] # Get current user response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["email"] == test_user_data["email"] assert "id" in data assert "created_at" in data assert "password" not in data def test_get_current_user_no_token(self, client: TestClient): """Test that missing token returns 401.""" response = client.get("/api/v1/auth/me") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_get_current_user_invalid_token(self, client: TestClient): """Test that invalid token returns 401.""" response = client.get("/api/v1/auth/me", headers={"Authorization": "Bearer invalid_token"}) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_get_current_user_malformed_header(self, client: TestClient): """Test that malformed auth header returns 401.""" # Missing "Bearer" prefix response1 = client.get("/api/v1/auth/me", headers={"Authorization": "just_a_token"}) assert response1.status_code == status.HTTP_401_UNAUTHORIZED # Wrong prefix response2 = client.get("/api/v1/auth/me", headers={"Authorization": "Basic dGVzdA=="}) assert response2.status_code == status.HTTP_401_UNAUTHORIZED def test_get_current_user_expired_token(self, client: TestClient, test_user_data: dict): """Test that expired token returns 401.""" from datetime import timedelta from app.auth.jwt import create_access_token # Register user register_response = client.post("/api/v1/auth/register", json=test_user_data) user_id = register_response.json()["id"] # Create expired token from uuid import UUID expired_token = create_access_token(UUID(user_id), test_user_data["email"], timedelta(seconds=-10)) # Try to use expired token response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {expired_token}"}) assert response.status_code == status.HTTP_401_UNAUTHORIZED class TestAuthenticationFlow: """Test complete authentication flows.""" def test_complete_register_login_access_flow(self, client: TestClient, test_user_data: dict): """Test complete flow: register → login → access protected resource.""" # Step 1: Register register_response = client.post("/api/v1/auth/register", json=test_user_data) assert register_response.status_code == status.HTTP_201_CREATED registered_user = register_response.json() assert registered_user["email"] == test_user_data["email"] # Step 2: Login login_response = client.post("/api/v1/auth/login", json=test_user_data) assert login_response.status_code == status.HTTP_200_OK token = login_response.json()["access_token"] login_user = login_response.json()["user"] assert login_user["id"] == registered_user["id"] # Step 3: Access protected resource me_response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) assert me_response.status_code == status.HTTP_200_OK current_user = me_response.json() assert current_user["id"] == registered_user["id"] assert current_user["email"] == test_user_data["email"] def test_multiple_users_independent_authentication(self, client: TestClient): """Test that multiple users can register and authenticate independently.""" users = [ {"email": "user1@example.com", "password": "Password123"}, {"email": "user2@example.com", "password": "Password456"}, {"email": "user3@example.com", "password": "Password789"}, ] tokens = [] # Register all users for user_data in users: register_response = client.post("/api/v1/auth/register", json=user_data) assert register_response.status_code == status.HTTP_201_CREATED # Login each user login_response = client.post("/api/v1/auth/login", json=user_data) assert login_response.status_code == status.HTTP_200_OK tokens.append(login_response.json()["access_token"]) # Verify each token works independently for i, (user_data, token) in enumerate(zip(users, tokens)): response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) assert response.status_code == status.HTTP_200_OK assert response.json()["email"] == user_data["email"] def test_token_reuse_across_multiple_requests(self, client: TestClient, test_user_data: dict): """Test that same token can be reused for multiple requests.""" # Register and login client.post("/api/v1/auth/register", json=test_user_data) login_response = client.post("/api/v1/auth/login", json=test_user_data) token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {token}"} # Make multiple requests with same token for _ in range(5): response = client.get("/api/v1/auth/me", headers=headers) assert response.status_code == status.HTTP_200_OK assert response.json()["email"] == test_user_data["email"] def test_password_not_exposed_in_any_response(self, client: TestClient, test_user_data: dict): """Test that password is never exposed in any API response.""" # Register register_response = client.post("/api/v1/auth/register", json=test_user_data) register_data = register_response.json() assert "password" not in register_data assert "password_hash" not in register_data # Login login_response = client.post("/api/v1/auth/login", json=test_user_data) login_data = login_response.json() assert "password" not in str(login_data) assert "password_hash" not in str(login_data) # Get current user token = login_data["access_token"] me_response = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) me_data = me_response.json() assert "password" not in me_data assert "password_hash" not in me_data