Files
webref/backend/app/boards/repository.py
2025-11-02 00:36:32 -06:00

198 lines
5.1 KiB
Python

"""Board repository for database operations."""
from collections.abc import Sequence
from uuid import UUID
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.database.models.board import Board
from app.database.models.board_image import BoardImage
class BoardRepository:
"""Repository for Board database operations."""
def __init__(self, db: Session):
"""
Initialize repository with database session.
Args:
db: SQLAlchemy database session
"""
self.db = db
def create_board(
self,
user_id: UUID,
title: str,
description: str | None = None,
viewport_state: dict | None = None,
) -> Board:
"""
Create a new board.
Args:
user_id: Owner's user ID
title: Board title
description: Optional board description
viewport_state: Optional custom viewport state
Returns:
Created Board instance
"""
if viewport_state is None:
viewport_state = {"x": 0, "y": 0, "zoom": 1.0, "rotation": 0}
board = Board(
user_id=user_id,
title=title,
description=description,
viewport_state=viewport_state,
)
self.db.add(board)
self.db.commit()
self.db.refresh(board)
return board
def get_board_by_id(self, board_id: UUID, user_id: UUID) -> Board | None:
"""
Get board by ID for a specific user.
Args:
board_id: Board UUID
user_id: User UUID (for ownership check)
Returns:
Board if found and owned by user, None otherwise
"""
stmt = select(Board).where(
Board.id == board_id,
Board.user_id == user_id,
Board.is_deleted == False, # noqa: E712
)
return self.db.execute(stmt).scalar_one_or_none()
def get_user_boards(
self,
user_id: UUID,
limit: int = 50,
offset: int = 0,
) -> tuple[Sequence[Board], int]:
"""
Get all boards for a user with pagination.
Args:
user_id: User UUID
limit: Maximum number of boards to return
offset: Number of boards to skip
Returns:
Tuple of (list of boards, total count)
"""
# Query for boards with image count
stmt = (
select(Board, func.count(BoardImage.id).label("image_count"))
.outerjoin(BoardImage, Board.id == BoardImage.board_id)
.where(Board.user_id == user_id, Board.is_deleted == False) # noqa: E712
.group_by(Board.id)
.order_by(Board.updated_at.desc())
.limit(limit)
.offset(offset)
)
results = self.db.execute(stmt).all()
boards = [row[0] for row in results]
# Get total count
count_stmt = select(func.count(Board.id)).where(Board.user_id == user_id, Board.is_deleted == False) # noqa: E712
total = self.db.execute(count_stmt).scalar_one()
return boards, total
def update_board(
self,
board_id: UUID,
user_id: UUID,
title: str | None = None,
description: str | None = None,
viewport_state: dict | None = None,
) -> Board | None:
"""
Update board metadata.
Args:
board_id: Board UUID
user_id: User UUID (for ownership check)
title: New title (if provided)
description: New description (if provided)
viewport_state: New viewport state (if provided)
Returns:
Updated Board if found and owned by user, None otherwise
"""
board = self.get_board_by_id(board_id, user_id)
if not board:
return None
if title is not None:
board.title = title
if description is not None:
board.description = description
if viewport_state is not None:
board.viewport_state = viewport_state
self.db.commit()
self.db.refresh(board)
return board
def delete_board(self, board_id: UUID, user_id: UUID) -> bool:
"""
Soft delete a board.
Args:
board_id: Board UUID
user_id: User UUID (for ownership check)
Returns:
True if deleted, False if not found or not owned
"""
board = self.get_board_by_id(board_id, user_id)
if not board:
return False
board.is_deleted = True
self.db.commit()
return True
def board_exists(self, board_id: UUID, user_id: UUID) -> bool:
"""
Check if board exists and is owned by user.
Args:
board_id: Board UUID
user_id: User UUID
Returns:
True if board exists and is owned by user
"""
stmt = select(func.count(Board.id)).where(
Board.id == board_id,
Board.user_id == user_id,
Board.is_deleted == False, # noqa: E712
)
count = self.db.execute(stmt).scalar_one()
return count > 0