Compare commits

...

21 Commits

Author SHA1 Message Date
Danilo Reyes
e985e359a7 clean exit 2026-03-02 22:29:04 -06:00
Danilo Reyes
ba42689aa9 fixing download dir 2026-03-01 17:45:58 -06:00
Danilo Reyes
2a55d92f19 bugfix 2026-03-01 17:42:46 -06:00
Danilo Reyes
949f5a94c3 gallery-clean + autocompletions 2026-03-01 17:35:05 -06:00
Danilo Reyes
899543309f download 3.0 2026-03-01 00:22:56 -06:00
Danilo Reyes
5000304a8a fixed requires-revision 2026-03-01 00:22:11 -06:00
Danilo Reyes
76e3d72643 bugfix 2026-02-28 23:59:08 -06:00
Danilo Reyes
e73b4c8083 -sa to gallery/comic 2026-02-28 23:52:48 -06:00
Danilo Reyes
3f44f710b1 fix // and other logic flaws 2026-02-28 23:47:01 -06:00
Danilo Reyes
9da87b68e9 revision logic revisited 2026-02-28 23:33:06 -06:00
Danilo Reyes
766eca4a2f enable renamed links 2026-02-28 23:19:53 -06:00
Danilo Reyes
bda8105928 fix list download admin 2026-02-28 23:01:36 -06:00
Danilo Reyes
45b78ce76a logs display 2026-02-28 22:58:56 -06:00
Danilo Reyes
88e4ac04df lowered error rate 2026-02-28 22:46:59 -06:00
Danilo Reyes
7aab65a73a fzf into download 2026-02-28 22:20:11 -06:00
Danilo Reyes
adab652feb error logic to cancel / disable link 2026-02-28 22:05:01 -06:00
Danilo Reyes
81c2df84f7 refractioning 2026-02-28 21:34:39 -06:00
Danilo Reyes
7a64034f8a tests 2026-02-28 21:25:46 -06:00
Danilo Reyes
2ccdd713ea admin import/validate 2026-02-28 21:17:46 -06:00
Danilo Reyes
da87b6f9d2 download-admin (sqlite db) init 2026-02-28 20:53:48 -06:00
Danilo Reyes
ebb27daf0c turn sort functions fully pythonic 2026-02-28 20:05:33 -06:00
16 changed files with 1949 additions and 84 deletions

View File

@@ -126,5 +126,32 @@
ext = "py";
handler = scriptBin;
};
apps.x86_64-linux = {
download = {
type = "app";
program = "${pkgs.download}/bin/download";
};
download-admin = {
type = "app";
program = "${pkgs.download}/bin/download-admin";
};
download-tests = {
type = "app";
program = "${
pkgs.writeShellApplication {
name = "download-tests";
runtimeInputs = [
(pkgs.python3.withPackages (ps: [ ps.pyyaml ]))
];
text = ''
set -euo pipefail
export PYTHONPATH="${inputs.self}/src/download"
python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py"
'';
}
}/bin/download-tests";
};
};
};
}

View File

@@ -7,11 +7,12 @@
gallery-dl,
ffmpeg,
webcomix,
fzf,
...
}:
let
pname = "download";
version = "2.6";
version = "3.0";
in
buildPythonApplication {
inherit pname version;
@@ -32,5 +33,13 @@ buildPythonApplication {
types-pyyaml
yt-dlp
webcomix
fzf
];
postInstall = ''
install -Dm644 completions/download.bash \
$out/share/bash-completion/completions/download
install -Dm644 completions/download.bash \
$out/share/bash-completion/completions/download-admin
'';
}

104
src/download/admin.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""Administrative CLI for download link database."""
from __future__ import annotations
import argparse
from admin_links import cmd_add
from admin_links import cmd_ban
from admin_links import cmd_disable
from admin_links import cmd_enable
from admin_links import cmd_import
from admin_links import cmd_list
from admin_links import cmd_remove
from admin_links import cmd_rename
from admin_links import cmd_unban
from admin_links import cmd_validate_import
from admin_links import cmd_fix_revision
from admin_links import cmd_fix_x_media
from admin_users import cmd_user_rename
from admin_users import cmd_users
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="download-admin")
sub = parser.add_subparsers(dest="cmd", required=True)
p_add = sub.add_parser("add")
p_add.add_argument("user")
p_add.add_argument("url")
p_add.add_argument("--assume-yes", action="store_true")
p_add.set_defaults(func=cmd_add)
p_disable = sub.add_parser("disable")
p_disable.add_argument("user")
p_disable.add_argument("url", nargs="?")
p_disable.set_defaults(func=cmd_disable)
p_enable = sub.add_parser("enable")
p_enable.add_argument("user")
p_enable.add_argument("url", nargs="?")
p_enable.set_defaults(func=cmd_enable)
p_ban = sub.add_parser("ban")
p_ban.add_argument("user")
p_ban.add_argument("url", nargs="?")
p_ban.add_argument("--reason")
p_ban.set_defaults(func=cmd_ban)
p_unban = sub.add_parser("unban")
p_unban.add_argument("user")
p_unban.add_argument("url", nargs="?")
p_unban.set_defaults(func=cmd_unban)
p_remove = sub.add_parser("remove")
p_remove.add_argument("user")
p_remove.add_argument("url", nargs="?")
p_remove.set_defaults(func=cmd_remove)
p_rename = sub.add_parser("rename")
p_rename.add_argument("user")
p_rename.add_argument("old_url", nargs="?")
p_rename.add_argument("new_url", nargs="?")
p_rename.set_defaults(func=cmd_rename)
p_list = sub.add_parser("list")
p_list.add_argument("--user", action="append")
p_list.add_argument("--disabled", action="store_true")
p_list.add_argument("--banned", action="store_true")
p_list.add_argument("--requires-revision", action="store_true")
p_list.set_defaults(func=cmd_list)
p_users = sub.add_parser("users")
p_users.set_defaults(func=cmd_users)
p_import = sub.add_parser("import")
p_import.set_defaults(func=cmd_import)
p_validate = sub.add_parser("validate-import")
p_validate.set_defaults(func=cmd_validate_import)
p_fix_rev = sub.add_parser("fix-revision")
p_fix_rev.set_defaults(func=cmd_fix_revision)
p_fix_media = sub.add_parser("fix-x-media")
p_fix_media.set_defaults(func=cmd_fix_x_media)
p_user_rename = sub.add_parser("user-rename")
p_user_rename.add_argument("user")
p_user_rename.add_argument("site")
p_user_rename.add_argument("old")
p_user_rename.add_argument("new")
p_user_rename.set_defaults(func=cmd_user_rename)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

318
src/download/admin_links.py Normal file
View File

@@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""Admin CLI: link operations."""
from __future__ import annotations
import argparse
import shutil
import subprocess
from pathlib import Path
import db
from functions import load_config_variables
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def parse_list_file(path: Path) -> dict:
enabled: set[str] = set()
disabled: set[str] = set()
if not path.is_file():
return {"enabled": enabled, "disabled": disabled}
with open(path, "r", encoding="utf-8") as r_file:
for raw in r_file:
line = raw.strip()
if not line:
continue
if line.startswith("#"):
url = line.lstrip("#").strip()
if url:
disabled.add(db.normalize_url(url))
continue
enabled.add(db.normalize_url(line))
return {"enabled": enabled, "disabled": disabled}
def cmd_add(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.add_link(
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
)
if result["status"] == "removed" and not args.assume_yes:
removed_at = result.get("removed_at", "unknown")
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
result = db.add_link(
conn, args.user, args.url, assume_yes=True, source="manual"
)
row = result.get("row")
if row and row["banned_at"]:
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
if row and not row["enabled"]:
print("Warning: link is disabled")
conn.commit()
print(result["status"])
def cmd_disable(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=False),
selector_filter="disable",
)
def cmd_enable(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=True),
selector_filter="enable",
)
def cmd_ban(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_banned(
conn, user, url, banned=True, reason=args.reason
),
selector_filter="ban",
)
def cmd_unban(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_banned(conn, user, url, banned=False),
selector_filter="unban",
)
def cmd_remove(args: argparse.Namespace) -> None:
_apply_to_links(args, lambda conn, user, url: db.remove_link(conn, user, url), "any")
def cmd_rename(args: argparse.Namespace) -> None:
old_url = args.old_url
if not old_url:
selection = _select_links(args.user, multi=False, selector_filter="any")
if not selection:
print("not found")
return
old_url = selection[0]
new_url = args.new_url or input("New URL: ").strip()
with db.connect() as conn:
result = db.rename_link(conn, args.user, old_url, new_url)
if result["status"] == "renamed":
conn.commit()
print(result["status"])
def cmd_list(args: argparse.Namespace) -> None:
users = args.user or None
include_disabled = args.disabled or args.requires_revision
include_banned = args.banned or args.requires_revision
with db.connect() as conn:
rows = db.get_links(
conn,
users=users,
include_disabled=include_disabled,
include_banned=include_banned,
requires_revision_only=args.requires_revision,
)
for row in rows:
if args.disabled and row["enabled"]:
continue
if args.banned and not row["banned_at"]:
continue
status = "enabled" if row["enabled"] else "disabled"
if row["banned_at"]:
status = "banned"
print(f"{row['user_name']} [{status}] {row['url_original']}")
def cmd_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
imported_paths = []
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
result = db.import_master_list(conn, user, master)
if result["status"] == "ok":
imported_paths.append(str(master))
print(f"{user}: {result}")
if result.get("duplicates"):
print(f"{user} duplicates:")
for dup in result["duplicates"]:
print(f" {dup}")
if imported_paths:
print("Imported lists:")
for path in imported_paths:
print(f" {path}")
conn.commit()
def cmd_validate_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
list_sets = parse_list_file(master)
rows = db.get_links_by_user(conn, user)
db_enabled = set()
db_disabled = set()
for row in rows:
norm = db.normalize_url(row["url_original"])
if row["enabled"] and not row["banned_at"]:
db_enabled.add(norm)
else:
db_disabled.add(norm)
missing_enabled = list_sets["enabled"] - db_enabled
missing_disabled = list_sets["disabled"] - db_disabled
extra_enabled = db_enabled - list_sets["enabled"]
extra_disabled = db_disabled - list_sets["disabled"]
print(f"{user}:")
if missing_enabled:
print(" Missing enabled in DB:")
for url in sorted(missing_enabled):
print(f" {url}")
if missing_disabled:
print(" Missing disabled in DB:")
for url in sorted(missing_disabled):
print(f" {url}")
if extra_enabled:
print(" Extra enabled in DB:")
for url in sorted(extra_enabled):
print(f" {url}")
if extra_disabled:
print(" Extra disabled in DB:")
for url in sorted(extra_disabled):
print(f" {url}")
if not any(
[missing_enabled, missing_disabled, extra_enabled, extra_disabled]
):
print(" OK")
def cmd_fix_revision(_: argparse.Namespace) -> None:
with db.connect() as conn:
conn.execute(
"""
UPDATE links
SET requires_revision = 0
WHERE enabled = 1 OR banned_at IS NULL
"""
)
conn.commit()
print("ok")
def cmd_fix_x_media(_: argparse.Namespace) -> None:
with db.connect() as conn:
rows = conn.execute(
"""
SELECT id, user_name, url_original FROM links
WHERE url_original LIKE '%x.com/%//media%'
"""
).fetchall()
for row in rows:
fixed = row["url_original"].replace("//media", "/media")
norm = db.normalize_url(fixed)
conflict = conn.execute(
"""
SELECT id FROM links
WHERE user_name = ? AND url_normalized = ? AND id != ?
""",
(row["user_name"], norm, row["id"]),
).fetchone()
if conflict:
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
continue
conn.execute(
"""
UPDATE links
SET url_original = ?, url_normalized = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(fixed, norm, row["id"]),
)
conn.commit()
print("ok")
def _fzf_select(lines: list[str], multi: bool) -> list[str]:
if not lines:
return []
if shutil.which("fzf") is None:
print("fzf not found.")
return []
args = ["fzf"]
if multi:
args.append("--multi")
proc = subprocess.run(
args,
input="\n".join(lines),
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
return []
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
def _select_links(user: str, multi: bool, selector_filter: str) -> list[str]:
with db.connect() as conn:
rows = db.get_links(conn, users=[user], include_disabled=True, include_banned=True)
links = []
for row in rows:
enabled = bool(row["enabled"])
banned = bool(row["banned_at"])
if selector_filter == "enable" and enabled:
continue
if selector_filter == "disable" and not enabled:
continue
if selector_filter == "ban" and banned:
continue
if selector_filter == "unban" and not banned:
continue
links.append(row["url_original"])
return _fzf_select(links, multi=multi)
def _apply_to_links(args: argparse.Namespace, fn, selector_filter: str) -> None:
if args.url:
with db.connect() as conn:
ok = fn(conn, args.user, args.url)
if ok:
conn.commit()
print("ok" if ok else "not found")
return
selections = _select_links(args.user, multi=True, selector_filter=selector_filter)
if not selections:
print("not found")
return
with db.connect() as conn:
changed = 0
for url in selections:
ok = fn(conn, args.user, url)
if ok:
changed += 1
if changed:
conn.commit()
print(f"ok ({changed})")

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""Admin CLI: user operations."""
from __future__ import annotations
import argparse
import os
import shutil
from pathlib import Path
import db
from functions import load_config_variables
def list_users(configs: dict) -> None:
for entry in configs["users"]:
print(entry["name"])
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def merge_dirs(src: Path, dst: Path) -> None:
for root, _, files in os.walk(src):
rel = Path(root).relative_to(src)
target_dir = dst / rel
target_dir.mkdir(parents=True, exist_ok=True)
for filename in files:
src_file = Path(root) / filename
dst_file = target_dir / filename
if dst_file.exists():
print(f"Skip existing file: {dst_file}")
continue
shutil.move(str(src_file), str(dst_file))
for root, dirs, files in os.walk(src, topdown=False):
if not dirs and not files:
Path(root).rmdir()
def move_user_outputs(
configs: dict, user_name: str, old_handle: str, new_handle: str
) -> None:
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
if not user_cfg:
print(f"Unknown user: {user_name}")
return
base_dirs = [Path(user_cfg["download-dir"])]
for base in base_dirs:
old_path = base / old_handle
new_path = base / new_handle
if not old_path.exists():
print(f"Missing: {old_path}")
continue
if not new_path.exists():
old_path.rename(new_path)
continue
if not prompt_yes_no(
f"Merge contents from {old_path} into existing {new_path}?"
):
continue
merge_dirs(old_path, new_path)
def cmd_users(_: argparse.Namespace) -> None:
configs = load_config_variables()
list_users(configs)
def cmd_user_rename(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.bulk_rename_handle(
conn,
user_name=args.user,
site=args.site,
old_handle=args.old,
new_handle=args.new,
)
conn.commit()
print(result)
move_user_outputs(configs, args.user, args.old, args.new)

View File

@@ -72,5 +72,5 @@ class Gallery:
LOG.debug(command)
self.command = command
def run_command(self, verbose: bool):
run(self.command, verbose)
def run_command(self, verbose: bool, on_line=None, log_failure: bool = True):
run(self.command, verbose, on_line=on_line, log_failure=log_failure)

View File

@@ -8,6 +8,7 @@ from functions import validate_x_link
from functions import parse_link
from functions import clean_cache
from functions import LOG
import db
class User:
@@ -61,6 +62,11 @@ class User:
for lst in filter(lambda x: not self.lists[x].is_file(), ["master", "push"]):
self.lists[lst].touch()
for lst in filter(
lambda x: not self.lists[x].is_file(),
["instagram", "kemono", "main"],
):
self.lists[lst].touch()
def append_list(self, name: str, line: str) -> None:
"""Appends a line into the given list"""
@@ -83,8 +89,8 @@ class User:
def list_manager(self) -> None:
"""Manage all the user list and create sub-lists"""
self._create_directories() # Call the function to create necesary cache dirs
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
master_content = list(map(lambda x: x.rstrip(), r_file))
with db.connect() as conn:
master_content = db.get_active_links(conn, self.name)
# Create temporary list files segmented per scrapper
shuffle(master_content)
@@ -94,12 +100,10 @@ class User:
def save_link(self, link: str) -> None:
"""Checks the master list against a new link
if unmatched, appends it to the end of the list"""
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
links = r_file.read().lower()
if parse_link(link).lower() in links:
with db.connect() as conn:
result = db.add_link(conn, self.name, parse_link(link), assume_yes=True)
conn.commit()
if result["status"] == "added":
LOG.info("New gallery, saving")
else:
LOG.info("Gallery repeated, not saving")
return
LOG.info("New gallery, saving")
self.append_list("master", parse_link(link))

View File

@@ -0,0 +1,102 @@
# Bash completion for download and download-admin.
# Source this file or install it in your bash_completion.d directory.
__download_users() {
python3 - <<'PY' 2>/dev/null
import pathlib
try:
import yaml
except Exception:
print("")
raise SystemExit(0)
cfg = pathlib.Path("~/.config/jawz/config.yaml").expanduser()
if not cfg.is_file():
print("")
raise SystemExit(0)
data = yaml.safe_load(cfg.read_text(encoding="utf-8")) or {}
users = [u.get("name") for u in data.get("users", []) if isinstance(u, dict)]
print(" ".join([u for u in users if u]))
PY
}
_download() {
local cur prev words cword
_init_completion -n : || return
local scrappers="push main instagram kemono comic manga webcomic"
local opts="-u --user -i --input -l --list -a --no-archive -s --no_skip -v --verbose -t --type-post"
local post_types="posts reels stories highlights avatar"
if [[ "$cur" == -* ]]; then
COMPREPLY=( $(compgen -W "$opts" -- "$cur") )
return
fi
case "$prev" in
-u|--user)
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
return
;;
-t|--type-post)
COMPREPLY=( $(compgen -W "$post_types" -- "$cur") )
return
;;
-i|--input)
return
;;
esac
local have_scrapper=0
local w
for w in "${words[@]:1}"; do
[[ "$w" == -* ]] && continue
if [[ " $scrappers " == *" $w "* ]]; then
have_scrapper=1
break
fi
done
if [[ $have_scrapper -eq 0 ]]; then
COMPREPLY=( $(compgen -W "$scrappers" -- "$cur") )
fi
}
_download_admin() {
local cur prev words cword
_init_completion -n : || return
local cmds="add disable enable ban unban remove rename list users import validate-import fix-revision fix-x-media user-rename"
local list_opts="--user --disabled --banned --requires-revision"
if [[ "$cur" == -* ]]; then
if [[ "${words[1]}" == "list" ]]; then
COMPREPLY=( $(compgen -W "$list_opts" -- "$cur") )
else
COMPREPLY=()
fi
return
fi
case "$prev" in
--user)
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
return
;;
esac
if [[ $cword -eq 1 ]]; then
COMPREPLY=( $(compgen -W "$cmds" -- "$cur") )
return
fi
case "${words[1]}" in
add|disable|enable|ban|unban|remove|rename|user-rename)
if [[ $cword -eq 2 ]]; then
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
fi
;;
esac
}
complete -F _download download
complete -F _download_admin download-admin

647
src/download/db.py Normal file
View File

@@ -0,0 +1,647 @@
#!/usr/bin/env python3
"""SQLite persistence for download links."""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable
from urllib.parse import urlsplit, urlunsplit
from functions import LOG
from functions import load_config_variables
def get_db_path(configs: dict | None = None) -> Path:
"""Return the database path for links."""
cfg = configs or load_config_variables()
base = Path(cfg["global"]["databases-dir"])
return base / "links.sqlite3"
def connect(configs: dict | None = None) -> sqlite3.Connection:
"""Open a connection and ensure schema exists."""
db_path = get_db_path(configs)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
ensure_schema(conn)
return conn
def ensure_schema(conn: sqlite3.Connection) -> None:
"""Create schema if missing."""
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
url_original TEXT NOT NULL,
url_normalized TEXT NOT NULL,
site TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
keep INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
disabled_at TEXT,
disabled_reason TEXT,
banned_at TEXT,
banned_reason TEXT,
requires_revision INTEGER NOT NULL DEFAULT 0
);
CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm
ON links (user_name, url_normalized);
CREATE TABLE IF NOT EXISTS link_history (
id INTEGER PRIMARY KEY,
link_id INTEGER,
user_name TEXT NOT NULL,
event TEXT NOT NULL,
old_url TEXT,
new_url TEXT,
note TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS link_tombstones (
id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
url_normalized TEXT NOT NULL,
url_original TEXT NOT NULL,
removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm
ON link_tombstones (user_name, url_normalized);
"""
)
_ensure_column(
conn,
"links",
"requires_revision",
"ALTER TABLE links ADD COLUMN requires_revision INTEGER NOT NULL DEFAULT 0",
)
_ensure_column(
conn,
"links",
"keep",
"ALTER TABLE links ADD COLUMN keep INTEGER NOT NULL DEFAULT 0",
)
_ensure_column(
conn,
"links",
"disabled_reason",
"ALTER TABLE links ADD COLUMN disabled_reason TEXT",
)
def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None:
cols = [row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()]
if column in cols:
return
conn.execute(ddl)
def normalize_url(url: str) -> str:
"""Normalize URL for dedupe only."""
raw = url.strip()
if "://" not in raw:
raw = f"https://{raw}"
parts = urlsplit(raw)
scheme = "https"
host = (parts.hostname or "").lower()
if host.startswith("www."):
host = host[4:]
if host in ("twitter.com", "www.twitter.com"):
host = "x.com"
path = parts.path.rstrip("/")
query = parts.query
return urlunsplit((scheme, host, path, query, ""))
def get_site(url: str) -> str:
"""Return normalized host name."""
raw = url.strip()
if "://" not in raw:
raw = f"https://{raw}"
host = (urlsplit(raw).hostname or "").lower()
if host.startswith("www."):
host = host[4:]
if host in ("twitter.com", "www.twitter.com"):
host = "x.com"
return host
def add_history(
conn: sqlite3.Connection,
user_name: str,
event: str,
link_id: int | None = None,
old_url: str | None = None,
new_url: str | None = None,
note: str | None = None,
) -> None:
conn.execute(
"""
INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note)
VALUES (?, ?, ?, ?, ?, ?)
""",
(link_id, user_name, event, old_url, new_url, note),
)
def add_link(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
assume_yes: bool = False,
source: str = "manual",
) -> dict:
"""Add a link or return existing status."""
url_norm = normalize_url(url_original)
site = get_site(url_original)
row = conn.execute(
"SELECT * FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if row:
return {"status": "exists", "row": row}
tombstone = conn.execute(
"SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if tombstone and not assume_yes and source != "push":
return {"status": "removed", "removed_at": tombstone["removed_at"]}
cur = conn.execute(
"""
INSERT INTO links (user_name, url_original, url_normalized, site)
VALUES (?, ?, ?, ?)
""",
(user_name, url_original, url_norm, site),
)
if tombstone:
conn.execute(
"""
UPDATE links
SET requires_revision = 0
WHERE id = ?
""",
(cur.lastrowid,),
)
add_history(
conn,
user_name=user_name,
event="add",
link_id=cur.lastrowid,
new_url=url_original,
note=f"source={source}",
)
return {"status": "added", "id": cur.lastrowid}
def set_enabled(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
enabled: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if enabled:
conn.execute(
"""
UPDATE links
SET enabled = 1,
disabled_at = NULL,
disabled_reason = NULL,
requires_revision = 0,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"])
else:
conn.execute(
"""
UPDATE links
SET enabled = 0,
disabled_at = CURRENT_TIMESTAMP,
disabled_reason = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(reason, row["id"]),
)
add_history(
conn,
user_name,
"disable",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return True
def set_banned(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
banned: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if banned:
conn.execute(
"""
UPDATE links
SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(reason, row["id"]),
)
add_history(
conn,
user_name,
"ban",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
else:
conn.execute(
"""
UPDATE links
SET banned_at = NULL, banned_reason = NULL, requires_revision = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"])
return True
def mark_requires_revision(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
reason: str,
) -> bool:
url_norm = normalize_url(url_original)
rows = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchall()
if not rows:
return False
for row in rows:
conn.execute(
"""
UPDATE links
SET requires_revision = 1,
enabled = 0,
disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP),
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"requires_revision",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return True
def mark_requires_revision_by_norm(
conn: sqlite3.Connection, url_norm: str, reason: str
) -> int:
rows = conn.execute(
"SELECT id, user_name, url_original FROM links WHERE url_normalized = ?",
(url_norm,),
).fetchall()
if not rows:
return 0
for row in rows:
conn.execute(
"""
UPDATE links
SET requires_revision = 1,
enabled = 0,
disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP),
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
row["user_name"],
"requires_revision",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return len(rows)
def rename_link(
conn: sqlite3.Connection,
user_name: str,
old_url: str,
new_url: str,
) -> dict:
old_norm = normalize_url(old_url)
new_norm = normalize_url(new_url)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, old_norm),
).fetchone()
if not row:
return {"status": "missing"}
conflict = conn.execute(
"SELECT id FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, new_norm),
).fetchone()
if conflict and conflict["id"] != row["id"]:
return {"status": "conflict"}
conn.execute(
"""
UPDATE links
SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(new_url, new_norm, get_site(new_url), row["id"]),
)
conn.execute(
"""
UPDATE links
SET enabled = 1, disabled_at = NULL, requires_revision = 0
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"rename",
link_id=row["id"],
old_url=row["url_original"],
new_url=new_url,
)
return {"status": "renamed"}
def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
conn.execute(
"""
INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original)
VALUES (?, ?, ?)
""",
(user_name, url_norm, row["url_original"]),
)
add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"])
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
return True
def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]:
rows = conn.execute(
"""
SELECT url_original FROM links
WHERE user_name = ?
AND enabled = 1
AND banned_at IS NULL
ORDER BY id ASC
""",
(user_name,),
).fetchall()
return [row["url_original"] for row in rows]
def get_links(
conn: sqlite3.Connection,
users: Iterable[str] | None = None,
include_disabled: bool = False,
include_banned: bool = False,
requires_revision_only: bool = False,
) -> list[sqlite3.Row]:
params: list = []
where = []
user_list = list(users) if users else []
if user_list:
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
params.extend(user_list)
if not include_disabled:
where.append("enabled = 1")
if not include_banned:
where.append("banned_at IS NULL")
if requires_revision_only:
where.append("requires_revision = 1")
clause = " AND ".join(where)
if clause:
clause = "WHERE " + clause
return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall()
def get_links_for_cleaning(
conn: sqlite3.Connection,
users: Iterable[str] | None = None,
) -> list[sqlite3.Row]:
params: list = []
where = [
"site = ?",
"enabled = 1",
"banned_at IS NULL",
"keep = 0",
]
params.append("x.com")
user_list = list(users) if users else []
if user_list:
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
params.extend(user_list)
clause = " AND ".join(where)
return conn.execute(
f"SELECT * FROM links WHERE {clause} ORDER BY user_name, id",
params,
).fetchall()
def set_keep(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
keep: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if keep:
conn.execute(
"""
UPDATE links
SET keep = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"keep",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
else:
conn.execute(
"""
UPDATE links
SET keep = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"unkeep",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return True
def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]:
return conn.execute(
"SELECT * FROM links WHERE user_name = ? ORDER BY id",
(user_name,),
).fetchall()
def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict:
if not path.is_file():
return {"status": "missing", "path": str(path)}
with open(path, "r", encoding="utf-8") as r_file:
lines = [ln.strip() for ln in r_file if ln.strip()]
added = 0
exists = 0
removed = 0
duplicates: list[str] = []
for line in lines:
disabled = False
raw = line
if raw.startswith("#"):
disabled = True
raw = raw.lstrip("#").strip()
if not raw:
continue
result = add_link(conn, user_name, raw, assume_yes=True, source="import")
if result["status"] == "added":
added += 1
if disabled:
set_enabled(conn, user_name, raw, enabled=False)
elif result["status"] == "exists":
exists += 1
duplicates.append(raw)
elif result["status"] == "removed":
removed += 1
return {
"status": "ok",
"added": added,
"exists": exists,
"removed": removed,
"duplicates": duplicates,
}
def bulk_rename_handle(
conn: sqlite3.Connection,
user_name: str,
site: str,
old_handle: str,
new_handle: str,
) -> dict:
"""Rename account handle within a site for a user."""
site_norm = site.lower().lstrip("www.")
if site_norm == "twitter.com":
site_norm = "x.com"
if site_norm == "www.twitter.com":
site_norm = "x.com"
rows = conn.execute(
"""
SELECT id, url_original FROM links
WHERE user_name = ? AND site = ?
""",
(user_name, site_norm),
).fetchall()
updated = 0
skipped = 0
conflicts = 0
for row in rows:
raw = row["url_original"]
parts = urlsplit(raw if "://" in raw else f"https://{raw}")
path = parts.path
segments = path.split("/")
if len(segments) < 2 or segments[1] != old_handle:
skipped += 1
continue
segments[1] = new_handle
new_path = "/".join(segments)
new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))
result = rename_link(conn, user_name, raw, new_url)
if result["status"] == "renamed":
updated += 1
elif result["status"] == "conflict":
conflicts += 1
else:
skipped += 1
return {"updated": updated, "skipped": skipped, "conflicts": conflicts}
def warn(msg: str) -> None:
LOG.warning(msg)

View File

@@ -13,6 +13,7 @@ import re
from pathlib import Path
import argparse
import yaml
import db
from typing import Dict
from functions import LOG
from functions import run
@@ -72,25 +73,139 @@ def get_index(name: str) -> int:
def parse_gallery(gdl_list: str, user: User) -> None:
"""Processes the gallery-dl command based on the selected gallery"""
args = get_args()
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.dest = "download"
gallery.list = gdl_list
gallery.opt_args = parse_instagram(gdl_list)
list_path = user.lists[gdl_list]
if not list_path.is_file():
LOG.warning("List file missing: %s", list_path)
return
with open(list_path, "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file))
for link in filter(None, links):
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.dest = "download"
gallery.link = link
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
gallery.generate_command(user)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
def parse_instagram(link: str) -> list[str]:
def parse_instagram(link: str, post_type: list[str] | str | None = None) -> list[str]:
"""Fix instagram links"""
args = get_args()
if "instagram" not in link:
return []
if isinstance(args.post_type, list):
return ["-o", f"include={','.join(args.post_type)}"]
return ["-o", f"include={args.post_type}"]
use_type = args.post_type if post_type is None else post_type
if isinstance(use_type, list):
return ["-o", f"include={','.join(use_type)}"]
return ["-o", f"include={use_type}"]
REVISION_ERRORS = {
"NotFoundError: Requested user could not be found",
"Unable to retrieve Tweets from this timeline",
"No results for",
}
TRANSIENT_ERRORS = {
"User input required (password)",
"429",
"rate limit",
"timed out",
"timeout",
"Network",
"connection",
}
def _make_gallery_error_handler(link: str):
norm = db.normalize_url(link)
def handle(line: str) -> None:
if "[error]" in line:
reason = line.split("[error]", 1)[1].strip()
LOG.warning("Error for %s: %s", link, reason)
if reason in REVISION_ERRORS:
with db.connect() as conn:
db.mark_requires_revision_by_norm(conn, norm, reason)
conn.commit()
LOG.warning("Marked requires_revision for %s", link)
if any(tok in reason for tok in TRANSIENT_ERRORS):
LOG.warning("Transient error for %s: %s", link, reason)
return
if "No results for" in line:
with db.connect() as conn:
db.mark_requires_revision_by_norm(conn, norm, "No results for")
conn.commit()
LOG.warning("Marked requires_revision for %s", link)
return
return handle
def _comic_skip_arg(link: str, flag_skip: bool) -> str:
if not flag_skip:
return ""
if re.search(r"readcomiconline", link):
return " --chapter-range 1"
if re.search(r"manganato|mangahere|webtoons", link):
return " --chapter-range 1-5"
return ""
def _handle_gallery_link(user: User, link: str, args, conn) -> None:
add_res = db.add_link(conn, user.name, parse_link(link), assume_yes=True, source="push")
row = add_res.get("row")
if row and row["banned_at"]:
LOG.warning("Link is banned, skipping: %s", link)
return
if row and not row["enabled"]:
LOG.warning("Link is disabled, skipping: %s", link)
return
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
def _handle_comic_link(link: str, args) -> None:
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = _comic_skip_arg(link, args.flag_skip)
gallery.link = link
gallery.generate_command(is_comic=True)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
save_comic(link)
def _handle_video_link(user: User, link: str, args) -> None:
video = Video()
video.use_archive = args.flag_archive
video.link = link
video.dest = str(user.directories["media"])
video.database = str(user.dbs["media"])
run(video_command(video), args.flag_verbose)
def _handle_other_link(user: User, link: str, args) -> None:
LOG.info("Other type of download %s", link)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
def video_command(video: Video):
@@ -218,7 +333,7 @@ def save_comic(link: str) -> None:
w_file.write(link + "\n")
def push_manager(user: User):
def push_manager(user: User, links: list[str] | None = None) -> None:
"""Filters out the URL to use the appropiate downloader"""
args = get_args()
# Creates an array which will store any links that should use youtube-dl
@@ -249,8 +364,9 @@ def push_manager(user: User):
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file))
if links is None:
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file))
links_galleries = filter(rgx_gallery.search, links)
links_videos = filter(rgx_video.search, links)
links_comics = filter(rgx_comic.search, links)
@@ -261,50 +377,20 @@ def push_manager(user: User):
links,
)
for link in links_galleries:
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
user.save_link(link)
with db.connect() as conn:
for link in links_galleries:
_handle_gallery_link(user, link, args, conn)
conn.commit()
for link in links_comics:
if args.flag_skip and re.search(r"readcomiconline", link):
skip_arg = " --chapter-range 1"
elif args.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
skip_arg = " --chapter-range 1-5"
else:
skip_arg = ""
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = skip_arg
gallery.link = link
gallery.generate_command(is_comic=True)
gallery.run_command(args.flag_verbose)
save_comic(link)
_handle_comic_link(link, args)
for link in links_videos:
video = Video()
video.use_archive = args.flag_archive
video.link = link
video.dest = str(user.directories["media"])
video.database = str(user.dbs["media"])
run(video_command(video), args.flag_verbose)
_handle_video_link(user, link, args)
for link in links_other:
LOG.info("Other type of download %s", link)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
gallery.run_command(args.flag_verbose)
_handle_other_link(user, link, args)
# Flush the push list, cleans all the contents
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
@@ -349,10 +435,8 @@ def main():
elif args.link:
is_admin = args.user in ("everyone", "jawz")
user = User(get_index("jawz" if is_admin else args.user))
for arg_link in [lnk for grp in args.link for lnk in grp]:
user.append_list("push", parse_link(arg_link))
push_manager(user)
links = [parse_link(lnk) for grp in args.link for lnk in grp]
push_manager(user, links=links)
if __name__ == "__main__":

View File

@@ -9,6 +9,7 @@ import logging
import shlex
import subprocess
import shutil
import random
from typing import Sequence
from pathlib import Path
import yaml
@@ -31,7 +32,7 @@ def validate_x_link(line: str) -> str:
if re.search(r"\/media$", line):
return line
# if does not contain /media at the end then add /media
return f"{line}/media"
return f"{line.rstrip('/')}/media"
def parse_link(link: str) -> str:
@@ -65,6 +66,8 @@ def run(
verbose: bool,
cwd: Path | None = None,
check: bool = False,
on_line=None,
log_failure: bool = True,
) -> None:
"""Run command in a subprocess"""
# pylint: disable=subprocess-run-check
@@ -82,9 +85,28 @@ def run(
else:
args = list(command)
result = subprocess.run(args, check=check, cwd=cwd)
if not check and result.returncode != 0:
LOG.warning("Command failed (%s): %s", result.returncode, args)
if on_line is None:
result = subprocess.run(args, check=check, cwd=cwd)
if log_failure and not check and result.returncode != 0:
LOG.warning("Command failed (%s): %s", result.returncode, args)
return
proc = subprocess.Popen(
args,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
assert proc.stdout is not None
for line in proc.stdout:
print(line, end="")
on_line(line)
returncode = proc.wait()
if check and returncode != 0:
raise subprocess.CalledProcessError(returncode, args)
if log_failure and not check and returncode != 0:
LOG.warning("Command failed (%s): %s", returncode, args)
def list_lines(i: int, line: str) -> str:
@@ -97,20 +119,36 @@ def list_lines(i: int, line: str) -> str:
def sort_txt_file(file_path: Path):
"""Sort every line alphabetically
remove duplicated and empty lines"""
file = str(file_path.resolve())
run(["sort", "-u", file, "-o", file], VERBOSE_G)
run(["sed", "-i", "/^$/d", file], VERBOSE_G)
run(["sed", "-i", "-e", "s,http:,https:,", file], VERBOSE_G)
# fix this using strip on python
# line.strip("/")
run(["sed", "-i", "-e", "s,/$,,", file], VERBOSE_G) # trailing /
path = file_path.resolve()
with open(path, "r", encoding="utf-8") as open_file:
lines = [ln.strip() for ln in open_file]
normalized = []
for ln in lines:
if not ln:
continue
ln = ln.replace("http://", "https://")
ln = ln.rstrip("/")
normalized.append(ln)
unique_sorted = sorted(set(normalized))
with open(path, "w", encoding="utf-8") as open_file:
open_file.write("\n".join(unique_sorted))
if unique_sorted:
open_file.write("\n")
def randomize_txt_file(file_path: Path):
"""Randomize the order of the
lines of the txt file"""
file = str(file_path.resolve())
run(["sort", "-R", file, "-o", file], VERBOSE_G)
path = file_path.resolve()
with open(path, "r", encoding="utf-8") as open_file:
lines = [ln.rstrip("\n") for ln in open_file]
random.shuffle(lines)
with open(path, "w", encoding="utf-8") as open_file:
open_file.write("\n".join(lines))
if lines:
open_file.write("\n")
def parse_list(file):

View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""Interactive cleaner for x.com galleries."""
from __future__ import annotations
import argparse
import shutil
import subprocess
from pathlib import Path
from urllib.parse import urlsplit
import db
from classes.user import User
from functions import load_config_variables
def _extract_handle(url: str) -> str | None:
parts = urlsplit(url if "://" in url else f"https://{url}")
segments = [seg for seg in parts.path.split("/") if seg]
if not segments:
return None
return segments[0]
def _resolve_folder(user: User, handle: str | None) -> Path | None:
base = user.directories.get("download")
if base is None:
return None
if not base.exists():
return None
if not handle:
return base
candidates = [
base / handle,
]
for cand in candidates:
if cand.exists():
return cand
return None
def _open_folder(path: Path) -> None:
if shutil.which("xdg-open") is None:
print("xdg-open not found; skipping folder open.")
return
subprocess.run(["xdg-open", str(path)], check=False)
def _prompt() -> str:
return input("Keep? [y] keep / [n] disable / [s] skip / [q] quit: ").strip().lower()
def _build_user_index(configs: dict) -> dict[str, int]:
return {entry["name"]: idx for idx, entry in enumerate(configs["users"])}
def _validate_users(user_index: dict[str, int], users: list[str] | None) -> bool:
if not users:
return True
unknown = [u for u in users if u not in user_index]
if not unknown:
return True
print(f"Unknown users: {', '.join(unknown)}")
return False
def _print_context(user_name: str, url: str, handle: str | None, folder: Path | None) -> None:
print(f"\nUser: {user_name}")
print(f"URL: {url}")
if handle:
print(f"Handle: {handle}")
if folder:
print(f"Folder: {folder}")
_open_folder(folder)
return
print("Folder: <unknown>")
def _apply_choice(
conn,
user_name: str,
url: str,
choice: str,
reason: str,
) -> bool | None:
if choice in ("y", "yes"):
ok = db.set_keep(conn, user_name, url, keep=True, reason=reason)
if ok:
conn.commit()
return True
if choice in ("n", "no"):
ok = db.set_enabled(conn, user_name, url, enabled=False, reason=reason)
if ok:
conn.commit()
return True
if choice in ("s", "skip", ""):
return True
if choice in ("q", "quit"):
return None
print("Please enter y, n, s, or q.")
return False
def main() -> None:
parser = argparse.ArgumentParser(prog="gallery-clean")
parser.add_argument(
"session",
nargs="?",
type=int,
default=10,
help="Number of links to review this session (default: 10)",
)
parser.add_argument(
"--reason",
default="gallery-clean",
help="Reason stored when disabling or keeping",
)
args = parser.parse_args()
configs = load_config_variables()
user_index = _build_user_index(configs)
users_filter = ["jawz"]
if not _validate_users(user_index, users_filter):
return
with db.connect(configs) as conn:
rows = db.get_links_for_cleaning(conn, users=users_filter)
for row in rows[: max(args.session, 0)]:
user_name = row["user_name"]
url = row["url_original"]
handle = _extract_handle(url)
folder = _resolve_folder(User(user_index[user_name]), handle)
_print_context(user_name, url, handle, folder)
while True:
result = _apply_choice(conn, user_name, url, _prompt(), args.reason)
if result is None:
return
if result:
break
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""fzf-based selectors for comic and gallery links."""
from __future__ import annotations
import argparse
import re
import subprocess
import db
USER = "jawz"
RGX_COMIC = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
def _select_links(urls: list[str]) -> list[str]:
if not urls:
return []
proc = subprocess.run(
["fzf", "--multi", "--exact", "-i"],
input="\n".join(urls),
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
return []
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
def _run_download(selected: list[str], extra_args: list[str]) -> None:
if not selected:
return
subprocess.run(["download", "-u", USER, *extra_args, "-i", *selected], check=False)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("-s", "--no_skip", dest="flag_skip", action="store_false")
parser.add_argument("-a", "--no-archive", dest="flag_archive", action="store_false")
return parser.parse_args()
def _extra_args_from_flags(args: argparse.Namespace) -> list[str]:
extra = []
if args.flag_skip is False:
extra.append("-s")
if args.flag_archive is False:
extra.append("-a")
return extra
def comic_main() -> None:
args = _parse_args()
extra_args = _extra_args_from_flags(args)
with db.connect() as conn:
rows = db.get_links(conn, users=[USER], include_disabled=False, include_banned=False)
urls = [row["url_original"] for row in rows if RGX_COMIC.search(row["url_original"])]
_run_download(_select_links(urls), extra_args)
def gallery_main() -> None:
args = _parse_args()
extra_args = _extra_args_from_flags(args)
with db.connect() as conn:
rows = db.get_links(conn, users=[USER], include_disabled=False, include_banned=False)
urls = [row["url_original"] for row in rows if not RGX_COMIC.search(row["url_original"])]
_run_download(_select_links(urls), extra_args)
if __name__ == "__main__":
gallery_main()

View File

@@ -6,10 +6,19 @@ py_modules =
download
functions
argparser
db
admin
admin_links
admin_users
select_links
gallery_clean
classes.gallery
classes.user
[options.entry_points]
console_scripts =
download = download:main
download-admin = admin:main
comic = select_links:comic_main
gallery = select_links:gallery_main
gallery-clean = gallery_clean:main

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
import tempfile
import unittest
import sqlite3
from pathlib import Path
import db
class TestDB(unittest.TestCase):
def setUp(self) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.row_factory = sqlite3.Row
db.ensure_schema(self.conn)
def tearDown(self) -> None:
self.conn.close()
def test_normalize_url(self):
self.assertEqual(
db.normalize_url("http://Twitter.com/User/"),
"https://x.com/User",
)
self.assertEqual(
db.normalize_url("x.com/SomeUser/media/"),
"https://x.com/SomeUser/media",
)
def test_add_link_dedupe(self):
res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res1["status"], "added")
self.assertEqual(res2["status"], "exists")
def test_remove_tombstone(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.remove_link(self.conn, "jawz", "https://x.com/Test")
self.assertTrue(ok)
res = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res["status"], "removed")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True)
self.assertEqual(res2["status"], "added")
def test_disable_and_ban(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False)
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad")
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
def test_import_master_list(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "watch.txt"
path.write_text(
"\n".join(
[
"https://x.com/User",
"# https://x.com/DisabledUser",
"https://x.com/User",
]
)
+ "\n",
encoding="utf-8",
)
result = db.import_master_list(self.conn, "jawz", path)
self.assertEqual(result["added"], 2)
self.assertEqual(result["exists"], 1)
rows = db.get_links_by_user(self.conn, "jawz")
by_norm = {db.normalize_url(r["url_original"]): r for r in rows}
self.assertTrue(by_norm["https://x.com/User"]["enabled"])
self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python3
import types
import unittest
from pathlib import Path
import tempfile
import download
class DummyArgs:
def __init__(self):
self.post_type = ["posts", "reels"]
self.flag_archive = True
self.flag_skip = True
self.flag_verbose = True
class DummyUser:
def __init__(self):
self.name = "jawz"
self.sleep = 0
self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")}
self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")}
self.lists = {"push": Path("/tmp/instant.txt")}
class TestDownload(unittest.TestCase):
def setUp(self) -> None:
download.ARGS = DummyArgs()
download.CONFIGS = {
"users": [{"name": "jawz"}],
"global": {},
"comic": {"comic-list": "/tmp/comic.txt"},
}
self.orig_gallery = download.Gallery
self.orig_video_command = download.video_command
self.orig_run = download.run
self.orig_db_connect = download.db.connect
self.orig_db_add_link = download.db.add_link
self.orig_save_comic = download.save_comic
self.orig_make_handler = download._make_gallery_error_handler
def tearDown(self) -> None:
download.Gallery = self.orig_gallery
download.video_command = self.orig_video_command
download.run = self.orig_run
download.db.connect = self.orig_db_connect
download.db.add_link = self.orig_db_add_link
download.save_comic = self.orig_save_comic
download._make_gallery_error_handler = self.orig_make_handler
def test_parse_instagram(self):
res = download.parse_instagram("https://instagram.com/user")
self.assertEqual(res, ["-o", "include=posts,reels"])
res2 = download.parse_instagram("https://x.com/user")
self.assertEqual(res2, [])
def test_video_command(self):
v = download.Video()
v.link = "https://youtu.be/abc"
v.dest = "/tmp"
cmd = download.video_command(v)
self.assertIn("yt-dlp", cmd[0])
self.assertIn("https://youtu.be/abc", cmd)
v2 = download.Video()
v2.link = "https://music.youtube.com/watch?v=xyz"
v2.dest = "/tmp"
v2.use_archive = False
cmd2 = download.video_command(v2)
self.assertIn("--audio-format", cmd2)
def test_push_manager_routing(self):
user = DummyUser()
captured = {"gallery": [], "video": [], "comic": [], "other": []}
def fake_generate(self, *args, **kwargs):
return None
def fake_run(self, *args, **kwargs):
link = getattr(self, "link", "")
if "mangadex" in link:
captured["comic"].append(link)
elif "x.com" in link:
captured["gallery"].append(link)
else:
captured["other"].append(link)
def fake_video_command(video):
captured["video"].append(video.link)
return ["echo", "ok"]
# Patch Gallery methods and video_command/run
class FakeGallery(self.orig_gallery):
def generate_command(self, *args, **kwargs):
return fake_generate(self, *args, **kwargs)
def run_command(self, *args, **kwargs):
return fake_run(self, *args, **kwargs)
download.Gallery = FakeGallery
download.video_command = fake_video_command
download.run = lambda *args, **kwargs: None
download.save_comic = lambda *_args, **_kwargs: None
download._make_gallery_error_handler = lambda *_args, **_kwargs: None
links = [
"https://x.com/someuser",
"https://youtu.be/abc",
"https://mangadex.org/title/123",
"https://example.com/page",
]
# Disable DB write path for this test
class FakeConn:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def commit(self):
return None
download.db.connect = lambda *a, **k: FakeConn()
download.db.add_link = lambda *a, **k: {"status": "added"}
download.push_manager(user, links=links)
self.assertEqual(len(captured["gallery"]), 1)
self.assertEqual(len(captured["video"]), 1)
self.assertEqual(len(captured["comic"]), 1)
self.assertEqual(len(captured["other"]), 1)
# restore handled in tearDown
if __name__ == "__main__":
unittest.main()