#!/usr/bin/env python3 """SQLite persistence for download links.""" from __future__ import annotations import sqlite3 from pathlib import Path from typing import Iterable from urllib.parse import urlsplit, urlunsplit from functions import LOG from functions import load_config_variables def get_db_path(configs: dict | None = None) -> Path: """Return the database path for links.""" cfg = configs or load_config_variables() base = Path(cfg["global"]["databases-dir"]) return base / "links.sqlite3" def connect(configs: dict | None = None) -> sqlite3.Connection: """Open a connection and ensure schema exists.""" db_path = get_db_path(configs) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row ensure_schema(conn) return conn def ensure_schema(conn: sqlite3.Connection) -> None: """Create schema if missing.""" conn.executescript( """ CREATE TABLE IF NOT EXISTS links ( id INTEGER PRIMARY KEY, user_name TEXT NOT NULL, url_original TEXT NOT NULL, url_normalized TEXT NOT NULL, site TEXT, enabled INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, disabled_at TEXT, banned_at TEXT, banned_reason TEXT, requires_revision INTEGER NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm ON links (user_name, url_normalized); CREATE TABLE IF NOT EXISTS link_history ( id INTEGER PRIMARY KEY, link_id INTEGER, user_name TEXT NOT NULL, event TEXT NOT NULL, old_url TEXT, new_url TEXT, note TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS link_tombstones ( id INTEGER PRIMARY KEY, user_name TEXT NOT NULL, url_normalized TEXT NOT NULL, url_original TEXT NOT NULL, removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm ON link_tombstones (user_name, url_normalized); """ ) _ensure_column( conn, "links", "requires_revision", "ALTER TABLE links ADD COLUMN requires_revision INTEGER NOT NULL DEFAULT 0", ) def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None: cols = [row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()] if column in cols: return conn.execute(ddl) def normalize_url(url: str) -> str: """Normalize URL for dedupe only.""" raw = url.strip() if "://" not in raw: raw = f"https://{raw}" parts = urlsplit(raw) scheme = "https" host = (parts.hostname or "").lower() if host.startswith("www."): host = host[4:] if host in ("twitter.com", "www.twitter.com"): host = "x.com" path = parts.path.rstrip("/") query = parts.query return urlunsplit((scheme, host, path, query, "")) def get_site(url: str) -> str: """Return normalized host name.""" raw = url.strip() if "://" not in raw: raw = f"https://{raw}" host = (urlsplit(raw).hostname or "").lower() if host.startswith("www."): host = host[4:] if host in ("twitter.com", "www.twitter.com"): host = "x.com" return host def add_history( conn: sqlite3.Connection, user_name: str, event: str, link_id: int | None = None, old_url: str | None = None, new_url: str | None = None, note: str | None = None, ) -> None: conn.execute( """ INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note) VALUES (?, ?, ?, ?, ?, ?) """, (link_id, user_name, event, old_url, new_url, note), ) def add_link( conn: sqlite3.Connection, user_name: str, url_original: str, assume_yes: bool = False, source: str = "manual", ) -> dict: """Add a link or return existing status.""" url_norm = normalize_url(url_original) site = get_site(url_original) row = conn.execute( "SELECT * FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchone() if row: return {"status": "exists", "row": row} tombstone = conn.execute( "SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchone() if tombstone and not assume_yes and source != "push": return {"status": "removed", "removed_at": tombstone["removed_at"]} cur = conn.execute( """ INSERT INTO links (user_name, url_original, url_normalized, site) VALUES (?, ?, ?, ?) """, (user_name, url_original, url_norm, site), ) if tombstone: conn.execute( """ UPDATE links SET requires_revision = 0 WHERE id = ? """, (cur.lastrowid,), ) add_history( conn, user_name=user_name, event="add", link_id=cur.lastrowid, new_url=url_original, note=f"source={source}", ) return {"status": "added", "id": cur.lastrowid} def set_enabled( conn: sqlite3.Connection, user_name: str, url_original: str, enabled: bool, ) -> bool: url_norm = normalize_url(url_original) row = conn.execute( "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchone() if not row: return False if enabled: conn.execute( """ UPDATE links SET enabled = 1, disabled_at = NULL, requires_revision = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (row["id"],), ) add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"]) else: conn.execute( """ UPDATE links SET enabled = 0, disabled_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (row["id"],), ) add_history(conn, user_name, "disable", link_id=row["id"], old_url=row["url_original"]) return True def set_banned( conn: sqlite3.Connection, user_name: str, url_original: str, banned: bool, reason: str | None = None, ) -> bool: url_norm = normalize_url(url_original) row = conn.execute( "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchone() if not row: return False if banned: conn.execute( """ UPDATE links SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (reason, row["id"]), ) add_history( conn, user_name, "ban", link_id=row["id"], old_url=row["url_original"], note=reason, ) else: conn.execute( """ UPDATE links SET banned_at = NULL, banned_reason = NULL, requires_revision = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (row["id"],), ) add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"]) return True def mark_requires_revision( conn: sqlite3.Connection, user_name: str, url_original: str, reason: str, ) -> bool: url_norm = normalize_url(url_original) rows = conn.execute( "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchall() if not rows: return False for row in rows: conn.execute( """ UPDATE links SET requires_revision = 1, enabled = 0, disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP), updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (row["id"],), ) add_history( conn, user_name, "requires_revision", link_id=row["id"], old_url=row["url_original"], note=reason, ) return True def mark_requires_revision_by_norm( conn: sqlite3.Connection, url_norm: str, reason: str ) -> int: rows = conn.execute( "SELECT id, user_name, url_original FROM links WHERE url_normalized = ?", (url_norm,), ).fetchall() if not rows: return 0 for row in rows: conn.execute( """ UPDATE links SET requires_revision = 1, enabled = 0, disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP), updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (row["id"],), ) add_history( conn, row["user_name"], "requires_revision", link_id=row["id"], old_url=row["url_original"], note=reason, ) return len(rows) def rename_link( conn: sqlite3.Connection, user_name: str, old_url: str, new_url: str, ) -> dict: old_norm = normalize_url(old_url) new_norm = normalize_url(new_url) row = conn.execute( "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, old_norm), ).fetchone() if not row: return {"status": "missing"} conflict = conn.execute( "SELECT id FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, new_norm), ).fetchone() if conflict and conflict["id"] != row["id"]: return {"status": "conflict"} conn.execute( """ UPDATE links SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (new_url, new_norm, get_site(new_url), row["id"]), ) conn.execute( """ UPDATE links SET enabled = 1, disabled_at = NULL, requires_revision = 0 WHERE id = ? """, (row["id"],), ) add_history( conn, user_name, "rename", link_id=row["id"], old_url=row["url_original"], new_url=new_url, ) return {"status": "renamed"} def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool: url_norm = normalize_url(url_original) row = conn.execute( "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchone() if not row: return False conn.execute( """ INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original) VALUES (?, ?, ?) """, (user_name, url_norm, row["url_original"]), ) add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"]) conn.execute("DELETE FROM links WHERE id = ?", (row["id"],)) return True def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]: rows = conn.execute( """ SELECT url_original FROM links WHERE user_name = ? AND enabled = 1 AND banned_at IS NULL ORDER BY id ASC """, (user_name,), ).fetchall() return [row["url_original"] for row in rows] def get_links( conn: sqlite3.Connection, users: Iterable[str] | None = None, include_disabled: bool = False, include_banned: bool = False, requires_revision_only: bool = False, ) -> list[sqlite3.Row]: params: list = [] where = [] user_list = list(users) if users else [] if user_list: where.append(f"user_name IN ({','.join(['?'] * len(user_list))})") params.extend(user_list) if not include_disabled: where.append("enabled = 1") if not include_banned: where.append("banned_at IS NULL") if requires_revision_only: where.append("requires_revision = 1") clause = " AND ".join(where) if clause: clause = "WHERE " + clause return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall() def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]: return conn.execute( "SELECT * FROM links WHERE user_name = ? ORDER BY id", (user_name,), ).fetchall() def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict: if not path.is_file(): return {"status": "missing", "path": str(path)} with open(path, "r", encoding="utf-8") as r_file: lines = [ln.strip() for ln in r_file if ln.strip()] added = 0 exists = 0 removed = 0 duplicates: list[str] = [] for line in lines: disabled = False raw = line if raw.startswith("#"): disabled = True raw = raw.lstrip("#").strip() if not raw: continue result = add_link(conn, user_name, raw, assume_yes=True, source="import") if result["status"] == "added": added += 1 if disabled: set_enabled(conn, user_name, raw, enabled=False) elif result["status"] == "exists": exists += 1 duplicates.append(raw) elif result["status"] == "removed": removed += 1 return { "status": "ok", "added": added, "exists": exists, "removed": removed, "duplicates": duplicates, } def bulk_rename_handle( conn: sqlite3.Connection, user_name: str, site: str, old_handle: str, new_handle: str, ) -> dict: """Rename account handle within a site for a user.""" site_norm = site.lower().lstrip("www.") if site_norm == "twitter.com": site_norm = "x.com" if site_norm == "www.twitter.com": site_norm = "x.com" rows = conn.execute( """ SELECT id, url_original FROM links WHERE user_name = ? AND site = ? """, (user_name, site_norm), ).fetchall() updated = 0 skipped = 0 conflicts = 0 for row in rows: raw = row["url_original"] parts = urlsplit(raw if "://" in raw else f"https://{raw}") path = parts.path segments = path.split("/") if len(segments) < 2 or segments[1] != old_handle: skipped += 1 continue segments[1] = new_handle new_path = "/".join(segments) new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment)) result = rename_link(conn, user_name, raw, new_url) if result["status"] == "renamed": updated += 1 elif result["status"] == "conflict": conflicts += 1 else: skipped += 1 return {"updated": updated, "skipped": skipped, "conflicts": conflicts} def warn(msg: str) -> None: LOG.warning(msg)