refractioning

This commit is contained in:
Danilo Reyes
2026-02-28 21:34:39 -06:00
parent 7a64034f8a
commit 81c2df84f7
6 changed files with 361 additions and 304 deletions

View File

@@ -3,260 +3,19 @@
from __future__ import annotations
import argparse
import os
import shutil
from pathlib import Path
import db
from functions import load_config_variables
def list_users(configs: dict) -> None:
for entry in configs["users"]:
print(entry["name"])
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def merge_dirs(src: Path, dst: Path) -> None:
for root, _, files in os.walk(src):
rel = Path(root).relative_to(src)
target_dir = dst / rel
target_dir.mkdir(parents=True, exist_ok=True)
for filename in files:
src_file = Path(root) / filename
dst_file = target_dir / filename
if dst_file.exists():
print(f"Skip existing file: {dst_file}")
continue
shutil.move(str(src_file), str(dst_file))
# Cleanup empty directories
for root, dirs, files in os.walk(src, topdown=False):
if not dirs and not files:
Path(root).rmdir()
def move_user_outputs(configs: dict, user_name: str, old_handle: str, new_handle: str) -> None:
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
if not user_cfg:
print(f"Unknown user: {user_name}")
return
base_dirs = [Path(user_cfg["download-dir"])]
for base in base_dirs:
old_path = base / old_handle
new_path = base / new_handle
if not old_path.exists():
print(f"Missing: {old_path}")
continue
if new_path.exists():
if not prompt_yes_no(
f"Merge contents from {old_path} into existing {new_path}?"
):
continue
merge_dirs(old_path, new_path)
else:
old_path.rename(new_path)
def cmd_add(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.add_link(
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
)
if result["status"] == "removed" and not args.assume_yes:
removed_at = result.get("removed_at", "unknown")
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
result = db.add_link(
conn, args.user, args.url, assume_yes=True, source="manual"
)
row = result.get("row")
if row and row["banned_at"]:
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
if row and not row["enabled"]:
print("Warning: link is disabled")
conn.commit()
print(result["status"])
def cmd_disable(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_enabled(conn, args.user, args.url, enabled=False)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_enable(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_enabled(conn, args.user, args.url, enabled=True)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_ban(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_unban(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_banned(conn, args.user, args.url, banned=False)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_remove(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.remove_link(conn, args.user, args.url)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_rename(args: argparse.Namespace) -> None:
with db.connect() as conn:
result = db.rename_link(conn, args.user, args.old_url, args.new_url)
if result["status"] == "renamed":
conn.commit()
print(result["status"])
def cmd_list(args: argparse.Namespace) -> None:
users = args.user or None
with db.connect() as conn:
rows = db.get_links(
conn,
users=users,
include_disabled=args.disabled,
include_banned=args.banned,
)
for row in rows:
status = "enabled" if row["enabled"] else "disabled"
if row["banned_at"]:
status = "banned"
print(f"{row['user_name']} [{status}] {row['url_original']}")
def cmd_users(args: argparse.Namespace) -> None:
configs = load_config_variables()
list_users(configs)
def cmd_import(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
imported_paths = []
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
result = db.import_master_list(conn, user, master)
if result["status"] == "ok":
imported_paths.append(str(master))
print(f"{user}: {result}")
if result.get("duplicates"):
print(f"{user} duplicates:")
for dup in result["duplicates"]:
print(f" {dup}")
if imported_paths:
print("Imported lists:")
for path in imported_paths:
print(f" {path}")
conn.commit()
def parse_list_file(path: Path) -> dict:
enabled: set[str] = set()
disabled: set[str] = set()
if not path.is_file():
return {"enabled": enabled, "disabled": disabled}
with open(path, "r", encoding="utf-8") as r_file:
for raw in r_file:
line = raw.strip()
if not line:
continue
if line.startswith("#"):
url = line.lstrip("#").strip()
if url:
disabled.add(db.normalize_url(url))
else:
enabled.add(db.normalize_url(line))
return {"enabled": enabled, "disabled": disabled}
def cmd_validate_import(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
list_sets = parse_list_file(master)
rows = db.get_links_by_user(conn, user)
db_enabled = set()
db_disabled = set()
for row in rows:
norm = db.normalize_url(row["url_original"])
if row["enabled"] and not row["banned_at"]:
db_enabled.add(norm)
else:
db_disabled.add(norm)
missing_enabled = list_sets["enabled"] - db_enabled
missing_disabled = list_sets["disabled"] - db_disabled
extra_enabled = db_enabled - list_sets["enabled"]
extra_disabled = db_disabled - list_sets["disabled"]
print(f"{user}:")
if missing_enabled:
print(" Missing enabled in DB:")
for url in sorted(missing_enabled):
print(f" {url}")
if missing_disabled:
print(" Missing disabled in DB:")
for url in sorted(missing_disabled):
print(f" {url}")
if extra_enabled:
print(" Extra enabled in DB:")
for url in sorted(extra_enabled):
print(f" {url}")
if extra_disabled:
print(" Extra disabled in DB:")
for url in sorted(extra_disabled):
print(f" {url}")
if not any([missing_enabled, missing_disabled, extra_enabled, extra_disabled]):
print(" OK")
def cmd_user_rename(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.bulk_rename_handle(
conn,
user_name=args.user,
site=args.site,
old_handle=args.old,
new_handle=args.new,
)
conn.commit()
print(result)
move_user_outputs(configs, args.user, args.old, args.new)
from admin_links import cmd_add
from admin_links import cmd_ban
from admin_links import cmd_disable
from admin_links import cmd_enable
from admin_links import cmd_import
from admin_links import cmd_list
from admin_links import cmd_remove
from admin_links import cmd_rename
from admin_links import cmd_unban
from admin_links import cmd_validate_import
from admin_users import cmd_user_rename
from admin_users import cmd_users
def build_parser() -> argparse.ArgumentParser:
@@ -313,6 +72,9 @@ def build_parser() -> argparse.ArgumentParser:
p_import = sub.add_parser("import")
p_import.set_defaults(func=cmd_import)
p_validate = sub.add_parser("validate-import")
p_validate.set_defaults(func=cmd_validate_import)
p_user_rename = sub.add_parser("user-rename")
p_user_rename.add_argument("user")
p_user_rename.add_argument("site")
@@ -320,9 +82,6 @@ def build_parser() -> argparse.ArgumentParser:
p_user_rename.add_argument("new")
p_user_rename.set_defaults(func=cmd_user_rename)
p_validate = sub.add_parser("validate-import")
p_validate.set_defaults(func=cmd_validate_import)
return parser