phase 3.1

This commit is contained in:
Danilo Reyes
2025-11-01 23:33:52 -06:00
parent da4892cc30
commit a95a4c091a
25 changed files with 1214 additions and 27 deletions

View File

@@ -0,0 +1,2 @@
"""Authentication module."""

55
backend/app/auth/jwt.py Normal file
View File

@@ -0,0 +1,55 @@
"""JWT token generation and validation."""
from datetime import datetime, timedelta
from typing import Optional
from uuid import UUID
from jose import JWTError, jwt
from app.core.config import settings
def create_access_token(user_id: UUID, email: str, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a new JWT access token.
Args:
user_id: User's UUID
email: User's email address
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {
"sub": str(user_id),
"email": email,
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""
Decode and validate a JWT access token.
Args:
token: JWT token string to decode
Returns:
Decoded token payload if valid, None otherwise
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None

View File

@@ -0,0 +1,87 @@
"""User repository for database operations."""
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from app.auth.security import hash_password
from app.database.models.user import User
class UserRepository:
"""Repository for user database operations."""
def __init__(self, db: Session):
"""
Initialize repository.
Args:
db: Database session
"""
self.db = db
def create_user(self, email: str, password: str) -> User:
"""
Create a new user.
Args:
email: User email (will be lowercased)
password: Plain text password (will be hashed)
Returns:
Created user instance
"""
email = email.lower()
password_hash = hash_password(password)
user = User(
email=email,
password_hash=password_hash
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def get_user_by_email(self, email: str) -> Optional[User]:
"""
Get user by email address.
Args:
email: User email to search for
Returns:
User if found, None otherwise
"""
email = email.lower()
return self.db.query(User).filter(User.email == email).first()
def get_user_by_id(self, user_id: UUID) -> Optional[User]:
"""
Get user by ID.
Args:
user_id: User UUID
Returns:
User if found, None otherwise
"""
return self.db.query(User).filter(User.id == user_id).first()
def email_exists(self, email: str) -> bool:
"""
Check if email already exists.
Args:
email: Email to check
Returns:
True if email exists, False otherwise
"""
email = email.lower()
return self.db.query(User).filter(User.email == email).first() is not None

View File

@@ -0,0 +1,45 @@
"""Authentication schemas for request/response validation."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, EmailStr, Field
class UserBase(BaseModel):
"""Base user schema."""
email: EmailStr
class UserCreate(UserBase):
"""Schema for user registration."""
password: str = Field(..., min_length=8, max_length=100)
class UserLogin(BaseModel):
"""Schema for user login."""
email: EmailStr
password: str
class UserResponse(UserBase):
"""Schema for user response."""
id: UUID
created_at: datetime
is_active: bool
class Config:
from_attributes = True
class TokenResponse(BaseModel):
"""Schema for JWT token response."""
access_token: str
token_type: str = "bearer"
user: UserResponse

View File

@@ -0,0 +1,65 @@
"""Password hashing utilities using passlib."""
import re
from passlib.context import CryptContext
# Create password context for hashing and verification
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt.
Args:
password: Plain text password
Returns:
Hashed password string
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a plain password against a hashed password.
Args:
plain_password: Plain text password to verify
hashed_password: Hashed password from database
Returns:
True if password matches, False otherwise
"""
return pwd_context.verify(plain_password, hashed_password)
def validate_password_strength(password: str) -> tuple[bool, str]:
"""
Validate password meets complexity requirements.
Requirements:
- At least 8 characters
- At least 1 uppercase letter
- At least 1 lowercase letter
- At least 1 number
Args:
password: Plain text password to validate
Returns:
Tuple of (is_valid, error_message)
"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not re.search(r"[A-Z]", password):
return False, "Password must contain at least one uppercase letter"
if not re.search(r"[a-z]", password):
return False, "Password must contain at least one lowercase letter"
if not re.search(r"\d", password):
return False, "Password must contain at least one number"
return True, ""