Compare commits

..

27 Commits

Author SHA1 Message Date
Danilo Reyes
e985e359a7 clean exit 2026-03-02 22:29:04 -06:00
Danilo Reyes
ba42689aa9 fixing download dir 2026-03-01 17:45:58 -06:00
Danilo Reyes
2a55d92f19 bugfix 2026-03-01 17:42:46 -06:00
Danilo Reyes
949f5a94c3 gallery-clean + autocompletions 2026-03-01 17:35:05 -06:00
Danilo Reyes
899543309f download 3.0 2026-03-01 00:22:56 -06:00
Danilo Reyes
5000304a8a fixed requires-revision 2026-03-01 00:22:11 -06:00
Danilo Reyes
76e3d72643 bugfix 2026-02-28 23:59:08 -06:00
Danilo Reyes
e73b4c8083 -sa to gallery/comic 2026-02-28 23:52:48 -06:00
Danilo Reyes
3f44f710b1 fix // and other logic flaws 2026-02-28 23:47:01 -06:00
Danilo Reyes
9da87b68e9 revision logic revisited 2026-02-28 23:33:06 -06:00
Danilo Reyes
766eca4a2f enable renamed links 2026-02-28 23:19:53 -06:00
Danilo Reyes
bda8105928 fix list download admin 2026-02-28 23:01:36 -06:00
Danilo Reyes
45b78ce76a logs display 2026-02-28 22:58:56 -06:00
Danilo Reyes
88e4ac04df lowered error rate 2026-02-28 22:46:59 -06:00
Danilo Reyes
7aab65a73a fzf into download 2026-02-28 22:20:11 -06:00
Danilo Reyes
adab652feb error logic to cancel / disable link 2026-02-28 22:05:01 -06:00
Danilo Reyes
81c2df84f7 refractioning 2026-02-28 21:34:39 -06:00
Danilo Reyes
7a64034f8a tests 2026-02-28 21:25:46 -06:00
Danilo Reyes
2ccdd713ea admin import/validate 2026-02-28 21:17:46 -06:00
Danilo Reyes
da87b6f9d2 download-admin (sqlite db) init 2026-02-28 20:53:48 -06:00
Danilo Reyes
ebb27daf0c turn sort functions fully pythonic 2026-02-28 20:05:33 -06:00
Danilo Reyes
fcd898873c init reinitializing 2026-02-28 20:00:47 -06:00
Danilo Reyes
fa8f2a825b allows the use of multiple -i 2026-02-28 19:54:06 -06:00
Danilo Reyes
274edf1668 preventing instances where user eve is treated as everyone 2026-02-28 19:51:05 -06:00
Danilo Reyes
e189b619ef removed unused function 2026-02-28 19:49:32 -06:00
Danilo Reyes
c71ff53b23 init variable order, so that tests can be built. 2026-02-28 19:48:49 -06:00
Danilo Reyes
83210d4356 check if both list and link are provided 2026-02-28 19:42:27 -06:00
16 changed files with 2017 additions and 116 deletions

View File

@@ -126,5 +126,32 @@
ext = "py"; ext = "py";
handler = scriptBin; handler = scriptBin;
}; };
apps.x86_64-linux = {
download = {
type = "app";
program = "${pkgs.download}/bin/download";
};
download-admin = {
type = "app";
program = "${pkgs.download}/bin/download-admin";
};
download-tests = {
type = "app";
program = "${
pkgs.writeShellApplication {
name = "download-tests";
runtimeInputs = [
(pkgs.python3.withPackages (ps: [ ps.pyyaml ]))
];
text = ''
set -euo pipefail
export PYTHONPATH="${inputs.self}/src/download"
python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py"
'';
}
}/bin/download-tests";
};
};
}; };
} }

View File

@@ -7,11 +7,12 @@
gallery-dl, gallery-dl,
ffmpeg, ffmpeg,
webcomix, webcomix,
fzf,
... ...
}: }:
let let
pname = "download"; pname = "download";
version = "2.6"; version = "3.0";
in in
buildPythonApplication { buildPythonApplication {
inherit pname version; inherit pname version;
@@ -32,5 +33,13 @@ buildPythonApplication {
types-pyyaml types-pyyaml
yt-dlp yt-dlp
webcomix webcomix
fzf
]; ];
postInstall = ''
install -Dm644 completions/download.bash \
$out/share/bash-completion/completions/download
install -Dm644 completions/download.bash \
$out/share/bash-completion/completions/download-admin
'';
} }

104
src/download/admin.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""Administrative CLI for download link database."""
from __future__ import annotations
import argparse
from admin_links import cmd_add
from admin_links import cmd_ban
from admin_links import cmd_disable
from admin_links import cmd_enable
from admin_links import cmd_import
from admin_links import cmd_list
from admin_links import cmd_remove
from admin_links import cmd_rename
from admin_links import cmd_unban
from admin_links import cmd_validate_import
from admin_links import cmd_fix_revision
from admin_links import cmd_fix_x_media
from admin_users import cmd_user_rename
from admin_users import cmd_users
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="download-admin")
sub = parser.add_subparsers(dest="cmd", required=True)
p_add = sub.add_parser("add")
p_add.add_argument("user")
p_add.add_argument("url")
p_add.add_argument("--assume-yes", action="store_true")
p_add.set_defaults(func=cmd_add)
p_disable = sub.add_parser("disable")
p_disable.add_argument("user")
p_disable.add_argument("url", nargs="?")
p_disable.set_defaults(func=cmd_disable)
p_enable = sub.add_parser("enable")
p_enable.add_argument("user")
p_enable.add_argument("url", nargs="?")
p_enable.set_defaults(func=cmd_enable)
p_ban = sub.add_parser("ban")
p_ban.add_argument("user")
p_ban.add_argument("url", nargs="?")
p_ban.add_argument("--reason")
p_ban.set_defaults(func=cmd_ban)
p_unban = sub.add_parser("unban")
p_unban.add_argument("user")
p_unban.add_argument("url", nargs="?")
p_unban.set_defaults(func=cmd_unban)
p_remove = sub.add_parser("remove")
p_remove.add_argument("user")
p_remove.add_argument("url", nargs="?")
p_remove.set_defaults(func=cmd_remove)
p_rename = sub.add_parser("rename")
p_rename.add_argument("user")
p_rename.add_argument("old_url", nargs="?")
p_rename.add_argument("new_url", nargs="?")
p_rename.set_defaults(func=cmd_rename)
p_list = sub.add_parser("list")
p_list.add_argument("--user", action="append")
p_list.add_argument("--disabled", action="store_true")
p_list.add_argument("--banned", action="store_true")
p_list.add_argument("--requires-revision", action="store_true")
p_list.set_defaults(func=cmd_list)
p_users = sub.add_parser("users")
p_users.set_defaults(func=cmd_users)
p_import = sub.add_parser("import")
p_import.set_defaults(func=cmd_import)
p_validate = sub.add_parser("validate-import")
p_validate.set_defaults(func=cmd_validate_import)
p_fix_rev = sub.add_parser("fix-revision")
p_fix_rev.set_defaults(func=cmd_fix_revision)
p_fix_media = sub.add_parser("fix-x-media")
p_fix_media.set_defaults(func=cmd_fix_x_media)
p_user_rename = sub.add_parser("user-rename")
p_user_rename.add_argument("user")
p_user_rename.add_argument("site")
p_user_rename.add_argument("old")
p_user_rename.add_argument("new")
p_user_rename.set_defaults(func=cmd_user_rename)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

318
src/download/admin_links.py Normal file
View File

@@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""Admin CLI: link operations."""
from __future__ import annotations
import argparse
import shutil
import subprocess
from pathlib import Path
import db
from functions import load_config_variables
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def parse_list_file(path: Path) -> dict:
enabled: set[str] = set()
disabled: set[str] = set()
if not path.is_file():
return {"enabled": enabled, "disabled": disabled}
with open(path, "r", encoding="utf-8") as r_file:
for raw in r_file:
line = raw.strip()
if not line:
continue
if line.startswith("#"):
url = line.lstrip("#").strip()
if url:
disabled.add(db.normalize_url(url))
continue
enabled.add(db.normalize_url(line))
return {"enabled": enabled, "disabled": disabled}
def cmd_add(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.add_link(
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
)
if result["status"] == "removed" and not args.assume_yes:
removed_at = result.get("removed_at", "unknown")
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
result = db.add_link(
conn, args.user, args.url, assume_yes=True, source="manual"
)
row = result.get("row")
if row and row["banned_at"]:
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
if row and not row["enabled"]:
print("Warning: link is disabled")
conn.commit()
print(result["status"])
def cmd_disable(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=False),
selector_filter="disable",
)
def cmd_enable(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=True),
selector_filter="enable",
)
def cmd_ban(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_banned(
conn, user, url, banned=True, reason=args.reason
),
selector_filter="ban",
)
def cmd_unban(args: argparse.Namespace) -> None:
_apply_to_links(
args,
lambda conn, user, url: db.set_banned(conn, user, url, banned=False),
selector_filter="unban",
)
def cmd_remove(args: argparse.Namespace) -> None:
_apply_to_links(args, lambda conn, user, url: db.remove_link(conn, user, url), "any")
def cmd_rename(args: argparse.Namespace) -> None:
old_url = args.old_url
if not old_url:
selection = _select_links(args.user, multi=False, selector_filter="any")
if not selection:
print("not found")
return
old_url = selection[0]
new_url = args.new_url or input("New URL: ").strip()
with db.connect() as conn:
result = db.rename_link(conn, args.user, old_url, new_url)
if result["status"] == "renamed":
conn.commit()
print(result["status"])
def cmd_list(args: argparse.Namespace) -> None:
users = args.user or None
include_disabled = args.disabled or args.requires_revision
include_banned = args.banned or args.requires_revision
with db.connect() as conn:
rows = db.get_links(
conn,
users=users,
include_disabled=include_disabled,
include_banned=include_banned,
requires_revision_only=args.requires_revision,
)
for row in rows:
if args.disabled and row["enabled"]:
continue
if args.banned and not row["banned_at"]:
continue
status = "enabled" if row["enabled"] else "disabled"
if row["banned_at"]:
status = "banned"
print(f"{row['user_name']} [{status}] {row['url_original']}")
def cmd_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
imported_paths = []
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
result = db.import_master_list(conn, user, master)
if result["status"] == "ok":
imported_paths.append(str(master))
print(f"{user}: {result}")
if result.get("duplicates"):
print(f"{user} duplicates:")
for dup in result["duplicates"]:
print(f" {dup}")
if imported_paths:
print("Imported lists:")
for path in imported_paths:
print(f" {path}")
conn.commit()
def cmd_validate_import(_: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
for entry in configs["users"]:
user = entry["name"]
lists_dir = Path(configs["global"]["lists-dir"]) / user
master = lists_dir / "watch.txt"
list_sets = parse_list_file(master)
rows = db.get_links_by_user(conn, user)
db_enabled = set()
db_disabled = set()
for row in rows:
norm = db.normalize_url(row["url_original"])
if row["enabled"] and not row["banned_at"]:
db_enabled.add(norm)
else:
db_disabled.add(norm)
missing_enabled = list_sets["enabled"] - db_enabled
missing_disabled = list_sets["disabled"] - db_disabled
extra_enabled = db_enabled - list_sets["enabled"]
extra_disabled = db_disabled - list_sets["disabled"]
print(f"{user}:")
if missing_enabled:
print(" Missing enabled in DB:")
for url in sorted(missing_enabled):
print(f" {url}")
if missing_disabled:
print(" Missing disabled in DB:")
for url in sorted(missing_disabled):
print(f" {url}")
if extra_enabled:
print(" Extra enabled in DB:")
for url in sorted(extra_enabled):
print(f" {url}")
if extra_disabled:
print(" Extra disabled in DB:")
for url in sorted(extra_disabled):
print(f" {url}")
if not any(
[missing_enabled, missing_disabled, extra_enabled, extra_disabled]
):
print(" OK")
def cmd_fix_revision(_: argparse.Namespace) -> None:
with db.connect() as conn:
conn.execute(
"""
UPDATE links
SET requires_revision = 0
WHERE enabled = 1 OR banned_at IS NULL
"""
)
conn.commit()
print("ok")
def cmd_fix_x_media(_: argparse.Namespace) -> None:
with db.connect() as conn:
rows = conn.execute(
"""
SELECT id, user_name, url_original FROM links
WHERE url_original LIKE '%x.com/%//media%'
"""
).fetchall()
for row in rows:
fixed = row["url_original"].replace("//media", "/media")
norm = db.normalize_url(fixed)
conflict = conn.execute(
"""
SELECT id FROM links
WHERE user_name = ? AND url_normalized = ? AND id != ?
""",
(row["user_name"], norm, row["id"]),
).fetchone()
if conflict:
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
continue
conn.execute(
"""
UPDATE links
SET url_original = ?, url_normalized = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(fixed, norm, row["id"]),
)
conn.commit()
print("ok")
def _fzf_select(lines: list[str], multi: bool) -> list[str]:
if not lines:
return []
if shutil.which("fzf") is None:
print("fzf not found.")
return []
args = ["fzf"]
if multi:
args.append("--multi")
proc = subprocess.run(
args,
input="\n".join(lines),
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
return []
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
def _select_links(user: str, multi: bool, selector_filter: str) -> list[str]:
with db.connect() as conn:
rows = db.get_links(conn, users=[user], include_disabled=True, include_banned=True)
links = []
for row in rows:
enabled = bool(row["enabled"])
banned = bool(row["banned_at"])
if selector_filter == "enable" and enabled:
continue
if selector_filter == "disable" and not enabled:
continue
if selector_filter == "ban" and banned:
continue
if selector_filter == "unban" and not banned:
continue
links.append(row["url_original"])
return _fzf_select(links, multi=multi)
def _apply_to_links(args: argparse.Namespace, fn, selector_filter: str) -> None:
if args.url:
with db.connect() as conn:
ok = fn(conn, args.user, args.url)
if ok:
conn.commit()
print("ok" if ok else "not found")
return
selections = _select_links(args.user, multi=True, selector_filter=selector_filter)
if not selections:
print("not found")
return
with db.connect() as conn:
changed = 0
for url in selections:
ok = fn(conn, args.user, url)
if ok:
changed += 1
if changed:
conn.commit()
print(f"ok ({changed})")

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""Admin CLI: user operations."""
from __future__ import annotations
import argparse
import os
import shutil
from pathlib import Path
import db
from functions import load_config_variables
def list_users(configs: dict) -> None:
for entry in configs["users"]:
print(entry["name"])
def prompt_yes_no(message: str) -> bool:
while True:
raw = input(f"{message} [y/n]: ").strip().lower()
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
def merge_dirs(src: Path, dst: Path) -> None:
for root, _, files in os.walk(src):
rel = Path(root).relative_to(src)
target_dir = dst / rel
target_dir.mkdir(parents=True, exist_ok=True)
for filename in files:
src_file = Path(root) / filename
dst_file = target_dir / filename
if dst_file.exists():
print(f"Skip existing file: {dst_file}")
continue
shutil.move(str(src_file), str(dst_file))
for root, dirs, files in os.walk(src, topdown=False):
if not dirs and not files:
Path(root).rmdir()
def move_user_outputs(
configs: dict, user_name: str, old_handle: str, new_handle: str
) -> None:
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
if not user_cfg:
print(f"Unknown user: {user_name}")
return
base_dirs = [Path(user_cfg["download-dir"])]
for base in base_dirs:
old_path = base / old_handle
new_path = base / new_handle
if not old_path.exists():
print(f"Missing: {old_path}")
continue
if not new_path.exists():
old_path.rename(new_path)
continue
if not prompt_yes_no(
f"Merge contents from {old_path} into existing {new_path}?"
):
continue
merge_dirs(old_path, new_path)
def cmd_users(_: argparse.Namespace) -> None:
configs = load_config_variables()
list_users(configs)
def cmd_user_rename(args: argparse.Namespace) -> None:
configs = load_config_variables()
with db.connect(configs) as conn:
result = db.bulk_rename_handle(
conn,
user_name=args.user,
site=args.site,
old_handle=args.old,
new_handle=args.new,
)
conn.commit()
print(result)
move_user_outputs(configs, args.user, args.old, args.new)

View File

@@ -61,13 +61,16 @@ class Gallery:
for key, env_var in auth_env.items(): for key, env_var in auth_env.items():
command += ["-o", f"{key}={os.environ.get(env_var, '')}"] command += ["-o", f"{key}={os.environ.get(env_var, '')}"]
if self.link and not self.list: if self.link and self.list:
LOG.warning("Both link and list set; using link and ignoring list.")
command.append(self.link) command.append(self.link)
if self.list and not self.link: elif self.link:
command.append(self.link)
elif self.list:
command += ["-i", queue] command += ["-i", queue]
LOG.debug(command) LOG.debug(command)
self.command = command self.command = command
def run_command(self, verbose: bool): def run_command(self, verbose: bool, on_line=None, log_failure: bool = True):
run(self.command, verbose) run(self.command, verbose, on_line=on_line, log_failure=log_failure)

View File

@@ -8,6 +8,7 @@ from functions import validate_x_link
from functions import parse_link from functions import parse_link
from functions import clean_cache from functions import clean_cache
from functions import LOG from functions import LOG
import db
class User: class User:
@@ -61,6 +62,11 @@ class User:
for lst in filter(lambda x: not self.lists[x].is_file(), ["master", "push"]): for lst in filter(lambda x: not self.lists[x].is_file(), ["master", "push"]):
self.lists[lst].touch() self.lists[lst].touch()
for lst in filter(
lambda x: not self.lists[x].is_file(),
["instagram", "kemono", "main"],
):
self.lists[lst].touch()
def append_list(self, name: str, line: str) -> None: def append_list(self, name: str, line: str) -> None:
"""Appends a line into the given list""" """Appends a line into the given list"""
@@ -83,8 +89,8 @@ class User:
def list_manager(self) -> None: def list_manager(self) -> None:
"""Manage all the user list and create sub-lists""" """Manage all the user list and create sub-lists"""
self._create_directories() # Call the function to create necesary cache dirs self._create_directories() # Call the function to create necesary cache dirs
with open(self.lists["master"], "r", encoding="utf-8") as r_file: with db.connect() as conn:
master_content = list(map(lambda x: x.rstrip(), r_file)) master_content = db.get_active_links(conn, self.name)
# Create temporary list files segmented per scrapper # Create temporary list files segmented per scrapper
shuffle(master_content) shuffle(master_content)
@@ -94,12 +100,10 @@ class User:
def save_link(self, link: str) -> None: def save_link(self, link: str) -> None:
"""Checks the master list against a new link """Checks the master list against a new link
if unmatched, appends it to the end of the list""" if unmatched, appends it to the end of the list"""
with open(self.lists["master"], "r", encoding="utf-8") as r_file: with db.connect() as conn:
links = r_file.read().lower() result = db.add_link(conn, self.name, parse_link(link), assume_yes=True)
conn.commit()
if parse_link(link).lower() in links: if result["status"] == "added":
LOG.info("Gallery repeated, not saving")
return
LOG.info("New gallery, saving") LOG.info("New gallery, saving")
self.append_list("master", parse_link(link)) else:
LOG.info("Gallery repeated, not saving")

View File

@@ -0,0 +1,102 @@
# Bash completion for download and download-admin.
# Source this file or install it in your bash_completion.d directory.
__download_users() {
python3 - <<'PY' 2>/dev/null
import pathlib
try:
import yaml
except Exception:
print("")
raise SystemExit(0)
cfg = pathlib.Path("~/.config/jawz/config.yaml").expanduser()
if not cfg.is_file():
print("")
raise SystemExit(0)
data = yaml.safe_load(cfg.read_text(encoding="utf-8")) or {}
users = [u.get("name") for u in data.get("users", []) if isinstance(u, dict)]
print(" ".join([u for u in users if u]))
PY
}
_download() {
local cur prev words cword
_init_completion -n : || return
local scrappers="push main instagram kemono comic manga webcomic"
local opts="-u --user -i --input -l --list -a --no-archive -s --no_skip -v --verbose -t --type-post"
local post_types="posts reels stories highlights avatar"
if [[ "$cur" == -* ]]; then
COMPREPLY=( $(compgen -W "$opts" -- "$cur") )
return
fi
case "$prev" in
-u|--user)
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
return
;;
-t|--type-post)
COMPREPLY=( $(compgen -W "$post_types" -- "$cur") )
return
;;
-i|--input)
return
;;
esac
local have_scrapper=0
local w
for w in "${words[@]:1}"; do
[[ "$w" == -* ]] && continue
if [[ " $scrappers " == *" $w "* ]]; then
have_scrapper=1
break
fi
done
if [[ $have_scrapper -eq 0 ]]; then
COMPREPLY=( $(compgen -W "$scrappers" -- "$cur") )
fi
}
_download_admin() {
local cur prev words cword
_init_completion -n : || return
local cmds="add disable enable ban unban remove rename list users import validate-import fix-revision fix-x-media user-rename"
local list_opts="--user --disabled --banned --requires-revision"
if [[ "$cur" == -* ]]; then
if [[ "${words[1]}" == "list" ]]; then
COMPREPLY=( $(compgen -W "$list_opts" -- "$cur") )
else
COMPREPLY=()
fi
return
fi
case "$prev" in
--user)
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
return
;;
esac
if [[ $cword -eq 1 ]]; then
COMPREPLY=( $(compgen -W "$cmds" -- "$cur") )
return
fi
case "${words[1]}" in
add|disable|enable|ban|unban|remove|rename|user-rename)
if [[ $cword -eq 2 ]]; then
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
fi
;;
esac
}
complete -F _download download
complete -F _download_admin download-admin

647
src/download/db.py Normal file
View File

@@ -0,0 +1,647 @@
#!/usr/bin/env python3
"""SQLite persistence for download links."""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable
from urllib.parse import urlsplit, urlunsplit
from functions import LOG
from functions import load_config_variables
def get_db_path(configs: dict | None = None) -> Path:
"""Return the database path for links."""
cfg = configs or load_config_variables()
base = Path(cfg["global"]["databases-dir"])
return base / "links.sqlite3"
def connect(configs: dict | None = None) -> sqlite3.Connection:
"""Open a connection and ensure schema exists."""
db_path = get_db_path(configs)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
ensure_schema(conn)
return conn
def ensure_schema(conn: sqlite3.Connection) -> None:
"""Create schema if missing."""
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
url_original TEXT NOT NULL,
url_normalized TEXT NOT NULL,
site TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
keep INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
disabled_at TEXT,
disabled_reason TEXT,
banned_at TEXT,
banned_reason TEXT,
requires_revision INTEGER NOT NULL DEFAULT 0
);
CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm
ON links (user_name, url_normalized);
CREATE TABLE IF NOT EXISTS link_history (
id INTEGER PRIMARY KEY,
link_id INTEGER,
user_name TEXT NOT NULL,
event TEXT NOT NULL,
old_url TEXT,
new_url TEXT,
note TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS link_tombstones (
id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
url_normalized TEXT NOT NULL,
url_original TEXT NOT NULL,
removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm
ON link_tombstones (user_name, url_normalized);
"""
)
_ensure_column(
conn,
"links",
"requires_revision",
"ALTER TABLE links ADD COLUMN requires_revision INTEGER NOT NULL DEFAULT 0",
)
_ensure_column(
conn,
"links",
"keep",
"ALTER TABLE links ADD COLUMN keep INTEGER NOT NULL DEFAULT 0",
)
_ensure_column(
conn,
"links",
"disabled_reason",
"ALTER TABLE links ADD COLUMN disabled_reason TEXT",
)
def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None:
cols = [row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()]
if column in cols:
return
conn.execute(ddl)
def normalize_url(url: str) -> str:
"""Normalize URL for dedupe only."""
raw = url.strip()
if "://" not in raw:
raw = f"https://{raw}"
parts = urlsplit(raw)
scheme = "https"
host = (parts.hostname or "").lower()
if host.startswith("www."):
host = host[4:]
if host in ("twitter.com", "www.twitter.com"):
host = "x.com"
path = parts.path.rstrip("/")
query = parts.query
return urlunsplit((scheme, host, path, query, ""))
def get_site(url: str) -> str:
"""Return normalized host name."""
raw = url.strip()
if "://" not in raw:
raw = f"https://{raw}"
host = (urlsplit(raw).hostname or "").lower()
if host.startswith("www."):
host = host[4:]
if host in ("twitter.com", "www.twitter.com"):
host = "x.com"
return host
def add_history(
conn: sqlite3.Connection,
user_name: str,
event: str,
link_id: int | None = None,
old_url: str | None = None,
new_url: str | None = None,
note: str | None = None,
) -> None:
conn.execute(
"""
INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note)
VALUES (?, ?, ?, ?, ?, ?)
""",
(link_id, user_name, event, old_url, new_url, note),
)
def add_link(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
assume_yes: bool = False,
source: str = "manual",
) -> dict:
"""Add a link or return existing status."""
url_norm = normalize_url(url_original)
site = get_site(url_original)
row = conn.execute(
"SELECT * FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if row:
return {"status": "exists", "row": row}
tombstone = conn.execute(
"SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if tombstone and not assume_yes and source != "push":
return {"status": "removed", "removed_at": tombstone["removed_at"]}
cur = conn.execute(
"""
INSERT INTO links (user_name, url_original, url_normalized, site)
VALUES (?, ?, ?, ?)
""",
(user_name, url_original, url_norm, site),
)
if tombstone:
conn.execute(
"""
UPDATE links
SET requires_revision = 0
WHERE id = ?
""",
(cur.lastrowid,),
)
add_history(
conn,
user_name=user_name,
event="add",
link_id=cur.lastrowid,
new_url=url_original,
note=f"source={source}",
)
return {"status": "added", "id": cur.lastrowid}
def set_enabled(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
enabled: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if enabled:
conn.execute(
"""
UPDATE links
SET enabled = 1,
disabled_at = NULL,
disabled_reason = NULL,
requires_revision = 0,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"])
else:
conn.execute(
"""
UPDATE links
SET enabled = 0,
disabled_at = CURRENT_TIMESTAMP,
disabled_reason = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(reason, row["id"]),
)
add_history(
conn,
user_name,
"disable",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return True
def set_banned(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
banned: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if banned:
conn.execute(
"""
UPDATE links
SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(reason, row["id"]),
)
add_history(
conn,
user_name,
"ban",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
else:
conn.execute(
"""
UPDATE links
SET banned_at = NULL, banned_reason = NULL, requires_revision = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"])
return True
def mark_requires_revision(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
reason: str,
) -> bool:
url_norm = normalize_url(url_original)
rows = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchall()
if not rows:
return False
for row in rows:
conn.execute(
"""
UPDATE links
SET requires_revision = 1,
enabled = 0,
disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP),
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"requires_revision",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return True
def mark_requires_revision_by_norm(
conn: sqlite3.Connection, url_norm: str, reason: str
) -> int:
rows = conn.execute(
"SELECT id, user_name, url_original FROM links WHERE url_normalized = ?",
(url_norm,),
).fetchall()
if not rows:
return 0
for row in rows:
conn.execute(
"""
UPDATE links
SET requires_revision = 1,
enabled = 0,
disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP),
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
row["user_name"],
"requires_revision",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return len(rows)
def rename_link(
conn: sqlite3.Connection,
user_name: str,
old_url: str,
new_url: str,
) -> dict:
old_norm = normalize_url(old_url)
new_norm = normalize_url(new_url)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, old_norm),
).fetchone()
if not row:
return {"status": "missing"}
conflict = conn.execute(
"SELECT id FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, new_norm),
).fetchone()
if conflict and conflict["id"] != row["id"]:
return {"status": "conflict"}
conn.execute(
"""
UPDATE links
SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(new_url, new_norm, get_site(new_url), row["id"]),
)
conn.execute(
"""
UPDATE links
SET enabled = 1, disabled_at = NULL, requires_revision = 0
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"rename",
link_id=row["id"],
old_url=row["url_original"],
new_url=new_url,
)
return {"status": "renamed"}
def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
conn.execute(
"""
INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original)
VALUES (?, ?, ?)
""",
(user_name, url_norm, row["url_original"]),
)
add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"])
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
return True
def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]:
rows = conn.execute(
"""
SELECT url_original FROM links
WHERE user_name = ?
AND enabled = 1
AND banned_at IS NULL
ORDER BY id ASC
""",
(user_name,),
).fetchall()
return [row["url_original"] for row in rows]
def get_links(
conn: sqlite3.Connection,
users: Iterable[str] | None = None,
include_disabled: bool = False,
include_banned: bool = False,
requires_revision_only: bool = False,
) -> list[sqlite3.Row]:
params: list = []
where = []
user_list = list(users) if users else []
if user_list:
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
params.extend(user_list)
if not include_disabled:
where.append("enabled = 1")
if not include_banned:
where.append("banned_at IS NULL")
if requires_revision_only:
where.append("requires_revision = 1")
clause = " AND ".join(where)
if clause:
clause = "WHERE " + clause
return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall()
def get_links_for_cleaning(
conn: sqlite3.Connection,
users: Iterable[str] | None = None,
) -> list[sqlite3.Row]:
params: list = []
where = [
"site = ?",
"enabled = 1",
"banned_at IS NULL",
"keep = 0",
]
params.append("x.com")
user_list = list(users) if users else []
if user_list:
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
params.extend(user_list)
clause = " AND ".join(where)
return conn.execute(
f"SELECT * FROM links WHERE {clause} ORDER BY user_name, id",
params,
).fetchall()
def set_keep(
conn: sqlite3.Connection,
user_name: str,
url_original: str,
keep: bool,
reason: str | None = None,
) -> bool:
url_norm = normalize_url(url_original)
row = conn.execute(
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
(user_name, url_norm),
).fetchone()
if not row:
return False
if keep:
conn.execute(
"""
UPDATE links
SET keep = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"keep",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
else:
conn.execute(
"""
UPDATE links
SET keep = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(row["id"],),
)
add_history(
conn,
user_name,
"unkeep",
link_id=row["id"],
old_url=row["url_original"],
note=reason,
)
return True
def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]:
return conn.execute(
"SELECT * FROM links WHERE user_name = ? ORDER BY id",
(user_name,),
).fetchall()
def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict:
if not path.is_file():
return {"status": "missing", "path": str(path)}
with open(path, "r", encoding="utf-8") as r_file:
lines = [ln.strip() for ln in r_file if ln.strip()]
added = 0
exists = 0
removed = 0
duplicates: list[str] = []
for line in lines:
disabled = False
raw = line
if raw.startswith("#"):
disabled = True
raw = raw.lstrip("#").strip()
if not raw:
continue
result = add_link(conn, user_name, raw, assume_yes=True, source="import")
if result["status"] == "added":
added += 1
if disabled:
set_enabled(conn, user_name, raw, enabled=False)
elif result["status"] == "exists":
exists += 1
duplicates.append(raw)
elif result["status"] == "removed":
removed += 1
return {
"status": "ok",
"added": added,
"exists": exists,
"removed": removed,
"duplicates": duplicates,
}
def bulk_rename_handle(
conn: sqlite3.Connection,
user_name: str,
site: str,
old_handle: str,
new_handle: str,
) -> dict:
"""Rename account handle within a site for a user."""
site_norm = site.lower().lstrip("www.")
if site_norm == "twitter.com":
site_norm = "x.com"
if site_norm == "www.twitter.com":
site_norm = "x.com"
rows = conn.execute(
"""
SELECT id, url_original FROM links
WHERE user_name = ? AND site = ?
""",
(user_name, site_norm),
).fetchall()
updated = 0
skipped = 0
conflicts = 0
for row in rows:
raw = row["url_original"]
parts = urlsplit(raw if "://" in raw else f"https://{raw}")
path = parts.path
segments = path.split("/")
if len(segments) < 2 or segments[1] != old_handle:
skipped += 1
continue
segments[1] = new_handle
new_path = "/".join(segments)
new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))
result = rename_link(conn, user_name, raw, new_url)
if result["status"] == "renamed":
updated += 1
elif result["status"] == "conflict":
conflicts += 1
else:
skipped += 1
return {"updated": updated, "skipped": skipped, "conflicts": conflicts}
def warn(msg: str) -> None:
LOG.warning(msg)

View File

@@ -11,7 +11,9 @@ Also following in line more posix and python rules.
import re import re
from pathlib import Path from pathlib import Path
import argparse
import yaml import yaml
import db
from typing import Dict from typing import Dict
from functions import LOG from functions import LOG
from functions import run from functions import run
@@ -23,12 +25,35 @@ from classes.user import User
from classes.gallery import Gallery from classes.gallery import Gallery
# GLOBAL VARIABLE SECTION # GLOBAL VARIABLE SECTION
CONFIGS = load_config_variables() CONFIGS = None
# Enable a default "everyone" flag for when running stuff like download gallery # Enable a default "everyone" flag for when running stuff like download gallery
USERS = []
ARGS = None
def init_globals() -> None:
"""Initialize global config and CLI args."""
global CONFIGS, USERS, ARGS
if CONFIGS is None:
CONFIGS = load_config_variables()
USERS = ["everyone"] + [user["name"] for user in CONFIGS["users"]] USERS = ["everyone"] + [user["name"] for user in CONFIGS["users"]]
ARGS = argparser(USERS) ARGS = argparser(USERS)
def get_args() -> argparse.Namespace:
"""Return initialized CLI args."""
init_globals()
assert ARGS is not None
return ARGS
def get_configs() -> dict:
"""Return initialized config."""
init_globals()
assert CONFIGS is not None
return CONFIGS
class Video: class Video:
"""Just a simple class to unify the Video parameters into a single one.""" """Just a simple class to unify the Video parameters into a single one."""
@@ -41,29 +66,146 @@ class Video:
def get_index(name: str) -> int: def get_index(name: str) -> int:
"""Find the index in the config file""" """Find the index in the config file"""
return next((i for i, d in enumerate(CONFIGS["users"]) if d["name"] == name), -1) configs = get_configs()
return next((i for i, d in enumerate(configs["users"]) if d["name"] == name), -1)
def parse_gallery(gdl_list: str, user: User) -> None: def parse_gallery(gdl_list: str, user: User) -> None:
"""Processes the gallery-dl command based on the selected gallery""" """Processes the gallery-dl command based on the selected gallery"""
args = get_args()
list_path = user.lists[gdl_list]
if not list_path.is_file():
LOG.warning("List file missing: %s", list_path)
return
with open(list_path, "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file))
for link in filter(None, links):
gallery = Gallery() gallery = Gallery()
gallery.archive = ARGS.flag_archive gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else "" gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.dest = "download" gallery.dest = "download"
gallery.list = gdl_list gallery.link = link
gallery.opt_args = parse_instagram(gdl_list) gallery.opt_args = parse_instagram(link)
gallery.generate_command(user) gallery.generate_command(user)
gallery.run_command(ARGS.flag_verbose) handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
def parse_instagram(link: str) -> list[str]: def parse_instagram(link: str, post_type: list[str] | str | None = None) -> list[str]:
"""Fix instagram links""" """Fix instagram links"""
args = get_args()
if "instagram" not in link: if "instagram" not in link:
return [] return []
if isinstance(ARGS.post_type, list): use_type = args.post_type if post_type is None else post_type
return ["-o", f"include={','.join(ARGS.post_type)}"] if isinstance(use_type, list):
return ["-o", f"include={ARGS.post_type}"] return ["-o", f"include={','.join(use_type)}"]
return ["-o", f"include={use_type}"]
REVISION_ERRORS = {
"NotFoundError: Requested user could not be found",
"Unable to retrieve Tweets from this timeline",
"No results for",
}
TRANSIENT_ERRORS = {
"User input required (password)",
"429",
"rate limit",
"timed out",
"timeout",
"Network",
"connection",
}
def _make_gallery_error_handler(link: str):
norm = db.normalize_url(link)
def handle(line: str) -> None:
if "[error]" in line:
reason = line.split("[error]", 1)[1].strip()
LOG.warning("Error for %s: %s", link, reason)
if reason in REVISION_ERRORS:
with db.connect() as conn:
db.mark_requires_revision_by_norm(conn, norm, reason)
conn.commit()
LOG.warning("Marked requires_revision for %s", link)
if any(tok in reason for tok in TRANSIENT_ERRORS):
LOG.warning("Transient error for %s: %s", link, reason)
return
if "No results for" in line:
with db.connect() as conn:
db.mark_requires_revision_by_norm(conn, norm, "No results for")
conn.commit()
LOG.warning("Marked requires_revision for %s", link)
return
return handle
def _comic_skip_arg(link: str, flag_skip: bool) -> str:
if not flag_skip:
return ""
if re.search(r"readcomiconline", link):
return " --chapter-range 1"
if re.search(r"manganato|mangahere|webtoons", link):
return " --chapter-range 1-5"
return ""
def _handle_gallery_link(user: User, link: str, args, conn) -> None:
add_res = db.add_link(conn, user.name, parse_link(link), assume_yes=True, source="push")
row = add_res.get("row")
if row and row["banned_at"]:
LOG.warning("Link is banned, skipping: %s", link)
return
if row and not row["enabled"]:
LOG.warning("Link is disabled, skipping: %s", link)
return
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
def _handle_comic_link(link: str, args) -> None:
gallery = Gallery()
gallery.archive = args.flag_archive
gallery.skip_arg = _comic_skip_arg(link, args.flag_skip)
gallery.link = link
gallery.generate_command(is_comic=True)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
save_comic(link)
def _handle_video_link(user: User, link: str, args) -> None:
video = Video()
video.use_archive = args.flag_archive
video.link = link
video.dest = str(user.directories["media"])
video.database = str(user.dbs["media"])
run(video_command(video), args.flag_verbose)
def _handle_other_link(user: User, link: str, args) -> None:
LOG.info("Other type of download %s", link)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
handler = _make_gallery_error_handler(link)
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
def video_command(video: Video): def video_command(video: Video):
@@ -110,17 +252,19 @@ def video_command(video: Video):
def comic_manager(skip_arg: str, category: str) -> None: def comic_manager(skip_arg: str, category: str) -> None:
"""Process the information to download manga""" """Process the information to download manga"""
args = get_args()
configs = get_configs()
re_cat = "manga|webtoon" if category == "manga" else "readcomiconline" re_cat = "manga|webtoon" if category == "manga" else "readcomiconline"
with open(CONFIGS["comic"]["comic-list"], "r", encoding="utf-8") as r_file: with open(configs["comic"]["comic-list"], "r", encoding="utf-8") as r_file:
links = list(filter(lambda x: re.search(re_cat, x), r_file)) links = list(filter(lambda x: re.search(re_cat, x), r_file))
for link in links: for link in links:
gallery = Gallery() gallery = Gallery()
gallery.archive = ARGS.flag_archive gallery.archive = args.flag_archive
gallery.skip_arg = skip_arg gallery.skip_arg = skip_arg
gallery.link = link gallery.link = link
gallery.generate_command(is_comic=True) gallery.generate_command(is_comic=True)
gallery.run_command(ARGS.flag_verbose) gallery.run_command(args.flag_verbose)
def print_webcomics(webcomics: Dict[str, Dict]) -> int: def print_webcomics(webcomics: Dict[str, Dict]) -> int:
@@ -142,7 +286,9 @@ def print_webcomics(webcomics: Dict[str, Dict]) -> int:
def webcomic_manager(): def webcomic_manager():
"""Process the information to download webcomics""" """Process the information to download webcomics"""
with open(CONFIGS["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file: args = get_args()
configs = get_configs()
with open(configs["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file:
webcomics = yaml.safe_load(r_file) webcomics = yaml.safe_load(r_file)
usr_input = print_webcomics(webcomics) usr_input = print_webcomics(webcomics)
@@ -169,12 +315,13 @@ def webcomic_manager():
"--cbz", "--cbz",
] ]
run(command, ARGS.flag_verbose, cwd=Path(dest)) run(command, args.flag_verbose, cwd=Path(dest))
def save_comic(link: str) -> None: def save_comic(link: str) -> None:
"""Add comic/manga link to the list""" """Add comic/manga link to the list"""
list_comic = CONFIGS["comic"]["comic-list"] configs = get_configs()
list_comic = configs["comic"]["comic-list"]
with open(list_comic, "r", encoding="utf-8") as r_file: with open(list_comic, "r", encoding="utf-8") as r_file:
links = r_file.read().lower() links = r_file.read().lower()
if parse_link(link).lower() in links: if parse_link(link).lower() in links:
@@ -186,8 +333,9 @@ def save_comic(link: str) -> None:
w_file.write(link + "\n") w_file.write(link + "\n")
def push_manager(user: User): def push_manager(user: User, links: list[str] | None = None) -> None:
"""Filters out the URL to use the appropiate downloader""" """Filters out the URL to use the appropiate downloader"""
args = get_args()
# Creates an array which will store any links that should use youtube-dl # Creates an array which will store any links that should use youtube-dl
rgx_gallery = re.compile( rgx_gallery = re.compile(
r"(x\.com\/\w+((?=.*media)|(?!.*status)))" r"(x\.com\/\w+((?=.*media)|(?!.*status)))"
@@ -216,6 +364,7 @@ def push_manager(user: User):
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate") rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato") rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
if links is None:
with open(user.lists["push"], "r", encoding="utf-8") as r_file: with open(user.lists["push"], "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file)) links = list(map(lambda x: x.rstrip(), r_file))
links_galleries = filter(rgx_gallery.search, links) links_galleries = filter(rgx_gallery.search, links)
@@ -228,50 +377,20 @@ def push_manager(user: User):
links, links,
) )
with db.connect() as conn:
for link in links_galleries: for link in links_galleries:
gallery = Gallery() _handle_gallery_link(user, link, args, conn)
gallery.archive = ARGS.flag_archive
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else "" conn.commit()
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
gallery.run_command(ARGS.flag_verbose)
user.save_link(link)
for link in links_comics: for link in links_comics:
if ARGS.flag_skip and re.search(r"readcomiconline", link): _handle_comic_link(link, args)
skip_arg = " --chapter-range 1"
elif ARGS.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
skip_arg = " --chapter-range 1-5"
else:
skip_arg = ""
gallery = Gallery()
gallery.archive = ARGS.flag_archive
gallery.skip_arg = skip_arg
gallery.link = link
gallery.generate_command(is_comic=True)
gallery.run_command(ARGS.flag_verbose)
save_comic(link)
for link in links_videos: for link in links_videos:
video = Video() _handle_video_link(user, link, args)
video.use_archive = ARGS.flag_archive
video.link = link
video.dest = str(user.directories["media"])
video.database = str(user.dbs["media"])
run(video_command(video), ARGS.flag_verbose)
for link in links_other: for link in links_other:
LOG.info("Other type of download %s", link) _handle_other_link(user, link, args)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
gallery.run_command(ARGS.flag_verbose)
# Flush the push list, cleans all the contents # Flush the push list, cleans all the contents
with open(user.lists["push"], "w", encoding="utf-8") as w_file: with open(user.lists["push"], "w", encoding="utf-8") as w_file:
@@ -280,43 +399,44 @@ def push_manager(user: User):
def scrapper_manager(user: User) -> None: def scrapper_manager(user: User) -> None:
"""Analyze the user arguments and call in functions""" """Analyze the user arguments and call in functions"""
args = get_args()
user.list_manager() user.list_manager()
if re.search(r"main|instagram|kemono", ARGS.scrapper): if re.search(r"main|instagram|kemono", args.scrapper):
skip_arg = "" if ARGS.flag_skip else " -o skip=true" parse_gallery(args.scrapper, user)
parse_gallery(ARGS.scrapper, user) elif args.scrapper == "push":
elif ARGS.scrapper == "push":
push_manager(user) push_manager(user)
elif re.search("^comic|manga", ARGS.scrapper): elif re.search("^comic|manga", args.scrapper):
skip_arg = " --chapter-range 1" if ARGS.flag_skip else "" skip_arg = " --chapter-range 1" if args.flag_skip else ""
skip_arg += "-5" if ARGS.scrapper == "manga" else "" skip_arg += "-5" if args.scrapper == "manga" else ""
comic_manager(skip_arg, ARGS.scrapper) comic_manager(skip_arg, args.scrapper)
elif re.search("webcomic", ARGS.scrapper): elif re.search("webcomic", args.scrapper):
webcomic_manager() webcomic_manager()
def scrap_everyone() -> None: def scrap_everyone() -> None:
"""Iterates over every user of my scrapper""" """Iterates over every user of my scrapper"""
for current_user in CONFIGS["users"]: args = get_args()
configs = get_configs()
for current_user in configs["users"]:
user = User(get_index(current_user["name"])) user = User(get_index(current_user["name"]))
LOG.info("Scrapping %s for %s", ARGS.scrapper, current_user["name"]) LOG.info("Scrapping %s for %s", args.scrapper, current_user["name"])
scrapper_manager(user) scrapper_manager(user)
def main(): def main():
"""Main module to decide what to do based on the parsed arguments""" """Main module to decide what to do based on the parsed arguments"""
if ARGS.scrapper: args = get_args()
if args.scrapper:
rgx_shared = re.compile("push|main|instagram|kemono") rgx_shared = re.compile("push|main|instagram|kemono")
if (ARGS.user in "everyone") and (rgx_shared.search(ARGS.scrapper)): if (args.user == "everyone") and (rgx_shared.search(args.scrapper)):
scrap_everyone() scrap_everyone()
else: else:
scrapper_manager(User(get_index(ARGS.user))) scrapper_manager(User(get_index(args.user)))
elif ARGS.link: elif args.link:
is_admin = re.search(r"everyone|jawz", ARGS.user) is_admin = args.user in ("everyone", "jawz")
user = User(get_index("jawz" if is_admin else ARGS.user)) user = User(get_index("jawz" if is_admin else args.user))
for arg_link in ARGS.link[0]: links = [parse_link(lnk) for grp in args.link for lnk in grp]
user.append_list("push", parse_link(arg_link)) push_manager(user, links=links)
push_manager(user)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -9,6 +9,7 @@ import logging
import shlex import shlex
import subprocess import subprocess
import shutil import shutil
import random
from typing import Sequence from typing import Sequence
from pathlib import Path from pathlib import Path
import yaml import yaml
@@ -31,7 +32,7 @@ def validate_x_link(line: str) -> str:
if re.search(r"\/media$", line): if re.search(r"\/media$", line):
return line return line
# if does not contain /media at the end then add /media # if does not contain /media at the end then add /media
return f"{line}/media" return f"{line.rstrip('/')}/media"
def parse_link(link: str) -> str: def parse_link(link: str) -> str:
@@ -65,6 +66,8 @@ def run(
verbose: bool, verbose: bool,
cwd: Path | None = None, cwd: Path | None = None,
check: bool = False, check: bool = False,
on_line=None,
log_failure: bool = True,
) -> None: ) -> None:
"""Run command in a subprocess""" """Run command in a subprocess"""
# pylint: disable=subprocess-run-check # pylint: disable=subprocess-run-check
@@ -82,9 +85,28 @@ def run(
else: else:
args = list(command) args = list(command)
if on_line is None:
result = subprocess.run(args, check=check, cwd=cwd) result = subprocess.run(args, check=check, cwd=cwd)
if not check and result.returncode != 0: if log_failure and not check and result.returncode != 0:
LOG.warning("Command failed (%s): %s", result.returncode, args) LOG.warning("Command failed (%s): %s", result.returncode, args)
return
proc = subprocess.Popen(
args,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
assert proc.stdout is not None
for line in proc.stdout:
print(line, end="")
on_line(line)
returncode = proc.wait()
if check and returncode != 0:
raise subprocess.CalledProcessError(returncode, args)
if log_failure and not check and returncode != 0:
LOG.warning("Command failed (%s): %s", returncode, args)
def list_lines(i: int, line: str) -> str: def list_lines(i: int, line: str) -> str:
@@ -92,28 +114,41 @@ def list_lines(i: int, line: str) -> str:
return f"{i}) {line}" return f"{i}) {line}"
def quote(line: str) -> str:
"""Quote the line"""
return f'"{line}"'
def sort_txt_file(file_path: Path): def sort_txt_file(file_path: Path):
"""Sort every line alphabetically """Sort every line alphabetically
remove duplicated and empty lines""" remove duplicated and empty lines"""
file = str(file_path.resolve()) path = file_path.resolve()
run(["sort", "-u", file, "-o", file], VERBOSE_G) with open(path, "r", encoding="utf-8") as open_file:
run(["sed", "-i", "/^$/d", file], VERBOSE_G) lines = [ln.strip() for ln in open_file]
run(["sed", "-i", "-e", "s,http:,https:,", file], VERBOSE_G)
# fix this using strip on python normalized = []
# line.strip("/") for ln in lines:
run(["sed", "-i", "-e", "s,/$,,", file], VERBOSE_G) # trailing / if not ln:
continue
ln = ln.replace("http://", "https://")
ln = ln.rstrip("/")
normalized.append(ln)
unique_sorted = sorted(set(normalized))
with open(path, "w", encoding="utf-8") as open_file:
open_file.write("\n".join(unique_sorted))
if unique_sorted:
open_file.write("\n")
def randomize_txt_file(file_path: Path): def randomize_txt_file(file_path: Path):
"""Randomize the order of the """Randomize the order of the
lines of the txt file""" lines of the txt file"""
file = str(file_path.resolve()) path = file_path.resolve()
run(["sort", "-R", file, "-o", file], VERBOSE_G) with open(path, "r", encoding="utf-8") as open_file:
lines = [ln.rstrip("\n") for ln in open_file]
random.shuffle(lines)
with open(path, "w", encoding="utf-8") as open_file:
open_file.write("\n".join(lines))
if lines:
open_file.write("\n")
def parse_list(file): def parse_list(file):

View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""Interactive cleaner for x.com galleries."""
from __future__ import annotations
import argparse
import shutil
import subprocess
from pathlib import Path
from urllib.parse import urlsplit
import db
from classes.user import User
from functions import load_config_variables
def _extract_handle(url: str) -> str | None:
parts = urlsplit(url if "://" in url else f"https://{url}")
segments = [seg for seg in parts.path.split("/") if seg]
if not segments:
return None
return segments[0]
def _resolve_folder(user: User, handle: str | None) -> Path | None:
base = user.directories.get("download")
if base is None:
return None
if not base.exists():
return None
if not handle:
return base
candidates = [
base / handle,
]
for cand in candidates:
if cand.exists():
return cand
return None
def _open_folder(path: Path) -> None:
if shutil.which("xdg-open") is None:
print("xdg-open not found; skipping folder open.")
return
subprocess.run(["xdg-open", str(path)], check=False)
def _prompt() -> str:
return input("Keep? [y] keep / [n] disable / [s] skip / [q] quit: ").strip().lower()
def _build_user_index(configs: dict) -> dict[str, int]:
return {entry["name"]: idx for idx, entry in enumerate(configs["users"])}
def _validate_users(user_index: dict[str, int], users: list[str] | None) -> bool:
if not users:
return True
unknown = [u for u in users if u not in user_index]
if not unknown:
return True
print(f"Unknown users: {', '.join(unknown)}")
return False
def _print_context(user_name: str, url: str, handle: str | None, folder: Path | None) -> None:
print(f"\nUser: {user_name}")
print(f"URL: {url}")
if handle:
print(f"Handle: {handle}")
if folder:
print(f"Folder: {folder}")
_open_folder(folder)
return
print("Folder: <unknown>")
def _apply_choice(
conn,
user_name: str,
url: str,
choice: str,
reason: str,
) -> bool | None:
if choice in ("y", "yes"):
ok = db.set_keep(conn, user_name, url, keep=True, reason=reason)
if ok:
conn.commit()
return True
if choice in ("n", "no"):
ok = db.set_enabled(conn, user_name, url, enabled=False, reason=reason)
if ok:
conn.commit()
return True
if choice in ("s", "skip", ""):
return True
if choice in ("q", "quit"):
return None
print("Please enter y, n, s, or q.")
return False
def main() -> None:
parser = argparse.ArgumentParser(prog="gallery-clean")
parser.add_argument(
"session",
nargs="?",
type=int,
default=10,
help="Number of links to review this session (default: 10)",
)
parser.add_argument(
"--reason",
default="gallery-clean",
help="Reason stored when disabling or keeping",
)
args = parser.parse_args()
configs = load_config_variables()
user_index = _build_user_index(configs)
users_filter = ["jawz"]
if not _validate_users(user_index, users_filter):
return
with db.connect(configs) as conn:
rows = db.get_links_for_cleaning(conn, users=users_filter)
for row in rows[: max(args.session, 0)]:
user_name = row["user_name"]
url = row["url_original"]
handle = _extract_handle(url)
folder = _resolve_folder(User(user_index[user_name]), handle)
_print_context(user_name, url, handle, folder)
while True:
result = _apply_choice(conn, user_name, url, _prompt(), args.reason)
if result is None:
return
if result:
break
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""fzf-based selectors for comic and gallery links."""
from __future__ import annotations
import argparse
import re
import subprocess
import db
USER = "jawz"
RGX_COMIC = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
def _select_links(urls: list[str]) -> list[str]:
if not urls:
return []
proc = subprocess.run(
["fzf", "--multi", "--exact", "-i"],
input="\n".join(urls),
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
return []
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
def _run_download(selected: list[str], extra_args: list[str]) -> None:
if not selected:
return
subprocess.run(["download", "-u", USER, *extra_args, "-i", *selected], check=False)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("-s", "--no_skip", dest="flag_skip", action="store_false")
parser.add_argument("-a", "--no-archive", dest="flag_archive", action="store_false")
return parser.parse_args()
def _extra_args_from_flags(args: argparse.Namespace) -> list[str]:
extra = []
if args.flag_skip is False:
extra.append("-s")
if args.flag_archive is False:
extra.append("-a")
return extra
def comic_main() -> None:
args = _parse_args()
extra_args = _extra_args_from_flags(args)
with db.connect() as conn:
rows = db.get_links(conn, users=[USER], include_disabled=False, include_banned=False)
urls = [row["url_original"] for row in rows if RGX_COMIC.search(row["url_original"])]
_run_download(_select_links(urls), extra_args)
def gallery_main() -> None:
args = _parse_args()
extra_args = _extra_args_from_flags(args)
with db.connect() as conn:
rows = db.get_links(conn, users=[USER], include_disabled=False, include_banned=False)
urls = [row["url_original"] for row in rows if not RGX_COMIC.search(row["url_original"])]
_run_download(_select_links(urls), extra_args)
if __name__ == "__main__":
gallery_main()

View File

@@ -6,10 +6,19 @@ py_modules =
download download
functions functions
argparser argparser
db
admin
admin_links
admin_users
select_links
gallery_clean
classes.gallery classes.gallery
classes.user classes.user
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
download = download:main download = download:main
download-admin = admin:main
comic = select_links:comic_main
gallery = select_links:gallery_main
gallery-clean = gallery_clean:main

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
import tempfile
import unittest
import sqlite3
from pathlib import Path
import db
class TestDB(unittest.TestCase):
def setUp(self) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.row_factory = sqlite3.Row
db.ensure_schema(self.conn)
def tearDown(self) -> None:
self.conn.close()
def test_normalize_url(self):
self.assertEqual(
db.normalize_url("http://Twitter.com/User/"),
"https://x.com/User",
)
self.assertEqual(
db.normalize_url("x.com/SomeUser/media/"),
"https://x.com/SomeUser/media",
)
def test_add_link_dedupe(self):
res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res1["status"], "added")
self.assertEqual(res2["status"], "exists")
def test_remove_tombstone(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.remove_link(self.conn, "jawz", "https://x.com/Test")
self.assertTrue(ok)
res = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res["status"], "removed")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True)
self.assertEqual(res2["status"], "added")
def test_disable_and_ban(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False)
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad")
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
def test_import_master_list(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "watch.txt"
path.write_text(
"\n".join(
[
"https://x.com/User",
"# https://x.com/DisabledUser",
"https://x.com/User",
]
)
+ "\n",
encoding="utf-8",
)
result = db.import_master_list(self.conn, "jawz", path)
self.assertEqual(result["added"], 2)
self.assertEqual(result["exists"], 1)
rows = db.get_links_by_user(self.conn, "jawz")
by_norm = {db.normalize_url(r["url_original"]): r for r in rows}
self.assertTrue(by_norm["https://x.com/User"]["enabled"])
self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python3
import types
import unittest
from pathlib import Path
import tempfile
import download
class DummyArgs:
def __init__(self):
self.post_type = ["posts", "reels"]
self.flag_archive = True
self.flag_skip = True
self.flag_verbose = True
class DummyUser:
def __init__(self):
self.name = "jawz"
self.sleep = 0
self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")}
self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")}
self.lists = {"push": Path("/tmp/instant.txt")}
class TestDownload(unittest.TestCase):
def setUp(self) -> None:
download.ARGS = DummyArgs()
download.CONFIGS = {
"users": [{"name": "jawz"}],
"global": {},
"comic": {"comic-list": "/tmp/comic.txt"},
}
self.orig_gallery = download.Gallery
self.orig_video_command = download.video_command
self.orig_run = download.run
self.orig_db_connect = download.db.connect
self.orig_db_add_link = download.db.add_link
self.orig_save_comic = download.save_comic
self.orig_make_handler = download._make_gallery_error_handler
def tearDown(self) -> None:
download.Gallery = self.orig_gallery
download.video_command = self.orig_video_command
download.run = self.orig_run
download.db.connect = self.orig_db_connect
download.db.add_link = self.orig_db_add_link
download.save_comic = self.orig_save_comic
download._make_gallery_error_handler = self.orig_make_handler
def test_parse_instagram(self):
res = download.parse_instagram("https://instagram.com/user")
self.assertEqual(res, ["-o", "include=posts,reels"])
res2 = download.parse_instagram("https://x.com/user")
self.assertEqual(res2, [])
def test_video_command(self):
v = download.Video()
v.link = "https://youtu.be/abc"
v.dest = "/tmp"
cmd = download.video_command(v)
self.assertIn("yt-dlp", cmd[0])
self.assertIn("https://youtu.be/abc", cmd)
v2 = download.Video()
v2.link = "https://music.youtube.com/watch?v=xyz"
v2.dest = "/tmp"
v2.use_archive = False
cmd2 = download.video_command(v2)
self.assertIn("--audio-format", cmd2)
def test_push_manager_routing(self):
user = DummyUser()
captured = {"gallery": [], "video": [], "comic": [], "other": []}
def fake_generate(self, *args, **kwargs):
return None
def fake_run(self, *args, **kwargs):
link = getattr(self, "link", "")
if "mangadex" in link:
captured["comic"].append(link)
elif "x.com" in link:
captured["gallery"].append(link)
else:
captured["other"].append(link)
def fake_video_command(video):
captured["video"].append(video.link)
return ["echo", "ok"]
# Patch Gallery methods and video_command/run
class FakeGallery(self.orig_gallery):
def generate_command(self, *args, **kwargs):
return fake_generate(self, *args, **kwargs)
def run_command(self, *args, **kwargs):
return fake_run(self, *args, **kwargs)
download.Gallery = FakeGallery
download.video_command = fake_video_command
download.run = lambda *args, **kwargs: None
download.save_comic = lambda *_args, **_kwargs: None
download._make_gallery_error_handler = lambda *_args, **_kwargs: None
links = [
"https://x.com/someuser",
"https://youtu.be/abc",
"https://mangadex.org/title/123",
"https://example.com/page",
]
# Disable DB write path for this test
class FakeConn:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def commit(self):
return None
download.db.connect = lambda *a, **k: FakeConn()
download.db.add_link = lambda *a, **k: {"status": "added"}
download.push_manager(user, links=links)
self.assertEqual(len(captured["gallery"]), 1)
self.assertEqual(len(captured["video"]), 1)
self.assertEqual(len(captured["comic"]), 1)
self.assertEqual(len(captured["other"]), 1)
# restore handled in tearDown
if __name__ == "__main__":
unittest.main()