All checks were successful
CI/CD Pipeline / VM Test - backend-integration (push) Successful in 11s
CI/CD Pipeline / VM Test - full-stack (push) Successful in 8s
CI/CD Pipeline / VM Test - performance (push) Successful in 8s
CI/CD Pipeline / VM Test - security (push) Successful in 8s
CI/CD Pipeline / Backend Linting (push) Successful in 3s
CI/CD Pipeline / Frontend Linting (push) Successful in 18s
CI/CD Pipeline / Nix Flake Check (push) Successful in 43s
CI/CD Pipeline / CI Summary (push) Successful in 0s
104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
"""ZIP export functionality for multiple images."""
|
|
|
|
import io
|
|
import zipfile
|
|
|
|
from fastapi import HTTPException, status
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.storage import storage_client
|
|
from app.database.models.board_image import BoardImage
|
|
from app.database.models.image import Image
|
|
|
|
|
|
def create_zip_export(board_id: str, db: Session) -> StreamingResponse:
|
|
"""
|
|
Create a ZIP file containing all images from a board.
|
|
|
|
Args:
|
|
board_id: Board UUID
|
|
db: Database session
|
|
|
|
Returns:
|
|
StreamingResponse with ZIP file
|
|
|
|
Raises:
|
|
HTTPException: If export fails
|
|
"""
|
|
try:
|
|
# Get all images for the board
|
|
board_images = (
|
|
db.query(BoardImage, Image)
|
|
.join(Image, BoardImage.image_id == Image.id)
|
|
.filter(BoardImage.board_id == board_id)
|
|
.all()
|
|
)
|
|
|
|
if not board_images:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No images found for this board",
|
|
)
|
|
|
|
# Create ZIP file in memory
|
|
zip_buffer = io.BytesIO()
|
|
|
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
|
|
for _board_image, image in board_images:
|
|
try:
|
|
# Get image data from storage
|
|
image_data = storage_client.get_object(image.storage_path)
|
|
|
|
if image_data:
|
|
# Add to ZIP with sanitized filename
|
|
safe_filename = _sanitize_filename(image.filename)
|
|
zip_file.writestr(safe_filename, image_data)
|
|
|
|
except Exception as e:
|
|
# Log error but continue with other images
|
|
print(f"Warning: Failed to add {image.filename} to ZIP: {str(e)}")
|
|
continue
|
|
|
|
# Reset buffer position
|
|
zip_buffer.seek(0)
|
|
|
|
# Return ZIP file
|
|
return StreamingResponse(
|
|
zip_buffer,
|
|
media_type="application/zip",
|
|
headers={
|
|
"Content-Disposition": 'attachment; filename="board_export.zip"',
|
|
"Cache-Control": "no-cache",
|
|
},
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create ZIP export: {str(e)}",
|
|
) from e
|
|
|
|
|
|
def _sanitize_filename(filename: str) -> str:
|
|
"""
|
|
Sanitize filename for safe inclusion in ZIP.
|
|
|
|
Args:
|
|
filename: Original filename
|
|
|
|
Returns:
|
|
Sanitized filename
|
|
"""
|
|
# Remove any path separators and dangerous characters
|
|
safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789._- ")
|
|
sanitized = "".join(c if c in safe_chars else "_" for c in filename)
|
|
|
|
# Ensure it's not empty and doesn't start with a dot
|
|
if not sanitized or sanitized[0] == ".":
|
|
sanitized = "file_" + sanitized
|
|
|
|
return sanitized
|