Files
scripts/src/download/admin_links.py
2026-02-28 23:33:06 -06:00

284 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""Admin CLI: link operations."""
from __future__ import annotations
import argparse
import shutil
import subprocess
from pathlib import Path
import db
from functions import load_config_variables
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def parse_list_file(path: Path) -> dict:
enabled: set[str] = set()
disabled: set[str] = set()
if not path.is_file():
return {"enabled": enabled, "disabled": disabled}
with open(path, "r", encoding="utf-8") as r_file:
for raw in r_file:
line = raw.strip()
if not line:
continue
if line.startswith("#"):
url = line.lstrip("#").strip()
if url:
disabled.add(db.normalize_url(url))
continue
enabled.add(db.normalize_url(line))
return {"enabled": enabled, "disabled": disabled}
def cmd_add(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.add_link(
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
)
if result["status"] == "removed" and not args.assume_yes:
removed_at = result.get("removed_at", "unknown")
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
result = db.add_link(
conn, args.user, args.url, assume_yes=True, source="manual"
)
row = result.get("row")
if row and row["banned_at"]:
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
if row and not row["enabled"]:
print("Warning: link is disabled")
conn.commit()
print(result["status"])
def cmd_disable(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=False),
selector_filter="disable",
)
def cmd_enable(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=True),
selector_filter="enable",
)
def cmd_ban(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_banned(
conn, user, url, banned=True, reason=args.reason
),
selector_filter="ban",
)
def cmd_unban(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_banned(conn, user, url, banned=False),
selector_filter="unban",
)
def cmd_remove(args: argparse.Namespace) -> None:
_apply_to_links(args, lambda conn, user, url: db.remove_link(conn, user, url), "any")
def cmd_rename(args: argparse.Namespace) -> None:
old_url = args.old_url
if not old_url:
selection = _select_links(args.user, multi=False, selector_filter="any")
if not selection:
print("not found")
return
old_url = selection[0]
new_url = args.new_url or input("New URL: ").strip()
with db.connect() as conn:
result = db.rename_link(conn, args.user, old_url, new_url)
if result["status"] == "renamed":
conn.commit()
print(result["status"])
def cmd_list(args: argparse.Namespace) -> None:
users = args.user or None
with db.connect() as conn:
rows = db.get_links(
conn,
users=users,
include_disabled=args.disabled,
include_banned=args.banned,
requires_revision_only=args.requires_revision,
)
for row in rows:
if args.disabled and row["enabled"]:
continue
if args.banned and not row["banned_at"]:
continue
status = "enabled" if row["enabled"] else "disabled"
if row["banned_at"]:
status = "banned"
print(f"{row['user_name']} [{status}] {row['url_original']}")
def cmd_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
imported_paths = []
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
result = db.import_master_list(conn, user, master)
if result["status"] == "ok":
imported_paths.append(str(master))
print(f"{user}: {result}")
if result.get("duplicates"):
print(f"{user} duplicates:")
for dup in result["duplicates"]:
print(f" {dup}")
if imported_paths:
print("Imported lists:")
for path in imported_paths:
print(f" {path}")
conn.commit()
def cmd_validate_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
list_sets = parse_list_file(master)
rows = db.get_links_by_user(conn, user)
db_enabled = set()
db_disabled = set()
for row in rows:
norm = db.normalize_url(row["url_original"])
if row["enabled"] and not row["banned_at"]:
db_enabled.add(norm)
else:
db_disabled.add(norm)
missing_enabled = list_sets["enabled"] - db_enabled
missing_disabled = list_sets["disabled"] - db_disabled
extra_enabled = db_enabled - list_sets["enabled"]
extra_disabled = db_disabled - list_sets["disabled"]
print(f"{user}:")
if missing_enabled:
print(" Missing enabled in DB:")
for url in sorted(missing_enabled):
print(f" {url}")
if missing_disabled:
print(" Missing disabled in DB:")
for url in sorted(missing_disabled):
print(f" {url}")
if extra_enabled:
print(" Extra enabled in DB:")
for url in sorted(extra_enabled):
print(f" {url}")
if extra_disabled:
print(" Extra disabled in DB:")
for url in sorted(extra_disabled):
print(f" {url}")
if not any(
[missing_enabled, missing_disabled, extra_enabled, extra_disabled]
):
print(" OK")
def cmd_fix_revision(_: argparse.Namespace) -> None:
with db.connect() as conn:
conn.execute(
"""
UPDATE links
SET requires_revision = 0
WHERE enabled = 1 OR banned_at IS NULL
"""
)
conn.commit()
print("ok")
def _fzf_select(lines: list[str], multi: bool) -> list[str]:
if not lines:
return []
if shutil.which("fzf") is None:
print("fzf not found.")
return []
args = ["fzf"]
if multi:
args.append("--multi")
proc = subprocess.run(
args,
input="\n".join(lines),
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
return []
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
def _select_links(user: str, multi: bool, selector_filter: str) -> list[str]:
with db.connect() as conn:
rows = db.get_links(conn, users=[user], include_disabled=True, include_banned=True)
links = []
for row in rows:
enabled = bool(row["enabled"])
banned = bool(row["banned_at"])
if selector_filter == "enable" and enabled:
continue
if selector_filter == "disable" and not enabled:
continue
if selector_filter == "ban" and banned:
continue
if selector_filter == "unban" and not banned:
continue
links.append(row["url_original"])
return _fzf_select(links, multi=multi)
def _apply_to_links(args: argparse.Namespace, fn, selector_filter: str) -> None:
if args.url:
with db.connect() as conn:
ok = fn(conn, args.user, args.url)
if ok:
conn.commit()
print("ok" if ok else "not found")
return
selections = _select_links(args.user, multi=True, selector_filter=selector_filter)
if not selections:
print("not found")
return
with db.connect() as conn:
changed = 0
for url in selections:
ok = fn(conn, args.user, url)
if ok:
changed += 1
if changed:
conn.commit()
print(f"ok ({changed})")