"""Image library API endpoints.""" from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from pydantic import BaseModel from sqlalchemy.orm import Session from app.core.deps import get_current_user, get_db_sync from app.database.models.board_image import BoardImage from app.database.models.image import Image from app.database.models.user import User from app.images.search import count_images, search_images router = APIRouter(tags=["library"]) class ImageLibraryResponse(BaseModel): """Response schema for library image.""" id: str filename: str file_size: int mime_type: str width: int height: int reference_count: int created_at: str thumbnail_url: str | None = None class ImageLibraryListResponse(BaseModel): """Response schema for library listing.""" images: list[ImageLibraryResponse] total: int limit: int offset: int class AddToBoardRequest(BaseModel): """Request schema for adding library image to board.""" board_id: str position: dict = {"x": 0, "y": 0} @router.get("/library/images", response_model=ImageLibraryListResponse) def list_library_images( query: str | None = Query(None, description="Search query"), limit: int = Query(50, ge=1, le=100, description="Results per page"), offset: int = Query(0, ge=0, description="Pagination offset"), current_user: User = Depends(get_current_user), db: Session = Depends(get_db_sync), ) -> ImageLibraryListResponse: """ Get user's image library with optional search. Returns all images owned by the user, regardless of board usage. """ # Search images images = search_images(str(current_user.id), db, query=query, limit=limit, offset=offset) # Count total total = count_images(str(current_user.id), db, query=query) # Convert to response format image_responses = [] for img in images: thumbnails = img.image_metadata.get("thumbnails", {}) image_responses.append( ImageLibraryResponse( id=str(img.id), filename=img.filename, file_size=img.file_size, mime_type=img.mime_type, width=img.width, height=img.height, reference_count=img.reference_count, created_at=img.created_at.isoformat(), thumbnail_url=thumbnails.get("medium"), ) ) return ImageLibraryListResponse(images=image_responses, total=total, limit=limit, offset=offset) @router.post("/library/images/{image_id}/add-to-board", status_code=status.HTTP_201_CREATED) def add_library_image_to_board( image_id: UUID, request: AddToBoardRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db_sync), ) -> dict: """ Add an existing library image to a board. Creates a new BoardImage reference without duplicating the file. Increments reference count on the image. """ # Verify image exists and user owns it image = db.query(Image).filter(Image.id == image_id, Image.user_id == current_user.id).first() if image is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Image not found in library", ) # Verify board exists and user owns it from app.database.models.board import Board board = db.query(Board).filter(Board.id == request.board_id, Board.user_id == current_user.id).first() if board is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Board not found or access denied", ) # Check if image already on this board existing = ( db.query(BoardImage).filter(BoardImage.board_id == request.board_id, BoardImage.image_id == image_id).first() ) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Image already exists on this board", ) # Get max z_order for board max_z = ( db.query(BoardImage.z_order) .filter(BoardImage.board_id == request.board_id) .order_by(BoardImage.z_order.desc()) .first() ) next_z = (max_z[0] + 1) if max_z else 0 # Create BoardImage reference board_image = BoardImage( board_id=UUID(request.board_id), image_id=image_id, position=request.position, transformations={ "scale": 1.0, "rotation": 0, "opacity": 1.0, "flipped_h": False, "flipped_v": False, "greyscale": False, }, z_order=next_z, ) db.add(board_image) # Increment reference count image.reference_count += 1 db.commit() db.refresh(board_image) return {"id": str(board_image.id), "message": "Image added to board successfully"} @router.delete("/library/images/{image_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_library_image( image_id: UUID, current_user: User = Depends(get_current_user), db: Session = Depends(get_db_sync), ) -> None: """ Permanently delete an image from library. Removes image from all boards and deletes from storage. Only allowed if user owns the image. """ from app.core.storage import storage_client # Get image image = db.query(Image).filter(Image.id == image_id, Image.user_id == current_user.id).first() if image is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Image not found in library", ) # Delete all BoardImage references db.query(BoardImage).filter(BoardImage.image_id == image_id).delete() # Delete from storage import contextlib try: storage_client.delete_file(image.storage_path) # Also delete thumbnails if they exist thumbnails = image.image_metadata.get("thumbnails", {}) for thumb_path in thumbnails.values(): if thumb_path: with contextlib.suppress(Exception): storage_client.delete_file(thumb_path) except Exception as e: # Log error but continue with database deletion print(f"Warning: Failed to delete image from storage: {str(e)}") # Delete database record db.delete(image) db.commit() @router.get("/library/stats") def get_library_stats( current_user: User = Depends(get_current_user), db: Session = Depends(get_db_sync), ) -> dict: """ Get statistics about user's image library. Returns total images, total size, and usage across boards. """ images = db.query(Image).filter(Image.user_id == current_user.id).all() total_images = len(images) total_size = sum(img.file_size for img in images) total_references = sum(img.reference_count for img in images) return { "total_images": total_images, "total_size_bytes": total_size, "total_board_references": total_references, "average_references_per_image": total_references / total_images if total_images > 0 else 0, }