Files
media-map/backend/tests/conftest.py
Danilo Reyes 2b1a92fb49
Some checks failed
Test Suite / test (push) Has been cancelled
Add new API endpoints for media retrieval by country and enhance configuration
- Introduced `/api/tmdb` and `/api/collection/missing-locations` endpoints to the backend for improved media management.
- Added a new `get_media_by_country` function in the collection API to fetch media items based on country codes.
- Updated configuration to allow overriding *arr base URLs via environment variables for better flexibility.
- Enhanced frontend with a new `MissingLocations` component and integrated it into the routing structure.
- Improved the `CollectionMap` component to handle country selection and display media items accordingly.
- Added testing dependencies in `requirements.txt` and updated frontend configuration for testing support.
2025-12-28 22:35:06 -06:00

201 lines
6.5 KiB
Python

"""Pytest configuration and fixtures"""
import pytest
import asyncio
import os
from typing import AsyncGenerator
from fastapi.testclient import TestClient
from httpx import AsyncClient
from app.core.database import init_db, close_db, pool as db_pool
from app.core.config import settings
from app.main import app
import psycopg
from psycopg_pool import AsyncConnectionPool
# Use test database
TEST_DB = os.getenv("TEST_POSTGRES_DB", "moviemap_test")
TEST_USER = os.getenv("TEST_POSTGRES_USER", os.getenv("USER", "jawz"))
TEST_SOCKET = os.getenv("TEST_POSTGRES_SOCKET_PATH", "/run/postgresql")
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for the test session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="function")
async def test_db_pool() -> AsyncGenerator[AsyncConnectionPool, None]:
"""Create a test database connection pool"""
test_db_url = f"postgresql://{TEST_USER}@/{TEST_DB}?host={TEST_SOCKET}"
pool = AsyncConnectionPool(
conninfo=test_db_url,
min_size=1,
max_size=5,
open=False,
)
await pool.open()
# Run migrations
async with pool.connection() as conn:
async with conn.cursor() as cur:
# Create schema if not exists
await cur.execute("CREATE SCHEMA IF NOT EXISTS moviemap")
# Create enums
await cur.execute("""
DO $$ BEGIN
CREATE TYPE moviemap.source_kind AS ENUM ('radarr', 'sonarr', 'lidarr');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
""")
await cur.execute("""
DO $$ BEGIN
CREATE TYPE moviemap.media_type AS ENUM ('movie', 'show', 'music');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
""")
await cur.execute("""
DO $$ BEGIN
CREATE TYPE moviemap.watched_media_type AS ENUM ('movie', 'show');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
""")
# Create tables
await cur.execute("""
CREATE TABLE IF NOT EXISTS moviemap.source (
id SERIAL PRIMARY KEY,
kind moviemap.source_kind NOT NULL UNIQUE,
base_url TEXT NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT true,
last_sync_at TIMESTAMPTZ
)
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS moviemap.media_item (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_kind moviemap.source_kind NOT NULL,
source_item_id INTEGER NOT NULL,
title TEXT NOT NULL,
year INTEGER,
media_type moviemap.media_type NOT NULL,
arr_raw JSONB,
UNIQUE (source_kind, source_item_id)
)
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS moviemap.media_country (
media_item_id UUID NOT NULL REFERENCES moviemap.media_item(id) ON DELETE CASCADE,
country_code CHAR(2) NOT NULL,
weight SMALLINT NOT NULL DEFAULT 1,
PRIMARY KEY (media_item_id, country_code)
)
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS moviemap.watched_item (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
media_type moviemap.watched_media_type NOT NULL,
title TEXT NOT NULL,
year INTEGER,
country_code CHAR(2) NOT NULL,
watched_at TIMESTAMPTZ,
notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS moviemap.manual_pin (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
country_code CHAR(2) NOT NULL,
label TEXT,
pinned_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
await conn.commit()
yield pool
# Cleanup: drop all data but keep schema
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("TRUNCATE TABLE moviemap.media_country CASCADE")
await cur.execute("TRUNCATE TABLE moviemap.media_item CASCADE")
await cur.execute("TRUNCATE TABLE moviemap.watched_item CASCADE")
await cur.execute("TRUNCATE TABLE moviemap.manual_pin CASCADE")
await cur.execute("TRUNCATE TABLE moviemap.source CASCADE")
await conn.commit()
await pool.close()
@pytest.fixture(scope="function")
async def test_client(test_db_pool) -> AsyncGenerator[AsyncClient, None]:
"""Create a test client with test database"""
# Temporarily replace the global pool
import app.core.database
original_pool = app.core.database.pool
app.core.database.pool = test_db_pool
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# Restore original pool
app.core.database.pool = original_pool
@pytest.fixture
def mock_radarr_response():
"""Mock Radarr API response"""
return [
{
"id": 1,
"title": "Test Movie",
"year": 2020,
"tmdbId": 12345,
"productionCountries": [{"iso_3166_1": "US"}]
}
]
@pytest.fixture
def mock_sonarr_response():
"""Mock Sonarr API response"""
return [
{
"id": 1,
"title": "Test Show",
"year": 2020,
"tmdbId": 67890,
"seriesMetadata": {"originCountry": ["US"]}
}
]
@pytest.fixture
def mock_lidarr_response():
"""Mock Lidarr API response"""
return [
{
"id": 1,
"artistName": "Test Artist",
"country": "US",
"foreignArtistId": "test-mbid-123"
}
]