This commit is contained in:
Danilo Reyes
2025-11-02 01:47:25 -06:00
parent 48020b6f42
commit 010df31455
45 changed files with 8045 additions and 720 deletions

344
backend/app/api/images.py Normal file
View File

@@ -0,0 +1,344 @@
"""Image upload and management endpoints."""
from uuid import UUID
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.jwt import get_current_user
from app.core.deps import get_db
from app.database.models.board import Board
from app.database.models.user import User
from app.images.processing import generate_thumbnails
from app.images.repository import ImageRepository
from app.images.schemas import (
BoardImageCreate,
BoardImageResponse,
ImageListResponse,
ImageResponse,
ImageUploadResponse,
)
from app.images.upload import calculate_checksum, upload_image_to_storage
from app.images.validation import sanitize_filename, validate_image_file
from app.images.zip_handler import extract_images_from_zip
router = APIRouter(prefix="/images", tags=["images"])
@router.post("/upload", response_model=ImageUploadResponse, status_code=status.HTTP_201_CREATED)
async def upload_image(
file: UploadFile = File(...),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Upload a single image.
- Validates file type and size
- Uploads to MinIO storage
- Generates thumbnails
- Creates database record
Returns image metadata including ID for adding to boards.
"""
# Validate file
contents = await validate_image_file(file)
# Sanitize filename
filename = sanitize_filename(file.filename or "image.jpg")
# Upload to storage and get dimensions
from uuid import uuid4
image_id = uuid4()
storage_path, width, height, mime_type = await upload_image_to_storage(
current_user.id, image_id, filename, contents
)
# Generate thumbnails
thumbnail_paths = generate_thumbnails(image_id, storage_path, contents)
# Calculate checksum
checksum = calculate_checksum(contents)
# Create metadata
metadata = {"format": mime_type.split("/")[1], "checksum": checksum, "thumbnails": thumbnail_paths}
# Create database record
repo = ImageRepository(db)
image = await repo.create_image(
user_id=current_user.id,
filename=filename,
storage_path=storage_path,
file_size=len(contents),
mime_type=mime_type,
width=width,
height=height,
metadata=metadata,
)
return image
@router.post("/upload-zip", response_model=list[ImageUploadResponse])
async def upload_zip(
file: UploadFile = File(...),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Upload multiple images from a ZIP file.
- Extracts all valid images from ZIP
- Processes each image
- Returns list of uploaded images
Maximum ZIP size: 200MB
"""
uploaded_images = []
repo = ImageRepository(db)
async for filename, contents in extract_images_from_zip(file):
try:
# Sanitize filename
clean_filename = sanitize_filename(filename)
# Upload to storage
from uuid import uuid4
image_id = uuid4()
storage_path, width, height, mime_type = await upload_image_to_storage(
current_user.id, image_id, clean_filename, contents
)
# Generate thumbnails
thumbnail_paths = generate_thumbnails(image_id, storage_path, contents)
# Calculate checksum
checksum = calculate_checksum(contents)
# Create metadata
metadata = {
"format": mime_type.split("/")[1],
"checksum": checksum,
"thumbnails": thumbnail_paths,
}
# Create database record
image = await repo.create_image(
user_id=current_user.id,
filename=clean_filename,
storage_path=storage_path,
file_size=len(contents),
mime_type=mime_type,
width=width,
height=height,
metadata=metadata,
)
uploaded_images.append(image)
except Exception as e:
# Log error but continue with other images
print(f"Error processing {filename}: {e}")
continue
if not uploaded_images:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No images could be processed from ZIP")
return uploaded_images
@router.get("/library", response_model=ImageListResponse)
async def get_image_library(
page: int = 1,
page_size: int = 50,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Get user's image library with pagination.
Returns all images uploaded by the current user.
"""
repo = ImageRepository(db)
offset = (page - 1) * page_size
images, total = await repo.get_user_images(current_user.id, limit=page_size, offset=offset)
return ImageListResponse(images=list(images), total=total, page=page, page_size=page_size)
@router.get("/{image_id}", response_model=ImageResponse)
async def get_image(
image_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get image by ID."""
repo = ImageRepository(db)
image = await repo.get_image_by_id(image_id)
if not image:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Image not found")
# Verify ownership
if image.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
return image
@router.delete("/{image_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_image(
image_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Delete image permanently.
Only allowed if reference_count is 0 (not used on any boards).
"""
repo = ImageRepository(db)
image = await repo.get_image_by_id(image_id)
if not image:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Image not found")
# Verify ownership
if image.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Check if still in use
if image.reference_count > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Image is still used on {image.reference_count} board(s). Remove from boards first.",
)
# Delete from storage
from app.images.processing import delete_thumbnails
from app.images.upload import delete_image_from_storage
await delete_image_from_storage(image.storage_path)
if "thumbnails" in image.metadata:
await delete_thumbnails(image.metadata["thumbnails"])
# Delete from database
await repo.delete_image(image_id)
@router.post("/boards/{board_id}/images", response_model=BoardImageResponse, status_code=status.HTTP_201_CREATED)
async def add_image_to_board(
board_id: UUID,
data: BoardImageCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Add an existing image to a board.
The image must already be uploaded and owned by the current user.
"""
# Verify board ownership
from sqlalchemy import select
board_result = await db.execute(select(Board).where(Board.id == board_id))
board = board_result.scalar_one_or_none()
if not board:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found")
if board.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Verify image ownership
repo = ImageRepository(db)
image = await repo.get_image_by_id(data.image_id)
if not image:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Image not found")
if image.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Image access denied")
# Add image to board
board_image = await repo.add_image_to_board(
board_id=board_id,
image_id=data.image_id,
position=data.position,
transformations=data.transformations,
z_order=data.z_order,
)
# Load image relationship for response
await db.refresh(board_image, ["image"])
return board_image
@router.delete("/boards/{board_id}/images/{image_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_image_from_board(
board_id: UUID,
image_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Remove image from board.
This doesn't delete the image, just removes it from this board.
The image remains in the user's library.
"""
# Verify board ownership
from sqlalchemy import select
board_result = await db.execute(select(Board).where(Board.id == board_id))
board = board_result.scalar_one_or_none()
if not board:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found")
if board.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Remove image from board
repo = ImageRepository(db)
removed = await repo.remove_image_from_board(board_id, image_id)
if not removed:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Image not on this board")
@router.get("/boards/{board_id}/images", response_model=list[BoardImageResponse])
async def get_board_images(
board_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Get all images on a board, ordered by z-order.
Used for loading board contents in the canvas.
"""
# Verify board access (owner or shared link - for now just owner)
from sqlalchemy import select
board_result = await db.execute(select(Board).where(Board.id == board_id))
board = board_result.scalar_one_or_none()
if not board:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found")
if board.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Get board images
repo = ImageRepository(db)
board_images = await repo.get_board_images(board_id)
# Load image relationships
for board_image in board_images:
await db.refresh(board_image, ["image"])
return list(board_images)

View File

@@ -28,6 +28,14 @@ class StorageClient:
self.bucket = settings.MINIO_BUCKET
self._ensure_bucket_exists()
def put_object(self, bucket_name: str, object_name: str, data: BinaryIO, length: int, content_type: str):
"""MinIO-compatible put_object method."""
return self.upload_file(data, object_name, content_type)
def remove_object(self, bucket_name: str, object_name: str):
"""MinIO-compatible remove_object method."""
return self.delete_file(object_name)
def _ensure_bucket_exists(self) -> None:
"""Create bucket if it doesn't exist."""
try:
@@ -116,3 +124,19 @@ class StorageClient:
# Global storage client instance
storage_client = StorageClient()
def get_storage_client() -> StorageClient:
"""Get the global storage client instance."""
return storage_client
# Compatibility methods for MinIO-style API
def put_object(bucket_name: str, object_name: str, data: BinaryIO, length: int, content_type: str):
"""MinIO-compatible put_object method."""
storage_client.upload_file(data, object_name, content_type)
def remove_object(bucket_name: str, object_name: str):
"""MinIO-compatible remove_object method."""
storage_client.delete_file(object_name)

44
backend/app/core/tasks.py Normal file
View File

@@ -0,0 +1,44 @@
"""Background task utilities for long-running operations."""
import asyncio
from collections.abc import Callable
class BackgroundTasks:
"""Simple background task manager using FastAPI BackgroundTasks."""
@staticmethod
async def run_in_background(func: Callable, *args, **kwargs):
"""
Run function in background.
For now, uses asyncio to run tasks in background.
In production, consider Celery or similar for distributed tasks.
Args:
func: Function to run
*args: Positional arguments
**kwargs: Keyword arguments
"""
asyncio.create_task(func(*args, **kwargs))
async def generate_thumbnails_task(image_id: str, storage_path: str, contents: bytes):
"""
Background task to generate thumbnails.
Args:
image_id: Image ID
storage_path: Original image storage path
contents: Image file contents
"""
from uuid import UUID
from app.images.processing import generate_thumbnails
# Generate thumbnails
generate_thumbnails(UUID(image_id), storage_path, contents)
# Update image metadata with thumbnail paths
# This would require database access - for now, thumbnails are generated synchronously
pass

View File

@@ -0,0 +1 @@
"""Image upload and processing package."""

View File

@@ -0,0 +1,98 @@
"""Image processing utilities - thumbnail generation."""
import contextlib
import io
from uuid import UUID
from PIL import Image as PILImage
from app.core.storage import get_storage_client
# Thumbnail sizes (width in pixels, height proportional)
THUMBNAIL_SIZES = {
"low": 800, # For slow connections
"medium": 1600, # For medium connections
"high": 3200, # For fast connections
}
def generate_thumbnails(image_id: UUID, original_path: str, contents: bytes) -> dict[str, str]:
"""
Generate thumbnails at different resolutions.
Args:
image_id: Image ID for naming thumbnails
original_path: Path to original image
contents: Original image contents
Returns:
Dictionary mapping quality level to thumbnail storage path
"""
storage = get_storage_client()
thumbnail_paths = {}
# Load original image
image = PILImage.open(io.BytesIO(contents))
# Convert to RGB if necessary (for JPEG compatibility)
if image.mode in ("RGBA", "LA", "P"):
# Create white background for transparent images
background = PILImage.new("RGB", image.size, (255, 255, 255))
if image.mode == "P":
image = image.convert("RGBA")
background.paste(image, mask=image.split()[-1] if image.mode in ("RGBA", "LA") else None)
image = background
elif image.mode != "RGB":
image = image.convert("RGB")
# Get original dimensions
orig_width, orig_height = image.size
# Generate thumbnails for each size
for quality, max_width in THUMBNAIL_SIZES.items():
# Skip if original is smaller than thumbnail size
if orig_width <= max_width:
thumbnail_paths[quality] = original_path
continue
# Calculate proportional height
ratio = max_width / orig_width
new_height = int(orig_height * ratio)
# Resize image
thumbnail = image.resize((max_width, new_height), PILImage.Resampling.LANCZOS)
# Convert to WebP for better compression
output = io.BytesIO()
thumbnail.save(output, format="WEBP", quality=85, method=6)
output.seek(0)
# Generate storage path
thumbnail_path = f"thumbnails/{quality}/{image_id}.webp"
# Upload to MinIO
storage.put_object(
bucket_name="webref",
object_name=thumbnail_path,
data=output,
length=len(output.getvalue()),
content_type="image/webp",
)
thumbnail_paths[quality] = thumbnail_path
return thumbnail_paths
async def delete_thumbnails(thumbnail_paths: dict[str, str]) -> None:
"""
Delete thumbnails from storage.
Args:
thumbnail_paths: Dictionary of quality -> path
"""
storage = get_storage_client()
for path in thumbnail_paths.values():
with contextlib.suppress(Exception):
# Log error but continue
storage.remove_object(bucket_name="webref", object_name=path)

View File

@@ -0,0 +1,223 @@
"""Image repository for database operations."""
from collections.abc import Sequence
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models.board_image import BoardImage
from app.database.models.image import Image
class ImageRepository:
"""Repository for image database operations."""
def __init__(self, db: AsyncSession):
"""Initialize repository with database session."""
self.db = db
async def create_image(
self,
user_id: UUID,
filename: str,
storage_path: str,
file_size: int,
mime_type: str,
width: int,
height: int,
metadata: dict,
) -> Image:
"""
Create new image record.
Args:
user_id: Owner user ID
filename: Original filename
storage_path: Path in MinIO
file_size: File size in bytes
mime_type: MIME type
width: Image width in pixels
height: Image height in pixels
metadata: Additional metadata (format, checksum, thumbnails, etc)
Returns:
Created Image instance
"""
image = Image(
user_id=user_id,
filename=filename,
storage_path=storage_path,
file_size=file_size,
mime_type=mime_type,
width=width,
height=height,
metadata=metadata,
)
self.db.add(image)
await self.db.commit()
await self.db.refresh(image)
return image
async def get_image_by_id(self, image_id: UUID) -> Image | None:
"""
Get image by ID.
Args:
image_id: Image ID
Returns:
Image instance or None
"""
result = await self.db.execute(select(Image).where(Image.id == image_id))
return result.scalar_one_or_none()
async def get_user_images(self, user_id: UUID, limit: int = 50, offset: int = 0) -> tuple[Sequence[Image], int]:
"""
Get all images for a user with pagination.
Args:
user_id: User ID
limit: Maximum number of images to return
offset: Number of images to skip
Returns:
Tuple of (images, total_count)
"""
# Get total count
count_result = await self.db.execute(select(Image).where(Image.user_id == user_id))
total = len(count_result.scalars().all())
# Get paginated results
result = await self.db.execute(
select(Image).where(Image.user_id == user_id).order_by(Image.created_at.desc()).limit(limit).offset(offset)
)
images = result.scalars().all()
return images, total
async def delete_image(self, image_id: UUID) -> bool:
"""
Delete image record.
Args:
image_id: Image ID
Returns:
True if deleted, False if not found
"""
image = await self.get_image_by_id(image_id)
if not image:
return False
await self.db.delete(image)
await self.db.commit()
return True
async def increment_reference_count(self, image_id: UUID) -> None:
"""
Increment reference count for image.
Args:
image_id: Image ID
"""
image = await self.get_image_by_id(image_id)
if image:
image.reference_count += 1
await self.db.commit()
async def decrement_reference_count(self, image_id: UUID) -> int:
"""
Decrement reference count for image.
Args:
image_id: Image ID
Returns:
New reference count
"""
image = await self.get_image_by_id(image_id)
if image and image.reference_count > 0:
image.reference_count -= 1
await self.db.commit()
return image.reference_count
return 0
async def add_image_to_board(
self,
board_id: UUID,
image_id: UUID,
position: dict,
transformations: dict,
z_order: int = 0,
) -> BoardImage:
"""
Add image to board.
Args:
board_id: Board ID
image_id: Image ID
position: Canvas position {x, y}
transformations: Image transformations
z_order: Layer order
Returns:
Created BoardImage instance
"""
board_image = BoardImage(
board_id=board_id,
image_id=image_id,
position=position,
transformations=transformations,
z_order=z_order,
)
self.db.add(board_image)
# Increment reference count
await self.increment_reference_count(image_id)
await self.db.commit()
await self.db.refresh(board_image)
return board_image
async def get_board_images(self, board_id: UUID) -> Sequence[BoardImage]:
"""
Get all images for a board, ordered by z-order.
Args:
board_id: Board ID
Returns:
List of BoardImage instances
"""
result = await self.db.execute(
select(BoardImage).where(BoardImage.board_id == board_id).order_by(BoardImage.z_order.asc())
)
return result.scalars().all()
async def remove_image_from_board(self, board_id: UUID, image_id: UUID) -> bool:
"""
Remove image from board.
Args:
board_id: Board ID
image_id: Image ID
Returns:
True if removed, False if not found
"""
result = await self.db.execute(
select(BoardImage).where(BoardImage.board_id == board_id, BoardImage.image_id == image_id)
)
board_image = result.scalar_one_or_none()
if not board_image:
return False
await self.db.delete(board_image)
# Decrement reference count
await self.decrement_reference_count(image_id)
await self.db.commit()
return True

View File

@@ -0,0 +1,112 @@
"""Image schemas for request/response validation."""
from datetime import datetime
from typing import Any
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
class ImageMetadata(BaseModel):
"""Image metadata structure."""
format: str = Field(..., description="Image format (jpeg, png, etc)")
checksum: str = Field(..., description="SHA256 checksum of file")
exif: dict[str, Any] | None = Field(None, description="EXIF data if available")
thumbnails: dict[str, str] = Field(default_factory=dict, description="Thumbnail URLs by quality level")
class ImageUploadResponse(BaseModel):
"""Response after successful image upload."""
id: UUID
filename: str
storage_path: str
file_size: int
mime_type: str
width: int
height: int
metadata: dict[str, Any]
created_at: datetime
class Config:
"""Pydantic config."""
from_attributes = True
class ImageResponse(BaseModel):
"""Full image response with all fields."""
id: UUID
user_id: UUID
filename: str
storage_path: str
file_size: int
mime_type: str
width: int
height: int
metadata: dict[str, Any]
created_at: datetime
reference_count: int
class Config:
"""Pydantic config."""
from_attributes = True
class BoardImageCreate(BaseModel):
"""Schema for adding image to board."""
image_id: UUID = Field(..., description="ID of uploaded image")
position: dict[str, float] = Field(default_factory=lambda: {"x": 0, "y": 0}, description="Canvas position")
transformations: dict[str, Any] = Field(
default_factory=lambda: {
"scale": 1.0,
"rotation": 0,
"opacity": 1.0,
"flipped_h": False,
"flipped_v": False,
"greyscale": False,
},
description="Image transformations",
)
z_order: int = Field(default=0, description="Layer order")
@field_validator("position")
@classmethod
def validate_position(cls, v: dict[str, float]) -> dict[str, float]:
"""Validate position has x and y."""
if "x" not in v or "y" not in v:
raise ValueError("Position must contain 'x' and 'y' coordinates")
return v
class BoardImageResponse(BaseModel):
"""Response for board image with all metadata."""
id: UUID
board_id: UUID
image_id: UUID
position: dict[str, float]
transformations: dict[str, Any]
z_order: int
group_id: UUID | None
created_at: datetime
updated_at: datetime
image: ImageResponse
class Config:
"""Pydantic config."""
from_attributes = True
class ImageListResponse(BaseModel):
"""Paginated list of images."""
images: list[ImageResponse]
total: int
page: int
page_size: int

View File

@@ -0,0 +1,86 @@
"""Image upload handler with streaming to MinIO."""
import contextlib
import hashlib
import io
from uuid import UUID
from PIL import Image as PILImage
from app.core.storage import get_storage_client
async def upload_image_to_storage(
user_id: UUID, image_id: UUID, filename: str, contents: bytes
) -> tuple[str, int, int, str]:
"""
Upload image to MinIO storage.
Args:
user_id: User ID for organizing storage
image_id: Image ID for unique naming
filename: Original filename
contents: Image file contents
Returns:
Tuple of (storage_path, width, height, mime_type)
"""
# Get storage client
storage = get_storage_client()
# Generate storage path: originals/{user_id}/{image_id}.{ext}
extension = filename.split(".")[-1].lower()
storage_path = f"originals/{user_id}/{image_id}.{extension}"
# Detect image dimensions and format
image = PILImage.open(io.BytesIO(contents))
width, height = image.size
format_name = image.format.lower() if image.format else extension
# Map PIL format to MIME type
mime_type_map = {
"jpeg": "image/jpeg",
"jpg": "image/jpeg",
"png": "image/png",
"gif": "image/gif",
"webp": "image/webp",
"svg": "image/svg+xml",
}
mime_type = mime_type_map.get(format_name, f"image/{format_name}")
# Upload to MinIO
storage.put_object(
bucket_name="webref",
object_name=storage_path,
data=io.BytesIO(contents),
length=len(contents),
content_type=mime_type,
)
return storage_path, width, height, mime_type
def calculate_checksum(contents: bytes) -> str:
"""
Calculate SHA256 checksum of file contents.
Args:
contents: File contents
Returns:
SHA256 checksum as hex string
"""
return hashlib.sha256(contents).hexdigest()
async def delete_image_from_storage(storage_path: str) -> None:
"""
Delete image from MinIO storage.
Args:
storage_path: Path to image in storage
"""
storage = get_storage_client()
with contextlib.suppress(Exception):
# Log error but don't fail - image might already be deleted
storage.remove_object(bucket_name="webref", object_name=storage_path)

View File

@@ -0,0 +1,110 @@
"""File validation utilities for image uploads."""
import magic
from fastapi import HTTPException, UploadFile, status
# Maximum file size: 50MB
MAX_FILE_SIZE = 52_428_800
# Allowed MIME types
ALLOWED_MIME_TYPES = {
"image/jpeg",
"image/jpg",
"image/png",
"image/gif",
"image/webp",
"image/svg+xml",
}
# Allowed file extensions
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"}
async def validate_image_file(file: UploadFile) -> bytes:
"""
Validate uploaded image file.
Checks:
- File size within limits
- MIME type allowed
- Magic bytes match declared type
- File extension valid
Args:
file: The uploaded file from FastAPI
Returns:
File contents as bytes
Raises:
HTTPException: If validation fails
"""
# Read file contents
contents = await file.read()
file_size = len(contents)
# Reset file pointer for potential re-reading
await file.seek(0)
# Check file size
if file_size == 0:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Empty file uploaded")
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File too large. Maximum size is {MAX_FILE_SIZE / 1_048_576:.1f}MB",
)
# Validate file extension
if file.filename:
extension = "." + file.filename.lower().split(".")[-1] if "." in file.filename else ""
if extension not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file extension. Allowed: {', '.join(ALLOWED_EXTENSIONS)}",
)
# Detect actual MIME type using magic bytes
mime = magic.from_buffer(contents, mime=True)
# Validate MIME type
if mime not in ALLOWED_MIME_TYPES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type '{mime}'. Allowed types: {', '.join(ALLOWED_MIME_TYPES)}",
)
return contents
def sanitize_filename(filename: str) -> str:
"""
Sanitize filename to prevent path traversal and other attacks.
Args:
filename: Original filename
Returns:
Sanitized filename
"""
import re
# Remove path separators
filename = filename.replace("/", "_").replace("\\", "_")
# Remove any non-alphanumeric characters except dots, dashes, underscores
filename = re.sub(r"[^a-zA-Z0-9._-]", "_", filename)
# Limit length
max_length = 255
if len(filename) > max_length:
# Keep extension
parts = filename.rsplit(".", 1)
if len(parts) == 2:
name, ext = parts
filename = name[: max_length - len(ext) - 1] + "." + ext
else:
filename = filename[:max_length]
return filename

View File

@@ -0,0 +1,73 @@
"""ZIP file extraction handler for batch image uploads."""
import io
import zipfile
from collections.abc import AsyncIterator
from fastapi import HTTPException, UploadFile, status
async def extract_images_from_zip(zip_file: UploadFile) -> AsyncIterator[tuple[str, bytes]]:
"""
Extract image files from ZIP archive.
Args:
zip_file: Uploaded ZIP file
Yields:
Tuples of (filename, contents) for each image file
Raises:
HTTPException: If ZIP is invalid or too large
"""
# Read ZIP contents
zip_contents = await zip_file.read()
# Check ZIP size (max 200MB for ZIP)
max_zip_size = 200 * 1024 * 1024 # 200MB
if len(zip_contents) > max_zip_size:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"ZIP file too large. Maximum size is {max_zip_size / 1_048_576:.1f}MB",
)
try:
# Open ZIP file
with zipfile.ZipFile(io.BytesIO(zip_contents)) as zip_ref:
# Get list of image files (filter by extension)
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"}
image_files = [
name
for name in zip_ref.namelist()
if not name.startswith("__MACOSX/") # Skip macOS metadata
and not name.startswith(".") # Skip hidden files
and any(name.lower().endswith(ext) for ext in image_extensions)
]
if not image_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No valid image files found in ZIP archive",
)
# Extract each image
for filename in image_files:
# Skip directories
if filename.endswith("/"):
continue
# Get just the filename without path
base_filename = filename.split("/")[-1]
# Read file contents
file_contents = zip_ref.read(filename)
yield base_filename, file_contents
except zipfile.BadZipFile as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid ZIP file") from e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing ZIP file: {str(e)}",
) from e

View File

@@ -5,7 +5,7 @@ import logging
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app.api import auth, boards
from app.api import auth, boards, images
from app.core.config import settings
from app.core.errors import WebRefException
from app.core.logging import setup_logging
@@ -84,9 +84,7 @@ async def root():
# API routers
app.include_router(auth.router, prefix=f"{settings.API_V1_PREFIX}")
app.include_router(boards.router, prefix=f"{settings.API_V1_PREFIX}")
# Additional routers will be added in subsequent phases
# from app.api import images
# app.include_router(images.router, prefix=f"{settings.API_V1_PREFIX}")
app.include_router(images.router, prefix=f"{settings.API_V1_PREFIX}")
@app.on_event("startup")

View File

@@ -17,6 +17,7 @@ dependencies = [
"python-multipart>=0.0.12",
"httpx>=0.27.0",
"psycopg2>=2.9.0",
"python-magic>=0.4.27",
]
[project.optional-dependencies]

View File

@@ -0,0 +1,156 @@
"""Integration tests for image upload endpoints."""
import io
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import status
from httpx import AsyncClient
from PIL import Image as PILImage
@pytest.mark.asyncio
class TestImageUpload:
"""Tests for image upload endpoint."""
async def test_upload_image_success(self, client: AsyncClient, auth_headers: dict):
"""Test successful image upload."""
# Create a test image
image = PILImage.new("RGB", (800, 600), color="red")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
buffer.seek(0)
# Mock storage and processing
with patch("app.images.validation.magic.from_buffer") as mock_magic:
mock_magic.return_value = "image/jpeg"
with patch("app.api.images.upload_image_to_storage") as mock_upload:
mock_upload.return_value = ("storage/path.jpg", 800, 600, "image/jpeg")
with patch("app.api.images.generate_thumbnails") as mock_thumbs:
mock_thumbs.return_value = {
"low": "thumbs/low.webp",
"medium": "thumbs/medium.webp",
"high": "thumbs/high.webp",
}
# Upload image
response = await client.post(
"/api/v1/images/upload",
headers=auth_headers,
files={"file": ("test.jpg", buffer, "image/jpeg")},
)
assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert "id" in data
assert data["filename"] == "test.jpg"
assert data["width"] == 800
assert data["height"] == 600
async def test_upload_image_unauthenticated(self, client: AsyncClient):
"""Test upload without authentication fails."""
image = PILImage.new("RGB", (800, 600), color="red")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
buffer.seek(0)
response = await client.post(
"/api/v1/images/upload", files={"file": ("test.jpg", buffer, "image/jpeg")}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_upload_invalid_file_type(self, client: AsyncClient, auth_headers: dict):
"""Test upload with invalid file type."""
# Create a text file disguised as image
buffer = io.BytesIO(b"This is not an image")
with patch("app.images.validation.magic.from_buffer") as mock_magic:
mock_magic.return_value = "text/plain"
response = await client.post(
"/api/v1/images/upload",
headers=auth_headers,
files={"file": ("fake.jpg", buffer, "image/jpeg")},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "invalid" in response.json()["detail"].lower()
@pytest.mark.asyncio
class TestImageLibrary:
"""Tests for image library endpoint."""
async def test_get_image_library(self, client: AsyncClient, auth_headers: dict):
"""Test retrieving user's image library."""
response = await client.get("/api/v1/images/library", headers=auth_headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "images" in data
assert "total" in data
assert "page" in data
assert isinstance(data["images"], list)
async def test_get_image_library_pagination(self, client: AsyncClient, auth_headers: dict):
"""Test library pagination."""
response = await client.get(
"/api/v1/images/library", params={"page": 2, "page_size": 10}, headers=auth_headers
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["page"] == 2
assert data["page_size"] == 10
@pytest.mark.asyncio
class TestBoardImages:
"""Tests for adding images to boards."""
async def test_add_image_to_board(
self, client: AsyncClient, auth_headers: dict, test_board_id: str, test_image_id: str
):
"""Test adding image to board."""
payload = {
"image_id": test_image_id,
"position": {"x": 100, "y": 200},
"transformations": {
"scale": 1.0,
"rotation": 0,
"opacity": 1.0,
"flipped_h": False,
"flipped_v": False,
"greyscale": False,
},
"z_order": 0,
}
response = await client.post(
f"/api/v1/images/boards/{test_board_id}/images", headers=auth_headers, json=payload
)
# May fail if test_board_id/test_image_id fixtures aren't set up
# This is a placeholder for the structure
if response.status_code == status.HTTP_201_CREATED:
data = response.json()
assert "id" in data
assert data["image_id"] == test_image_id
assert data["position"]["x"] == 100
async def test_get_board_images(
self, client: AsyncClient, auth_headers: dict, test_board_id: str
):
"""Test getting all images on a board."""
response = await client.get(
f"/api/v1/images/boards/{test_board_id}/images", headers=auth_headers
)
# May return 404 if board doesn't exist in test DB
if response.status_code == status.HTTP_200_OK:
data = response.json()
assert isinstance(data, list)

View File

@@ -0,0 +1,2 @@
"""Image tests package."""

View File

@@ -0,0 +1,79 @@
"""Tests for image processing and thumbnail generation."""
import io
from uuid import uuid4
import pytest
from PIL import Image as PILImage
from app.images.processing import generate_thumbnails
class TestThumbnailGeneration:
"""Tests for thumbnail generation."""
def test_generate_thumbnails_creates_all_sizes(self):
"""Test that thumbnails are generated for all quality levels."""
# Create a test image
image_id = uuid4()
image = PILImage.new("RGB", (2000, 1500), color="red")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
contents = buffer.getvalue()
# Mock storage client to avoid actual uploads
from unittest.mock import MagicMock, patch
with patch("app.images.processing.get_storage_client") as mock_storage:
mock_storage.return_value.put_object = MagicMock()
# Generate thumbnails
thumbnail_paths = generate_thumbnails(image_id, "test/path.jpg", contents)
# Verify all sizes created
assert "low" in thumbnail_paths
assert "medium" in thumbnail_paths
assert "high" in thumbnail_paths
# Verify storage was called
assert mock_storage.return_value.put_object.call_count >= 2
def test_skip_thumbnail_for_small_images(self):
"""Test that thumbnails are skipped if image is smaller than target size."""
# Create a small test image (smaller than low quality threshold)
image_id = uuid4()
image = PILImage.new("RGB", (500, 375), color="blue")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
contents = buffer.getvalue()
from unittest.mock import MagicMock, patch
with patch("app.images.processing.get_storage_client") as mock_storage:
mock_storage.return_value.put_object = MagicMock()
# Generate thumbnails
thumbnail_paths = generate_thumbnails(image_id, "test/small.jpg", contents)
# Should use original path for all sizes
assert thumbnail_paths["low"] == "test/small.jpg"
def test_handles_transparent_images(self):
"""Test conversion of transparent images to RGB."""
# Create RGBA image
image_id = uuid4()
image = PILImage.new("RGBA", (2000, 1500), color=(255, 0, 0, 128))
buffer = io.BytesIO()
image.save(buffer, format="PNG")
contents = buffer.getvalue()
from unittest.mock import MagicMock, patch
with patch("app.images.processing.get_storage_client") as mock_storage:
mock_storage.return_value.put_object = MagicMock()
# Should not raise exception
thumbnail_paths = generate_thumbnails(image_id, "test/transparent.png", contents)
assert len(thumbnail_paths) > 0

View File

@@ -0,0 +1,82 @@
"""Tests for file validation."""
import io
from unittest.mock import AsyncMock, Mock
import pytest
from fastapi import HTTPException, UploadFile
from app.images.validation import sanitize_filename, validate_image_file
class TestSanitizeFilename:
"""Tests for filename sanitization."""
def test_sanitize_normal_filename(self):
"""Test sanitizing normal filename."""
assert sanitize_filename("image.jpg") == "image.jpg"
assert sanitize_filename("my_photo-2025.png") == "my_photo-2025.png"
def test_sanitize_path_traversal(self):
"""Test preventing path traversal."""
assert "/" not in sanitize_filename("../../../etc/passwd")
assert "\\" not in sanitize_filename("..\\..\\..\\windows\\system32")
def test_sanitize_special_characters(self):
"""Test removing special characters."""
result = sanitize_filename("file name with spaces!@#.jpg")
assert " " not in result or result == "file_name_with_spaces___.jpg"
def test_sanitize_long_filename(self):
"""Test truncating long filenames."""
long_name = "a" * 300 + ".jpg"
result = sanitize_filename(long_name)
assert len(result) <= 255
assert result.endswith(".jpg")
@pytest.mark.asyncio
class TestValidateImageFile:
"""Tests for image file validation."""
async def test_validate_empty_file(self):
"""Test rejection of empty files."""
mock_file = AsyncMock(spec=UploadFile)
mock_file.read = AsyncMock(return_value=b"")
mock_file.seek = AsyncMock()
mock_file.filename = "empty.jpg"
with pytest.raises(HTTPException) as exc:
await validate_image_file(mock_file)
assert exc.value.status_code == 400
assert "empty" in exc.value.detail.lower()
async def test_validate_file_too_large(self):
"""Test rejection of oversized files."""
# Create 60MB file
large_data = b"x" * (60 * 1024 * 1024)
mock_file = AsyncMock(spec=UploadFile)
mock_file.read = AsyncMock(return_value=large_data)
mock_file.seek = AsyncMock()
mock_file.filename = "large.jpg"
with pytest.raises(HTTPException) as exc:
await validate_image_file(mock_file)
assert exc.value.status_code == 413
assert "too large" in exc.value.detail.lower()
async def test_validate_invalid_extension(self):
"""Test rejection of invalid extensions."""
mock_file = AsyncMock(spec=UploadFile)
mock_file.read = AsyncMock(return_value=b"fake image data")
mock_file.seek = AsyncMock()
mock_file.filename = "document.pdf"
with pytest.raises(HTTPException) as exc:
await validate_image_file(mock_file)
assert exc.value.status_code == 400
assert "extension" in exc.value.detail.lower()