#!/usr/bin/env python3 """Admin CLI: link operations.""" from __future__ import annotations import argparse import shutil import subprocess from pathlib import Path import db from functions import load_config_variables def prompt_yes_no(message: str) -> bool: while True: raw = input(f"{message} [y/n]: ").strip().lower() if raw in ("y", "yes"): return True if raw in ("n", "no"): return False def parse_list_file(path: Path) -> dict: enabled: set[str] = set() disabled: set[str] = set() if not path.is_file(): return {"enabled": enabled, "disabled": disabled} with open(path, "r", encoding="utf-8") as r_file: for raw in r_file: line = raw.strip() if not line: continue if line.startswith("#"): url = line.lstrip("#").strip() if url: disabled.add(db.normalize_url(url)) continue enabled.add(db.normalize_url(line)) return {"enabled": enabled, "disabled": disabled} def cmd_add(args: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: result = db.add_link( conn, args.user, args.url, assume_yes=args.assume_yes, source="manual" ) if result["status"] == "removed" and not args.assume_yes: removed_at = result.get("removed_at", "unknown") if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"): result = db.add_link( conn, args.user, args.url, assume_yes=True, source="manual" ) row = result.get("row") if row and row["banned_at"]: print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})") if row and not row["enabled"]: print("Warning: link is disabled") conn.commit() print(result["status"]) def cmd_disable(args: argparse.Namespace) -> None: _apply_to_links( args, lambda conn, user, url: db.set_enabled(conn, user, url, enabled=False), selector_filter="disable", ) def cmd_enable(args: argparse.Namespace) -> None: _apply_to_links( args, lambda conn, user, url: db.set_enabled(conn, user, url, enabled=True), selector_filter="enable", ) def cmd_ban(args: argparse.Namespace) -> None: _apply_to_links( args, lambda conn, user, url: db.set_banned( conn, user, url, banned=True, reason=args.reason ), selector_filter="ban", ) def cmd_unban(args: argparse.Namespace) -> None: _apply_to_links( args, lambda conn, user, url: db.set_banned(conn, user, url, banned=False), selector_filter="unban", ) def cmd_remove(args: argparse.Namespace) -> None: _apply_to_links(args, lambda conn, user, url: db.remove_link(conn, user, url), "any") def cmd_rename(args: argparse.Namespace) -> None: old_url = args.old_url if not old_url: selection = _select_links(args.user, multi=False, selector_filter="any") if not selection: print("not found") return old_url = selection[0] new_url = args.new_url or input("New URL: ").strip() with db.connect() as conn: result = db.rename_link(conn, args.user, old_url, new_url) if result["status"] == "renamed": conn.commit() print(result["status"]) def cmd_list(args: argparse.Namespace) -> None: users = args.user or None with db.connect() as conn: rows = db.get_links( conn, users=users, include_disabled=args.disabled, include_banned=args.banned, requires_revision_only=args.requires_revision, ) for row in rows: if args.disabled and row["enabled"]: continue if args.banned and not row["banned_at"]: continue status = "enabled" if row["enabled"] else "disabled" if row["banned_at"]: status = "banned" print(f"{row['user_name']} [{status}] {row['url_original']}") def cmd_import(_: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: imported_paths = [] for entry in configs["users"]: user = entry["name"] lists_dir = Path(configs["global"]["lists-dir"]) / user master = lists_dir / "watch.txt" result = db.import_master_list(conn, user, master) if result["status"] == "ok": imported_paths.append(str(master)) print(f"{user}: {result}") if result.get("duplicates"): print(f"{user} duplicates:") for dup in result["duplicates"]: print(f" {dup}") if imported_paths: print("Imported lists:") for path in imported_paths: print(f" {path}") conn.commit() def cmd_validate_import(_: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: for entry in configs["users"]: user = entry["name"] lists_dir = Path(configs["global"]["lists-dir"]) / user master = lists_dir / "watch.txt" list_sets = parse_list_file(master) rows = db.get_links_by_user(conn, user) db_enabled = set() db_disabled = set() for row in rows: norm = db.normalize_url(row["url_original"]) if row["enabled"] and not row["banned_at"]: db_enabled.add(norm) else: db_disabled.add(norm) missing_enabled = list_sets["enabled"] - db_enabled missing_disabled = list_sets["disabled"] - db_disabled extra_enabled = db_enabled - list_sets["enabled"] extra_disabled = db_disabled - list_sets["disabled"] print(f"{user}:") if missing_enabled: print(" Missing enabled in DB:") for url in sorted(missing_enabled): print(f" {url}") if missing_disabled: print(" Missing disabled in DB:") for url in sorted(missing_disabled): print(f" {url}") if extra_enabled: print(" Extra enabled in DB:") for url in sorted(extra_enabled): print(f" {url}") if extra_disabled: print(" Extra disabled in DB:") for url in sorted(extra_disabled): print(f" {url}") if not any( [missing_enabled, missing_disabled, extra_enabled, extra_disabled] ): print(" OK") def _fzf_select(lines: list[str], multi: bool) -> list[str]: if not lines: return [] if shutil.which("fzf") is None: print("fzf not found.") return [] args = ["fzf"] if multi: args.append("--multi") proc = subprocess.run( args, input="\n".join(lines), text=True, capture_output=True, check=False, ) if proc.returncode != 0: return [] return [ln for ln in proc.stdout.splitlines() if ln.strip()] def _select_links(user: str, multi: bool, selector_filter: str) -> list[str]: with db.connect() as conn: rows = db.get_links(conn, users=[user], include_disabled=True, include_banned=True) links = [] for row in rows: enabled = bool(row["enabled"]) banned = bool(row["banned_at"]) if selector_filter == "enable" and enabled: continue if selector_filter == "disable" and not enabled: continue if selector_filter == "ban" and banned: continue if selector_filter == "unban" and not banned: continue links.append(row["url_original"]) return _fzf_select(links, multi=multi) def _apply_to_links(args: argparse.Namespace, fn, selector_filter: str) -> None: if args.url: with db.connect() as conn: ok = fn(conn, args.user, args.url) if ok: conn.commit() print("ok" if ok else "not found") return selections = _select_links(args.user, multi=True, selector_filter=selector_filter) if not selections: print("not found") return with db.connect() as conn: changed = 0 for url in selections: ok = fn(conn, args.user, url) if ok: changed += 1 if changed: conn.commit() print(f"ok ({changed})")