Compare commits

...

4 Commits

Author SHA1 Message Date
Danilo Reyes
81c2df84f7 refractioning 2026-02-28 21:34:39 -06:00
Danilo Reyes
7a64034f8a tests 2026-02-28 21:25:46 -06:00
Danilo Reyes
2ccdd713ea admin import/validate 2026-02-28 21:17:46 -06:00
Danilo Reyes
da87b6f9d2 download-admin (sqlite db) init 2026-02-28 20:53:48 -06:00
10 changed files with 1149 additions and 56 deletions

View File

@@ -126,5 +126,32 @@
ext = "py"; ext = "py";
handler = scriptBin; handler = scriptBin;
}; };
apps.x86_64-linux = {
download = {
type = "app";
program = "${pkgs.download}/bin/download";
};
download-admin = {
type = "app";
program = "${pkgs.download}/bin/download-admin";
};
download-tests = {
type = "app";
program = "${
pkgs.writeShellApplication {
name = "download-tests";
runtimeInputs = [
(pkgs.python3.withPackages (ps: [ ps.pyyaml ]))
];
text = ''
set -euo pipefail
export PYTHONPATH="${inputs.self}/src/download"
python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py"
'';
}
}/bin/download-tests";
};
};
}; };
} }

95
src/download/admin.py Normal file
View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""Administrative CLI for download link database."""
from __future__ import annotations
import argparse
from admin_links import cmd_add
from admin_links import cmd_ban
from admin_links import cmd_disable
from admin_links import cmd_enable
from admin_links import cmd_import
from admin_links import cmd_list
from admin_links import cmd_remove
from admin_links import cmd_rename
from admin_links import cmd_unban
from admin_links import cmd_validate_import
from admin_users import cmd_user_rename
from admin_users import cmd_users
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="download-admin")
sub = parser.add_subparsers(dest="cmd", required=True)
p_add = sub.add_parser("add")
p_add.add_argument("user")
p_add.add_argument("url")
p_add.add_argument("--assume-yes", action="store_true")
p_add.set_defaults(func=cmd_add)
p_disable = sub.add_parser("disable")
p_disable.add_argument("user")
p_disable.add_argument("url")
p_disable.set_defaults(func=cmd_disable)
p_enable = sub.add_parser("enable")
p_enable.add_argument("user")
p_enable.add_argument("url")
p_enable.set_defaults(func=cmd_enable)
p_ban = sub.add_parser("ban")
p_ban.add_argument("user")
p_ban.add_argument("url")
p_ban.add_argument("--reason")
p_ban.set_defaults(func=cmd_ban)
p_unban = sub.add_parser("unban")
p_unban.add_argument("user")
p_unban.add_argument("url")
p_unban.set_defaults(func=cmd_unban)
p_remove = sub.add_parser("remove")
p_remove.add_argument("user")
p_remove.add_argument("url")
p_remove.set_defaults(func=cmd_remove)
p_rename = sub.add_parser("rename")
p_rename.add_argument("user")
p_rename.add_argument("old_url")
p_rename.add_argument("new_url")
p_rename.set_defaults(func=cmd_rename)
p_list = sub.add_parser("list")
p_list.add_argument("--user", action="append")
p_list.add_argument("--disabled", action="store_true")
p_list.add_argument("--banned", action="store_true")
p_list.set_defaults(func=cmd_list)
p_users = sub.add_parser("users")
p_users.set_defaults(func=cmd_users)
p_import = sub.add_parser("import")
p_import.set_defaults(func=cmd_import)
p_validate = sub.add_parser("validate-import")
p_validate.set_defaults(func=cmd_validate_import)
p_user_rename = sub.add_parser("user-rename")
p_user_rename.add_argument("user")
p_user_rename.add_argument("site")
p_user_rename.add_argument("old")
p_user_rename.add_argument("new")
p_user_rename.set_defaults(func=cmd_user_rename)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

192
src/download/admin_links.py Normal file
View File

@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""Admin CLI: link operations."""
from __future__ import annotations
import argparse
from pathlib import Path
import db
from functions import load_config_variables
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def parse_list_file(path: Path) -> dict:
enabled: set[str] = set()
disabled: set[str] = set()
if not path.is_file():
return {"enabled": enabled, "disabled": disabled}
with open(path, "r", encoding="utf-8") as r_file:
for raw in r_file:
line = raw.strip()
if not line:
continue
if line.startswith("#"):
url = line.lstrip("#").strip()
if url:
disabled.add(db.normalize_url(url))
continue
enabled.add(db.normalize_url(line))
return {"enabled": enabled, "disabled": disabled}
def cmd_add(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.add_link(
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
)
if result["status"] == "removed" and not args.assume_yes:
removed_at = result.get("removed_at", "unknown")
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
result = db.add_link(
conn, args.user, args.url, assume_yes=True, source="manual"
)
row = result.get("row")
if row and row["banned_at"]:
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
if row and not row["enabled"]:
print("Warning: link is disabled")
conn.commit()
print(result["status"])
def cmd_disable(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_enabled(conn, args.user, args.url, enabled=False)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_enable(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_enabled(conn, args.user, args.url, enabled=True)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_ban(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_unban(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.set_banned(conn, args.user, args.url, banned=False)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_remove(args: argparse.Namespace) -> None:
with db.connect() as conn:
ok = db.remove_link(conn, args.user, args.url)
if ok:
conn.commit()
print("ok" if ok else "not found")
def cmd_rename(args: argparse.Namespace) -> None:
with db.connect() as conn:
result = db.rename_link(conn, args.user, args.old_url, args.new_url)
if result["status"] == "renamed":
conn.commit()
print(result["status"])
def cmd_list(args: argparse.Namespace) -> None:
users = args.user or None
with db.connect() as conn:
rows = db.get_links(
conn,
users=users,
include_disabled=args.disabled,
include_banned=args.banned,
)
for row in rows:
status = "enabled" if row["enabled"] else "disabled"
if row["banned_at"]:
status = "banned"
print(f"{row['user_name']} [{status}] {row['url_original']}")
def cmd_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
imported_paths = []
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
result = db.import_master_list(conn, user, master)
if result["status"] == "ok":
imported_paths.append(str(master))
print(f"{user}: {result}")
if result.get("duplicates"):
print(f"{user} duplicates:")
for dup in result["duplicates"]:
print(f" {dup}")
if imported_paths:
print("Imported lists:")
for path in imported_paths:
print(f" {path}")
conn.commit()
def cmd_validate_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
list_sets = parse_list_file(master)
rows = db.get_links_by_user(conn, user)
db_enabled = set()
db_disabled = set()
for row in rows:
norm = db.normalize_url(row["url_original"])
if row["enabled"] and not row["banned_at"]:
db_enabled.add(norm)
else:
db_disabled.add(norm)
missing_enabled = list_sets["enabled"] - db_enabled
missing_disabled = list_sets["disabled"] - db_disabled
extra_enabled = db_enabled - list_sets["enabled"]
extra_disabled = db_disabled - list_sets["disabled"]
print(f"{user}:")
if missing_enabled:
print(" Missing enabled in DB:")
for url in sorted(missing_enabled):
print(f" {url}")
if missing_disabled:
print(" Missing disabled in DB:")
for url in sorted(missing_disabled):
print(f" {url}")
if extra_enabled:
print(" Extra enabled in DB:")
for url in sorted(extra_enabled):
print(f" {url}")
if extra_disabled:
print(" Extra disabled in DB:")
for url in sorted(extra_disabled):
print(f" {url}")
if not any(
[missing_enabled, missing_disabled, extra_enabled, extra_disabled]
):
print(" OK")

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""Admin CLI: user operations."""
from __future__ import annotations
import argparse
import os
import shutil
from pathlib import Path
import db
from functions import load_config_variables
def list_users(configs: dict) -> None:
for entry in configs["users"]:
print(entry["name"])
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def merge_dirs(src: Path, dst: Path) -> None:
for root, _, files in os.walk(src):
rel = Path(root).relative_to(src)
target_dir = dst / rel
target_dir.mkdir(parents=True, exist_ok=True)
for filename in files:
src_file = Path(root) / filename
dst_file = target_dir / filename
if dst_file.exists():
print(f"Skip existing file: {dst_file}")
continue
shutil.move(str(src_file), str(dst_file))
for root, dirs, files in os.walk(src, topdown=False):
if not dirs and not files:
Path(root).rmdir()
def move_user_outputs(
configs: dict, user_name: str, old_handle: str, new_handle: str
) -> None:
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
if not user_cfg:
print(f"Unknown user: {user_name}")
return
base_dirs = [Path(user_cfg["download-dir"])]
for base in base_dirs:
old_path = base / old_handle
new_path = base / new_handle
if not old_path.exists():
print(f"Missing: {old_path}")
continue
if not new_path.exists():
old_path.rename(new_path)
continue
if not prompt_yes_no(
f"Merge contents from {old_path} into existing {new_path}?"
):
continue
merge_dirs(old_path, new_path)
def cmd_users(_: argparse.Namespace) -> None:
configs = load_config_variables()
list_users(configs)
def cmd_user_rename(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.bulk_rename_handle(
conn,
user_name=args.user,
site=args.site,
old_handle=args.old,
new_handle=args.new,
)
conn.commit()
print(result)
move_user_outputs(configs, args.user, args.old, args.new)

View File

@@ -8,6 +8,7 @@ from functions import validate_x_link
from functions import parse_link from functions import parse_link
from functions import clean_cache from functions import clean_cache
from functions import LOG from functions import LOG
import db
class User: class User:
@@ -83,8 +84,8 @@ class User:
def list_manager(self) -> None: def list_manager(self) -> None:
"""Manage all the user list and create sub-lists""" """Manage all the user list and create sub-lists"""
self._create_directories() # Call the function to create necesary cache dirs self._create_directories() # Call the function to create necesary cache dirs
with open(self.lists["master"], "r", encoding="utf-8") as r_file: with db.connect() as conn:
master_content = list(map(lambda x: x.rstrip(), r_file)) master_content = db.get_active_links(conn, self.name)
# Create temporary list files segmented per scrapper # Create temporary list files segmented per scrapper
shuffle(master_content) shuffle(master_content)
@@ -94,12 +95,10 @@ class User:
def save_link(self, link: str) -> None: def save_link(self, link: str) -> None:
"""Checks the master list against a new link """Checks the master list against a new link
if unmatched, appends it to the end of the list""" if unmatched, appends it to the end of the list"""
with open(self.lists["master"], "r", encoding="utf-8") as r_file: with db.connect() as conn:
links = r_file.read().lower() result = db.add_link(conn, self.name, parse_link(link), assume_yes=True)
conn.commit()
if parse_link(link).lower() in links: if result["status"] == "added":
LOG.info("Gallery repeated, not saving")
return
LOG.info("New gallery, saving") LOG.info("New gallery, saving")
self.append_list("master", parse_link(link)) else:
LOG.info("Gallery repeated, not saving")

442
src/download/db.py Normal file
View File

@@ -0,0 +1,442 @@
#!/usr/bin/env python3
"""SQLite persistence for download links."""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable
from urllib.parse import urlsplit, urlunsplit
from functions import LOG
from functions import load_config_variables
def get_db_path(configs: dict | None = None) -> Path:
"""Return the database path for links."""
cfg = configs or load_config_variables()
base = Path(cfg["global"]["databases-dir"])
return base / "links.sqlite3"
def connect(configs: dict | None = None) -> sqlite3.Connection:
"""Open a connection and ensure schema exists."""
db_path = get_db_path(configs)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
ensure_schema(conn)
return conn
def ensure_schema(conn: sqlite3.Connection) -> None:
"""Create schema if missing."""
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
url_original TEXT NOT NULL,
url_normalized TEXT NOT NULL,
site TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
disabled_at TEXT,
banned_at TEXT,
banned_reason TEXT
);
CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm
ON links (user_name, url_normalized);
CREATE TABLE IF NOT EXISTS link_history (
id INTEGER PRIMARY KEY,
link_id INTEGER,
user_name TEXT NOT NULL,
event TEXT NOT NULL,
old_url TEXT,
new_url TEXT,
note TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS link_tombstones (
id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
url_normalized TEXT NOT NULL,
url_original TEXT NOT NULL,
removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm
ON link_tombstones (user_name, url_normalized);
"""
)
def normalize_url(url: str) -> str:
"""Normalize URL for dedupe only."""
raw = url.strip()
if "://" not in raw:
raw = f"https://{raw}"
parts = urlsplit(raw)
scheme = "https"
host = (parts.hostname or "").lower()
if host.startswith("www."):
host = host[4:]
if host in ("twitter.com", "www.twitter.com"):
host = "x.com"
path = parts.path.rstrip("/")
query = parts.query
return urlunsplit((scheme, host, path, query, ""))
def get_site(url: str) -> str:
"""Return normalized host name."""
raw = url.strip()
if "://" not in raw:
raw = f"https://{raw}"
host = (urlsplit(raw).hostname or "").lower()
if host.startswith("www."):
host = host[4:]
if host in ("twitter.com", "www.twitter.com"):
host = "x.com"
return host
def add_history(
conn: sqlite3.Connection,
user_name: str,
event: str,
link_id: int | None = None,
old_url: str | None = None,
new_url: str | None = None,
note: str | None = None,
) -> None:
conn.execute(
"""
INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note)
VALUES (?, ?, ?, ?, ?, ?)
""",
(link_id, user_name, event, old_url, new_url, note),
)
def add_link(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
assume_yes: bool = False,
source: str = "manual",
) -> dict:
"""Add a link or return existing status."""
url_norm = normalize_url(url_original)
site = get_site(url_original)
row = conn.execute(
"SELECT * FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if row:
return {"status": "exists", "row": row}
tombstone = conn.execute(
"SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if tombstone and not assume_yes and source != "push":
return {"status": "removed", "removed_at": tombstone["removed_at"]}
cur = conn.execute(
"""
INSERT INTO links (user_name, url_original, url_normalized, site)
VALUES (?, ?, ?, ?)
""",
(user_name, url_original, url_norm, site),
)
add_history(
conn,
user_name=user_name,
event="add",
link_id=cur.lastrowid,
new_url=url_original,
note=f"source={source}",
)
return {"status": "added", "id": cur.lastrowid}
def set_enabled(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
enabled: bool,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if enabled:
conn.execute(
"""
UPDATE links
SET enabled = 1, disabled_at = NULL, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"])
else:
conn.execute(
"""
UPDATE links
SET enabled = 0, disabled_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "disable", link_id=row["id"], old_url=row["url_original"])
return True
def set_banned(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
banned: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if banned:
conn.execute(
"""
UPDATE links
SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(reason, row["id"]),
)
add_history(
conn,
user_name,
"ban",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
else:
conn.execute(
"""
UPDATE links
SET banned_at = NULL, banned_reason = NULL, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"])
return True
def rename_link(
conn: sqlite3.Connection,
user_name: str,
old_url: str,
new_url: str,
) -> dict:
old_norm = normalize_url(old_url)
new_norm = normalize_url(new_url)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, old_norm),
).fetchone()
if not row:
return {"status": "missing"}
conflict = conn.execute(
"SELECT id FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, new_norm),
).fetchone()
if conflict and conflict["id"] != row["id"]:
return {"status": "conflict"}
conn.execute(
"""
UPDATE links
SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(new_url, new_norm, get_site(new_url), row["id"]),
)
add_history(
conn,
user_name,
"rename",
link_id=row["id"],
old_url=row["url_original"],
new_url=new_url,
)
return {"status": "renamed"}
def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
conn.execute(
"""
INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original)
VALUES (?, ?, ?)
""",
(user_name, url_norm, row["url_original"]),
)
add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"])
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
return True
def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]:
rows = conn.execute(
"""
SELECT url_original FROM links
WHERE user_name = ?
AND enabled = 1
AND banned_at IS NULL
ORDER BY id ASC
""",
(user_name,),
).fetchall()
return [row["url_original"] for row in rows]
def get_links(
conn: sqlite3.Connection,
users: Iterable[str] | None = None,
include_disabled: bool = False,
include_banned: bool = False,
) -> list[sqlite3.Row]:
params: list = []
where = []
user_list = list(users) if users else []
if user_list:
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
params.extend(user_list)
if not include_disabled:
where.append("enabled = 1")
if not include_banned:
where.append("banned_at IS NULL")
clause = " AND ".join(where)
if clause:
clause = "WHERE " + clause
return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall()
def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]:
return conn.execute(
"SELECT * FROM links WHERE user_name = ? ORDER BY id",
(user_name,),
).fetchall()
def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict:
if not path.is_file():
return {"status": "missing", "path": str(path)}
with open(path, "r", encoding="utf-8") as r_file:
lines = [ln.strip() for ln in r_file if ln.strip()]
added = 0
exists = 0
removed = 0
duplicates: list[str] = []
for line in lines:
disabled = False
raw = line
if raw.startswith("#"):
disabled = True
raw = raw.lstrip("#").strip()
if not raw:
continue
result = add_link(conn, user_name, raw, assume_yes=True, source="import")
if result["status"] == "added":
added += 1
if disabled:
set_enabled(conn, user_name, raw, enabled=False)
elif result["status"] == "exists":
exists += 1
duplicates.append(raw)
elif result["status"] == "removed":
removed += 1
return {
"status": "ok",
"added": added,
"exists": exists,
"removed": removed,
"duplicates": duplicates,
}
def bulk_rename_handle(
conn: sqlite3.Connection,
user_name: str,
site: str,
old_handle: str,
new_handle: str,
) -> dict:
"""Rename account handle within a site for a user."""
site_norm = site.lower().lstrip("www.")
if site_norm == "twitter.com":
site_norm = "x.com"
if site_norm == "www.twitter.com":
site_norm = "x.com"
rows = conn.execute(
"""
SELECT id, url_original FROM links
WHERE user_name = ? AND site = ?
""",
(user_name, site_norm),
).fetchall()
updated = 0
skipped = 0
conflicts = 0
for row in rows:
raw = row["url_original"]
parts = urlsplit(raw if "://" in raw else f"https://{raw}")
path = parts.path
segments = path.split("/")
if len(segments) < 2 or segments[1] != old_handle:
skipped += 1
continue
segments[1] = new_handle
new_path = "/".join(segments)
new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))
result = rename_link(conn, user_name, raw, new_url)
if result["status"] == "renamed":
updated += 1
elif result["status"] == "conflict":
conflicts += 1
else:
skipped += 1
return {"updated": updated, "skipped": skipped, "conflicts": conflicts}
def warn(msg: str) -> None:
LOG.warning(msg)

View File

@@ -13,6 +13,7 @@ import re
from pathlib import Path from pathlib import Path
import argparse import argparse
import yaml import yaml
import db
from typing import Dict from typing import Dict
from functions import LOG from functions import LOG
from functions import run from functions import run
@@ -94,6 +95,66 @@ def parse_instagram(link: str, post_type: list[str] | str | None = None) -> list
return ["-o", f"include={use_type}"] return ["-o", f"include={use_type}"]
def _comic_skip_arg(link: str, flag_skip: bool) -> str:
if not flag_skip:
return ""
if re.search(r"readcomiconline", link):
return " --chapter-range 1"
if re.search(r"manganato|mangahere|webtoons", link):
return " --chapter-range 1-5"
return ""
def _handle_gallery_link(user: User, link: str, args, conn) -> None:
add_res = db.add_link(conn, user.name, parse_link(link), assume_yes=True, source="push")
row = add_res.get("row")
if row and row["banned_at"]:
LOG.warning("Link is banned, skipping: %s", link)
return
if row and not row["enabled"]:
LOG.warning("Link is disabled, skipping: %s", link)
return
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
def _handle_comic_link(link: str, args) -> None:
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = _comic_skip_arg(link, args.flag_skip)
gallery.link = link
gallery.generate_command(is_comic=True)
gallery.run_command(args.flag_verbose)
save_comic(link)
def _handle_video_link(user: User, link: str, args) -> None:
video = Video()
video.use_archive = args.flag_archive
video.link = link
video.dest = str(user.directories["media"])
video.database = str(user.dbs["media"])
run(video_command(video), args.flag_verbose)
def _handle_other_link(user: User, link: str, args) -> None:
LOG.info("Other type of download %s", link)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
def video_command(video: Video): def video_command(video: Video):
"""Filters and processes the required command to download videos""" """Filters and processes the required command to download videos"""
command = ["yt-dlp"] command = ["yt-dlp"]
@@ -219,7 +280,7 @@ def save_comic(link: str) -> None:
w_file.write(link + "\n") w_file.write(link + "\n")
def push_manager(user: User) -> None: def push_manager(user: User, links: list[str] | None = None) -> None:
"""Filters out the URL to use the appropiate downloader""" """Filters out the URL to use the appropiate downloader"""
args = get_args() args = get_args()
# Creates an array which will store any links that should use youtube-dl # Creates an array which will store any links that should use youtube-dl
@@ -250,6 +311,7 @@ def push_manager(user: User) -> None:
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate") rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato") rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
if links is None:
with open(user.lists["push"], "r", encoding="utf-8") as r_file: with open(user.lists["push"], "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file)) links = list(map(lambda x: x.rstrip(), r_file))
links_galleries = filter(rgx_gallery.search, links) links_galleries = filter(rgx_gallery.search, links)
@@ -262,50 +324,20 @@ def push_manager(user: User) -> None:
links, links,
) )
with db.connect() as conn:
for link in links_galleries: for link in links_galleries:
gallery = Gallery() _handle_gallery_link(user, link, args, conn)
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else "" conn.commit()
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
user.save_link(link)
for link in links_comics: for link in links_comics:
if args.flag_skip and re.search(r"readcomiconline", link): _handle_comic_link(link, args)
skip_arg = " --chapter-range 1"
elif args.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
skip_arg = " --chapter-range 1-5"
else:
skip_arg = ""
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = skip_arg
gallery.link = link
gallery.generate_command(is_comic=True)
gallery.run_command(args.flag_verbose)
save_comic(link)
for link in links_videos: for link in links_videos:
video = Video() _handle_video_link(user, link, args)
video.use_archive = args.flag_archive
video.link = link
video.dest = str(user.directories["media"])
video.database = str(user.dbs["media"])
run(video_command(video), args.flag_verbose)
for link in links_other: for link in links_other:
LOG.info("Other type of download %s", link) _handle_other_link(user, link, args)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
# Flush the push list, cleans all the contents # Flush the push list, cleans all the contents
with open(user.lists["push"], "w", encoding="utf-8") as w_file: with open(user.lists["push"], "w", encoding="utf-8") as w_file:
@@ -350,10 +382,8 @@ def main():
elif args.link: elif args.link:
is_admin = args.user in ("everyone", "jawz") is_admin = args.user in ("everyone", "jawz")
user = User(get_index("jawz" if is_admin else args.user)) user = User(get_index("jawz" if is_admin else args.user))
for arg_link in [lnk for grp in args.link for lnk in grp]: links = [parse_link(lnk) for grp in args.link for lnk in grp]
user.append_list("push", parse_link(arg_link)) push_manager(user, links=links)
push_manager(user)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -6,10 +6,14 @@ py_modules =
download download
functions functions
argparser argparser
db
admin
admin_links
admin_users
classes.gallery classes.gallery
classes.user classes.user
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
download = download:main download = download:main
download-admin = admin:main

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
import tempfile
import unittest
import sqlite3
from pathlib import Path
import db
class TestDB(unittest.TestCase):
def setUp(self) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.row_factory = sqlite3.Row
db.ensure_schema(self.conn)
def tearDown(self) -> None:
self.conn.close()
def test_normalize_url(self):
self.assertEqual(
db.normalize_url("http://Twitter.com/User/"),
"https://x.com/User",
)
self.assertEqual(
db.normalize_url("x.com/SomeUser/media/"),
"https://x.com/SomeUser/media",
)
def test_add_link_dedupe(self):
res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res1["status"], "added")
self.assertEqual(res2["status"], "exists")
def test_remove_tombstone(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.remove_link(self.conn, "jawz", "https://x.com/Test")
self.assertTrue(ok)
res = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res["status"], "removed")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True)
self.assertEqual(res2["status"], "added")
def test_disable_and_ban(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False)
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad")
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
def test_import_master_list(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "watch.txt"
path.write_text(
"\n".join(
[
"https://x.com/User",
"# https://x.com/DisabledUser",
"https://x.com/User",
]
)
+ "\n",
encoding="utf-8",
)
result = db.import_master_list(self.conn, "jawz", path)
self.assertEqual(result["added"], 2)
self.assertEqual(result["exists"], 1)
rows = db.get_links_by_user(self.conn, "jawz")
by_norm = {db.normalize_url(r["url_original"]): r for r in rows}
self.assertTrue(by_norm["https://x.com/User"]["enabled"])
self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
import types
import unittest
from pathlib import Path
import tempfile
import download
class DummyArgs:
def __init__(self):
self.post_type = ["posts", "reels"]
self.flag_archive = True
self.flag_skip = True
self.flag_verbose = True
class DummyUser:
def __init__(self):
self.name = "jawz"
self.sleep = 0
self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")}
self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")}
self.lists = {"push": Path("/tmp/instant.txt")}
class TestDownload(unittest.TestCase):
def setUp(self) -> None:
download.ARGS = DummyArgs()
download.CONFIGS = {
"users": [{"name": "jawz"}],
"global": {},
"comic": {"comic-list": "/tmp/comic.txt"},
}
self.orig_gallery = download.Gallery
self.orig_video_command = download.video_command
self.orig_run = download.run
self.orig_db_connect = download.db.connect
self.orig_db_add_link = download.db.add_link
self.orig_save_comic = download.save_comic
def tearDown(self) -> None:
download.Gallery = self.orig_gallery
download.video_command = self.orig_video_command
download.run = self.orig_run
download.db.connect = self.orig_db_connect
download.db.add_link = self.orig_db_add_link
download.save_comic = self.orig_save_comic
def test_parse_instagram(self):
res = download.parse_instagram("https://instagram.com/user")
self.assertEqual(res, ["-o", "include=posts,reels"])
res2 = download.parse_instagram("https://x.com/user")
self.assertEqual(res2, [])
def test_video_command(self):
v = download.Video()
v.link = "https://youtu.be/abc"
v.dest = "/tmp"
cmd = download.video_command(v)
self.assertIn("yt-dlp", cmd[0])
self.assertIn("https://youtu.be/abc", cmd)
v2 = download.Video()
v2.link = "https://music.youtube.com/watch?v=xyz"
v2.dest = "/tmp"
v2.use_archive = False
cmd2 = download.video_command(v2)
self.assertIn("--audio-format", cmd2)
def test_push_manager_routing(self):
user = DummyUser()
captured = {"gallery": [], "video": [], "comic": [], "other": []}
def fake_generate(self, *args, **kwargs):
return None
def fake_run(self, *args, **kwargs):
link = getattr(self, "link", "")
if "mangadex" in link:
captured["comic"].append(link)
elif "x.com" in link:
captured["gallery"].append(link)
else:
captured["other"].append(link)
def fake_video_command(video):
captured["video"].append(video.link)
return ["echo", "ok"]
# Patch Gallery methods and video_command/run
class FakeGallery(self.orig_gallery):
def generate_command(self, *args, **kwargs):
return fake_generate(self, *args, **kwargs)
def run_command(self, *args, **kwargs):
return fake_run(self, *args, **kwargs)
download.Gallery = FakeGallery
download.video_command = fake_video_command
download.run = lambda *args, **kwargs: None
download.save_comic = lambda *_args, **_kwargs: None
links = [
"https://x.com/someuser",
"https://youtu.be/abc",
"https://mangadex.org/title/123",
"https://example.com/page",
]
# Disable DB write path for this test
class FakeConn:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def commit(self):
return None
download.db.connect = lambda *a, **k: FakeConn()
download.db.add_link = lambda *a, **k: {"status": "added"}
download.push_manager(user, links=links)
self.assertEqual(len(captured["gallery"]), 1)
self.assertEqual(len(captured["video"]), 1)
self.assertEqual(len(captured["comic"]), 1)
self.assertEqual(len(captured["other"]), 1)
# restore handled in tearDown
if __name__ == "__main__":
unittest.main()