Files
media-map/backend/app/api/collection.py
Danilo Reyes 1329958b4e
Some checks failed
Test Suite / test (push) Failing after 45s
Add NixOS VM configuration and new media search endpoint
- Introduced a new NixOS VM configuration in `flake.nix` for testing purposes.
- Updated CI workflow to build the VM using the flake input for a more streamlined process.
- Added a new API endpoint `/search` in the collection API to search owned media by title, with optional filtering by media type.
- Refactored frontend components to utilize the new media search functionality, updating types and state management accordingly.
- Enhanced CSS for the map component to ensure proper rendering in various layouts.
2025-12-28 23:02:48 -06:00

195 lines
6.3 KiB
Python

"""Collection API endpoints"""
from fastapi import APIRouter, Query, HTTPException
from typing import List, Optional
import json
from app.core.database import init_db
router = APIRouter()
@router.get("/summary")
async def get_collection_summary(
types: Optional[str] = Query(None, description="Comma-separated list: movie,show,music")
):
"""
Get collection summary by country and media type.
Returns counts per country per media type.
"""
# Ensure pool is initialized
from app.core.database import init_db, pool as db_pool
await init_db()
if db_pool is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="Database not available")
# Parse types filter
type_filter = []
if types:
type_filter = [t.strip() for t in types.split(",") if t.strip() in ["movie", "show", "music"]]
async with db_pool.connection() as conn:
async with conn.cursor() as cur:
# Build query
query = """
SELECT
mc.country_code,
mi.media_type,
COUNT(*) as count
FROM moviemap.media_country mc
JOIN moviemap.media_item mi ON mc.media_item_id = mi.id
"""
params = []
if type_filter:
query += " WHERE mi.media_type = ANY(%s)"
params.append(type_filter)
query += """
GROUP BY mc.country_code, mi.media_type
ORDER BY mc.country_code, mi.media_type
"""
await cur.execute(query, params if params else None)
rows = await cur.fetchall()
# Transform to nested dict structure
result = {}
for row in rows:
country_code, media_type, count = row
if country_code not in result:
result[country_code] = {}
result[country_code][media_type] = count
return result
@router.get("/by-country")
async def get_media_by_country(
country_code: str = Query(..., description="ISO 3166-1 alpha-2 country code"),
types: Optional[str] = Query(None, description="Comma-separated list: movie,show,music")
):
"""
Get list of media items for a specific country.
Returns media items with their details.
"""
await init_db()
from app.core.database import pool as db_pool
if db_pool is None:
raise HTTPException(status_code=503, detail="Database not available")
# Validate country code
if len(country_code) != 2 or not country_code.isalpha():
raise HTTPException(status_code=400, detail="Country code must be 2 letters (ISO 3166-1 alpha-2)")
country_code = country_code.upper()
# Parse types filter
type_filter = []
if types:
type_filter = [t.strip() for t in types.split(",") if t.strip() in ["movie", "show", "music"]]
async with db_pool.connection() as conn:
async with conn.cursor() as cur:
query = """
SELECT
mi.id,
mi.source_kind,
mi.source_item_id,
mi.title,
mi.year,
mi.media_type
FROM moviemap.media_country mc
JOIN moviemap.media_item mi ON mc.media_item_id = mi.id
WHERE mc.country_code = %s
"""
params = [country_code]
if type_filter:
query += " AND mi.media_type = ANY(%s)"
params.append(type_filter)
query += " ORDER BY mi.title"
await cur.execute(query, params)
rows = await cur.fetchall()
items = []
for row in rows:
items.append({
"id": str(row[0]),
"source_kind": row[1],
"source_item_id": row[2],
"title": row[3],
"year": row[4],
"media_type": row[5],
})
return {
"country_code": country_code,
"count": len(items),
"items": items
}
@router.get("/search")
async def search_owned_media(
query: str = Query(..., min_length=1, description="Search query (title substring)"),
media_type: Optional[str] = Query(None, description="Filter by media type: movie or show"),
limit: int = Query(10, ge=1, le=50),
):
"""
Search owned media (from your synced Radarr/Sonarr library) by title.
"""
await init_db()
from app.core.database import pool as db_pool
if db_pool is None:
raise HTTPException(status_code=503, detail="Database not available")
mt = None
if media_type is not None:
if media_type not in ["movie", "show"]:
raise HTTPException(status_code=400, detail="media_type must be 'movie' or 'show'")
mt = media_type
like = f"%{query.strip()}%"
async with db_pool.connection() as conn:
async with conn.cursor() as cur:
if mt:
sql = """
SELECT id, title, year, media_type
FROM moviemap.media_item
WHERE media_type = %s
AND title ILIKE %s
ORDER BY title
LIMIT %s
"""
params = (mt, like, limit)
else:
sql = """
SELECT id, title, year, media_type
FROM moviemap.media_item
WHERE media_type = ANY(%s)
AND title ILIKE %s
ORDER BY title
LIMIT %s
"""
params = (["movie", "show"], like, limit)
await cur.execute(sql, params)
rows = await cur.fetchall()
return {
"query": query,
"results": [
{
"id": str(r[0]),
"title": r[1],
"year": r[2],
"media_type": r[3],
}
for r in rows
],
}