From 81c2df84f715cba35725be2231ea84c30a5f35b2 Mon Sep 17 00:00:00 2001 From: Danilo Reyes Date: Sat, 28 Feb 2026 21:34:39 -0600 Subject: [PATCH] refractioning --- src/download/admin.py | 271 ++---------------------------------- src/download/admin_links.py | 192 +++++++++++++++++++++++++ src/download/admin_users.py | 88 ++++++++++++ src/download/db.py | 1 - src/download/download.py | 111 ++++++++------- src/download/setup.cfg | 2 + 6 files changed, 361 insertions(+), 304 deletions(-) create mode 100644 src/download/admin_links.py create mode 100644 src/download/admin_users.py diff --git a/src/download/admin.py b/src/download/admin.py index e4c46e1..cfb8864 100644 --- a/src/download/admin.py +++ b/src/download/admin.py @@ -3,260 +3,19 @@ from __future__ import annotations import argparse -import os -import shutil -from pathlib import Path -import db -from functions import load_config_variables - - -def list_users(configs: dict) -> None: - for entry in configs["users"]: - print(entry["name"]) - - -def prompt_yes_no(message: str) -> bool: - while True: - raw = input(f"{message} [y/n]: ").strip().lower() - if raw in ("y", "yes"): - return True - if raw in ("n", "no"): - return False - - -def merge_dirs(src: Path, dst: Path) -> None: - for root, _, files in os.walk(src): - rel = Path(root).relative_to(src) - target_dir = dst / rel - target_dir.mkdir(parents=True, exist_ok=True) - for filename in files: - src_file = Path(root) / filename - dst_file = target_dir / filename - if dst_file.exists(): - print(f"Skip existing file: {dst_file}") - continue - shutil.move(str(src_file), str(dst_file)) - - # Cleanup empty directories - for root, dirs, files in os.walk(src, topdown=False): - if not dirs and not files: - Path(root).rmdir() - - -def move_user_outputs(configs: dict, user_name: str, old_handle: str, new_handle: str) -> None: - user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None) - if not user_cfg: - print(f"Unknown user: {user_name}") - return - - base_dirs = [Path(user_cfg["download-dir"])] - for base in base_dirs: - old_path = base / old_handle - new_path = base / new_handle - if not old_path.exists(): - print(f"Missing: {old_path}") - continue - if new_path.exists(): - if not prompt_yes_no( - f"Merge contents from {old_path} into existing {new_path}?" - ): - continue - merge_dirs(old_path, new_path) - else: - old_path.rename(new_path) - - -def cmd_add(args: argparse.Namespace) -> None: - configs = load_config_variables() - with db.connect(configs) as conn: - result = db.add_link( - conn, args.user, args.url, assume_yes=args.assume_yes, source="manual" - ) - if result["status"] == "removed" and not args.assume_yes: - removed_at = result.get("removed_at", "unknown") - if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"): - result = db.add_link( - conn, args.user, args.url, assume_yes=True, source="manual" - ) - row = result.get("row") - if row and row["banned_at"]: - print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})") - if row and not row["enabled"]: - print("Warning: link is disabled") - conn.commit() - print(result["status"]) - - -def cmd_disable(args: argparse.Namespace) -> None: - with db.connect() as conn: - ok = db.set_enabled(conn, args.user, args.url, enabled=False) - if ok: - conn.commit() - print("ok" if ok else "not found") - - -def cmd_enable(args: argparse.Namespace) -> None: - with db.connect() as conn: - ok = db.set_enabled(conn, args.user, args.url, enabled=True) - if ok: - conn.commit() - print("ok" if ok else "not found") - - -def cmd_ban(args: argparse.Namespace) -> None: - with db.connect() as conn: - ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason) - if ok: - conn.commit() - print("ok" if ok else "not found") - - -def cmd_unban(args: argparse.Namespace) -> None: - with db.connect() as conn: - ok = db.set_banned(conn, args.user, args.url, banned=False) - if ok: - conn.commit() - print("ok" if ok else "not found") - - -def cmd_remove(args: argparse.Namespace) -> None: - with db.connect() as conn: - ok = db.remove_link(conn, args.user, args.url) - if ok: - conn.commit() - print("ok" if ok else "not found") - - -def cmd_rename(args: argparse.Namespace) -> None: - with db.connect() as conn: - result = db.rename_link(conn, args.user, args.old_url, args.new_url) - if result["status"] == "renamed": - conn.commit() - print(result["status"]) - - -def cmd_list(args: argparse.Namespace) -> None: - users = args.user or None - with db.connect() as conn: - rows = db.get_links( - conn, - users=users, - include_disabled=args.disabled, - include_banned=args.banned, - ) - for row in rows: - status = "enabled" if row["enabled"] else "disabled" - if row["banned_at"]: - status = "banned" - print(f"{row['user_name']} [{status}] {row['url_original']}") - - -def cmd_users(args: argparse.Namespace) -> None: - configs = load_config_variables() - list_users(configs) - - -def cmd_import(args: argparse.Namespace) -> None: - configs = load_config_variables() - with db.connect(configs) as conn: - imported_paths = [] - for entry in configs["users"]: - user = entry["name"] - lists_dir = Path(configs["global"]["lists-dir"]) / user - master = lists_dir / "watch.txt" - result = db.import_master_list(conn, user, master) - if result["status"] == "ok": - imported_paths.append(str(master)) - print(f"{user}: {result}") - if result.get("duplicates"): - print(f"{user} duplicates:") - for dup in result["duplicates"]: - print(f" {dup}") - if imported_paths: - print("Imported lists:") - for path in imported_paths: - print(f" {path}") - conn.commit() - - -def parse_list_file(path: Path) -> dict: - enabled: set[str] = set() - disabled: set[str] = set() - if not path.is_file(): - return {"enabled": enabled, "disabled": disabled} - with open(path, "r", encoding="utf-8") as r_file: - for raw in r_file: - line = raw.strip() - if not line: - continue - if line.startswith("#"): - url = line.lstrip("#").strip() - if url: - disabled.add(db.normalize_url(url)) - else: - enabled.add(db.normalize_url(line)) - return {"enabled": enabled, "disabled": disabled} - - -def cmd_validate_import(args: argparse.Namespace) -> None: - configs = load_config_variables() - with db.connect(configs) as conn: - for entry in configs["users"]: - user = entry["name"] - lists_dir = Path(configs["global"]["lists-dir"]) / user - master = lists_dir / "watch.txt" - list_sets = parse_list_file(master) - - rows = db.get_links_by_user(conn, user) - db_enabled = set() - db_disabled = set() - for row in rows: - norm = db.normalize_url(row["url_original"]) - if row["enabled"] and not row["banned_at"]: - db_enabled.add(norm) - else: - db_disabled.add(norm) - - missing_enabled = list_sets["enabled"] - db_enabled - missing_disabled = list_sets["disabled"] - db_disabled - extra_enabled = db_enabled - list_sets["enabled"] - extra_disabled = db_disabled - list_sets["disabled"] - - print(f"{user}:") - if missing_enabled: - print(" Missing enabled in DB:") - for url in sorted(missing_enabled): - print(f" {url}") - if missing_disabled: - print(" Missing disabled in DB:") - for url in sorted(missing_disabled): - print(f" {url}") - if extra_enabled: - print(" Extra enabled in DB:") - for url in sorted(extra_enabled): - print(f" {url}") - if extra_disabled: - print(" Extra disabled in DB:") - for url in sorted(extra_disabled): - print(f" {url}") - if not any([missing_enabled, missing_disabled, extra_enabled, extra_disabled]): - print(" OK") - - -def cmd_user_rename(args: argparse.Namespace) -> None: - configs = load_config_variables() - with db.connect(configs) as conn: - result = db.bulk_rename_handle( - conn, - user_name=args.user, - site=args.site, - old_handle=args.old, - new_handle=args.new, - ) - conn.commit() - print(result) - move_user_outputs(configs, args.user, args.old, args.new) +from admin_links import cmd_add +from admin_links import cmd_ban +from admin_links import cmd_disable +from admin_links import cmd_enable +from admin_links import cmd_import +from admin_links import cmd_list +from admin_links import cmd_remove +from admin_links import cmd_rename +from admin_links import cmd_unban +from admin_links import cmd_validate_import +from admin_users import cmd_user_rename +from admin_users import cmd_users def build_parser() -> argparse.ArgumentParser: @@ -313,6 +72,9 @@ def build_parser() -> argparse.ArgumentParser: p_import = sub.add_parser("import") p_import.set_defaults(func=cmd_import) + p_validate = sub.add_parser("validate-import") + p_validate.set_defaults(func=cmd_validate_import) + p_user_rename = sub.add_parser("user-rename") p_user_rename.add_argument("user") p_user_rename.add_argument("site") @@ -320,9 +82,6 @@ def build_parser() -> argparse.ArgumentParser: p_user_rename.add_argument("new") p_user_rename.set_defaults(func=cmd_user_rename) - p_validate = sub.add_parser("validate-import") - p_validate.set_defaults(func=cmd_validate_import) - return parser diff --git a/src/download/admin_links.py b/src/download/admin_links.py new file mode 100644 index 0000000..ee443ec --- /dev/null +++ b/src/download/admin_links.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +"""Admin CLI: link operations.""" +from __future__ import annotations + +import argparse +from pathlib import Path + +import db +from functions import load_config_variables + + +def prompt_yes_no(message: str) -> bool: + while True: + raw = input(f"{message} [y/n]: ").strip().lower() + if raw in ("y", "yes"): + return True + if raw in ("n", "no"): + return False + + +def parse_list_file(path: Path) -> dict: + enabled: set[str] = set() + disabled: set[str] = set() + if not path.is_file(): + return {"enabled": enabled, "disabled": disabled} + with open(path, "r", encoding="utf-8") as r_file: + for raw in r_file: + line = raw.strip() + if not line: + continue + if line.startswith("#"): + url = line.lstrip("#").strip() + if url: + disabled.add(db.normalize_url(url)) + continue + enabled.add(db.normalize_url(line)) + return {"enabled": enabled, "disabled": disabled} + + +def cmd_add(args: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + result = db.add_link( + conn, args.user, args.url, assume_yes=args.assume_yes, source="manual" + ) + if result["status"] == "removed" and not args.assume_yes: + removed_at = result.get("removed_at", "unknown") + if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"): + result = db.add_link( + conn, args.user, args.url, assume_yes=True, source="manual" + ) + row = result.get("row") + if row and row["banned_at"]: + print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})") + if row and not row["enabled"]: + print("Warning: link is disabled") + conn.commit() + print(result["status"]) + + +def cmd_disable(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_enabled(conn, args.user, args.url, enabled=False) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_enable(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_enabled(conn, args.user, args.url, enabled=True) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_ban(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_unban(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_banned(conn, args.user, args.url, banned=False) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_remove(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.remove_link(conn, args.user, args.url) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_rename(args: argparse.Namespace) -> None: + with db.connect() as conn: + result = db.rename_link(conn, args.user, args.old_url, args.new_url) + if result["status"] == "renamed": + conn.commit() + print(result["status"]) + + +def cmd_list(args: argparse.Namespace) -> None: + users = args.user or None + with db.connect() as conn: + rows = db.get_links( + conn, + users=users, + include_disabled=args.disabled, + include_banned=args.banned, + ) + for row in rows: + status = "enabled" if row["enabled"] else "disabled" + if row["banned_at"]: + status = "banned" + print(f"{row['user_name']} [{status}] {row['url_original']}") + + +def cmd_import(_: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + imported_paths = [] + for entry in configs["users"]: + user = entry["name"] + lists_dir = Path(configs["global"]["lists-dir"]) / user + master = lists_dir / "watch.txt" + result = db.import_master_list(conn, user, master) + if result["status"] == "ok": + imported_paths.append(str(master)) + print(f"{user}: {result}") + if result.get("duplicates"): + print(f"{user} duplicates:") + for dup in result["duplicates"]: + print(f" {dup}") + if imported_paths: + print("Imported lists:") + for path in imported_paths: + print(f" {path}") + conn.commit() + + +def cmd_validate_import(_: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + for entry in configs["users"]: + user = entry["name"] + lists_dir = Path(configs["global"]["lists-dir"]) / user + master = lists_dir / "watch.txt" + list_sets = parse_list_file(master) + + rows = db.get_links_by_user(conn, user) + db_enabled = set() + db_disabled = set() + for row in rows: + norm = db.normalize_url(row["url_original"]) + if row["enabled"] and not row["banned_at"]: + db_enabled.add(norm) + else: + db_disabled.add(norm) + + missing_enabled = list_sets["enabled"] - db_enabled + missing_disabled = list_sets["disabled"] - db_disabled + extra_enabled = db_enabled - list_sets["enabled"] + extra_disabled = db_disabled - list_sets["disabled"] + + print(f"{user}:") + if missing_enabled: + print(" Missing enabled in DB:") + for url in sorted(missing_enabled): + print(f" {url}") + if missing_disabled: + print(" Missing disabled in DB:") + for url in sorted(missing_disabled): + print(f" {url}") + if extra_enabled: + print(" Extra enabled in DB:") + for url in sorted(extra_enabled): + print(f" {url}") + if extra_disabled: + print(" Extra disabled in DB:") + for url in sorted(extra_disabled): + print(f" {url}") + if not any( + [missing_enabled, missing_disabled, extra_enabled, extra_disabled] + ): + print(" OK") diff --git a/src/download/admin_users.py b/src/download/admin_users.py new file mode 100644 index 0000000..84b4d98 --- /dev/null +++ b/src/download/admin_users.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +"""Admin CLI: user operations.""" +from __future__ import annotations + +import argparse +import os +import shutil +from pathlib import Path + +import db +from functions import load_config_variables + + +def list_users(configs: dict) -> None: + for entry in configs["users"]: + print(entry["name"]) + + +def prompt_yes_no(message: str) -> bool: + while True: + raw = input(f"{message} [y/n]: ").strip().lower() + if raw in ("y", "yes"): + return True + if raw in ("n", "no"): + return False + + +def merge_dirs(src: Path, dst: Path) -> None: + for root, _, files in os.walk(src): + rel = Path(root).relative_to(src) + target_dir = dst / rel + target_dir.mkdir(parents=True, exist_ok=True) + for filename in files: + src_file = Path(root) / filename + dst_file = target_dir / filename + if dst_file.exists(): + print(f"Skip existing file: {dst_file}") + continue + shutil.move(str(src_file), str(dst_file)) + + for root, dirs, files in os.walk(src, topdown=False): + if not dirs and not files: + Path(root).rmdir() + + +def move_user_outputs( + configs: dict, user_name: str, old_handle: str, new_handle: str +) -> None: + user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None) + if not user_cfg: + print(f"Unknown user: {user_name}") + return + + base_dirs = [Path(user_cfg["download-dir"])] + for base in base_dirs: + old_path = base / old_handle + new_path = base / new_handle + if not old_path.exists(): + print(f"Missing: {old_path}") + continue + if not new_path.exists(): + old_path.rename(new_path) + continue + if not prompt_yes_no( + f"Merge contents from {old_path} into existing {new_path}?" + ): + continue + merge_dirs(old_path, new_path) + + +def cmd_users(_: argparse.Namespace) -> None: + configs = load_config_variables() + list_users(configs) + + +def cmd_user_rename(args: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + result = db.bulk_rename_handle( + conn, + user_name=args.user, + site=args.site, + old_handle=args.old, + new_handle=args.new, + ) + conn.commit() + print(result) + move_user_outputs(configs, args.user, args.old, args.new) diff --git a/src/download/db.py b/src/download/db.py index 1e67f32..7e4c74a 100644 --- a/src/download/db.py +++ b/src/download/db.py @@ -146,7 +146,6 @@ def add_link( "SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?", (user_name, url_norm), ).fetchone() - if tombstone and not assume_yes and source != "push": return {"status": "removed", "removed_at": tombstone["removed_at"]} diff --git a/src/download/download.py b/src/download/download.py index 32104de..d641039 100644 --- a/src/download/download.py +++ b/src/download/download.py @@ -95,6 +95,66 @@ def parse_instagram(link: str, post_type: list[str] | str | None = None) -> list return ["-o", f"include={use_type}"] +def _comic_skip_arg(link: str, flag_skip: bool) -> str: + if not flag_skip: + return "" + if re.search(r"readcomiconline", link): + return " --chapter-range 1" + if re.search(r"manganato|mangahere|webtoons", link): + return " --chapter-range 1-5" + return "" + + +def _handle_gallery_link(user: User, link: str, args, conn) -> None: + add_res = db.add_link(conn, user.name, parse_link(link), assume_yes=True, source="push") + row = add_res.get("row") + if row and row["banned_at"]: + LOG.warning("Link is banned, skipping: %s", link) + return + if row and not row["enabled"]: + LOG.warning("Link is disabled, skipping: %s", link) + return + + gallery = Gallery() + gallery.archive = args.flag_archive + gallery.skip_arg = " -o skip=true" if not args.flag_skip else "" + gallery.link = parse_link(link) + gallery.dest = "download" + gallery.opt_args = parse_instagram(link) + gallery.generate_command(user) + gallery.run_command(args.flag_verbose) + + +def _handle_comic_link(link: str, args) -> None: + gallery = Gallery() + gallery.archive = args.flag_archive + gallery.skip_arg = _comic_skip_arg(link, args.flag_skip) + gallery.link = link + gallery.generate_command(is_comic=True) + gallery.run_command(args.flag_verbose) + save_comic(link) + + +def _handle_video_link(user: User, link: str, args) -> None: + video = Video() + video.use_archive = args.flag_archive + video.link = link + video.dest = str(user.directories["media"]) + video.database = str(user.dbs["media"]) + run(video_command(video), args.flag_verbose) + + +def _handle_other_link(user: User, link: str, args) -> None: + LOG.info("Other type of download %s", link) + gallery = Gallery() + gallery.archive = False + gallery.skip_arg = " -o directory='[]'" + gallery.link = link + gallery.dest = "push" + gallery.generate_command(user) + gallery.run_command(args.flag_verbose) + + def video_command(video: Video): """Filters and processes the required command to download videos""" command = ["yt-dlp"] @@ -266,61 +326,18 @@ def push_manager(user: User, links: list[str] | None = None) -> None: with db.connect() as conn: for link in links_galleries: - add_res = db.add_link( - conn, user.name, parse_link(link), assume_yes=True, source="push" - ) - row = add_res.get("row") - if row and row["banned_at"]: - LOG.warning("Link is banned, skipping: %s", link) - continue - if row and not row["enabled"]: - LOG.warning("Link is disabled, skipping: %s", link) - continue - - gallery = Gallery() - gallery.archive = args.flag_archive - gallery.skip_arg = " -o skip=true" if not args.flag_skip else "" - gallery.link = parse_link(link) - gallery.dest = "download" - gallery.opt_args = parse_instagram(link) - gallery.generate_command(user) - gallery.run_command(args.flag_verbose) + _handle_gallery_link(user, link, args, conn) conn.commit() for link in links_comics: - if args.flag_skip and re.search(r"readcomiconline", link): - skip_arg = " --chapter-range 1" - elif args.flag_skip and re.search(r"manganato|mangahere|webtoons", link): - skip_arg = " --chapter-range 1-5" - else: - skip_arg = "" - - gallery = Gallery() - gallery.archive = args.flag_archive - gallery.skip_arg = skip_arg - gallery.link = link - gallery.generate_command(is_comic=True) - gallery.run_command(args.flag_verbose) - save_comic(link) + _handle_comic_link(link, args) for link in links_videos: - video = Video() - video.use_archive = args.flag_archive - video.link = link - video.dest = str(user.directories["media"]) - video.database = str(user.dbs["media"]) - run(video_command(video), args.flag_verbose) + _handle_video_link(user, link, args) for link in links_other: - LOG.info("Other type of download %s", link) - gallery = Gallery() - gallery.archive = False - gallery.skip_arg = " -o directory='[]'" - gallery.link = link - gallery.dest = "push" - gallery.generate_command(user) - gallery.run_command(args.flag_verbose) + _handle_other_link(user, link, args) # Flush the push list, cleans all the contents with open(user.lists["push"], "w", encoding="utf-8") as w_file: diff --git a/src/download/setup.cfg b/src/download/setup.cfg index ab1832c..eb630aa 100644 --- a/src/download/setup.cfg +++ b/src/download/setup.cfg @@ -8,6 +8,8 @@ py_modules = argparser db admin + admin_links + admin_users classes.gallery classes.user