Files
webref/backend/app/images/repository.py
Danilo Reyes ff1c29c66a fix part 3
2025-11-02 18:32:20 -06:00

174 lines
5.5 KiB
Python

"""Image repository for database operations."""
from collections.abc import Sequence
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models.board_image import BoardImage
from app.database.models.image import Image
class ImageRepository:
"""Repository for image database operations."""
def __init__(self, db: AsyncSession):
"""Initialize repository with database session."""
self.db = db
async def create_image(
self,
user_id: UUID,
filename: str,
storage_path: str,
file_size: int,
mime_type: str,
width: int,
height: int,
image_metadata: dict,
) -> Image:
"""Create new image record."""
image = Image(
user_id=user_id,
filename=filename,
storage_path=storage_path,
file_size=file_size,
mime_type=mime_type,
width=width,
height=height,
image_metadata=image_metadata,
)
self.db.add(image)
await self.db.commit()
await self.db.refresh(image)
return image
async def get_image_by_id(self, image_id: UUID) -> Image | None:
"""Get image by ID."""
result = await self.db.execute(select(Image).where(Image.id == image_id))
return result.scalar_one_or_none()
async def get_user_images(self, user_id: UUID, limit: int = 50, offset: int = 0) -> tuple[Sequence[Image], int]:
"""Get all images for a user with pagination."""
from sqlalchemy import func
# Get total count efficiently
count_result = await self.db.execute(select(func.count(Image.id)).where(Image.user_id == user_id))
total = count_result.scalar_one()
# Get paginated images
result = await self.db.execute(
select(Image).where(Image.user_id == user_id).order_by(Image.created_at.desc()).limit(limit).offset(offset)
)
images = result.scalars().all()
return images, total
async def delete_image(self, image_id: UUID) -> bool:
"""Delete image record."""
image = await self.get_image_by_id(image_id)
if not image:
return False
await self.db.delete(image)
await self.db.commit()
return True
async def increment_reference_count(self, image_id: UUID) -> None:
"""Increment reference count for image."""
image = await self.get_image_by_id(image_id)
if image:
image.reference_count += 1
await self.db.commit()
async def decrement_reference_count(self, image_id: UUID) -> int:
"""Decrement reference count for image."""
image = await self.get_image_by_id(image_id)
if image and image.reference_count > 0:
image.reference_count -= 1
await self.db.commit()
return image.reference_count
return 0
async def add_image_to_board(
self,
board_id: UUID,
image_id: UUID,
position: dict,
transformations: dict,
z_order: int = 0,
) -> BoardImage:
"""Add image to board."""
board_image = BoardImage(
board_id=board_id,
image_id=image_id,
position=position,
transformations=transformations,
z_order=z_order,
)
self.db.add(board_image)
# Increment reference count
await self.increment_reference_count(image_id)
await self.db.commit()
await self.db.refresh(board_image)
return board_image
async def get_board_images(self, board_id: UUID) -> Sequence[BoardImage]:
"""Get all images for a board, ordered by z-order."""
result = await self.db.execute(
select(BoardImage).where(BoardImage.board_id == board_id).order_by(BoardImage.z_order.asc())
)
return result.scalars().all()
async def get_board_image(self, board_id: UUID, image_id: UUID) -> BoardImage | None:
"""Get a specific board image."""
result = await self.db.execute(
select(BoardImage).where(BoardImage.board_id == board_id, BoardImage.image_id == image_id)
)
return result.scalar_one_or_none()
async def update_board_image(
self,
board_id: UUID,
image_id: UUID,
position: dict | None = None,
transformations: dict | None = None,
z_order: int | None = None,
group_id: UUID | None = None,
) -> BoardImage | None:
"""Update board image position, transformations, z-order, or group."""
board_image = await self.get_board_image(board_id, image_id)
if not board_image:
return None
if position is not None:
board_image.position = position
if transformations is not None:
board_image.transformations = transformations
if z_order is not None:
board_image.z_order = z_order
if group_id is not None:
board_image.group_id = group_id
await self.db.commit()
await self.db.refresh(board_image)
return board_image
async def remove_image_from_board(self, board_id: UUID, image_id: UUID) -> bool:
"""Remove image from board."""
board_image = await self.get_board_image(board_id, image_id)
if not board_image:
return False
await self.db.delete(board_image)
# Decrement reference count
await self.decrement_reference_count(image_id)
await self.db.commit()
return True