From da87b6f9d2e9563755bf391fd76694441d499590 Mon Sep 17 00:00:00 2001 From: Danilo Reyes Date: Sat, 28 Feb 2026 20:53:48 -0600 Subject: [PATCH] download-admin (sqlite db) init --- flake.nix | 11 + src/download/admin.py | 258 +++++++++++++++++++++ src/download/classes/user.py | 19 +- src/download/db.py | 418 +++++++++++++++++++++++++++++++++++ src/download/download.py | 47 ++-- src/download/setup.cfg | 4 +- 6 files changed, 729 insertions(+), 28 deletions(-) create mode 100644 src/download/admin.py create mode 100644 src/download/db.py diff --git a/flake.nix b/flake.nix index 89884bc..be8d6b4 100644 --- a/flake.nix +++ b/flake.nix @@ -126,5 +126,16 @@ ext = "py"; handler = scriptBin; }; + + apps.x86_64-linux = { + download = { + type = "app"; + program = "${pkgs.download}/bin/download"; + }; + download-admin = { + type = "app"; + program = "${pkgs.download}/bin/download-admin"; + }; + }; }; } diff --git a/src/download/admin.py b/src/download/admin.py new file mode 100644 index 0000000..f3ca382 --- /dev/null +++ b/src/download/admin.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +"""Administrative CLI for download link database.""" +from __future__ import annotations + +import argparse +import os +import shutil +from pathlib import Path + +import db +from functions import load_config_variables + + +def list_users(configs: dict) -> None: + for entry in configs["users"]: + print(entry["name"]) + + +def prompt_yes_no(message: str) -> bool: + while True: + raw = input(f"{message} [y/n]: ").strip().lower() + if raw in ("y", "yes"): + return True + if raw in ("n", "no"): + return False + + +def merge_dirs(src: Path, dst: Path) -> None: + for root, _, files in os.walk(src): + rel = Path(root).relative_to(src) + target_dir = dst / rel + target_dir.mkdir(parents=True, exist_ok=True) + for filename in files: + src_file = Path(root) / filename + dst_file = target_dir / filename + if dst_file.exists(): + print(f"Skip existing file: {dst_file}") + continue + shutil.move(str(src_file), str(dst_file)) + + # Cleanup empty directories + for root, dirs, files in os.walk(src, topdown=False): + if not dirs and not files: + Path(root).rmdir() + + +def move_user_outputs(configs: dict, user_name: str, old_handle: str, new_handle: str) -> None: + user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None) + if not user_cfg: + print(f"Unknown user: {user_name}") + return + + base_dirs = [Path(user_cfg["download-dir"])] + for base in base_dirs: + old_path = base / old_handle + new_path = base / new_handle + if not old_path.exists(): + print(f"Missing: {old_path}") + continue + if new_path.exists(): + if not prompt_yes_no( + f"Merge contents from {old_path} into existing {new_path}?" + ): + continue + merge_dirs(old_path, new_path) + else: + old_path.rename(new_path) + + +def cmd_add(args: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + result = db.add_link( + conn, args.user, args.url, assume_yes=args.assume_yes, source="manual" + ) + if result["status"] == "removed" and not args.assume_yes: + removed_at = result.get("removed_at", "unknown") + if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"): + result = db.add_link( + conn, args.user, args.url, assume_yes=True, source="manual" + ) + row = result.get("row") + if row and row["banned_at"]: + print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})") + if row and not row["enabled"]: + print("Warning: link is disabled") + conn.commit() + print(result["status"]) + + +def cmd_disable(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_enabled(conn, args.user, args.url, enabled=False) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_enable(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_enabled(conn, args.user, args.url, enabled=True) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_ban(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_unban(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.set_banned(conn, args.user, args.url, banned=False) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_remove(args: argparse.Namespace) -> None: + with db.connect() as conn: + ok = db.remove_link(conn, args.user, args.url) + if ok: + conn.commit() + print("ok" if ok else "not found") + + +def cmd_rename(args: argparse.Namespace) -> None: + with db.connect() as conn: + result = db.rename_link(conn, args.user, args.old_url, args.new_url) + if result["status"] == "renamed": + conn.commit() + print(result["status"]) + + +def cmd_list(args: argparse.Namespace) -> None: + users = args.user or None + with db.connect() as conn: + rows = db.get_links( + conn, + users=users, + include_disabled=args.disabled, + include_banned=args.banned, + ) + for row in rows: + status = "enabled" if row["enabled"] else "disabled" + if row["banned_at"]: + status = "banned" + print(f"{row['user_name']} [{status}] {row['url_original']}") + + +def cmd_users(args: argparse.Namespace) -> None: + configs = load_config_variables() + list_users(configs) + + +def cmd_import(args: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + for entry in configs["users"]: + user = entry["name"] + lists_dir = Path(configs["global"]["lists-dir"]) / user + master = lists_dir / "watch.txt" + result = db.import_master_list(conn, user, master) + print(f"{user}: {result}") + conn.commit() + + +def cmd_user_rename(args: argparse.Namespace) -> None: + configs = load_config_variables() + with db.connect(configs) as conn: + result = db.bulk_rename_handle( + conn, + user_name=args.user, + site=args.site, + old_handle=args.old, + new_handle=args.new, + ) + conn.commit() + print(result) + move_user_outputs(configs, args.user, args.old, args.new) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="download-admin") + sub = parser.add_subparsers(dest="cmd", required=True) + + p_add = sub.add_parser("add") + p_add.add_argument("user") + p_add.add_argument("url") + p_add.add_argument("--assume-yes", action="store_true") + p_add.set_defaults(func=cmd_add) + + p_disable = sub.add_parser("disable") + p_disable.add_argument("user") + p_disable.add_argument("url") + p_disable.set_defaults(func=cmd_disable) + + p_enable = sub.add_parser("enable") + p_enable.add_argument("user") + p_enable.add_argument("url") + p_enable.set_defaults(func=cmd_enable) + + p_ban = sub.add_parser("ban") + p_ban.add_argument("user") + p_ban.add_argument("url") + p_ban.add_argument("--reason") + p_ban.set_defaults(func=cmd_ban) + + p_unban = sub.add_parser("unban") + p_unban.add_argument("user") + p_unban.add_argument("url") + p_unban.set_defaults(func=cmd_unban) + + p_remove = sub.add_parser("remove") + p_remove.add_argument("user") + p_remove.add_argument("url") + p_remove.set_defaults(func=cmd_remove) + + p_rename = sub.add_parser("rename") + p_rename.add_argument("user") + p_rename.add_argument("old_url") + p_rename.add_argument("new_url") + p_rename.set_defaults(func=cmd_rename) + + p_list = sub.add_parser("list") + p_list.add_argument("--user", action="append") + p_list.add_argument("--disabled", action="store_true") + p_list.add_argument("--banned", action="store_true") + p_list.set_defaults(func=cmd_list) + + p_users = sub.add_parser("users") + p_users.set_defaults(func=cmd_users) + + p_import = sub.add_parser("import") + p_import.set_defaults(func=cmd_import) + + p_user_rename = sub.add_parser("user-rename") + p_user_rename.add_argument("user") + p_user_rename.add_argument("site") + p_user_rename.add_argument("old") + p_user_rename.add_argument("new") + p_user_rename.set_defaults(func=cmd_user_rename) + + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/src/download/classes/user.py b/src/download/classes/user.py index 14079af..9adc13e 100644 --- a/src/download/classes/user.py +++ b/src/download/classes/user.py @@ -8,6 +8,7 @@ from functions import validate_x_link from functions import parse_link from functions import clean_cache from functions import LOG +import db class User: @@ -83,8 +84,8 @@ class User: def list_manager(self) -> None: """Manage all the user list and create sub-lists""" self._create_directories() # Call the function to create necesary cache dirs - with open(self.lists["master"], "r", encoding="utf-8") as r_file: - master_content = list(map(lambda x: x.rstrip(), r_file)) + with db.connect() as conn: + master_content = db.get_active_links(conn, self.name) # Create temporary list files segmented per scrapper shuffle(master_content) @@ -94,12 +95,10 @@ class User: def save_link(self, link: str) -> None: """Checks the master list against a new link if unmatched, appends it to the end of the list""" - with open(self.lists["master"], "r", encoding="utf-8") as r_file: - links = r_file.read().lower() - - if parse_link(link).lower() in links: + with db.connect() as conn: + result = db.add_link(conn, self.name, parse_link(link), assume_yes=True) + conn.commit() + if result["status"] == "added": + LOG.info("New gallery, saving") + else: LOG.info("Gallery repeated, not saving") - return - - LOG.info("New gallery, saving") - self.append_list("master", parse_link(link)) diff --git a/src/download/db.py b/src/download/db.py new file mode 100644 index 0000000..4ea5f47 --- /dev/null +++ b/src/download/db.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +"""SQLite persistence for download links.""" +from __future__ import annotations + +import sqlite3 +from pathlib import Path +from typing import Iterable +from urllib.parse import urlsplit, urlunsplit + +from functions import LOG +from functions import load_config_variables + + +def get_db_path(configs: dict | None = None) -> Path: + """Return the database path for links.""" + cfg = configs or load_config_variables() + base = Path(cfg["global"]["databases-dir"]) + return base / "links.sqlite3" + + +def connect(configs: dict | None = None) -> sqlite3.Connection: + """Open a connection and ensure schema exists.""" + db_path = get_db_path(configs) + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + ensure_schema(conn) + return conn + + +def ensure_schema(conn: sqlite3.Connection) -> None: + """Create schema if missing.""" + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS links ( + id INTEGER PRIMARY KEY, + user_name TEXT NOT NULL, + url_original TEXT NOT NULL, + url_normalized TEXT NOT NULL, + site TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + disabled_at TEXT, + banned_at TEXT, + banned_reason TEXT + ); + + CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm + ON links (user_name, url_normalized); + + CREATE TABLE IF NOT EXISTS link_history ( + id INTEGER PRIMARY KEY, + link_id INTEGER, + user_name TEXT NOT NULL, + event TEXT NOT NULL, + old_url TEXT, + new_url TEXT, + note TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS link_tombstones ( + id INTEGER PRIMARY KEY, + user_name TEXT NOT NULL, + url_normalized TEXT NOT NULL, + url_original TEXT NOT NULL, + removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm + ON link_tombstones (user_name, url_normalized); + """ + ) + + +def normalize_url(url: str) -> str: + """Normalize URL for dedupe only.""" + raw = url.strip() + if "://" not in raw: + raw = f"https://{raw}" + + parts = urlsplit(raw) + scheme = "https" + host = (parts.hostname or "").lower() + if host.startswith("www."): + host = host[4:] + if host in ("twitter.com", "www.twitter.com"): + host = "x.com" + + path = parts.path.rstrip("/") + query = parts.query + return urlunsplit((scheme, host, path, query, "")) + + +def get_site(url: str) -> str: + """Return normalized host name.""" + raw = url.strip() + if "://" not in raw: + raw = f"https://{raw}" + host = (urlsplit(raw).hostname or "").lower() + if host.startswith("www."): + host = host[4:] + if host in ("twitter.com", "www.twitter.com"): + host = "x.com" + return host + + +def add_history( + conn: sqlite3.Connection, + user_name: str, + event: str, + link_id: int | None = None, + old_url: str | None = None, + new_url: str | None = None, + note: str | None = None, +) -> None: + conn.execute( + """ + INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note) + VALUES (?, ?, ?, ?, ?, ?) + """, + (link_id, user_name, event, old_url, new_url, note), + ) + + +def add_link( + conn: sqlite3.Connection, + user_name: str, + url_original: str, + assume_yes: bool = False, + source: str = "manual", +) -> dict: + """Add a link or return existing status.""" + url_norm = normalize_url(url_original) + site = get_site(url_original) + + row = conn.execute( + "SELECT * FROM links WHERE user_name = ? AND url_normalized = ?", + (user_name, url_norm), + ).fetchone() + if row: + return {"status": "exists", "row": row} + + tombstone = conn.execute( + "SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?", + (user_name, url_norm), + ).fetchone() + + if tombstone and not assume_yes and source != "push": + return {"status": "removed", "removed_at": tombstone["removed_at"]} + + cur = conn.execute( + """ + INSERT INTO links (user_name, url_original, url_normalized, site) + VALUES (?, ?, ?, ?) + """, + (user_name, url_original, url_norm, site), + ) + add_history( + conn, + user_name=user_name, + event="add", + link_id=cur.lastrowid, + new_url=url_original, + note=f"source={source}", + ) + return {"status": "added", "id": cur.lastrowid} + + +def set_enabled( + conn: sqlite3.Connection, + user_name: str, + url_original: str, + enabled: bool, +) -> bool: + url_norm = normalize_url(url_original) + row = conn.execute( + "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", + (user_name, url_norm), + ).fetchone() + if not row: + return False + if enabled: + conn.execute( + """ + UPDATE links + SET enabled = 1, disabled_at = NULL, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + (row["id"],), + ) + add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"]) + else: + conn.execute( + """ + UPDATE links + SET enabled = 0, disabled_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + (row["id"],), + ) + add_history(conn, user_name, "disable", link_id=row["id"], old_url=row["url_original"]) + return True + + +def set_banned( + conn: sqlite3.Connection, + user_name: str, + url_original: str, + banned: bool, + reason: str | None = None, +) -> bool: + url_norm = normalize_url(url_original) + row = conn.execute( + "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", + (user_name, url_norm), + ).fetchone() + if not row: + return False + if banned: + conn.execute( + """ + UPDATE links + SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + (reason, row["id"]), + ) + add_history( + conn, + user_name, + "ban", + link_id=row["id"], + old_url=row["url_original"], + note=reason, + ) + else: + conn.execute( + """ + UPDATE links + SET banned_at = NULL, banned_reason = NULL, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + (row["id"],), + ) + add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"]) + return True + + +def rename_link( + conn: sqlite3.Connection, + user_name: str, + old_url: str, + new_url: str, +) -> dict: + old_norm = normalize_url(old_url) + new_norm = normalize_url(new_url) + + row = conn.execute( + "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", + (user_name, old_norm), + ).fetchone() + if not row: + return {"status": "missing"} + + conflict = conn.execute( + "SELECT id FROM links WHERE user_name = ? AND url_normalized = ?", + (user_name, new_norm), + ).fetchone() + if conflict and conflict["id"] != row["id"]: + return {"status": "conflict"} + + conn.execute( + """ + UPDATE links + SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + (new_url, new_norm, get_site(new_url), row["id"]), + ) + add_history( + conn, + user_name, + "rename", + link_id=row["id"], + old_url=row["url_original"], + new_url=new_url, + ) + return {"status": "renamed"} + + +def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool: + url_norm = normalize_url(url_original) + row = conn.execute( + "SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?", + (user_name, url_norm), + ).fetchone() + if not row: + return False + + conn.execute( + """ + INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original) + VALUES (?, ?, ?) + """, + (user_name, url_norm, row["url_original"]), + ) + add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"]) + conn.execute("DELETE FROM links WHERE id = ?", (row["id"],)) + return True + + +def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]: + rows = conn.execute( + """ + SELECT url_original FROM links + WHERE user_name = ? + AND enabled = 1 + AND banned_at IS NULL + ORDER BY id ASC + """, + (user_name,), + ).fetchall() + return [row["url_original"] for row in rows] + + +def get_links( + conn: sqlite3.Connection, + users: Iterable[str] | None = None, + include_disabled: bool = False, + include_banned: bool = False, +) -> list[sqlite3.Row]: + params: list = [] + where = [] + user_list = list(users) if users else [] + if user_list: + where.append(f"user_name IN ({','.join(['?'] * len(user_list))})") + params.extend(user_list) + if not include_disabled: + where.append("enabled = 1") + if not include_banned: + where.append("banned_at IS NULL") + clause = " AND ".join(where) + if clause: + clause = "WHERE " + clause + return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall() + + +def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict: + if not path.is_file(): + return {"status": "missing", "path": str(path)} + with open(path, "r", encoding="utf-8") as r_file: + lines = [ln.strip() for ln in r_file if ln.strip()] + + added = 0 + exists = 0 + removed = 0 + for line in lines: + result = add_link(conn, user_name, line, assume_yes=True, source="import") + if result["status"] == "added": + added += 1 + elif result["status"] == "exists": + exists += 1 + elif result["status"] == "removed": + removed += 1 + return {"status": "ok", "added": added, "exists": exists, "removed": removed} + + +def bulk_rename_handle( + conn: sqlite3.Connection, + user_name: str, + site: str, + old_handle: str, + new_handle: str, +) -> dict: + """Rename account handle within a site for a user.""" + site_norm = site.lower().lstrip("www.") + if site_norm == "twitter.com": + site_norm = "x.com" + if site_norm == "www.twitter.com": + site_norm = "x.com" + + rows = conn.execute( + """ + SELECT id, url_original FROM links + WHERE user_name = ? AND site = ? + """, + (user_name, site_norm), + ).fetchall() + + updated = 0 + skipped = 0 + conflicts = 0 + for row in rows: + raw = row["url_original"] + parts = urlsplit(raw if "://" in raw else f"https://{raw}") + path = parts.path + segments = path.split("/") + if len(segments) < 2 or segments[1] != old_handle: + skipped += 1 + continue + segments[1] = new_handle + new_path = "/".join(segments) + new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment)) + result = rename_link(conn, user_name, raw, new_url) + if result["status"] == "renamed": + updated += 1 + elif result["status"] == "conflict": + conflicts += 1 + else: + skipped += 1 + + return {"updated": updated, "skipped": skipped, "conflicts": conflicts} + + +def warn(msg: str) -> None: + LOG.warning(msg) diff --git a/src/download/download.py b/src/download/download.py index 8e07c38..32104de 100644 --- a/src/download/download.py +++ b/src/download/download.py @@ -13,6 +13,7 @@ import re from pathlib import Path import argparse import yaml +import db from typing import Dict from functions import LOG from functions import run @@ -219,7 +220,7 @@ def save_comic(link: str) -> None: w_file.write(link + "\n") -def push_manager(user: User) -> None: +def push_manager(user: User, links: list[str] | None = None) -> None: """Filters out the URL to use the appropiate downloader""" args = get_args() # Creates an array which will store any links that should use youtube-dl @@ -250,8 +251,9 @@ def push_manager(user: User) -> None: rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate") rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato") - with open(user.lists["push"], "r", encoding="utf-8") as r_file: - links = list(map(lambda x: x.rstrip(), r_file)) + if links is None: + with open(user.lists["push"], "r", encoding="utf-8") as r_file: + links = list(map(lambda x: x.rstrip(), r_file)) links_galleries = filter(rgx_gallery.search, links) links_videos = filter(rgx_video.search, links) links_comics = filter(rgx_comic.search, links) @@ -262,16 +264,29 @@ def push_manager(user: User) -> None: links, ) - for link in links_galleries: - gallery = Gallery() - gallery.archive = args.flag_archive - gallery.skip_arg = " -o skip=true" if not args.flag_skip else "" - gallery.link = parse_link(link) - gallery.dest = "download" - gallery.opt_args = parse_instagram(link) - gallery.generate_command(user) - gallery.run_command(args.flag_verbose) - user.save_link(link) + with db.connect() as conn: + for link in links_galleries: + add_res = db.add_link( + conn, user.name, parse_link(link), assume_yes=True, source="push" + ) + row = add_res.get("row") + if row and row["banned_at"]: + LOG.warning("Link is banned, skipping: %s", link) + continue + if row and not row["enabled"]: + LOG.warning("Link is disabled, skipping: %s", link) + continue + + gallery = Gallery() + gallery.archive = args.flag_archive + gallery.skip_arg = " -o skip=true" if not args.flag_skip else "" + gallery.link = parse_link(link) + gallery.dest = "download" + gallery.opt_args = parse_instagram(link) + gallery.generate_command(user) + gallery.run_command(args.flag_verbose) + + conn.commit() for link in links_comics: if args.flag_skip and re.search(r"readcomiconline", link): @@ -350,10 +365,8 @@ def main(): elif args.link: is_admin = args.user in ("everyone", "jawz") user = User(get_index("jawz" if is_admin else args.user)) - for arg_link in [lnk for grp in args.link for lnk in grp]: - user.append_list("push", parse_link(arg_link)) - - push_manager(user) + links = [parse_link(lnk) for grp in args.link for lnk in grp] + push_manager(user, links=links) if __name__ == "__main__": diff --git a/src/download/setup.cfg b/src/download/setup.cfg index 3cbcbbb..ab1832c 100644 --- a/src/download/setup.cfg +++ b/src/download/setup.cfg @@ -6,10 +6,12 @@ py_modules = download functions argparser + db + admin classes.gallery classes.user [options.entry_points] console_scripts = download = download:main - + download-admin = admin:main