Compare commits
4 Commits
ebb27daf0c
...
81c2df84f7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81c2df84f7 | ||
|
|
7a64034f8a | ||
|
|
2ccdd713ea | ||
|
|
da87b6f9d2 |
27
flake.nix
27
flake.nix
@@ -126,5 +126,32 @@
|
|||||||
ext = "py";
|
ext = "py";
|
||||||
handler = scriptBin;
|
handler = scriptBin;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
apps.x86_64-linux = {
|
||||||
|
download = {
|
||||||
|
type = "app";
|
||||||
|
program = "${pkgs.download}/bin/download";
|
||||||
|
};
|
||||||
|
download-admin = {
|
||||||
|
type = "app";
|
||||||
|
program = "${pkgs.download}/bin/download-admin";
|
||||||
|
};
|
||||||
|
download-tests = {
|
||||||
|
type = "app";
|
||||||
|
program = "${
|
||||||
|
pkgs.writeShellApplication {
|
||||||
|
name = "download-tests";
|
||||||
|
runtimeInputs = [
|
||||||
|
(pkgs.python3.withPackages (ps: [ ps.pyyaml ]))
|
||||||
|
];
|
||||||
|
text = ''
|
||||||
|
set -euo pipefail
|
||||||
|
export PYTHONPATH="${inputs.self}/src/download"
|
||||||
|
python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py"
|
||||||
|
'';
|
||||||
|
}
|
||||||
|
}/bin/download-tests";
|
||||||
|
};
|
||||||
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
95
src/download/admin.py
Normal file
95
src/download/admin.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Administrative CLI for download link database."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from admin_links import cmd_add
|
||||||
|
from admin_links import cmd_ban
|
||||||
|
from admin_links import cmd_disable
|
||||||
|
from admin_links import cmd_enable
|
||||||
|
from admin_links import cmd_import
|
||||||
|
from admin_links import cmd_list
|
||||||
|
from admin_links import cmd_remove
|
||||||
|
from admin_links import cmd_rename
|
||||||
|
from admin_links import cmd_unban
|
||||||
|
from admin_links import cmd_validate_import
|
||||||
|
from admin_users import cmd_user_rename
|
||||||
|
from admin_users import cmd_users
|
||||||
|
|
||||||
|
|
||||||
|
def build_parser() -> argparse.ArgumentParser:
|
||||||
|
parser = argparse.ArgumentParser(prog="download-admin")
|
||||||
|
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||||
|
|
||||||
|
p_add = sub.add_parser("add")
|
||||||
|
p_add.add_argument("user")
|
||||||
|
p_add.add_argument("url")
|
||||||
|
p_add.add_argument("--assume-yes", action="store_true")
|
||||||
|
p_add.set_defaults(func=cmd_add)
|
||||||
|
|
||||||
|
p_disable = sub.add_parser("disable")
|
||||||
|
p_disable.add_argument("user")
|
||||||
|
p_disable.add_argument("url")
|
||||||
|
p_disable.set_defaults(func=cmd_disable)
|
||||||
|
|
||||||
|
p_enable = sub.add_parser("enable")
|
||||||
|
p_enable.add_argument("user")
|
||||||
|
p_enable.add_argument("url")
|
||||||
|
p_enable.set_defaults(func=cmd_enable)
|
||||||
|
|
||||||
|
p_ban = sub.add_parser("ban")
|
||||||
|
p_ban.add_argument("user")
|
||||||
|
p_ban.add_argument("url")
|
||||||
|
p_ban.add_argument("--reason")
|
||||||
|
p_ban.set_defaults(func=cmd_ban)
|
||||||
|
|
||||||
|
p_unban = sub.add_parser("unban")
|
||||||
|
p_unban.add_argument("user")
|
||||||
|
p_unban.add_argument("url")
|
||||||
|
p_unban.set_defaults(func=cmd_unban)
|
||||||
|
|
||||||
|
p_remove = sub.add_parser("remove")
|
||||||
|
p_remove.add_argument("user")
|
||||||
|
p_remove.add_argument("url")
|
||||||
|
p_remove.set_defaults(func=cmd_remove)
|
||||||
|
|
||||||
|
p_rename = sub.add_parser("rename")
|
||||||
|
p_rename.add_argument("user")
|
||||||
|
p_rename.add_argument("old_url")
|
||||||
|
p_rename.add_argument("new_url")
|
||||||
|
p_rename.set_defaults(func=cmd_rename)
|
||||||
|
|
||||||
|
p_list = sub.add_parser("list")
|
||||||
|
p_list.add_argument("--user", action="append")
|
||||||
|
p_list.add_argument("--disabled", action="store_true")
|
||||||
|
p_list.add_argument("--banned", action="store_true")
|
||||||
|
p_list.set_defaults(func=cmd_list)
|
||||||
|
|
||||||
|
p_users = sub.add_parser("users")
|
||||||
|
p_users.set_defaults(func=cmd_users)
|
||||||
|
|
||||||
|
p_import = sub.add_parser("import")
|
||||||
|
p_import.set_defaults(func=cmd_import)
|
||||||
|
|
||||||
|
p_validate = sub.add_parser("validate-import")
|
||||||
|
p_validate.set_defaults(func=cmd_validate_import)
|
||||||
|
|
||||||
|
p_user_rename = sub.add_parser("user-rename")
|
||||||
|
p_user_rename.add_argument("user")
|
||||||
|
p_user_rename.add_argument("site")
|
||||||
|
p_user_rename.add_argument("old")
|
||||||
|
p_user_rename.add_argument("new")
|
||||||
|
p_user_rename.set_defaults(func=cmd_user_rename)
|
||||||
|
|
||||||
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = build_parser()
|
||||||
|
args = parser.parse_args()
|
||||||
|
args.func(args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
192
src/download/admin_links.py
Normal file
192
src/download/admin_links.py
Normal file
@@ -0,0 +1,192 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Admin CLI: link operations."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import db
|
||||||
|
from functions import load_config_variables
|
||||||
|
|
||||||
|
|
||||||
|
def prompt_yes_no(message: str) -> bool:
|
||||||
|
while True:
|
||||||
|
raw = input(f"{message} [y/n]: ").strip().lower()
|
||||||
|
if raw in ("y", "yes"):
|
||||||
|
return True
|
||||||
|
if raw in ("n", "no"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def parse_list_file(path: Path) -> dict:
|
||||||
|
enabled: set[str] = set()
|
||||||
|
disabled: set[str] = set()
|
||||||
|
if not path.is_file():
|
||||||
|
return {"enabled": enabled, "disabled": disabled}
|
||||||
|
with open(path, "r", encoding="utf-8") as r_file:
|
||||||
|
for raw in r_file:
|
||||||
|
line = raw.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
if line.startswith("#"):
|
||||||
|
url = line.lstrip("#").strip()
|
||||||
|
if url:
|
||||||
|
disabled.add(db.normalize_url(url))
|
||||||
|
continue
|
||||||
|
enabled.add(db.normalize_url(line))
|
||||||
|
return {"enabled": enabled, "disabled": disabled}
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_add(args: argparse.Namespace) -> None:
|
||||||
|
configs = load_config_variables()
|
||||||
|
with db.connect(configs) as conn:
|
||||||
|
result = db.add_link(
|
||||||
|
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
|
||||||
|
)
|
||||||
|
if result["status"] == "removed" and not args.assume_yes:
|
||||||
|
removed_at = result.get("removed_at", "unknown")
|
||||||
|
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
|
||||||
|
result = db.add_link(
|
||||||
|
conn, args.user, args.url, assume_yes=True, source="manual"
|
||||||
|
)
|
||||||
|
row = result.get("row")
|
||||||
|
if row and row["banned_at"]:
|
||||||
|
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
|
||||||
|
if row and not row["enabled"]:
|
||||||
|
print("Warning: link is disabled")
|
||||||
|
conn.commit()
|
||||||
|
print(result["status"])
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_disable(args: argparse.Namespace) -> None:
|
||||||
|
with db.connect() as conn:
|
||||||
|
ok = db.set_enabled(conn, args.user, args.url, enabled=False)
|
||||||
|
if ok:
|
||||||
|
conn.commit()
|
||||||
|
print("ok" if ok else "not found")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_enable(args: argparse.Namespace) -> None:
|
||||||
|
with db.connect() as conn:
|
||||||
|
ok = db.set_enabled(conn, args.user, args.url, enabled=True)
|
||||||
|
if ok:
|
||||||
|
conn.commit()
|
||||||
|
print("ok" if ok else "not found")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_ban(args: argparse.Namespace) -> None:
|
||||||
|
with db.connect() as conn:
|
||||||
|
ok = db.set_banned(conn, args.user, args.url, banned=True, reason=args.reason)
|
||||||
|
if ok:
|
||||||
|
conn.commit()
|
||||||
|
print("ok" if ok else "not found")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_unban(args: argparse.Namespace) -> None:
|
||||||
|
with db.connect() as conn:
|
||||||
|
ok = db.set_banned(conn, args.user, args.url, banned=False)
|
||||||
|
if ok:
|
||||||
|
conn.commit()
|
||||||
|
print("ok" if ok else "not found")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_remove(args: argparse.Namespace) -> None:
|
||||||
|
with db.connect() as conn:
|
||||||
|
ok = db.remove_link(conn, args.user, args.url)
|
||||||
|
if ok:
|
||||||
|
conn.commit()
|
||||||
|
print("ok" if ok else "not found")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_rename(args: argparse.Namespace) -> None:
|
||||||
|
with db.connect() as conn:
|
||||||
|
result = db.rename_link(conn, args.user, args.old_url, args.new_url)
|
||||||
|
if result["status"] == "renamed":
|
||||||
|
conn.commit()
|
||||||
|
print(result["status"])
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_list(args: argparse.Namespace) -> None:
|
||||||
|
users = args.user or None
|
||||||
|
with db.connect() as conn:
|
||||||
|
rows = db.get_links(
|
||||||
|
conn,
|
||||||
|
users=users,
|
||||||
|
include_disabled=args.disabled,
|
||||||
|
include_banned=args.banned,
|
||||||
|
)
|
||||||
|
for row in rows:
|
||||||
|
status = "enabled" if row["enabled"] else "disabled"
|
||||||
|
if row["banned_at"]:
|
||||||
|
status = "banned"
|
||||||
|
print(f"{row['user_name']} [{status}] {row['url_original']}")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_import(_: argparse.Namespace) -> None:
|
||||||
|
configs = load_config_variables()
|
||||||
|
with db.connect(configs) as conn:
|
||||||
|
imported_paths = []
|
||||||
|
for entry in configs["users"]:
|
||||||
|
user = entry["name"]
|
||||||
|
lists_dir = Path(configs["global"]["lists-dir"]) / user
|
||||||
|
master = lists_dir / "watch.txt"
|
||||||
|
result = db.import_master_list(conn, user, master)
|
||||||
|
if result["status"] == "ok":
|
||||||
|
imported_paths.append(str(master))
|
||||||
|
print(f"{user}: {result}")
|
||||||
|
if result.get("duplicates"):
|
||||||
|
print(f"{user} duplicates:")
|
||||||
|
for dup in result["duplicates"]:
|
||||||
|
print(f" {dup}")
|
||||||
|
if imported_paths:
|
||||||
|
print("Imported lists:")
|
||||||
|
for path in imported_paths:
|
||||||
|
print(f" {path}")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_validate_import(_: argparse.Namespace) -> None:
|
||||||
|
configs = load_config_variables()
|
||||||
|
with db.connect(configs) as conn:
|
||||||
|
for entry in configs["users"]:
|
||||||
|
user = entry["name"]
|
||||||
|
lists_dir = Path(configs["global"]["lists-dir"]) / user
|
||||||
|
master = lists_dir / "watch.txt"
|
||||||
|
list_sets = parse_list_file(master)
|
||||||
|
|
||||||
|
rows = db.get_links_by_user(conn, user)
|
||||||
|
db_enabled = set()
|
||||||
|
db_disabled = set()
|
||||||
|
for row in rows:
|
||||||
|
norm = db.normalize_url(row["url_original"])
|
||||||
|
if row["enabled"] and not row["banned_at"]:
|
||||||
|
db_enabled.add(norm)
|
||||||
|
else:
|
||||||
|
db_disabled.add(norm)
|
||||||
|
|
||||||
|
missing_enabled = list_sets["enabled"] - db_enabled
|
||||||
|
missing_disabled = list_sets["disabled"] - db_disabled
|
||||||
|
extra_enabled = db_enabled - list_sets["enabled"]
|
||||||
|
extra_disabled = db_disabled - list_sets["disabled"]
|
||||||
|
|
||||||
|
print(f"{user}:")
|
||||||
|
if missing_enabled:
|
||||||
|
print(" Missing enabled in DB:")
|
||||||
|
for url in sorted(missing_enabled):
|
||||||
|
print(f" {url}")
|
||||||
|
if missing_disabled:
|
||||||
|
print(" Missing disabled in DB:")
|
||||||
|
for url in sorted(missing_disabled):
|
||||||
|
print(f" {url}")
|
||||||
|
if extra_enabled:
|
||||||
|
print(" Extra enabled in DB:")
|
||||||
|
for url in sorted(extra_enabled):
|
||||||
|
print(f" {url}")
|
||||||
|
if extra_disabled:
|
||||||
|
print(" Extra disabled in DB:")
|
||||||
|
for url in sorted(extra_disabled):
|
||||||
|
print(f" {url}")
|
||||||
|
if not any(
|
||||||
|
[missing_enabled, missing_disabled, extra_enabled, extra_disabled]
|
||||||
|
):
|
||||||
|
print(" OK")
|
||||||
88
src/download/admin_users.py
Normal file
88
src/download/admin_users.py
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Admin CLI: user operations."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import db
|
||||||
|
from functions import load_config_variables
|
||||||
|
|
||||||
|
|
||||||
|
def list_users(configs: dict) -> None:
|
||||||
|
for entry in configs["users"]:
|
||||||
|
print(entry["name"])
|
||||||
|
|
||||||
|
|
||||||
|
def prompt_yes_no(message: str) -> bool:
|
||||||
|
while True:
|
||||||
|
raw = input(f"{message} [y/n]: ").strip().lower()
|
||||||
|
if raw in ("y", "yes"):
|
||||||
|
return True
|
||||||
|
if raw in ("n", "no"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def merge_dirs(src: Path, dst: Path) -> None:
|
||||||
|
for root, _, files in os.walk(src):
|
||||||
|
rel = Path(root).relative_to(src)
|
||||||
|
target_dir = dst / rel
|
||||||
|
target_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
for filename in files:
|
||||||
|
src_file = Path(root) / filename
|
||||||
|
dst_file = target_dir / filename
|
||||||
|
if dst_file.exists():
|
||||||
|
print(f"Skip existing file: {dst_file}")
|
||||||
|
continue
|
||||||
|
shutil.move(str(src_file), str(dst_file))
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(src, topdown=False):
|
||||||
|
if not dirs and not files:
|
||||||
|
Path(root).rmdir()
|
||||||
|
|
||||||
|
|
||||||
|
def move_user_outputs(
|
||||||
|
configs: dict, user_name: str, old_handle: str, new_handle: str
|
||||||
|
) -> None:
|
||||||
|
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
|
||||||
|
if not user_cfg:
|
||||||
|
print(f"Unknown user: {user_name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
base_dirs = [Path(user_cfg["download-dir"])]
|
||||||
|
for base in base_dirs:
|
||||||
|
old_path = base / old_handle
|
||||||
|
new_path = base / new_handle
|
||||||
|
if not old_path.exists():
|
||||||
|
print(f"Missing: {old_path}")
|
||||||
|
continue
|
||||||
|
if not new_path.exists():
|
||||||
|
old_path.rename(new_path)
|
||||||
|
continue
|
||||||
|
if not prompt_yes_no(
|
||||||
|
f"Merge contents from {old_path} into existing {new_path}?"
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
merge_dirs(old_path, new_path)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_users(_: argparse.Namespace) -> None:
|
||||||
|
configs = load_config_variables()
|
||||||
|
list_users(configs)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_user_rename(args: argparse.Namespace) -> None:
|
||||||
|
configs = load_config_variables()
|
||||||
|
with db.connect(configs) as conn:
|
||||||
|
result = db.bulk_rename_handle(
|
||||||
|
conn,
|
||||||
|
user_name=args.user,
|
||||||
|
site=args.site,
|
||||||
|
old_handle=args.old,
|
||||||
|
new_handle=args.new,
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
print(result)
|
||||||
|
move_user_outputs(configs, args.user, args.old, args.new)
|
||||||
@@ -8,6 +8,7 @@ from functions import validate_x_link
|
|||||||
from functions import parse_link
|
from functions import parse_link
|
||||||
from functions import clean_cache
|
from functions import clean_cache
|
||||||
from functions import LOG
|
from functions import LOG
|
||||||
|
import db
|
||||||
|
|
||||||
|
|
||||||
class User:
|
class User:
|
||||||
@@ -83,8 +84,8 @@ class User:
|
|||||||
def list_manager(self) -> None:
|
def list_manager(self) -> None:
|
||||||
"""Manage all the user list and create sub-lists"""
|
"""Manage all the user list and create sub-lists"""
|
||||||
self._create_directories() # Call the function to create necesary cache dirs
|
self._create_directories() # Call the function to create necesary cache dirs
|
||||||
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
|
with db.connect() as conn:
|
||||||
master_content = list(map(lambda x: x.rstrip(), r_file))
|
master_content = db.get_active_links(conn, self.name)
|
||||||
|
|
||||||
# Create temporary list files segmented per scrapper
|
# Create temporary list files segmented per scrapper
|
||||||
shuffle(master_content)
|
shuffle(master_content)
|
||||||
@@ -94,12 +95,10 @@ class User:
|
|||||||
def save_link(self, link: str) -> None:
|
def save_link(self, link: str) -> None:
|
||||||
"""Checks the master list against a new link
|
"""Checks the master list against a new link
|
||||||
if unmatched, appends it to the end of the list"""
|
if unmatched, appends it to the end of the list"""
|
||||||
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
|
with db.connect() as conn:
|
||||||
links = r_file.read().lower()
|
result = db.add_link(conn, self.name, parse_link(link), assume_yes=True)
|
||||||
|
conn.commit()
|
||||||
if parse_link(link).lower() in links:
|
if result["status"] == "added":
|
||||||
|
LOG.info("New gallery, saving")
|
||||||
|
else:
|
||||||
LOG.info("Gallery repeated, not saving")
|
LOG.info("Gallery repeated, not saving")
|
||||||
return
|
|
||||||
|
|
||||||
LOG.info("New gallery, saving")
|
|
||||||
self.append_list("master", parse_link(link))
|
|
||||||
|
|||||||
442
src/download/db.py
Normal file
442
src/download/db.py
Normal file
@@ -0,0 +1,442 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""SQLite persistence for download links."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterable
|
||||||
|
from urllib.parse import urlsplit, urlunsplit
|
||||||
|
|
||||||
|
from functions import LOG
|
||||||
|
from functions import load_config_variables
|
||||||
|
|
||||||
|
|
||||||
|
def get_db_path(configs: dict | None = None) -> Path:
|
||||||
|
"""Return the database path for links."""
|
||||||
|
cfg = configs or load_config_variables()
|
||||||
|
base = Path(cfg["global"]["databases-dir"])
|
||||||
|
return base / "links.sqlite3"
|
||||||
|
|
||||||
|
|
||||||
|
def connect(configs: dict | None = None) -> sqlite3.Connection:
|
||||||
|
"""Open a connection and ensure schema exists."""
|
||||||
|
db_path = get_db_path(configs)
|
||||||
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
ensure_schema(conn)
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_schema(conn: sqlite3.Connection) -> None:
|
||||||
|
"""Create schema if missing."""
|
||||||
|
conn.executescript(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS links (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
user_name TEXT NOT NULL,
|
||||||
|
url_original TEXT NOT NULL,
|
||||||
|
url_normalized TEXT NOT NULL,
|
||||||
|
site TEXT,
|
||||||
|
enabled INTEGER NOT NULL DEFAULT 1,
|
||||||
|
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
disabled_at TEXT,
|
||||||
|
banned_at TEXT,
|
||||||
|
banned_reason TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm
|
||||||
|
ON links (user_name, url_normalized);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS link_history (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
link_id INTEGER,
|
||||||
|
user_name TEXT NOT NULL,
|
||||||
|
event TEXT NOT NULL,
|
||||||
|
old_url TEXT,
|
||||||
|
new_url TEXT,
|
||||||
|
note TEXT,
|
||||||
|
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS link_tombstones (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
user_name TEXT NOT NULL,
|
||||||
|
url_normalized TEXT NOT NULL,
|
||||||
|
url_original TEXT NOT NULL,
|
||||||
|
removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm
|
||||||
|
ON link_tombstones (user_name, url_normalized);
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_url(url: str) -> str:
|
||||||
|
"""Normalize URL for dedupe only."""
|
||||||
|
raw = url.strip()
|
||||||
|
if "://" not in raw:
|
||||||
|
raw = f"https://{raw}"
|
||||||
|
|
||||||
|
parts = urlsplit(raw)
|
||||||
|
scheme = "https"
|
||||||
|
host = (parts.hostname or "").lower()
|
||||||
|
if host.startswith("www."):
|
||||||
|
host = host[4:]
|
||||||
|
if host in ("twitter.com", "www.twitter.com"):
|
||||||
|
host = "x.com"
|
||||||
|
|
||||||
|
path = parts.path.rstrip("/")
|
||||||
|
query = parts.query
|
||||||
|
return urlunsplit((scheme, host, path, query, ""))
|
||||||
|
|
||||||
|
|
||||||
|
def get_site(url: str) -> str:
|
||||||
|
"""Return normalized host name."""
|
||||||
|
raw = url.strip()
|
||||||
|
if "://" not in raw:
|
||||||
|
raw = f"https://{raw}"
|
||||||
|
host = (urlsplit(raw).hostname or "").lower()
|
||||||
|
if host.startswith("www."):
|
||||||
|
host = host[4:]
|
||||||
|
if host in ("twitter.com", "www.twitter.com"):
|
||||||
|
host = "x.com"
|
||||||
|
return host
|
||||||
|
|
||||||
|
|
||||||
|
def add_history(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
user_name: str,
|
||||||
|
event: str,
|
||||||
|
link_id: int | None = None,
|
||||||
|
old_url: str | None = None,
|
||||||
|
new_url: str | None = None,
|
||||||
|
note: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(link_id, user_name, event, old_url, new_url, note),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def add_link(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
user_name: str,
|
||||||
|
url_original: str,
|
||||||
|
assume_yes: bool = False,
|
||||||
|
source: str = "manual",
|
||||||
|
) -> dict:
|
||||||
|
"""Add a link or return existing status."""
|
||||||
|
url_norm = normalize_url(url_original)
|
||||||
|
site = get_site(url_original)
|
||||||
|
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT * FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, url_norm),
|
||||||
|
).fetchone()
|
||||||
|
if row:
|
||||||
|
return {"status": "exists", "row": row}
|
||||||
|
|
||||||
|
tombstone = conn.execute(
|
||||||
|
"SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, url_norm),
|
||||||
|
).fetchone()
|
||||||
|
if tombstone and not assume_yes and source != "push":
|
||||||
|
return {"status": "removed", "removed_at": tombstone["removed_at"]}
|
||||||
|
|
||||||
|
cur = conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO links (user_name, url_original, url_normalized, site)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(user_name, url_original, url_norm, site),
|
||||||
|
)
|
||||||
|
add_history(
|
||||||
|
conn,
|
||||||
|
user_name=user_name,
|
||||||
|
event="add",
|
||||||
|
link_id=cur.lastrowid,
|
||||||
|
new_url=url_original,
|
||||||
|
note=f"source={source}",
|
||||||
|
)
|
||||||
|
return {"status": "added", "id": cur.lastrowid}
|
||||||
|
|
||||||
|
|
||||||
|
def set_enabled(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
user_name: str,
|
||||||
|
url_original: str,
|
||||||
|
enabled: bool,
|
||||||
|
) -> bool:
|
||||||
|
url_norm = normalize_url(url_original)
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, url_norm),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return False
|
||||||
|
if enabled:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE links
|
||||||
|
SET enabled = 1, disabled_at = NULL, updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(row["id"],),
|
||||||
|
)
|
||||||
|
add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"])
|
||||||
|
else:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE links
|
||||||
|
SET enabled = 0, disabled_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(row["id"],),
|
||||||
|
)
|
||||||
|
add_history(conn, user_name, "disable", link_id=row["id"], old_url=row["url_original"])
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def set_banned(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
user_name: str,
|
||||||
|
url_original: str,
|
||||||
|
banned: bool,
|
||||||
|
reason: str | None = None,
|
||||||
|
) -> bool:
|
||||||
|
url_norm = normalize_url(url_original)
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, url_norm),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return False
|
||||||
|
if banned:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE links
|
||||||
|
SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(reason, row["id"]),
|
||||||
|
)
|
||||||
|
add_history(
|
||||||
|
conn,
|
||||||
|
user_name,
|
||||||
|
"ban",
|
||||||
|
link_id=row["id"],
|
||||||
|
old_url=row["url_original"],
|
||||||
|
note=reason,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE links
|
||||||
|
SET banned_at = NULL, banned_reason = NULL, updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(row["id"],),
|
||||||
|
)
|
||||||
|
add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"])
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def rename_link(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
user_name: str,
|
||||||
|
old_url: str,
|
||||||
|
new_url: str,
|
||||||
|
) -> dict:
|
||||||
|
old_norm = normalize_url(old_url)
|
||||||
|
new_norm = normalize_url(new_url)
|
||||||
|
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, old_norm),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return {"status": "missing"}
|
||||||
|
|
||||||
|
conflict = conn.execute(
|
||||||
|
"SELECT id FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, new_norm),
|
||||||
|
).fetchone()
|
||||||
|
if conflict and conflict["id"] != row["id"]:
|
||||||
|
return {"status": "conflict"}
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE links
|
||||||
|
SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(new_url, new_norm, get_site(new_url), row["id"]),
|
||||||
|
)
|
||||||
|
add_history(
|
||||||
|
conn,
|
||||||
|
user_name,
|
||||||
|
"rename",
|
||||||
|
link_id=row["id"],
|
||||||
|
old_url=row["url_original"],
|
||||||
|
new_url=new_url,
|
||||||
|
)
|
||||||
|
return {"status": "renamed"}
|
||||||
|
|
||||||
|
|
||||||
|
def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool:
|
||||||
|
url_norm = normalize_url(url_original)
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||||
|
(user_name, url_norm),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return False
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
""",
|
||||||
|
(user_name, url_norm, row["url_original"]),
|
||||||
|
)
|
||||||
|
add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"])
|
||||||
|
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]:
|
||||||
|
rows = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT url_original FROM links
|
||||||
|
WHERE user_name = ?
|
||||||
|
AND enabled = 1
|
||||||
|
AND banned_at IS NULL
|
||||||
|
ORDER BY id ASC
|
||||||
|
""",
|
||||||
|
(user_name,),
|
||||||
|
).fetchall()
|
||||||
|
return [row["url_original"] for row in rows]
|
||||||
|
|
||||||
|
|
||||||
|
def get_links(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
users: Iterable[str] | None = None,
|
||||||
|
include_disabled: bool = False,
|
||||||
|
include_banned: bool = False,
|
||||||
|
) -> list[sqlite3.Row]:
|
||||||
|
params: list = []
|
||||||
|
where = []
|
||||||
|
user_list = list(users) if users else []
|
||||||
|
if user_list:
|
||||||
|
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
|
||||||
|
params.extend(user_list)
|
||||||
|
if not include_disabled:
|
||||||
|
where.append("enabled = 1")
|
||||||
|
if not include_banned:
|
||||||
|
where.append("banned_at IS NULL")
|
||||||
|
clause = " AND ".join(where)
|
||||||
|
if clause:
|
||||||
|
clause = "WHERE " + clause
|
||||||
|
return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall()
|
||||||
|
|
||||||
|
|
||||||
|
def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]:
|
||||||
|
return conn.execute(
|
||||||
|
"SELECT * FROM links WHERE user_name = ? ORDER BY id",
|
||||||
|
(user_name,),
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
|
||||||
|
def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict:
|
||||||
|
if not path.is_file():
|
||||||
|
return {"status": "missing", "path": str(path)}
|
||||||
|
with open(path, "r", encoding="utf-8") as r_file:
|
||||||
|
lines = [ln.strip() for ln in r_file if ln.strip()]
|
||||||
|
|
||||||
|
added = 0
|
||||||
|
exists = 0
|
||||||
|
removed = 0
|
||||||
|
duplicates: list[str] = []
|
||||||
|
for line in lines:
|
||||||
|
disabled = False
|
||||||
|
raw = line
|
||||||
|
if raw.startswith("#"):
|
||||||
|
disabled = True
|
||||||
|
raw = raw.lstrip("#").strip()
|
||||||
|
if not raw:
|
||||||
|
continue
|
||||||
|
|
||||||
|
result = add_link(conn, user_name, raw, assume_yes=True, source="import")
|
||||||
|
if result["status"] == "added":
|
||||||
|
added += 1
|
||||||
|
if disabled:
|
||||||
|
set_enabled(conn, user_name, raw, enabled=False)
|
||||||
|
elif result["status"] == "exists":
|
||||||
|
exists += 1
|
||||||
|
duplicates.append(raw)
|
||||||
|
elif result["status"] == "removed":
|
||||||
|
removed += 1
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"added": added,
|
||||||
|
"exists": exists,
|
||||||
|
"removed": removed,
|
||||||
|
"duplicates": duplicates,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def bulk_rename_handle(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
user_name: str,
|
||||||
|
site: str,
|
||||||
|
old_handle: str,
|
||||||
|
new_handle: str,
|
||||||
|
) -> dict:
|
||||||
|
"""Rename account handle within a site for a user."""
|
||||||
|
site_norm = site.lower().lstrip("www.")
|
||||||
|
if site_norm == "twitter.com":
|
||||||
|
site_norm = "x.com"
|
||||||
|
if site_norm == "www.twitter.com":
|
||||||
|
site_norm = "x.com"
|
||||||
|
|
||||||
|
rows = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT id, url_original FROM links
|
||||||
|
WHERE user_name = ? AND site = ?
|
||||||
|
""",
|
||||||
|
(user_name, site_norm),
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
updated = 0
|
||||||
|
skipped = 0
|
||||||
|
conflicts = 0
|
||||||
|
for row in rows:
|
||||||
|
raw = row["url_original"]
|
||||||
|
parts = urlsplit(raw if "://" in raw else f"https://{raw}")
|
||||||
|
path = parts.path
|
||||||
|
segments = path.split("/")
|
||||||
|
if len(segments) < 2 or segments[1] != old_handle:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
segments[1] = new_handle
|
||||||
|
new_path = "/".join(segments)
|
||||||
|
new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))
|
||||||
|
result = rename_link(conn, user_name, raw, new_url)
|
||||||
|
if result["status"] == "renamed":
|
||||||
|
updated += 1
|
||||||
|
elif result["status"] == "conflict":
|
||||||
|
conflicts += 1
|
||||||
|
else:
|
||||||
|
skipped += 1
|
||||||
|
|
||||||
|
return {"updated": updated, "skipped": skipped, "conflicts": conflicts}
|
||||||
|
|
||||||
|
|
||||||
|
def warn(msg: str) -> None:
|
||||||
|
LOG.warning(msg)
|
||||||
@@ -13,6 +13,7 @@ import re
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import argparse
|
import argparse
|
||||||
import yaml
|
import yaml
|
||||||
|
import db
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from functions import LOG
|
from functions import LOG
|
||||||
from functions import run
|
from functions import run
|
||||||
@@ -94,6 +95,66 @@ def parse_instagram(link: str, post_type: list[str] | str | None = None) -> list
|
|||||||
return ["-o", f"include={use_type}"]
|
return ["-o", f"include={use_type}"]
|
||||||
|
|
||||||
|
|
||||||
|
def _comic_skip_arg(link: str, flag_skip: bool) -> str:
|
||||||
|
if not flag_skip:
|
||||||
|
return ""
|
||||||
|
if re.search(r"readcomiconline", link):
|
||||||
|
return " --chapter-range 1"
|
||||||
|
if re.search(r"manganato|mangahere|webtoons", link):
|
||||||
|
return " --chapter-range 1-5"
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_gallery_link(user: User, link: str, args, conn) -> None:
|
||||||
|
add_res = db.add_link(conn, user.name, parse_link(link), assume_yes=True, source="push")
|
||||||
|
row = add_res.get("row")
|
||||||
|
if row and row["banned_at"]:
|
||||||
|
LOG.warning("Link is banned, skipping: %s", link)
|
||||||
|
return
|
||||||
|
if row and not row["enabled"]:
|
||||||
|
LOG.warning("Link is disabled, skipping: %s", link)
|
||||||
|
return
|
||||||
|
|
||||||
|
gallery = Gallery()
|
||||||
|
gallery.archive = args.flag_archive
|
||||||
|
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
|
||||||
|
gallery.link = parse_link(link)
|
||||||
|
gallery.dest = "download"
|
||||||
|
gallery.opt_args = parse_instagram(link)
|
||||||
|
gallery.generate_command(user)
|
||||||
|
gallery.run_command(args.flag_verbose)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_comic_link(link: str, args) -> None:
|
||||||
|
gallery = Gallery()
|
||||||
|
gallery.archive = args.flag_archive
|
||||||
|
gallery.skip_arg = _comic_skip_arg(link, args.flag_skip)
|
||||||
|
gallery.link = link
|
||||||
|
gallery.generate_command(is_comic=True)
|
||||||
|
gallery.run_command(args.flag_verbose)
|
||||||
|
save_comic(link)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_video_link(user: User, link: str, args) -> None:
|
||||||
|
video = Video()
|
||||||
|
video.use_archive = args.flag_archive
|
||||||
|
video.link = link
|
||||||
|
video.dest = str(user.directories["media"])
|
||||||
|
video.database = str(user.dbs["media"])
|
||||||
|
run(video_command(video), args.flag_verbose)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_other_link(user: User, link: str, args) -> None:
|
||||||
|
LOG.info("Other type of download %s", link)
|
||||||
|
gallery = Gallery()
|
||||||
|
gallery.archive = False
|
||||||
|
gallery.skip_arg = " -o directory='[]'"
|
||||||
|
gallery.link = link
|
||||||
|
gallery.dest = "push"
|
||||||
|
gallery.generate_command(user)
|
||||||
|
gallery.run_command(args.flag_verbose)
|
||||||
|
|
||||||
|
|
||||||
def video_command(video: Video):
|
def video_command(video: Video):
|
||||||
"""Filters and processes the required command to download videos"""
|
"""Filters and processes the required command to download videos"""
|
||||||
command = ["yt-dlp"]
|
command = ["yt-dlp"]
|
||||||
@@ -219,7 +280,7 @@ def save_comic(link: str) -> None:
|
|||||||
w_file.write(link + "\n")
|
w_file.write(link + "\n")
|
||||||
|
|
||||||
|
|
||||||
def push_manager(user: User) -> None:
|
def push_manager(user: User, links: list[str] | None = None) -> None:
|
||||||
"""Filters out the URL to use the appropiate downloader"""
|
"""Filters out the URL to use the appropiate downloader"""
|
||||||
args = get_args()
|
args = get_args()
|
||||||
# Creates an array which will store any links that should use youtube-dl
|
# Creates an array which will store any links that should use youtube-dl
|
||||||
@@ -250,8 +311,9 @@ def push_manager(user: User) -> None:
|
|||||||
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
|
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
|
||||||
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
|
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
|
||||||
|
|
||||||
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
|
if links is None:
|
||||||
links = list(map(lambda x: x.rstrip(), r_file))
|
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
|
||||||
|
links = list(map(lambda x: x.rstrip(), r_file))
|
||||||
links_galleries = filter(rgx_gallery.search, links)
|
links_galleries = filter(rgx_gallery.search, links)
|
||||||
links_videos = filter(rgx_video.search, links)
|
links_videos = filter(rgx_video.search, links)
|
||||||
links_comics = filter(rgx_comic.search, links)
|
links_comics = filter(rgx_comic.search, links)
|
||||||
@@ -262,50 +324,20 @@ def push_manager(user: User) -> None:
|
|||||||
links,
|
links,
|
||||||
)
|
)
|
||||||
|
|
||||||
for link in links_galleries:
|
with db.connect() as conn:
|
||||||
gallery = Gallery()
|
for link in links_galleries:
|
||||||
gallery.archive = args.flag_archive
|
_handle_gallery_link(user, link, args, conn)
|
||||||
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
|
|
||||||
gallery.link = parse_link(link)
|
conn.commit()
|
||||||
gallery.dest = "download"
|
|
||||||
gallery.opt_args = parse_instagram(link)
|
|
||||||
gallery.generate_command(user)
|
|
||||||
gallery.run_command(args.flag_verbose)
|
|
||||||
user.save_link(link)
|
|
||||||
|
|
||||||
for link in links_comics:
|
for link in links_comics:
|
||||||
if args.flag_skip and re.search(r"readcomiconline", link):
|
_handle_comic_link(link, args)
|
||||||
skip_arg = " --chapter-range 1"
|
|
||||||
elif args.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
|
|
||||||
skip_arg = " --chapter-range 1-5"
|
|
||||||
else:
|
|
||||||
skip_arg = ""
|
|
||||||
|
|
||||||
gallery = Gallery()
|
|
||||||
gallery.archive = args.flag_archive
|
|
||||||
gallery.skip_arg = skip_arg
|
|
||||||
gallery.link = link
|
|
||||||
gallery.generate_command(is_comic=True)
|
|
||||||
gallery.run_command(args.flag_verbose)
|
|
||||||
save_comic(link)
|
|
||||||
|
|
||||||
for link in links_videos:
|
for link in links_videos:
|
||||||
video = Video()
|
_handle_video_link(user, link, args)
|
||||||
video.use_archive = args.flag_archive
|
|
||||||
video.link = link
|
|
||||||
video.dest = str(user.directories["media"])
|
|
||||||
video.database = str(user.dbs["media"])
|
|
||||||
run(video_command(video), args.flag_verbose)
|
|
||||||
|
|
||||||
for link in links_other:
|
for link in links_other:
|
||||||
LOG.info("Other type of download %s", link)
|
_handle_other_link(user, link, args)
|
||||||
gallery = Gallery()
|
|
||||||
gallery.archive = False
|
|
||||||
gallery.skip_arg = " -o directory='[]'"
|
|
||||||
gallery.link = link
|
|
||||||
gallery.dest = "push"
|
|
||||||
gallery.generate_command(user)
|
|
||||||
gallery.run_command(args.flag_verbose)
|
|
||||||
|
|
||||||
# Flush the push list, cleans all the contents
|
# Flush the push list, cleans all the contents
|
||||||
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
|
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
|
||||||
@@ -350,10 +382,8 @@ def main():
|
|||||||
elif args.link:
|
elif args.link:
|
||||||
is_admin = args.user in ("everyone", "jawz")
|
is_admin = args.user in ("everyone", "jawz")
|
||||||
user = User(get_index("jawz" if is_admin else args.user))
|
user = User(get_index("jawz" if is_admin else args.user))
|
||||||
for arg_link in [lnk for grp in args.link for lnk in grp]:
|
links = [parse_link(lnk) for grp in args.link for lnk in grp]
|
||||||
user.append_list("push", parse_link(arg_link))
|
push_manager(user, links=links)
|
||||||
|
|
||||||
push_manager(user)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -6,10 +6,14 @@ py_modules =
|
|||||||
download
|
download
|
||||||
functions
|
functions
|
||||||
argparser
|
argparser
|
||||||
|
db
|
||||||
|
admin
|
||||||
|
admin_links
|
||||||
|
admin_users
|
||||||
classes.gallery
|
classes.gallery
|
||||||
classes.user
|
classes.user
|
||||||
|
|
||||||
[options.entry_points]
|
[options.entry_points]
|
||||||
console_scripts =
|
console_scripts =
|
||||||
download = download:main
|
download = download:main
|
||||||
|
download-admin = admin:main
|
||||||
|
|||||||
79
src/download/tests/test_db.py
Normal file
79
src/download/tests/test_db.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import db
|
||||||
|
|
||||||
|
|
||||||
|
class TestDB(unittest.TestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.conn = sqlite3.connect(":memory:")
|
||||||
|
self.conn.row_factory = sqlite3.Row
|
||||||
|
db.ensure_schema(self.conn)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.conn.close()
|
||||||
|
|
||||||
|
def test_normalize_url(self):
|
||||||
|
self.assertEqual(
|
||||||
|
db.normalize_url("http://Twitter.com/User/"),
|
||||||
|
"https://x.com/User",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
db.normalize_url("x.com/SomeUser/media/"),
|
||||||
|
"https://x.com/SomeUser/media",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_add_link_dedupe(self):
|
||||||
|
res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/")
|
||||||
|
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||||
|
self.assertEqual(res1["status"], "added")
|
||||||
|
self.assertEqual(res2["status"], "exists")
|
||||||
|
|
||||||
|
def test_remove_tombstone(self):
|
||||||
|
db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||||
|
ok = db.remove_link(self.conn, "jawz", "https://x.com/Test")
|
||||||
|
self.assertTrue(ok)
|
||||||
|
res = db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||||
|
self.assertEqual(res["status"], "removed")
|
||||||
|
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True)
|
||||||
|
self.assertEqual(res2["status"], "added")
|
||||||
|
|
||||||
|
def test_disable_and_ban(self):
|
||||||
|
db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||||
|
ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False)
|
||||||
|
self.assertTrue(ok)
|
||||||
|
active = db.get_active_links(self.conn, "jawz")
|
||||||
|
self.assertEqual(active, [])
|
||||||
|
ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad")
|
||||||
|
self.assertTrue(ok)
|
||||||
|
active = db.get_active_links(self.conn, "jawz")
|
||||||
|
self.assertEqual(active, [])
|
||||||
|
|
||||||
|
def test_import_master_list(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
path = Path(tmp) / "watch.txt"
|
||||||
|
path.write_text(
|
||||||
|
"\n".join(
|
||||||
|
[
|
||||||
|
"https://x.com/User",
|
||||||
|
"# https://x.com/DisabledUser",
|
||||||
|
"https://x.com/User",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
+ "\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
result = db.import_master_list(self.conn, "jawz", path)
|
||||||
|
self.assertEqual(result["added"], 2)
|
||||||
|
self.assertEqual(result["exists"], 1)
|
||||||
|
rows = db.get_links_by_user(self.conn, "jawz")
|
||||||
|
by_norm = {db.normalize_url(r["url_original"]): r for r in rows}
|
||||||
|
self.assertTrue(by_norm["https://x.com/User"]["enabled"])
|
||||||
|
self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
137
src/download/tests/test_download.py
Normal file
137
src/download/tests/test_download.py
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import types
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import download
|
||||||
|
|
||||||
|
|
||||||
|
class DummyArgs:
|
||||||
|
def __init__(self):
|
||||||
|
self.post_type = ["posts", "reels"]
|
||||||
|
self.flag_archive = True
|
||||||
|
self.flag_skip = True
|
||||||
|
self.flag_verbose = True
|
||||||
|
|
||||||
|
|
||||||
|
class DummyUser:
|
||||||
|
def __init__(self):
|
||||||
|
self.name = "jawz"
|
||||||
|
self.sleep = 0
|
||||||
|
self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")}
|
||||||
|
self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")}
|
||||||
|
self.lists = {"push": Path("/tmp/instant.txt")}
|
||||||
|
|
||||||
|
|
||||||
|
class TestDownload(unittest.TestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
download.ARGS = DummyArgs()
|
||||||
|
download.CONFIGS = {
|
||||||
|
"users": [{"name": "jawz"}],
|
||||||
|
"global": {},
|
||||||
|
"comic": {"comic-list": "/tmp/comic.txt"},
|
||||||
|
}
|
||||||
|
self.orig_gallery = download.Gallery
|
||||||
|
self.orig_video_command = download.video_command
|
||||||
|
self.orig_run = download.run
|
||||||
|
self.orig_db_connect = download.db.connect
|
||||||
|
self.orig_db_add_link = download.db.add_link
|
||||||
|
self.orig_save_comic = download.save_comic
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
download.Gallery = self.orig_gallery
|
||||||
|
download.video_command = self.orig_video_command
|
||||||
|
download.run = self.orig_run
|
||||||
|
download.db.connect = self.orig_db_connect
|
||||||
|
download.db.add_link = self.orig_db_add_link
|
||||||
|
download.save_comic = self.orig_save_comic
|
||||||
|
|
||||||
|
def test_parse_instagram(self):
|
||||||
|
res = download.parse_instagram("https://instagram.com/user")
|
||||||
|
self.assertEqual(res, ["-o", "include=posts,reels"])
|
||||||
|
res2 = download.parse_instagram("https://x.com/user")
|
||||||
|
self.assertEqual(res2, [])
|
||||||
|
|
||||||
|
def test_video_command(self):
|
||||||
|
v = download.Video()
|
||||||
|
v.link = "https://youtu.be/abc"
|
||||||
|
v.dest = "/tmp"
|
||||||
|
cmd = download.video_command(v)
|
||||||
|
self.assertIn("yt-dlp", cmd[0])
|
||||||
|
self.assertIn("https://youtu.be/abc", cmd)
|
||||||
|
|
||||||
|
v2 = download.Video()
|
||||||
|
v2.link = "https://music.youtube.com/watch?v=xyz"
|
||||||
|
v2.dest = "/tmp"
|
||||||
|
v2.use_archive = False
|
||||||
|
cmd2 = download.video_command(v2)
|
||||||
|
self.assertIn("--audio-format", cmd2)
|
||||||
|
|
||||||
|
def test_push_manager_routing(self):
|
||||||
|
user = DummyUser()
|
||||||
|
|
||||||
|
captured = {"gallery": [], "video": [], "comic": [], "other": []}
|
||||||
|
|
||||||
|
def fake_generate(self, *args, **kwargs):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fake_run(self, *args, **kwargs):
|
||||||
|
link = getattr(self, "link", "")
|
||||||
|
if "mangadex" in link:
|
||||||
|
captured["comic"].append(link)
|
||||||
|
elif "x.com" in link:
|
||||||
|
captured["gallery"].append(link)
|
||||||
|
else:
|
||||||
|
captured["other"].append(link)
|
||||||
|
|
||||||
|
def fake_video_command(video):
|
||||||
|
captured["video"].append(video.link)
|
||||||
|
return ["echo", "ok"]
|
||||||
|
|
||||||
|
# Patch Gallery methods and video_command/run
|
||||||
|
class FakeGallery(self.orig_gallery):
|
||||||
|
def generate_command(self, *args, **kwargs):
|
||||||
|
return fake_generate(self, *args, **kwargs)
|
||||||
|
|
||||||
|
def run_command(self, *args, **kwargs):
|
||||||
|
return fake_run(self, *args, **kwargs)
|
||||||
|
|
||||||
|
download.Gallery = FakeGallery
|
||||||
|
download.video_command = fake_video_command
|
||||||
|
download.run = lambda *args, **kwargs: None
|
||||||
|
download.save_comic = lambda *_args, **_kwargs: None
|
||||||
|
|
||||||
|
links = [
|
||||||
|
"https://x.com/someuser",
|
||||||
|
"https://youtu.be/abc",
|
||||||
|
"https://mangadex.org/title/123",
|
||||||
|
"https://example.com/page",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Disable DB write path for this test
|
||||||
|
class FakeConn:
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
download.db.connect = lambda *a, **k: FakeConn()
|
||||||
|
download.db.add_link = lambda *a, **k: {"status": "added"}
|
||||||
|
|
||||||
|
download.push_manager(user, links=links)
|
||||||
|
|
||||||
|
self.assertEqual(len(captured["gallery"]), 1)
|
||||||
|
self.assertEqual(len(captured["video"]), 1)
|
||||||
|
self.assertEqual(len(captured["comic"]), 1)
|
||||||
|
self.assertEqual(len(captured["other"]), 1)
|
||||||
|
|
||||||
|
# restore handled in tearDown
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user