#!/usr/bin/env python3 """Administrative CLI for download link database.""" from __future__ import annotations import argparse import os import shutil from pathlib import Path import db from functions import load_config_variables def list_users(configs: dict) -> None: for entry in configs["users"]: print(entry["name"]) def prompt_yes_no(message: str) -> bool: while True: raw = input(f"{message} [y/n]: ").strip().lower() if raw in ("y", "yes"): return True if raw in ("n", "no"): return False def merge_dirs(src: Path, dst: Path) -> None: for root, _, files in os.walk(src): rel = Path(root).relative_to(src) target_dir = dst / rel target_dir.mkdir(parents=True, exist_ok=True) for filename in files: src_file = Path(root) / filename dst_file = target_dir / filename if dst_file.exists(): print(f"Skip existing file: {dst_file}") continue shutil.move(str(src_file), str(dst_file)) # Cleanup empty directories for root, dirs, files in os.walk(src, topdown=False): if not dirs and not files: Path(root).rmdir() def move_user_outputs(configs: dict, user_name: str, old_handle: str, new_handle: str) -> None: user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None) if not user_cfg: print(f"Unknown user: {user_name}") return base_dirs = [Path(user_cfg["download-dir"])] for base in base_dirs: old_path = base / old_handle new_path = base / new_handle if not old_path.exists(): print(f"Missing: {old_path}") continue if new_path.exists(): if not prompt_yes_no( f"Merge contents from {old_path} into existing {new_path}?" ): continue merge_dirs(old_path, new_path) else: old_path.rename(new_path) def cmd_add(args: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: result = db.add_link( conn, args.user, args.url, assume_yes=args.assume_yes, source="manual" ) if result["status"] == "removed" and not args.assume_yes: removed_at = result.get("removed_at", "unknown") if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"): result = db.add_link( conn, args.user, args.url, assume_yes=True, source="manual" ) row = result.get("row") if row and row["banned_at"]: print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})") if row and not row["enabled"]: print("Warning: link is disabled") conn.commit() print(result["status"]) def cmd_disable(args: argparse.Namespace) -> None: with db.connect() as conn: ok = db.set_enabled(conn, args.user, args.url, enabled=False) if ok: conn.commit() print("ok" if ok else "not found") def cmd_enable(args: argparse.Namespace) -> None: with db.connect() as conn: ok = db.set_enabled(conn, args.user, args.url, enabled=True) if ok: conn.commit() print("ok" if ok else "not found") def cmd_ban(args: argparse.Namespace) -> None: with db.connect() as conn: ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason) if ok: conn.commit() print("ok" if ok else "not found") def cmd_unban(args: argparse.Namespace) -> None: with db.connect() as conn: ok = db.set_banned(conn, args.user, args.url, banned=False) if ok: conn.commit() print("ok" if ok else "not found") def cmd_remove(args: argparse.Namespace) -> None: with db.connect() as conn: ok = db.remove_link(conn, args.user, args.url) if ok: conn.commit() print("ok" if ok else "not found") def cmd_rename(args: argparse.Namespace) -> None: with db.connect() as conn: result = db.rename_link(conn, args.user, args.old_url, args.new_url) if result["status"] == "renamed": conn.commit() print(result["status"]) def cmd_list(args: argparse.Namespace) -> None: users = args.user or None with db.connect() as conn: rows = db.get_links( conn, users=users, include_disabled=args.disabled, include_banned=args.banned, ) for row in rows: status = "enabled" if row["enabled"] else "disabled" if row["banned_at"]: status = "banned" print(f"{row['user_name']} [{status}] {row['url_original']}") def cmd_users(args: argparse.Namespace) -> None: configs = load_config_variables() list_users(configs) def cmd_import(args: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: imported_paths = [] for entry in configs["users"]: user = entry["name"] lists_dir = Path(configs["global"]["lists-dir"]) / user master = lists_dir / "watch.txt" result = db.import_master_list(conn, user, master) if result["status"] == "ok": imported_paths.append(str(master)) print(f"{user}: {result}") if result.get("duplicates"): print(f"{user} duplicates:") for dup in result["duplicates"]: print(f" {dup}") if imported_paths: print("Imported lists:") for path in imported_paths: print(f" {path}") conn.commit() def parse_list_file(path: Path) -> dict: enabled: set[str] = set() disabled: set[str] = set() if not path.is_file(): return {"enabled": enabled, "disabled": disabled} with open(path, "r", encoding="utf-8") as r_file: for raw in r_file: line = raw.strip() if not line: continue if line.startswith("#"): url = line.lstrip("#").strip() if url: disabled.add(db.normalize_url(url)) else: enabled.add(db.normalize_url(line)) return {"enabled": enabled, "disabled": disabled} def cmd_validate_import(args: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: for entry in configs["users"]: user = entry["name"] lists_dir = Path(configs["global"]["lists-dir"]) / user master = lists_dir / "watch.txt" list_sets = parse_list_file(master) rows = db.get_links_by_user(conn, user) db_enabled = set() db_disabled = set() for row in rows: norm = db.normalize_url(row["url_original"]) if row["enabled"] and not row["banned_at"]: db_enabled.add(norm) else: db_disabled.add(norm) missing_enabled = list_sets["enabled"] - db_enabled missing_disabled = list_sets["disabled"] - db_disabled extra_enabled = db_enabled - list_sets["enabled"] extra_disabled = db_disabled - list_sets["disabled"] print(f"{user}:") if missing_enabled: print(" Missing enabled in DB:") for url in sorted(missing_enabled): print(f" {url}") if missing_disabled: print(" Missing disabled in DB:") for url in sorted(missing_disabled): print(f" {url}") if extra_enabled: print(" Extra enabled in DB:") for url in sorted(extra_enabled): print(f" {url}") if extra_disabled: print(" Extra disabled in DB:") for url in sorted(extra_disabled): print(f" {url}") if not any([missing_enabled, missing_disabled, extra_enabled, extra_disabled]): print(" OK") def cmd_user_rename(args: argparse.Namespace) -> None: configs = load_config_variables() with db.connect(configs) as conn: result = db.bulk_rename_handle( conn, user_name=args.user, site=args.site, old_handle=args.old, new_handle=args.new, ) conn.commit() print(result) move_user_outputs(configs, args.user, args.old, args.new) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="download-admin") sub = parser.add_subparsers(dest="cmd", required=True) p_add = sub.add_parser("add") p_add.add_argument("user") p_add.add_argument("url") p_add.add_argument("--assume-yes", action="store_true") p_add.set_defaults(func=cmd_add) p_disable = sub.add_parser("disable") p_disable.add_argument("user") p_disable.add_argument("url") p_disable.set_defaults(func=cmd_disable) p_enable = sub.add_parser("enable") p_enable.add_argument("user") p_enable.add_argument("url") p_enable.set_defaults(func=cmd_enable) p_ban = sub.add_parser("ban") p_ban.add_argument("user") p_ban.add_argument("url") p_ban.add_argument("--reason") p_ban.set_defaults(func=cmd_ban) p_unban = sub.add_parser("unban") p_unban.add_argument("user") p_unban.add_argument("url") p_unban.set_defaults(func=cmd_unban) p_remove = sub.add_parser("remove") p_remove.add_argument("user") p_remove.add_argument("url") p_remove.set_defaults(func=cmd_remove) p_rename = sub.add_parser("rename") p_rename.add_argument("user") p_rename.add_argument("old_url") p_rename.add_argument("new_url") p_rename.set_defaults(func=cmd_rename) p_list = sub.add_parser("list") p_list.add_argument("--user", action="append") p_list.add_argument("--disabled", action="store_true") p_list.add_argument("--banned", action="store_true") p_list.set_defaults(func=cmd_list) p_users = sub.add_parser("users") p_users.set_defaults(func=cmd_users) p_import = sub.add_parser("import") p_import.set_defaults(func=cmd_import) p_user_rename = sub.add_parser("user-rename") p_user_rename.add_argument("user") p_user_rename.add_argument("site") p_user_rename.add_argument("old") p_user_rename.add_argument("new") p_user_rename.set_defaults(func=cmd_user_rename) p_validate = sub.add_parser("validate-import") p_validate.set_defaults(func=cmd_validate_import) return parser def main() -> None: parser = build_parser() args = parser.parse_args() args.func(args) if __name__ == "__main__": main()