Files
webref/backend/app/images/upload.py
Danilo Reyes 010df31455 phase 5
2025-11-02 11:07:42 -06:00

87 lines
2.2 KiB
Python

"""Image upload handler with streaming to MinIO."""
import contextlib
import hashlib
import io
from uuid import UUID
from PIL import Image as PILImage
from app.core.storage import get_storage_client
async def upload_image_to_storage(
user_id: UUID, image_id: UUID, filename: str, contents: bytes
) -> tuple[str, int, int, str]:
"""
Upload image to MinIO storage.
Args:
user_id: User ID for organizing storage
image_id: Image ID for unique naming
filename: Original filename
contents: Image file contents
Returns:
Tuple of (storage_path, width, height, mime_type)
"""
# Get storage client
storage = get_storage_client()
# Generate storage path: originals/{user_id}/{image_id}.{ext}
extension = filename.split(".")[-1].lower()
storage_path = f"originals/{user_id}/{image_id}.{extension}"
# Detect image dimensions and format
image = PILImage.open(io.BytesIO(contents))
width, height = image.size
format_name = image.format.lower() if image.format else extension
# Map PIL format to MIME type
mime_type_map = {
"jpeg": "image/jpeg",
"jpg": "image/jpeg",
"png": "image/png",
"gif": "image/gif",
"webp": "image/webp",
"svg": "image/svg+xml",
}
mime_type = mime_type_map.get(format_name, f"image/{format_name}")
# Upload to MinIO
storage.put_object(
bucket_name="webref",
object_name=storage_path,
data=io.BytesIO(contents),
length=len(contents),
content_type=mime_type,
)
return storage_path, width, height, mime_type
def calculate_checksum(contents: bytes) -> str:
"""
Calculate SHA256 checksum of file contents.
Args:
contents: File contents
Returns:
SHA256 checksum as hex string
"""
return hashlib.sha256(contents).hexdigest()
async def delete_image_from_storage(storage_path: str) -> None:
"""
Delete image from MinIO storage.
Args:
storage_path: Path to image in storage
"""
storage = get_storage_client()
with contextlib.suppress(Exception):
# Log error but don't fail - image might already be deleted
storage.remove_object(bucket_name="webref", object_name=storage_path)