download-admin (sqlite db) init
This commit is contained in:
258
src/download/admin.py
Normal file
258
src/download/admin.py
Normal file
@@ -0,0 +1,258 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Administrative CLI for download link database."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import db
|
||||
from functions import load_config_variables
|
||||
|
||||
|
||||
def list_users(configs: dict) -> None:
|
||||
for entry in configs["users"]:
|
||||
print(entry["name"])
|
||||
|
||||
|
||||
def prompt_yes_no(message: str) -> bool:
|
||||
while True:
|
||||
raw = input(f"{message} [y/n]: ").strip().lower()
|
||||
if raw in ("y", "yes"):
|
||||
return True
|
||||
if raw in ("n", "no"):
|
||||
return False
|
||||
|
||||
|
||||
def merge_dirs(src: Path, dst: Path) -> None:
|
||||
for root, _, files in os.walk(src):
|
||||
rel = Path(root).relative_to(src)
|
||||
target_dir = dst / rel
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
for filename in files:
|
||||
src_file = Path(root) / filename
|
||||
dst_file = target_dir / filename
|
||||
if dst_file.exists():
|
||||
print(f"Skip existing file: {dst_file}")
|
||||
continue
|
||||
shutil.move(str(src_file), str(dst_file))
|
||||
|
||||
# Cleanup empty directories
|
||||
for root, dirs, files in os.walk(src, topdown=False):
|
||||
if not dirs and not files:
|
||||
Path(root).rmdir()
|
||||
|
||||
|
||||
def move_user_outputs(configs: dict, user_name: str, old_handle: str, new_handle: str) -> None:
|
||||
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
|
||||
if not user_cfg:
|
||||
print(f"Unknown user: {user_name}")
|
||||
return
|
||||
|
||||
base_dirs = [Path(user_cfg["download-dir"])]
|
||||
for base in base_dirs:
|
||||
old_path = base / old_handle
|
||||
new_path = base / new_handle
|
||||
if not old_path.exists():
|
||||
print(f"Missing: {old_path}")
|
||||
continue
|
||||
if new_path.exists():
|
||||
if not prompt_yes_no(
|
||||
f"Merge contents from {old_path} into existing {new_path}?"
|
||||
):
|
||||
continue
|
||||
merge_dirs(old_path, new_path)
|
||||
else:
|
||||
old_path.rename(new_path)
|
||||
|
||||
|
||||
def cmd_add(args: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
result = db.add_link(
|
||||
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
|
||||
)
|
||||
if result["status"] == "removed" and not args.assume_yes:
|
||||
removed_at = result.get("removed_at", "unknown")
|
||||
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
|
||||
result = db.add_link(
|
||||
conn, args.user, args.url, assume_yes=True, source="manual"
|
||||
)
|
||||
row = result.get("row")
|
||||
if row and row["banned_at"]:
|
||||
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
|
||||
if row and not row["enabled"]:
|
||||
print("Warning: link is disabled")
|
||||
conn.commit()
|
||||
print(result["status"])
|
||||
|
||||
|
||||
def cmd_disable(args: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
ok = db.set_enabled(conn, args.user, args.url, enabled=False)
|
||||
if ok:
|
||||
conn.commit()
|
||||
print("ok" if ok else "not found")
|
||||
|
||||
|
||||
def cmd_enable(args: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
ok = db.set_enabled(conn, args.user, args.url, enabled=True)
|
||||
if ok:
|
||||
conn.commit()
|
||||
print("ok" if ok else "not found")
|
||||
|
||||
|
||||
def cmd_ban(args: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason)
|
||||
if ok:
|
||||
conn.commit()
|
||||
print("ok" if ok else "not found")
|
||||
|
||||
|
||||
def cmd_unban(args: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
ok = db.set_banned(conn, args.user, args.url, banned=False)
|
||||
if ok:
|
||||
conn.commit()
|
||||
print("ok" if ok else "not found")
|
||||
|
||||
|
||||
def cmd_remove(args: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
ok = db.remove_link(conn, args.user, args.url)
|
||||
if ok:
|
||||
conn.commit()
|
||||
print("ok" if ok else "not found")
|
||||
|
||||
|
||||
def cmd_rename(args: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
result = db.rename_link(conn, args.user, args.old_url, args.new_url)
|
||||
if result["status"] == "renamed":
|
||||
conn.commit()
|
||||
print(result["status"])
|
||||
|
||||
|
||||
def cmd_list(args: argparse.Namespace) -> None:
|
||||
users = args.user or None
|
||||
with db.connect() as conn:
|
||||
rows = db.get_links(
|
||||
conn,
|
||||
users=users,
|
||||
include_disabled=args.disabled,
|
||||
include_banned=args.banned,
|
||||
)
|
||||
for row in rows:
|
||||
status = "enabled" if row["enabled"] else "disabled"
|
||||
if row["banned_at"]:
|
||||
status = "banned"
|
||||
print(f"{row['user_name']} [{status}] {row['url_original']}")
|
||||
|
||||
|
||||
def cmd_users(args: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
list_users(configs)
|
||||
|
||||
|
||||
def cmd_import(args: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
for entry in configs["users"]:
|
||||
user = entry["name"]
|
||||
lists_dir = Path(configs["global"]["lists-dir"]) / user
|
||||
master = lists_dir / "watch.txt"
|
||||
result = db.import_master_list(conn, user, master)
|
||||
print(f"{user}: {result}")
|
||||
conn.commit()
|
||||
|
||||
|
||||
def cmd_user_rename(args: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
result = db.bulk_rename_handle(
|
||||
conn,
|
||||
user_name=args.user,
|
||||
site=args.site,
|
||||
old_handle=args.old,
|
||||
new_handle=args.new,
|
||||
)
|
||||
conn.commit()
|
||||
print(result)
|
||||
move_user_outputs(configs, args.user, args.old, args.new)
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(prog="download-admin")
|
||||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
p_add = sub.add_parser("add")
|
||||
p_add.add_argument("user")
|
||||
p_add.add_argument("url")
|
||||
p_add.add_argument("--assume-yes", action="store_true")
|
||||
p_add.set_defaults(func=cmd_add)
|
||||
|
||||
p_disable = sub.add_parser("disable")
|
||||
p_disable.add_argument("user")
|
||||
p_disable.add_argument("url")
|
||||
p_disable.set_defaults(func=cmd_disable)
|
||||
|
||||
p_enable = sub.add_parser("enable")
|
||||
p_enable.add_argument("user")
|
||||
p_enable.add_argument("url")
|
||||
p_enable.set_defaults(func=cmd_enable)
|
||||
|
||||
p_ban = sub.add_parser("ban")
|
||||
p_ban.add_argument("user")
|
||||
p_ban.add_argument("url")
|
||||
p_ban.add_argument("--reason")
|
||||
p_ban.set_defaults(func=cmd_ban)
|
||||
|
||||
p_unban = sub.add_parser("unban")
|
||||
p_unban.add_argument("user")
|
||||
p_unban.add_argument("url")
|
||||
p_unban.set_defaults(func=cmd_unban)
|
||||
|
||||
p_remove = sub.add_parser("remove")
|
||||
p_remove.add_argument("user")
|
||||
p_remove.add_argument("url")
|
||||
p_remove.set_defaults(func=cmd_remove)
|
||||
|
||||
p_rename = sub.add_parser("rename")
|
||||
p_rename.add_argument("user")
|
||||
p_rename.add_argument("old_url")
|
||||
p_rename.add_argument("new_url")
|
||||
p_rename.set_defaults(func=cmd_rename)
|
||||
|
||||
p_list = sub.add_parser("list")
|
||||
p_list.add_argument("--user", action="append")
|
||||
p_list.add_argument("--disabled", action="store_true")
|
||||
p_list.add_argument("--banned", action="store_true")
|
||||
p_list.set_defaults(func=cmd_list)
|
||||
|
||||
p_users = sub.add_parser("users")
|
||||
p_users.set_defaults(func=cmd_users)
|
||||
|
||||
p_import = sub.add_parser("import")
|
||||
p_import.set_defaults(func=cmd_import)
|
||||
|
||||
p_user_rename = sub.add_parser("user-rename")
|
||||
p_user_rename.add_argument("user")
|
||||
p_user_rename.add_argument("site")
|
||||
p_user_rename.add_argument("old")
|
||||
p_user_rename.add_argument("new")
|
||||
p_user_rename.set_defaults(func=cmd_user_rename)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user