"""Board repository for database operations.""" from collections.abc import Sequence from uuid import UUID from sqlalchemy import func, select from sqlalchemy.orm import Session from app.database.models.board import Board from app.database.models.board_image import BoardImage from app.database.models.group import Group class BoardRepository: """Repository for Board database operations.""" def __init__(self, db: Session): """ Initialize repository with database session. Args: db: SQLAlchemy database session """ self.db = db def create_board( self, user_id: UUID, title: str, description: str | None = None, viewport_state: dict | None = None, ) -> Board: """ Create a new board. Args: user_id: Owner's user ID title: Board title description: Optional board description viewport_state: Optional custom viewport state Returns: Created Board instance """ if viewport_state is None: viewport_state = {"x": 0, "y": 0, "zoom": 1.0, "rotation": 0} board = Board( user_id=user_id, title=title, description=description, viewport_state=viewport_state, ) self.db.add(board) self.db.commit() self.db.refresh(board) return board def get_board_by_id(self, board_id: UUID, user_id: UUID) -> Board | None: """ Get board by ID for a specific user. Args: board_id: Board UUID user_id: User UUID (for ownership check) Returns: Board if found and owned by user, None otherwise """ stmt = select(Board).where( Board.id == board_id, Board.user_id == user_id, Board.is_deleted == False, # noqa: E712 ) return self.db.execute(stmt).scalar_one_or_none() def get_user_boards( self, user_id: UUID, limit: int = 50, offset: int = 0, ) -> tuple[Sequence[Board], int]: """ Get all boards for a user with pagination. Args: user_id: User UUID limit: Maximum number of boards to return offset: Number of boards to skip Returns: Tuple of (list of boards, total count) """ # Query for boards with image count stmt = ( select(Board, func.count(BoardImage.id).label("image_count")) .outerjoin(BoardImage, Board.id == BoardImage.board_id) .where(Board.user_id == user_id, Board.is_deleted == False) # noqa: E712 .group_by(Board.id) .order_by(Board.updated_at.desc()) .limit(limit) .offset(offset) ) results = self.db.execute(stmt).all() boards = [row[0] for row in results] # Get total count count_stmt = select(func.count(Board.id)).where(Board.user_id == user_id, Board.is_deleted == False) # noqa: E712 total = self.db.execute(count_stmt).scalar_one() return boards, total def update_board( self, board_id: UUID, user_id: UUID, title: str | None = None, description: str | None = None, viewport_state: dict | None = None, ) -> Board | None: """ Update board metadata. Args: board_id: Board UUID user_id: User UUID (for ownership check) title: New title (if provided) description: New description (if provided) viewport_state: New viewport state (if provided) Returns: Updated Board if found and owned by user, None otherwise """ board = self.get_board_by_id(board_id, user_id) if not board: return None if title is not None: board.title = title if description is not None: board.description = description if viewport_state is not None: board.viewport_state = viewport_state self.db.commit() self.db.refresh(board) return board def delete_board(self, board_id: UUID, user_id: UUID) -> bool: """ Soft delete a board. Args: board_id: Board UUID user_id: User UUID (for ownership check) Returns: True if deleted, False if not found or not owned """ board = self.get_board_by_id(board_id, user_id) if not board: return False board.is_deleted = True self.db.commit() return True def board_exists(self, board_id: UUID, user_id: UUID) -> bool: """ Check if board exists and is owned by user. Args: board_id: Board UUID user_id: User UUID Returns: True if board exists and is owned by user """ stmt = select(func.count(Board.id)).where( Board.id == board_id, Board.user_id == user_id, Board.is_deleted == False, # noqa: E712 ) count = self.db.execute(stmt).scalar_one() return count > 0 # Group operations def create_group( self, board_id: UUID, name: str, color: str, annotation: str | None, image_ids: list[UUID], ) -> Group: """ Create a new group and assign images to it. Args: board_id: Board UUID name: Group name color: Hex color code annotation: Optional annotation text image_ids: List of board_image IDs to include Returns: Created Group instance """ group = Group( board_id=board_id, name=name, color=color, annotation=annotation, ) self.db.add(group) self.db.flush() # Get group ID # Assign images to group for image_id in image_ids: stmt = select(BoardImage).where(BoardImage.board_id == board_id, BoardImage.image_id == image_id) board_image = self.db.execute(stmt).scalar_one_or_none() if board_image: board_image.group_id = group.id self.db.commit() self.db.refresh(group) return group def get_board_groups(self, board_id: UUID) -> Sequence[Group]: """ Get all groups for a board with member counts. Args: board_id: Board UUID Returns: List of groups """ stmt = ( select(Group, func.count(BoardImage.id).label("member_count")) .outerjoin(BoardImage, Group.id == BoardImage.group_id) .where(Group.board_id == board_id) .group_by(Group.id) .order_by(Group.created_at.desc()) ) results = self.db.execute(stmt).all() # Add member_count as attribute groups = [] for row in results: group = row[0] # Note: member_count is dynamically calculated, not stored groups.append(group) return groups def get_group_by_id(self, group_id: UUID, board_id: UUID) -> Group | None: """ Get group by ID. Args: group_id: Group UUID board_id: Board UUID (for verification) Returns: Group if found, None otherwise """ stmt = select(Group).where(Group.id == group_id, Group.board_id == board_id) return self.db.execute(stmt).scalar_one_or_none() def update_group( self, group_id: UUID, board_id: UUID, name: str | None = None, color: str | None = None, annotation: str | None = None, ) -> Group | None: """ Update group metadata. Args: group_id: Group UUID board_id: Board UUID name: New name (if provided) color: New color (if provided) annotation: New annotation (if provided) Returns: Updated Group if found, None otherwise """ group = self.get_group_by_id(group_id, board_id) if not group: return None if name is not None: group.name = name if color is not None: group.color = color if annotation is not None: group.annotation = annotation self.db.commit() self.db.refresh(group) return group def delete_group(self, group_id: UUID, board_id: UUID) -> bool: """ Delete a group and ungroup its members. Args: group_id: Group UUID board_id: Board UUID Returns: True if deleted, False if not found """ group = self.get_group_by_id(group_id, board_id) if not group: return False # Ungroup all members (set group_id to None) stmt = select(BoardImage).where(BoardImage.group_id == group_id) members = self.db.execute(stmt).scalars().all() for member in members: member.group_id = None # Delete the group self.db.delete(group) self.db.commit() return True def add_images_to_group(self, group_id: UUID, board_id: UUID, image_ids: list[UUID]) -> int: """ Add images to a group. Args: group_id: Group UUID board_id: Board UUID image_ids: List of image IDs to add Returns: Number of images added """ count = 0 for image_id in image_ids: stmt = select(BoardImage).where(BoardImage.board_id == board_id, BoardImage.image_id == image_id) board_image = self.db.execute(stmt).scalar_one_or_none() if board_image: board_image.group_id = group_id count += 1 self.db.commit() return count def remove_images_from_group(self, group_id: UUID, image_ids: list[UUID]) -> int: """ Remove images from a group. Args: group_id: Group UUID image_ids: List of image IDs to remove Returns: Number of images removed """ count = 0 for image_id in image_ids: stmt = select(BoardImage).where(BoardImage.group_id == group_id, BoardImage.image_id == image_id) board_image = self.db.execute(stmt).scalar_one_or_none() if board_image: board_image.group_id = None count += 1 self.db.commit() return count