Files
webref/backend/app/boards/repository.py
Danilo Reyes 948fe591dc
All checks were successful
CI/CD Pipeline / VM Test - security (push) Successful in 7s
CI/CD Pipeline / Backend Linting (push) Successful in 4s
CI/CD Pipeline / VM Test - backend-integration (push) Successful in 11s
CI/CD Pipeline / VM Test - full-stack (push) Successful in 8s
CI/CD Pipeline / VM Test - performance (push) Successful in 8s
CI/CD Pipeline / Nix Flake Check (push) Successful in 38s
CI/CD Pipeline / CI Summary (push) Successful in 0s
CI/CD Pipeline / Frontend Linting (push) Successful in 17s
phase 13
2025-11-02 14:48:03 -06:00

409 lines
11 KiB
Python

"""Board repository for database operations."""
from collections.abc import Sequence
from uuid import UUID
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.database.models.board import Board
from app.database.models.board_image import BoardImage
from app.database.models.group import Group
class BoardRepository:
"""Repository for Board database operations."""
def __init__(self, db: Session):
"""
Initialize repository with database session.
Args:
db: SQLAlchemy database session
"""
self.db = db
def create_board(
self,
user_id: UUID,
title: str,
description: str | None = None,
viewport_state: dict | None = None,
) -> Board:
"""
Create a new board.
Args:
user_id: Owner's user ID
title: Board title
description: Optional board description
viewport_state: Optional custom viewport state
Returns:
Created Board instance
"""
if viewport_state is None:
viewport_state = {"x": 0, "y": 0, "zoom": 1.0, "rotation": 0}
board = Board(
user_id=user_id,
title=title,
description=description,
viewport_state=viewport_state,
)
self.db.add(board)
self.db.commit()
self.db.refresh(board)
return board
def get_board_by_id(self, board_id: UUID, user_id: UUID) -> Board | None:
"""
Get board by ID for a specific user.
Args:
board_id: Board UUID
user_id: User UUID (for ownership check)
Returns:
Board if found and owned by user, None otherwise
"""
stmt = select(Board).where(
Board.id == board_id,
Board.user_id == user_id,
Board.is_deleted == False, # noqa: E712
)
return self.db.execute(stmt).scalar_one_or_none()
def get_user_boards(
self,
user_id: UUID,
limit: int = 50,
offset: int = 0,
) -> tuple[Sequence[Board], int]:
"""
Get all boards for a user with pagination.
Args:
user_id: User UUID
limit: Maximum number of boards to return
offset: Number of boards to skip
Returns:
Tuple of (list of boards, total count)
"""
# Query for boards with image count
stmt = (
select(Board, func.count(BoardImage.id).label("image_count"))
.outerjoin(BoardImage, Board.id == BoardImage.board_id)
.where(Board.user_id == user_id, Board.is_deleted == False) # noqa: E712
.group_by(Board.id)
.order_by(Board.updated_at.desc())
.limit(limit)
.offset(offset)
)
results = self.db.execute(stmt).all()
boards = [row[0] for row in results]
# Get total count
count_stmt = select(func.count(Board.id)).where(Board.user_id == user_id, Board.is_deleted == False) # noqa: E712
total = self.db.execute(count_stmt).scalar_one()
return boards, total
def update_board(
self,
board_id: UUID,
user_id: UUID,
title: str | None = None,
description: str | None = None,
viewport_state: dict | None = None,
) -> Board | None:
"""
Update board metadata.
Args:
board_id: Board UUID
user_id: User UUID (for ownership check)
title: New title (if provided)
description: New description (if provided)
viewport_state: New viewport state (if provided)
Returns:
Updated Board if found and owned by user, None otherwise
"""
board = self.get_board_by_id(board_id, user_id)
if not board:
return None
if title is not None:
board.title = title
if description is not None:
board.description = description
if viewport_state is not None:
board.viewport_state = viewport_state
self.db.commit()
self.db.refresh(board)
return board
def delete_board(self, board_id: UUID, user_id: UUID) -> bool:
"""
Soft delete a board.
Args:
board_id: Board UUID
user_id: User UUID (for ownership check)
Returns:
True if deleted, False if not found or not owned
"""
board = self.get_board_by_id(board_id, user_id)
if not board:
return False
board.is_deleted = True
self.db.commit()
return True
def board_exists(self, board_id: UUID, user_id: UUID) -> bool:
"""
Check if board exists and is owned by user.
Args:
board_id: Board UUID
user_id: User UUID
Returns:
True if board exists and is owned by user
"""
stmt = select(func.count(Board.id)).where(
Board.id == board_id,
Board.user_id == user_id,
Board.is_deleted == False, # noqa: E712
)
count = self.db.execute(stmt).scalar_one()
return count > 0
# Group operations
def create_group(
self,
board_id: UUID,
name: str,
color: str,
annotation: str | None,
image_ids: list[UUID],
) -> Group:
"""
Create a new group and assign images to it.
Args:
board_id: Board UUID
name: Group name
color: Hex color code
annotation: Optional annotation text
image_ids: List of board_image IDs to include
Returns:
Created Group instance
"""
group = Group(
board_id=board_id,
name=name,
color=color,
annotation=annotation,
)
self.db.add(group)
self.db.flush() # Get group ID
# Assign images to group
for image_id in image_ids:
stmt = select(BoardImage).where(BoardImage.board_id == board_id, BoardImage.image_id == image_id)
board_image = self.db.execute(stmt).scalar_one_or_none()
if board_image:
board_image.group_id = group.id
self.db.commit()
self.db.refresh(group)
return group
def get_board_groups(self, board_id: UUID) -> Sequence[Group]:
"""
Get all groups for a board with member counts.
Args:
board_id: Board UUID
Returns:
List of groups
"""
stmt = (
select(Group, func.count(BoardImage.id).label("member_count"))
.outerjoin(BoardImage, Group.id == BoardImage.group_id)
.where(Group.board_id == board_id)
.group_by(Group.id)
.order_by(Group.created_at.desc())
)
results = self.db.execute(stmt).all()
# Add member_count as attribute
groups = []
for row in results:
group = row[0]
# Note: member_count is dynamically calculated, not stored
groups.append(group)
return groups
def get_group_by_id(self, group_id: UUID, board_id: UUID) -> Group | None:
"""
Get group by ID.
Args:
group_id: Group UUID
board_id: Board UUID (for verification)
Returns:
Group if found, None otherwise
"""
stmt = select(Group).where(Group.id == group_id, Group.board_id == board_id)
return self.db.execute(stmt).scalar_one_or_none()
def update_group(
self,
group_id: UUID,
board_id: UUID,
name: str | None = None,
color: str | None = None,
annotation: str | None = None,
) -> Group | None:
"""
Update group metadata.
Args:
group_id: Group UUID
board_id: Board UUID
name: New name (if provided)
color: New color (if provided)
annotation: New annotation (if provided)
Returns:
Updated Group if found, None otherwise
"""
group = self.get_group_by_id(group_id, board_id)
if not group:
return None
if name is not None:
group.name = name
if color is not None:
group.color = color
if annotation is not None:
group.annotation = annotation
self.db.commit()
self.db.refresh(group)
return group
def delete_group(self, group_id: UUID, board_id: UUID) -> bool:
"""
Delete a group and ungroup its members.
Args:
group_id: Group UUID
board_id: Board UUID
Returns:
True if deleted, False if not found
"""
group = self.get_group_by_id(group_id, board_id)
if not group:
return False
# Ungroup all members (set group_id to None)
stmt = select(BoardImage).where(BoardImage.group_id == group_id)
members = self.db.execute(stmt).scalars().all()
for member in members:
member.group_id = None
# Delete the group
self.db.delete(group)
self.db.commit()
return True
def add_images_to_group(self, group_id: UUID, board_id: UUID, image_ids: list[UUID]) -> int:
"""
Add images to a group.
Args:
group_id: Group UUID
board_id: Board UUID
image_ids: List of image IDs to add
Returns:
Number of images added
"""
count = 0
for image_id in image_ids:
stmt = select(BoardImage).where(BoardImage.board_id == board_id, BoardImage.image_id == image_id)
board_image = self.db.execute(stmt).scalar_one_or_none()
if board_image:
board_image.group_id = group_id
count += 1
self.db.commit()
return count
def remove_images_from_group(self, group_id: UUID, image_ids: list[UUID]) -> int:
"""
Remove images from a group.
Args:
group_id: Group UUID
image_ids: List of image IDs to remove
Returns:
Number of images removed
"""
count = 0
for image_id in image_ids:
stmt = select(BoardImage).where(BoardImage.group_id == group_id, BoardImage.image_id == image_id)
board_image = self.db.execute(stmt).scalar_one_or_none()
if board_image:
board_image.group_id = None
count += 1
self.db.commit()
return count