Compare commits
27 Commits
5ad8fc0dc8
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e985e359a7 | ||
|
|
ba42689aa9 | ||
|
|
2a55d92f19 | ||
|
|
949f5a94c3 | ||
|
|
899543309f | ||
|
|
5000304a8a | ||
|
|
76e3d72643 | ||
|
|
e73b4c8083 | ||
|
|
3f44f710b1 | ||
|
|
9da87b68e9 | ||
|
|
766eca4a2f | ||
|
|
bda8105928 | ||
|
|
45b78ce76a | ||
|
|
88e4ac04df | ||
|
|
7aab65a73a | ||
|
|
adab652feb | ||
|
|
81c2df84f7 | ||
|
|
7a64034f8a | ||
|
|
2ccdd713ea | ||
|
|
da87b6f9d2 | ||
|
|
ebb27daf0c | ||
|
|
fcd898873c | ||
|
|
fa8f2a825b | ||
|
|
274edf1668 | ||
|
|
e189b619ef | ||
|
|
c71ff53b23 | ||
|
|
83210d4356 |
27
flake.nix
27
flake.nix
@@ -126,5 +126,32 @@
|
||||
ext = "py";
|
||||
handler = scriptBin;
|
||||
};
|
||||
|
||||
apps.x86_64-linux = {
|
||||
download = {
|
||||
type = "app";
|
||||
program = "${pkgs.download}/bin/download";
|
||||
};
|
||||
download-admin = {
|
||||
type = "app";
|
||||
program = "${pkgs.download}/bin/download-admin";
|
||||
};
|
||||
download-tests = {
|
||||
type = "app";
|
||||
program = "${
|
||||
pkgs.writeShellApplication {
|
||||
name = "download-tests";
|
||||
runtimeInputs = [
|
||||
(pkgs.python3.withPackages (ps: [ ps.pyyaml ]))
|
||||
];
|
||||
text = ''
|
||||
set -euo pipefail
|
||||
export PYTHONPATH="${inputs.self}/src/download"
|
||||
python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py"
|
||||
'';
|
||||
}
|
||||
}/bin/download-tests";
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
@@ -7,11 +7,12 @@
|
||||
gallery-dl,
|
||||
ffmpeg,
|
||||
webcomix,
|
||||
fzf,
|
||||
...
|
||||
}:
|
||||
let
|
||||
pname = "download";
|
||||
version = "2.6";
|
||||
version = "3.0";
|
||||
in
|
||||
buildPythonApplication {
|
||||
inherit pname version;
|
||||
@@ -32,5 +33,13 @@ buildPythonApplication {
|
||||
types-pyyaml
|
||||
yt-dlp
|
||||
webcomix
|
||||
fzf
|
||||
];
|
||||
|
||||
postInstall = ''
|
||||
install -Dm644 completions/download.bash \
|
||||
$out/share/bash-completion/completions/download
|
||||
install -Dm644 completions/download.bash \
|
||||
$out/share/bash-completion/completions/download-admin
|
||||
'';
|
||||
}
|
||||
|
||||
104
src/download/admin.py
Normal file
104
src/download/admin.py
Normal file
@@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Administrative CLI for download link database."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
|
||||
from admin_links import cmd_add
|
||||
from admin_links import cmd_ban
|
||||
from admin_links import cmd_disable
|
||||
from admin_links import cmd_enable
|
||||
from admin_links import cmd_import
|
||||
from admin_links import cmd_list
|
||||
from admin_links import cmd_remove
|
||||
from admin_links import cmd_rename
|
||||
from admin_links import cmd_unban
|
||||
from admin_links import cmd_validate_import
|
||||
from admin_links import cmd_fix_revision
|
||||
from admin_links import cmd_fix_x_media
|
||||
from admin_users import cmd_user_rename
|
||||
from admin_users import cmd_users
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(prog="download-admin")
|
||||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
p_add = sub.add_parser("add")
|
||||
p_add.add_argument("user")
|
||||
p_add.add_argument("url")
|
||||
p_add.add_argument("--assume-yes", action="store_true")
|
||||
p_add.set_defaults(func=cmd_add)
|
||||
|
||||
p_disable = sub.add_parser("disable")
|
||||
p_disable.add_argument("user")
|
||||
p_disable.add_argument("url", nargs="?")
|
||||
p_disable.set_defaults(func=cmd_disable)
|
||||
|
||||
p_enable = sub.add_parser("enable")
|
||||
p_enable.add_argument("user")
|
||||
p_enable.add_argument("url", nargs="?")
|
||||
p_enable.set_defaults(func=cmd_enable)
|
||||
|
||||
p_ban = sub.add_parser("ban")
|
||||
p_ban.add_argument("user")
|
||||
p_ban.add_argument("url", nargs="?")
|
||||
p_ban.add_argument("--reason")
|
||||
p_ban.set_defaults(func=cmd_ban)
|
||||
|
||||
p_unban = sub.add_parser("unban")
|
||||
p_unban.add_argument("user")
|
||||
p_unban.add_argument("url", nargs="?")
|
||||
p_unban.set_defaults(func=cmd_unban)
|
||||
|
||||
p_remove = sub.add_parser("remove")
|
||||
p_remove.add_argument("user")
|
||||
p_remove.add_argument("url", nargs="?")
|
||||
p_remove.set_defaults(func=cmd_remove)
|
||||
|
||||
p_rename = sub.add_parser("rename")
|
||||
p_rename.add_argument("user")
|
||||
p_rename.add_argument("old_url", nargs="?")
|
||||
p_rename.add_argument("new_url", nargs="?")
|
||||
p_rename.set_defaults(func=cmd_rename)
|
||||
|
||||
p_list = sub.add_parser("list")
|
||||
p_list.add_argument("--user", action="append")
|
||||
p_list.add_argument("--disabled", action="store_true")
|
||||
p_list.add_argument("--banned", action="store_true")
|
||||
p_list.add_argument("--requires-revision", action="store_true")
|
||||
p_list.set_defaults(func=cmd_list)
|
||||
|
||||
p_users = sub.add_parser("users")
|
||||
p_users.set_defaults(func=cmd_users)
|
||||
|
||||
p_import = sub.add_parser("import")
|
||||
p_import.set_defaults(func=cmd_import)
|
||||
|
||||
p_validate = sub.add_parser("validate-import")
|
||||
p_validate.set_defaults(func=cmd_validate_import)
|
||||
|
||||
p_fix_rev = sub.add_parser("fix-revision")
|
||||
p_fix_rev.set_defaults(func=cmd_fix_revision)
|
||||
|
||||
p_fix_media = sub.add_parser("fix-x-media")
|
||||
p_fix_media.set_defaults(func=cmd_fix_x_media)
|
||||
|
||||
p_user_rename = sub.add_parser("user-rename")
|
||||
p_user_rename.add_argument("user")
|
||||
p_user_rename.add_argument("site")
|
||||
p_user_rename.add_argument("old")
|
||||
p_user_rename.add_argument("new")
|
||||
p_user_rename.set_defaults(func=cmd_user_rename)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
318
src/download/admin_links.py
Normal file
318
src/download/admin_links.py
Normal file
@@ -0,0 +1,318 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Admin CLI: link operations."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
import db
|
||||
from functions import load_config_variables
|
||||
|
||||
|
||||
def prompt_yes_no(message: str) -> bool:
|
||||
while True:
|
||||
raw = input(f"{message} [y/n]: ").strip().lower()
|
||||
if raw in ("y", "yes"):
|
||||
return True
|
||||
if raw in ("n", "no"):
|
||||
return False
|
||||
|
||||
|
||||
def parse_list_file(path: Path) -> dict:
|
||||
enabled: set[str] = set()
|
||||
disabled: set[str] = set()
|
||||
if not path.is_file():
|
||||
return {"enabled": enabled, "disabled": disabled}
|
||||
with open(path, "r", encoding="utf-8") as r_file:
|
||||
for raw in r_file:
|
||||
line = raw.strip()
|
||||
if not line:
|
||||
continue
|
||||
if line.startswith("#"):
|
||||
url = line.lstrip("#").strip()
|
||||
if url:
|
||||
disabled.add(db.normalize_url(url))
|
||||
continue
|
||||
enabled.add(db.normalize_url(line))
|
||||
return {"enabled": enabled, "disabled": disabled}
|
||||
|
||||
|
||||
def cmd_add(args: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
result = db.add_link(
|
||||
conn, args.user, args.url, assume_yes=args.assume_yes, source="manual"
|
||||
)
|
||||
if result["status"] == "removed" and not args.assume_yes:
|
||||
removed_at = result.get("removed_at", "unknown")
|
||||
if prompt_yes_no(f"Link was deleted on {removed_at}. Re-add?"):
|
||||
result = db.add_link(
|
||||
conn, args.user, args.url, assume_yes=True, source="manual"
|
||||
)
|
||||
row = result.get("row")
|
||||
if row and row["banned_at"]:
|
||||
print(f"Warning: link is banned ({row['banned_reason'] or 'no reason'})")
|
||||
if row and not row["enabled"]:
|
||||
print("Warning: link is disabled")
|
||||
conn.commit()
|
||||
print(result["status"])
|
||||
|
||||
|
||||
def cmd_disable(args: argparse.Namespace) -> None:
|
||||
_apply_to_links(
|
||||
args,
|
||||
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=False),
|
||||
selector_filter="disable",
|
||||
)
|
||||
|
||||
|
||||
def cmd_enable(args: argparse.Namespace) -> None:
|
||||
_apply_to_links(
|
||||
args,
|
||||
lambda conn, user, url: db.set_enabled(conn, user, url, enabled=True),
|
||||
selector_filter="enable",
|
||||
)
|
||||
|
||||
|
||||
def cmd_ban(args: argparse.Namespace) -> None:
|
||||
_apply_to_links(
|
||||
args,
|
||||
lambda conn, user, url: db.set_banned(
|
||||
conn, user, url, banned=True, reason=args.reason
|
||||
),
|
||||
selector_filter="ban",
|
||||
)
|
||||
|
||||
|
||||
def cmd_unban(args: argparse.Namespace) -> None:
|
||||
_apply_to_links(
|
||||
args,
|
||||
lambda conn, user, url: db.set_banned(conn, user, url, banned=False),
|
||||
selector_filter="unban",
|
||||
)
|
||||
|
||||
|
||||
def cmd_remove(args: argparse.Namespace) -> None:
|
||||
_apply_to_links(args, lambda conn, user, url: db.remove_link(conn, user, url), "any")
|
||||
|
||||
|
||||
def cmd_rename(args: argparse.Namespace) -> None:
|
||||
old_url = args.old_url
|
||||
if not old_url:
|
||||
selection = _select_links(args.user, multi=False, selector_filter="any")
|
||||
if not selection:
|
||||
print("not found")
|
||||
return
|
||||
old_url = selection[0]
|
||||
new_url = args.new_url or input("New URL: ").strip()
|
||||
with db.connect() as conn:
|
||||
result = db.rename_link(conn, args.user, old_url, new_url)
|
||||
if result["status"] == "renamed":
|
||||
conn.commit()
|
||||
print(result["status"])
|
||||
|
||||
|
||||
def cmd_list(args: argparse.Namespace) -> None:
|
||||
users = args.user or None
|
||||
include_disabled = args.disabled or args.requires_revision
|
||||
include_banned = args.banned or args.requires_revision
|
||||
with db.connect() as conn:
|
||||
rows = db.get_links(
|
||||
conn,
|
||||
users=users,
|
||||
include_disabled=include_disabled,
|
||||
include_banned=include_banned,
|
||||
requires_revision_only=args.requires_revision,
|
||||
)
|
||||
for row in rows:
|
||||
if args.disabled and row["enabled"]:
|
||||
continue
|
||||
if args.banned and not row["banned_at"]:
|
||||
continue
|
||||
status = "enabled" if row["enabled"] else "disabled"
|
||||
if row["banned_at"]:
|
||||
status = "banned"
|
||||
print(f"{row['user_name']} [{status}] {row['url_original']}")
|
||||
|
||||
|
||||
def cmd_import(_: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
imported_paths = []
|
||||
for entry in configs["users"]:
|
||||
user = entry["name"]
|
||||
lists_dir = Path(configs["global"]["lists-dir"]) / user
|
||||
master = lists_dir / "watch.txt"
|
||||
result = db.import_master_list(conn, user, master)
|
||||
if result["status"] == "ok":
|
||||
imported_paths.append(str(master))
|
||||
print(f"{user}: {result}")
|
||||
if result.get("duplicates"):
|
||||
print(f"{user} duplicates:")
|
||||
for dup in result["duplicates"]:
|
||||
print(f" {dup}")
|
||||
if imported_paths:
|
||||
print("Imported lists:")
|
||||
for path in imported_paths:
|
||||
print(f" {path}")
|
||||
conn.commit()
|
||||
|
||||
|
||||
def cmd_validate_import(_: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
for entry in configs["users"]:
|
||||
user = entry["name"]
|
||||
lists_dir = Path(configs["global"]["lists-dir"]) / user
|
||||
master = lists_dir / "watch.txt"
|
||||
list_sets = parse_list_file(master)
|
||||
|
||||
rows = db.get_links_by_user(conn, user)
|
||||
db_enabled = set()
|
||||
db_disabled = set()
|
||||
for row in rows:
|
||||
norm = db.normalize_url(row["url_original"])
|
||||
if row["enabled"] and not row["banned_at"]:
|
||||
db_enabled.add(norm)
|
||||
else:
|
||||
db_disabled.add(norm)
|
||||
|
||||
missing_enabled = list_sets["enabled"] - db_enabled
|
||||
missing_disabled = list_sets["disabled"] - db_disabled
|
||||
extra_enabled = db_enabled - list_sets["enabled"]
|
||||
extra_disabled = db_disabled - list_sets["disabled"]
|
||||
|
||||
print(f"{user}:")
|
||||
if missing_enabled:
|
||||
print(" Missing enabled in DB:")
|
||||
for url in sorted(missing_enabled):
|
||||
print(f" {url}")
|
||||
if missing_disabled:
|
||||
print(" Missing disabled in DB:")
|
||||
for url in sorted(missing_disabled):
|
||||
print(f" {url}")
|
||||
if extra_enabled:
|
||||
print(" Extra enabled in DB:")
|
||||
for url in sorted(extra_enabled):
|
||||
print(f" {url}")
|
||||
if extra_disabled:
|
||||
print(" Extra disabled in DB:")
|
||||
for url in sorted(extra_disabled):
|
||||
print(f" {url}")
|
||||
if not any(
|
||||
[missing_enabled, missing_disabled, extra_enabled, extra_disabled]
|
||||
):
|
||||
print(" OK")
|
||||
|
||||
|
||||
def cmd_fix_revision(_: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET requires_revision = 0
|
||||
WHERE enabled = 1 OR banned_at IS NULL
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
print("ok")
|
||||
|
||||
|
||||
def cmd_fix_x_media(_: argparse.Namespace) -> None:
|
||||
with db.connect() as conn:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT id, user_name, url_original FROM links
|
||||
WHERE url_original LIKE '%x.com/%//media%'
|
||||
"""
|
||||
).fetchall()
|
||||
for row in rows:
|
||||
fixed = row["url_original"].replace("//media", "/media")
|
||||
norm = db.normalize_url(fixed)
|
||||
conflict = conn.execute(
|
||||
"""
|
||||
SELECT id FROM links
|
||||
WHERE user_name = ? AND url_normalized = ? AND id != ?
|
||||
""",
|
||||
(row["user_name"], norm, row["id"]),
|
||||
).fetchone()
|
||||
if conflict:
|
||||
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
|
||||
continue
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET url_original = ?, url_normalized = ?, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(fixed, norm, row["id"]),
|
||||
)
|
||||
conn.commit()
|
||||
print("ok")
|
||||
|
||||
|
||||
def _fzf_select(lines: list[str], multi: bool) -> list[str]:
|
||||
if not lines:
|
||||
return []
|
||||
if shutil.which("fzf") is None:
|
||||
print("fzf not found.")
|
||||
return []
|
||||
args = ["fzf"]
|
||||
if multi:
|
||||
args.append("--multi")
|
||||
proc = subprocess.run(
|
||||
args,
|
||||
input="\n".join(lines),
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
return []
|
||||
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
|
||||
|
||||
|
||||
def _select_links(user: str, multi: bool, selector_filter: str) -> list[str]:
|
||||
with db.connect() as conn:
|
||||
rows = db.get_links(conn, users=[user], include_disabled=True, include_banned=True)
|
||||
links = []
|
||||
for row in rows:
|
||||
enabled = bool(row["enabled"])
|
||||
banned = bool(row["banned_at"])
|
||||
if selector_filter == "enable" and enabled:
|
||||
continue
|
||||
if selector_filter == "disable" and not enabled:
|
||||
continue
|
||||
if selector_filter == "ban" and banned:
|
||||
continue
|
||||
if selector_filter == "unban" and not banned:
|
||||
continue
|
||||
links.append(row["url_original"])
|
||||
return _fzf_select(links, multi=multi)
|
||||
|
||||
|
||||
def _apply_to_links(args: argparse.Namespace, fn, selector_filter: str) -> None:
|
||||
if args.url:
|
||||
with db.connect() as conn:
|
||||
ok = fn(conn, args.user, args.url)
|
||||
if ok:
|
||||
conn.commit()
|
||||
print("ok" if ok else "not found")
|
||||
return
|
||||
|
||||
selections = _select_links(args.user, multi=True, selector_filter=selector_filter)
|
||||
if not selections:
|
||||
print("not found")
|
||||
return
|
||||
|
||||
with db.connect() as conn:
|
||||
changed = 0
|
||||
for url in selections:
|
||||
ok = fn(conn, args.user, url)
|
||||
if ok:
|
||||
changed += 1
|
||||
if changed:
|
||||
conn.commit()
|
||||
print(f"ok ({changed})")
|
||||
88
src/download/admin_users.py
Normal file
88
src/download/admin_users.py
Normal file
@@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Admin CLI: user operations."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import db
|
||||
from functions import load_config_variables
|
||||
|
||||
|
||||
def list_users(configs: dict) -> None:
|
||||
for entry in configs["users"]:
|
||||
print(entry["name"])
|
||||
|
||||
|
||||
def prompt_yes_no(message: str) -> bool:
|
||||
while True:
|
||||
raw = input(f"{message} [y/n]: ").strip().lower()
|
||||
if raw in ("y", "yes"):
|
||||
return True
|
||||
if raw in ("n", "no"):
|
||||
return False
|
||||
|
||||
|
||||
def merge_dirs(src: Path, dst: Path) -> None:
|
||||
for root, _, files in os.walk(src):
|
||||
rel = Path(root).relative_to(src)
|
||||
target_dir = dst / rel
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
for filename in files:
|
||||
src_file = Path(root) / filename
|
||||
dst_file = target_dir / filename
|
||||
if dst_file.exists():
|
||||
print(f"Skip existing file: {dst_file}")
|
||||
continue
|
||||
shutil.move(str(src_file), str(dst_file))
|
||||
|
||||
for root, dirs, files in os.walk(src, topdown=False):
|
||||
if not dirs and not files:
|
||||
Path(root).rmdir()
|
||||
|
||||
|
||||
def move_user_outputs(
|
||||
configs: dict, user_name: str, old_handle: str, new_handle: str
|
||||
) -> None:
|
||||
user_cfg = next((u for u in configs["users"] if u["name"] == user_name), None)
|
||||
if not user_cfg:
|
||||
print(f"Unknown user: {user_name}")
|
||||
return
|
||||
|
||||
base_dirs = [Path(user_cfg["download-dir"])]
|
||||
for base in base_dirs:
|
||||
old_path = base / old_handle
|
||||
new_path = base / new_handle
|
||||
if not old_path.exists():
|
||||
print(f"Missing: {old_path}")
|
||||
continue
|
||||
if not new_path.exists():
|
||||
old_path.rename(new_path)
|
||||
continue
|
||||
if not prompt_yes_no(
|
||||
f"Merge contents from {old_path} into existing {new_path}?"
|
||||
):
|
||||
continue
|
||||
merge_dirs(old_path, new_path)
|
||||
|
||||
|
||||
def cmd_users(_: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
list_users(configs)
|
||||
|
||||
|
||||
def cmd_user_rename(args: argparse.Namespace) -> None:
|
||||
configs = load_config_variables()
|
||||
with db.connect(configs) as conn:
|
||||
result = db.bulk_rename_handle(
|
||||
conn,
|
||||
user_name=args.user,
|
||||
site=args.site,
|
||||
old_handle=args.old,
|
||||
new_handle=args.new,
|
||||
)
|
||||
conn.commit()
|
||||
print(result)
|
||||
move_user_outputs(configs, args.user, args.old, args.new)
|
||||
@@ -61,13 +61,16 @@ class Gallery:
|
||||
for key, env_var in auth_env.items():
|
||||
command += ["-o", f"{key}={os.environ.get(env_var, '')}"]
|
||||
|
||||
if self.link and not self.list:
|
||||
if self.link and self.list:
|
||||
LOG.warning("Both link and list set; using link and ignoring list.")
|
||||
command.append(self.link)
|
||||
if self.list and not self.link:
|
||||
elif self.link:
|
||||
command.append(self.link)
|
||||
elif self.list:
|
||||
command += ["-i", queue]
|
||||
|
||||
LOG.debug(command)
|
||||
self.command = command
|
||||
|
||||
def run_command(self, verbose: bool):
|
||||
run(self.command, verbose)
|
||||
def run_command(self, verbose: bool, on_line=None, log_failure: bool = True):
|
||||
run(self.command, verbose, on_line=on_line, log_failure=log_failure)
|
||||
|
||||
@@ -8,6 +8,7 @@ from functions import validate_x_link
|
||||
from functions import parse_link
|
||||
from functions import clean_cache
|
||||
from functions import LOG
|
||||
import db
|
||||
|
||||
|
||||
class User:
|
||||
@@ -61,6 +62,11 @@ class User:
|
||||
|
||||
for lst in filter(lambda x: not self.lists[x].is_file(), ["master", "push"]):
|
||||
self.lists[lst].touch()
|
||||
for lst in filter(
|
||||
lambda x: not self.lists[x].is_file(),
|
||||
["instagram", "kemono", "main"],
|
||||
):
|
||||
self.lists[lst].touch()
|
||||
|
||||
def append_list(self, name: str, line: str) -> None:
|
||||
"""Appends a line into the given list"""
|
||||
@@ -83,8 +89,8 @@ class User:
|
||||
def list_manager(self) -> None:
|
||||
"""Manage all the user list and create sub-lists"""
|
||||
self._create_directories() # Call the function to create necesary cache dirs
|
||||
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
|
||||
master_content = list(map(lambda x: x.rstrip(), r_file))
|
||||
with db.connect() as conn:
|
||||
master_content = db.get_active_links(conn, self.name)
|
||||
|
||||
# Create temporary list files segmented per scrapper
|
||||
shuffle(master_content)
|
||||
@@ -94,12 +100,10 @@ class User:
|
||||
def save_link(self, link: str) -> None:
|
||||
"""Checks the master list against a new link
|
||||
if unmatched, appends it to the end of the list"""
|
||||
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
|
||||
links = r_file.read().lower()
|
||||
|
||||
if parse_link(link).lower() in links:
|
||||
LOG.info("Gallery repeated, not saving")
|
||||
return
|
||||
|
||||
with db.connect() as conn:
|
||||
result = db.add_link(conn, self.name, parse_link(link), assume_yes=True)
|
||||
conn.commit()
|
||||
if result["status"] == "added":
|
||||
LOG.info("New gallery, saving")
|
||||
self.append_list("master", parse_link(link))
|
||||
else:
|
||||
LOG.info("Gallery repeated, not saving")
|
||||
|
||||
102
src/download/completions/download.bash
Normal file
102
src/download/completions/download.bash
Normal file
@@ -0,0 +1,102 @@
|
||||
# Bash completion for download and download-admin.
|
||||
# Source this file or install it in your bash_completion.d directory.
|
||||
|
||||
__download_users() {
|
||||
python3 - <<'PY' 2>/dev/null
|
||||
import pathlib
|
||||
try:
|
||||
import yaml
|
||||
except Exception:
|
||||
print("")
|
||||
raise SystemExit(0)
|
||||
|
||||
cfg = pathlib.Path("~/.config/jawz/config.yaml").expanduser()
|
||||
if not cfg.is_file():
|
||||
print("")
|
||||
raise SystemExit(0)
|
||||
data = yaml.safe_load(cfg.read_text(encoding="utf-8")) or {}
|
||||
users = [u.get("name") for u in data.get("users", []) if isinstance(u, dict)]
|
||||
print(" ".join([u for u in users if u]))
|
||||
PY
|
||||
}
|
||||
|
||||
_download() {
|
||||
local cur prev words cword
|
||||
_init_completion -n : || return
|
||||
|
||||
local scrappers="push main instagram kemono comic manga webcomic"
|
||||
local opts="-u --user -i --input -l --list -a --no-archive -s --no_skip -v --verbose -t --type-post"
|
||||
local post_types="posts reels stories highlights avatar"
|
||||
|
||||
if [[ "$cur" == -* ]]; then
|
||||
COMPREPLY=( $(compgen -W "$opts" -- "$cur") )
|
||||
return
|
||||
fi
|
||||
|
||||
case "$prev" in
|
||||
-u|--user)
|
||||
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
|
||||
return
|
||||
;;
|
||||
-t|--type-post)
|
||||
COMPREPLY=( $(compgen -W "$post_types" -- "$cur") )
|
||||
return
|
||||
;;
|
||||
-i|--input)
|
||||
return
|
||||
;;
|
||||
esac
|
||||
|
||||
local have_scrapper=0
|
||||
local w
|
||||
for w in "${words[@]:1}"; do
|
||||
[[ "$w" == -* ]] && continue
|
||||
if [[ " $scrappers " == *" $w "* ]]; then
|
||||
have_scrapper=1
|
||||
break
|
||||
fi
|
||||
done
|
||||
if [[ $have_scrapper -eq 0 ]]; then
|
||||
COMPREPLY=( $(compgen -W "$scrappers" -- "$cur") )
|
||||
fi
|
||||
}
|
||||
|
||||
_download_admin() {
|
||||
local cur prev words cword
|
||||
_init_completion -n : || return
|
||||
|
||||
local cmds="add disable enable ban unban remove rename list users import validate-import fix-revision fix-x-media user-rename"
|
||||
local list_opts="--user --disabled --banned --requires-revision"
|
||||
|
||||
if [[ "$cur" == -* ]]; then
|
||||
if [[ "${words[1]}" == "list" ]]; then
|
||||
COMPREPLY=( $(compgen -W "$list_opts" -- "$cur") )
|
||||
else
|
||||
COMPREPLY=()
|
||||
fi
|
||||
return
|
||||
fi
|
||||
|
||||
case "$prev" in
|
||||
--user)
|
||||
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
|
||||
return
|
||||
;;
|
||||
esac
|
||||
|
||||
if [[ $cword -eq 1 ]]; then
|
||||
COMPREPLY=( $(compgen -W "$cmds" -- "$cur") )
|
||||
return
|
||||
fi
|
||||
|
||||
case "${words[1]}" in
|
||||
add|disable|enable|ban|unban|remove|rename|user-rename)
|
||||
if [[ $cword -eq 2 ]]; then
|
||||
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
complete -F _download download
|
||||
complete -F _download_admin download-admin
|
||||
647
src/download/db.py
Normal file
647
src/download/db.py
Normal file
@@ -0,0 +1,647 @@
|
||||
#!/usr/bin/env python3
|
||||
"""SQLite persistence for download links."""
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
from functions import LOG
|
||||
from functions import load_config_variables
|
||||
|
||||
|
||||
def get_db_path(configs: dict | None = None) -> Path:
|
||||
"""Return the database path for links."""
|
||||
cfg = configs or load_config_variables()
|
||||
base = Path(cfg["global"]["databases-dir"])
|
||||
return base / "links.sqlite3"
|
||||
|
||||
|
||||
def connect(configs: dict | None = None) -> sqlite3.Connection:
|
||||
"""Open a connection and ensure schema exists."""
|
||||
db_path = get_db_path(configs)
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
ensure_schema(conn)
|
||||
return conn
|
||||
|
||||
|
||||
def ensure_schema(conn: sqlite3.Connection) -> None:
|
||||
"""Create schema if missing."""
|
||||
conn.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS links (
|
||||
id INTEGER PRIMARY KEY,
|
||||
user_name TEXT NOT NULL,
|
||||
url_original TEXT NOT NULL,
|
||||
url_normalized TEXT NOT NULL,
|
||||
site TEXT,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
keep INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
disabled_at TEXT,
|
||||
disabled_reason TEXT,
|
||||
banned_at TEXT,
|
||||
banned_reason TEXT,
|
||||
requires_revision INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS links_user_url_norm
|
||||
ON links (user_name, url_normalized);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS link_history (
|
||||
id INTEGER PRIMARY KEY,
|
||||
link_id INTEGER,
|
||||
user_name TEXT NOT NULL,
|
||||
event TEXT NOT NULL,
|
||||
old_url TEXT,
|
||||
new_url TEXT,
|
||||
note TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS link_tombstones (
|
||||
id INTEGER PRIMARY KEY,
|
||||
user_name TEXT NOT NULL,
|
||||
url_normalized TEXT NOT NULL,
|
||||
url_original TEXT NOT NULL,
|
||||
removed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS tombstones_user_url_norm
|
||||
ON link_tombstones (user_name, url_normalized);
|
||||
"""
|
||||
)
|
||||
_ensure_column(
|
||||
conn,
|
||||
"links",
|
||||
"requires_revision",
|
||||
"ALTER TABLE links ADD COLUMN requires_revision INTEGER NOT NULL DEFAULT 0",
|
||||
)
|
||||
_ensure_column(
|
||||
conn,
|
||||
"links",
|
||||
"keep",
|
||||
"ALTER TABLE links ADD COLUMN keep INTEGER NOT NULL DEFAULT 0",
|
||||
)
|
||||
_ensure_column(
|
||||
conn,
|
||||
"links",
|
||||
"disabled_reason",
|
||||
"ALTER TABLE links ADD COLUMN disabled_reason TEXT",
|
||||
)
|
||||
|
||||
|
||||
def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None:
|
||||
cols = [row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
||||
if column in cols:
|
||||
return
|
||||
conn.execute(ddl)
|
||||
|
||||
|
||||
def normalize_url(url: str) -> str:
|
||||
"""Normalize URL for dedupe only."""
|
||||
raw = url.strip()
|
||||
if "://" not in raw:
|
||||
raw = f"https://{raw}"
|
||||
|
||||
parts = urlsplit(raw)
|
||||
scheme = "https"
|
||||
host = (parts.hostname or "").lower()
|
||||
if host.startswith("www."):
|
||||
host = host[4:]
|
||||
if host in ("twitter.com", "www.twitter.com"):
|
||||
host = "x.com"
|
||||
|
||||
path = parts.path.rstrip("/")
|
||||
query = parts.query
|
||||
return urlunsplit((scheme, host, path, query, ""))
|
||||
|
||||
|
||||
def get_site(url: str) -> str:
|
||||
"""Return normalized host name."""
|
||||
raw = url.strip()
|
||||
if "://" not in raw:
|
||||
raw = f"https://{raw}"
|
||||
host = (urlsplit(raw).hostname or "").lower()
|
||||
if host.startswith("www."):
|
||||
host = host[4:]
|
||||
if host in ("twitter.com", "www.twitter.com"):
|
||||
host = "x.com"
|
||||
return host
|
||||
|
||||
|
||||
def add_history(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
event: str,
|
||||
link_id: int | None = None,
|
||||
old_url: str | None = None,
|
||||
new_url: str | None = None,
|
||||
note: str | None = None,
|
||||
) -> None:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO link_history (link_id, user_name, event, old_url, new_url, note)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(link_id, user_name, event, old_url, new_url, note),
|
||||
)
|
||||
|
||||
|
||||
def add_link(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
assume_yes: bool = False,
|
||||
source: str = "manual",
|
||||
) -> dict:
|
||||
"""Add a link or return existing status."""
|
||||
url_norm = normalize_url(url_original)
|
||||
site = get_site(url_original)
|
||||
|
||||
row = conn.execute(
|
||||
"SELECT * FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if row:
|
||||
return {"status": "exists", "row": row}
|
||||
|
||||
tombstone = conn.execute(
|
||||
"SELECT removed_at FROM link_tombstones WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if tombstone and not assume_yes and source != "push":
|
||||
return {"status": "removed", "removed_at": tombstone["removed_at"]}
|
||||
|
||||
cur = conn.execute(
|
||||
"""
|
||||
INSERT INTO links (user_name, url_original, url_normalized, site)
|
||||
VALUES (?, ?, ?, ?)
|
||||
""",
|
||||
(user_name, url_original, url_norm, site),
|
||||
)
|
||||
if tombstone:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET requires_revision = 0
|
||||
WHERE id = ?
|
||||
""",
|
||||
(cur.lastrowid,),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name=user_name,
|
||||
event="add",
|
||||
link_id=cur.lastrowid,
|
||||
new_url=url_original,
|
||||
note=f"source={source}",
|
||||
)
|
||||
return {"status": "added", "id": cur.lastrowid}
|
||||
|
||||
|
||||
def set_enabled(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
enabled: bool,
|
||||
reason: str | None = None,
|
||||
) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
row = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
if enabled:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET enabled = 1,
|
||||
disabled_at = NULL,
|
||||
disabled_reason = NULL,
|
||||
requires_revision = 0,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(conn, user_name, "enable", link_id=row["id"], old_url=row["url_original"])
|
||||
else:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET enabled = 0,
|
||||
disabled_at = CURRENT_TIMESTAMP,
|
||||
disabled_reason = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(reason, row["id"]),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"disable",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def set_banned(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
banned: bool,
|
||||
reason: str | None = None,
|
||||
) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
row = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
if banned:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET banned_at = CURRENT_TIMESTAMP, banned_reason = ?, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(reason, row["id"]),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"ban",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET banned_at = NULL, banned_reason = NULL, requires_revision = 0, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(conn, user_name, "unban", link_id=row["id"], old_url=row["url_original"])
|
||||
return True
|
||||
|
||||
|
||||
def mark_requires_revision(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
reason: str,
|
||||
) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
rows = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchall()
|
||||
if not rows:
|
||||
return False
|
||||
for row in rows:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET requires_revision = 1,
|
||||
enabled = 0,
|
||||
disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"requires_revision",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def mark_requires_revision_by_norm(
|
||||
conn: sqlite3.Connection, url_norm: str, reason: str
|
||||
) -> int:
|
||||
rows = conn.execute(
|
||||
"SELECT id, user_name, url_original FROM links WHERE url_normalized = ?",
|
||||
(url_norm,),
|
||||
).fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
for row in rows:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET requires_revision = 1,
|
||||
enabled = 0,
|
||||
disabled_at = COALESCE(disabled_at, CURRENT_TIMESTAMP),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
row["user_name"],
|
||||
"requires_revision",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
return len(rows)
|
||||
|
||||
|
||||
def rename_link(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
old_url: str,
|
||||
new_url: str,
|
||||
) -> dict:
|
||||
old_norm = normalize_url(old_url)
|
||||
new_norm = normalize_url(new_url)
|
||||
|
||||
row = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, old_norm),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return {"status": "missing"}
|
||||
|
||||
conflict = conn.execute(
|
||||
"SELECT id FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, new_norm),
|
||||
).fetchone()
|
||||
if conflict and conflict["id"] != row["id"]:
|
||||
return {"status": "conflict"}
|
||||
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET url_original = ?, url_normalized = ?, site = ?, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(new_url, new_norm, get_site(new_url), row["id"]),
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET enabled = 1, disabled_at = NULL, requires_revision = 0
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"rename",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
new_url=new_url,
|
||||
)
|
||||
return {"status": "renamed"}
|
||||
|
||||
|
||||
def remove_link(conn: sqlite3.Connection, user_name: str, url_original: str) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
row = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT OR IGNORE INTO link_tombstones (user_name, url_normalized, url_original)
|
||||
VALUES (?, ?, ?)
|
||||
""",
|
||||
(user_name, url_norm, row["url_original"]),
|
||||
)
|
||||
add_history(conn, user_name, "remove", link_id=row["id"], old_url=row["url_original"])
|
||||
conn.execute("DELETE FROM links WHERE id = ?", (row["id"],))
|
||||
return True
|
||||
|
||||
|
||||
def get_active_links(conn: sqlite3.Connection, user_name: str) -> list[str]:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT url_original FROM links
|
||||
WHERE user_name = ?
|
||||
AND enabled = 1
|
||||
AND banned_at IS NULL
|
||||
ORDER BY id ASC
|
||||
""",
|
||||
(user_name,),
|
||||
).fetchall()
|
||||
return [row["url_original"] for row in rows]
|
||||
|
||||
|
||||
def get_links(
|
||||
conn: sqlite3.Connection,
|
||||
users: Iterable[str] | None = None,
|
||||
include_disabled: bool = False,
|
||||
include_banned: bool = False,
|
||||
requires_revision_only: bool = False,
|
||||
) -> list[sqlite3.Row]:
|
||||
params: list = []
|
||||
where = []
|
||||
user_list = list(users) if users else []
|
||||
if user_list:
|
||||
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
|
||||
params.extend(user_list)
|
||||
if not include_disabled:
|
||||
where.append("enabled = 1")
|
||||
if not include_banned:
|
||||
where.append("banned_at IS NULL")
|
||||
if requires_revision_only:
|
||||
where.append("requires_revision = 1")
|
||||
clause = " AND ".join(where)
|
||||
if clause:
|
||||
clause = "WHERE " + clause
|
||||
return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall()
|
||||
|
||||
|
||||
def get_links_for_cleaning(
|
||||
conn: sqlite3.Connection,
|
||||
users: Iterable[str] | None = None,
|
||||
) -> list[sqlite3.Row]:
|
||||
params: list = []
|
||||
where = [
|
||||
"site = ?",
|
||||
"enabled = 1",
|
||||
"banned_at IS NULL",
|
||||
"keep = 0",
|
||||
]
|
||||
params.append("x.com")
|
||||
user_list = list(users) if users else []
|
||||
if user_list:
|
||||
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
|
||||
params.extend(user_list)
|
||||
clause = " AND ".join(where)
|
||||
return conn.execute(
|
||||
f"SELECT * FROM links WHERE {clause} ORDER BY user_name, id",
|
||||
params,
|
||||
).fetchall()
|
||||
|
||||
|
||||
def set_keep(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
keep: bool,
|
||||
reason: str | None = None,
|
||||
) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
row = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
if keep:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET keep = 1, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"keep",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET keep = 0, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"unkeep",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]:
|
||||
return conn.execute(
|
||||
"SELECT * FROM links WHERE user_name = ? ORDER BY id",
|
||||
(user_name,),
|
||||
).fetchall()
|
||||
|
||||
|
||||
def import_master_list(conn: sqlite3.Connection, user_name: str, path: Path) -> dict:
|
||||
if not path.is_file():
|
||||
return {"status": "missing", "path": str(path)}
|
||||
with open(path, "r", encoding="utf-8") as r_file:
|
||||
lines = [ln.strip() for ln in r_file if ln.strip()]
|
||||
|
||||
added = 0
|
||||
exists = 0
|
||||
removed = 0
|
||||
duplicates: list[str] = []
|
||||
for line in lines:
|
||||
disabled = False
|
||||
raw = line
|
||||
if raw.startswith("#"):
|
||||
disabled = True
|
||||
raw = raw.lstrip("#").strip()
|
||||
if not raw:
|
||||
continue
|
||||
|
||||
result = add_link(conn, user_name, raw, assume_yes=True, source="import")
|
||||
if result["status"] == "added":
|
||||
added += 1
|
||||
if disabled:
|
||||
set_enabled(conn, user_name, raw, enabled=False)
|
||||
elif result["status"] == "exists":
|
||||
exists += 1
|
||||
duplicates.append(raw)
|
||||
elif result["status"] == "removed":
|
||||
removed += 1
|
||||
return {
|
||||
"status": "ok",
|
||||
"added": added,
|
||||
"exists": exists,
|
||||
"removed": removed,
|
||||
"duplicates": duplicates,
|
||||
}
|
||||
|
||||
|
||||
def bulk_rename_handle(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
site: str,
|
||||
old_handle: str,
|
||||
new_handle: str,
|
||||
) -> dict:
|
||||
"""Rename account handle within a site for a user."""
|
||||
site_norm = site.lower().lstrip("www.")
|
||||
if site_norm == "twitter.com":
|
||||
site_norm = "x.com"
|
||||
if site_norm == "www.twitter.com":
|
||||
site_norm = "x.com"
|
||||
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT id, url_original FROM links
|
||||
WHERE user_name = ? AND site = ?
|
||||
""",
|
||||
(user_name, site_norm),
|
||||
).fetchall()
|
||||
|
||||
updated = 0
|
||||
skipped = 0
|
||||
conflicts = 0
|
||||
for row in rows:
|
||||
raw = row["url_original"]
|
||||
parts = urlsplit(raw if "://" in raw else f"https://{raw}")
|
||||
path = parts.path
|
||||
segments = path.split("/")
|
||||
if len(segments) < 2 or segments[1] != old_handle:
|
||||
skipped += 1
|
||||
continue
|
||||
segments[1] = new_handle
|
||||
new_path = "/".join(segments)
|
||||
new_url = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))
|
||||
result = rename_link(conn, user_name, raw, new_url)
|
||||
if result["status"] == "renamed":
|
||||
updated += 1
|
||||
elif result["status"] == "conflict":
|
||||
conflicts += 1
|
||||
else:
|
||||
skipped += 1
|
||||
|
||||
return {"updated": updated, "skipped": skipped, "conflicts": conflicts}
|
||||
|
||||
|
||||
def warn(msg: str) -> None:
|
||||
LOG.warning(msg)
|
||||
@@ -11,7 +11,9 @@ Also following in line more posix and python rules.
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
import yaml
|
||||
import db
|
||||
from typing import Dict
|
||||
from functions import LOG
|
||||
from functions import run
|
||||
@@ -23,12 +25,35 @@ from classes.user import User
|
||||
from classes.gallery import Gallery
|
||||
|
||||
# GLOBAL VARIABLE SECTION
|
||||
CONFIGS = load_config_variables()
|
||||
CONFIGS = None
|
||||
# Enable a default "everyone" flag for when running stuff like download gallery
|
||||
USERS = []
|
||||
ARGS = None
|
||||
|
||||
|
||||
def init_globals() -> None:
|
||||
"""Initialize global config and CLI args."""
|
||||
global CONFIGS, USERS, ARGS
|
||||
if CONFIGS is None:
|
||||
CONFIGS = load_config_variables()
|
||||
USERS = ["everyone"] + [user["name"] for user in CONFIGS["users"]]
|
||||
ARGS = argparser(USERS)
|
||||
|
||||
|
||||
def get_args() -> argparse.Namespace:
|
||||
"""Return initialized CLI args."""
|
||||
init_globals()
|
||||
assert ARGS is not None
|
||||
return ARGS
|
||||
|
||||
|
||||
def get_configs() -> dict:
|
||||
"""Return initialized config."""
|
||||
init_globals()
|
||||
assert CONFIGS is not None
|
||||
return CONFIGS
|
||||
|
||||
|
||||
class Video:
|
||||
"""Just a simple class to unify the Video parameters into a single one."""
|
||||
|
||||
@@ -41,29 +66,146 @@ class Video:
|
||||
|
||||
def get_index(name: str) -> int:
|
||||
"""Find the index in the config file"""
|
||||
return next((i for i, d in enumerate(CONFIGS["users"]) if d["name"] == name), -1)
|
||||
configs = get_configs()
|
||||
return next((i for i, d in enumerate(configs["users"]) if d["name"] == name), -1)
|
||||
|
||||
|
||||
def parse_gallery(gdl_list: str, user: User) -> None:
|
||||
"""Processes the gallery-dl command based on the selected gallery"""
|
||||
args = get_args()
|
||||
list_path = user.lists[gdl_list]
|
||||
if not list_path.is_file():
|
||||
LOG.warning("List file missing: %s", list_path)
|
||||
return
|
||||
with open(list_path, "r", encoding="utf-8") as r_file:
|
||||
links = list(map(lambda x: x.rstrip(), r_file))
|
||||
for link in filter(None, links):
|
||||
gallery = Gallery()
|
||||
gallery.archive = ARGS.flag_archive
|
||||
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
|
||||
gallery.archive = args.flag_archive
|
||||
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
|
||||
gallery.dest = "download"
|
||||
gallery.list = gdl_list
|
||||
gallery.opt_args = parse_instagram(gdl_list)
|
||||
gallery.link = link
|
||||
gallery.opt_args = parse_instagram(link)
|
||||
|
||||
gallery.generate_command(user)
|
||||
gallery.run_command(ARGS.flag_verbose)
|
||||
handler = _make_gallery_error_handler(link)
|
||||
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
|
||||
|
||||
|
||||
def parse_instagram(link: str) -> list[str]:
|
||||
def parse_instagram(link: str, post_type: list[str] | str | None = None) -> list[str]:
|
||||
"""Fix instagram links"""
|
||||
args = get_args()
|
||||
if "instagram" not in link:
|
||||
return []
|
||||
if isinstance(ARGS.post_type, list):
|
||||
return ["-o", f"include={','.join(ARGS.post_type)}"]
|
||||
return ["-o", f"include={ARGS.post_type}"]
|
||||
use_type = args.post_type if post_type is None else post_type
|
||||
if isinstance(use_type, list):
|
||||
return ["-o", f"include={','.join(use_type)}"]
|
||||
return ["-o", f"include={use_type}"]
|
||||
|
||||
|
||||
REVISION_ERRORS = {
|
||||
"NotFoundError: Requested user could not be found",
|
||||
"Unable to retrieve Tweets from this timeline",
|
||||
"No results for",
|
||||
}
|
||||
|
||||
TRANSIENT_ERRORS = {
|
||||
"User input required (password)",
|
||||
"429",
|
||||
"rate limit",
|
||||
"timed out",
|
||||
"timeout",
|
||||
"Network",
|
||||
"connection",
|
||||
}
|
||||
|
||||
|
||||
def _make_gallery_error_handler(link: str):
|
||||
norm = db.normalize_url(link)
|
||||
|
||||
def handle(line: str) -> None:
|
||||
if "[error]" in line:
|
||||
reason = line.split("[error]", 1)[1].strip()
|
||||
LOG.warning("Error for %s: %s", link, reason)
|
||||
if reason in REVISION_ERRORS:
|
||||
with db.connect() as conn:
|
||||
db.mark_requires_revision_by_norm(conn, norm, reason)
|
||||
conn.commit()
|
||||
LOG.warning("Marked requires_revision for %s", link)
|
||||
if any(tok in reason for tok in TRANSIENT_ERRORS):
|
||||
LOG.warning("Transient error for %s: %s", link, reason)
|
||||
return
|
||||
if "No results for" in line:
|
||||
with db.connect() as conn:
|
||||
db.mark_requires_revision_by_norm(conn, norm, "No results for")
|
||||
conn.commit()
|
||||
LOG.warning("Marked requires_revision for %s", link)
|
||||
return
|
||||
|
||||
return handle
|
||||
|
||||
|
||||
def _comic_skip_arg(link: str, flag_skip: bool) -> str:
|
||||
if not flag_skip:
|
||||
return ""
|
||||
if re.search(r"readcomiconline", link):
|
||||
return " --chapter-range 1"
|
||||
if re.search(r"manganato|mangahere|webtoons", link):
|
||||
return " --chapter-range 1-5"
|
||||
return ""
|
||||
|
||||
|
||||
def _handle_gallery_link(user: User, link: str, args, conn) -> None:
|
||||
add_res = db.add_link(conn, user.name, parse_link(link), assume_yes=True, source="push")
|
||||
row = add_res.get("row")
|
||||
if row and row["banned_at"]:
|
||||
LOG.warning("Link is banned, skipping: %s", link)
|
||||
return
|
||||
if row and not row["enabled"]:
|
||||
LOG.warning("Link is disabled, skipping: %s", link)
|
||||
return
|
||||
|
||||
gallery = Gallery()
|
||||
gallery.archive = args.flag_archive
|
||||
gallery.skip_arg = " -o skip=true" if not args.flag_skip else ""
|
||||
gallery.link = parse_link(link)
|
||||
gallery.dest = "download"
|
||||
gallery.opt_args = parse_instagram(link)
|
||||
gallery.generate_command(user)
|
||||
handler = _make_gallery_error_handler(link)
|
||||
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
|
||||
|
||||
|
||||
def _handle_comic_link(link: str, args) -> None:
|
||||
gallery = Gallery()
|
||||
gallery.archive = args.flag_archive
|
||||
gallery.skip_arg = _comic_skip_arg(link, args.flag_skip)
|
||||
gallery.link = link
|
||||
gallery.generate_command(is_comic=True)
|
||||
handler = _make_gallery_error_handler(link)
|
||||
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
|
||||
save_comic(link)
|
||||
|
||||
|
||||
def _handle_video_link(user: User, link: str, args) -> None:
|
||||
video = Video()
|
||||
video.use_archive = args.flag_archive
|
||||
video.link = link
|
||||
video.dest = str(user.directories["media"])
|
||||
video.database = str(user.dbs["media"])
|
||||
run(video_command(video), args.flag_verbose)
|
||||
|
||||
|
||||
def _handle_other_link(user: User, link: str, args) -> None:
|
||||
LOG.info("Other type of download %s", link)
|
||||
gallery = Gallery()
|
||||
gallery.archive = False
|
||||
gallery.skip_arg = " -o directory='[]'"
|
||||
gallery.link = link
|
||||
gallery.dest = "push"
|
||||
gallery.generate_command(user)
|
||||
handler = _make_gallery_error_handler(link)
|
||||
gallery.run_command(args.flag_verbose, on_line=handler, log_failure=False)
|
||||
|
||||
|
||||
def video_command(video: Video):
|
||||
@@ -110,17 +252,19 @@ def video_command(video: Video):
|
||||
|
||||
def comic_manager(skip_arg: str, category: str) -> None:
|
||||
"""Process the information to download manga"""
|
||||
args = get_args()
|
||||
configs = get_configs()
|
||||
re_cat = "manga|webtoon" if category == "manga" else "readcomiconline"
|
||||
with open(CONFIGS["comic"]["comic-list"], "r", encoding="utf-8") as r_file:
|
||||
with open(configs["comic"]["comic-list"], "r", encoding="utf-8") as r_file:
|
||||
links = list(filter(lambda x: re.search(re_cat, x), r_file))
|
||||
|
||||
for link in links:
|
||||
gallery = Gallery()
|
||||
gallery.archive = ARGS.flag_archive
|
||||
gallery.archive = args.flag_archive
|
||||
gallery.skip_arg = skip_arg
|
||||
gallery.link = link
|
||||
gallery.generate_command(is_comic=True)
|
||||
gallery.run_command(ARGS.flag_verbose)
|
||||
gallery.run_command(args.flag_verbose)
|
||||
|
||||
|
||||
def print_webcomics(webcomics: Dict[str, Dict]) -> int:
|
||||
@@ -142,7 +286,9 @@ def print_webcomics(webcomics: Dict[str, Dict]) -> int:
|
||||
|
||||
def webcomic_manager():
|
||||
"""Process the information to download webcomics"""
|
||||
with open(CONFIGS["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file:
|
||||
args = get_args()
|
||||
configs = get_configs()
|
||||
with open(configs["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file:
|
||||
webcomics = yaml.safe_load(r_file)
|
||||
|
||||
usr_input = print_webcomics(webcomics)
|
||||
@@ -169,12 +315,13 @@ def webcomic_manager():
|
||||
"--cbz",
|
||||
]
|
||||
|
||||
run(command, ARGS.flag_verbose, cwd=Path(dest))
|
||||
run(command, args.flag_verbose, cwd=Path(dest))
|
||||
|
||||
|
||||
def save_comic(link: str) -> None:
|
||||
"""Add comic/manga link to the list"""
|
||||
list_comic = CONFIGS["comic"]["comic-list"]
|
||||
configs = get_configs()
|
||||
list_comic = configs["comic"]["comic-list"]
|
||||
with open(list_comic, "r", encoding="utf-8") as r_file:
|
||||
links = r_file.read().lower()
|
||||
if parse_link(link).lower() in links:
|
||||
@@ -186,8 +333,9 @@ def save_comic(link: str) -> None:
|
||||
w_file.write(link + "\n")
|
||||
|
||||
|
||||
def push_manager(user: User):
|
||||
def push_manager(user: User, links: list[str] | None = None) -> None:
|
||||
"""Filters out the URL to use the appropiate downloader"""
|
||||
args = get_args()
|
||||
# Creates an array which will store any links that should use youtube-dl
|
||||
rgx_gallery = re.compile(
|
||||
r"(x\.com\/\w+((?=.*media)|(?!.*status)))"
|
||||
@@ -216,6 +364,7 @@ def push_manager(user: User):
|
||||
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
|
||||
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
|
||||
|
||||
if links is None:
|
||||
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
|
||||
links = list(map(lambda x: x.rstrip(), r_file))
|
||||
links_galleries = filter(rgx_gallery.search, links)
|
||||
@@ -228,50 +377,20 @@ def push_manager(user: User):
|
||||
links,
|
||||
)
|
||||
|
||||
with db.connect() as conn:
|
||||
for link in links_galleries:
|
||||
gallery = Gallery()
|
||||
gallery.archive = ARGS.flag_archive
|
||||
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
|
||||
gallery.link = parse_link(link)
|
||||
gallery.dest = "download"
|
||||
gallery.opt_args = parse_instagram(link)
|
||||
gallery.generate_command(user)
|
||||
gallery.run_command(ARGS.flag_verbose)
|
||||
user.save_link(link)
|
||||
_handle_gallery_link(user, link, args, conn)
|
||||
|
||||
conn.commit()
|
||||
|
||||
for link in links_comics:
|
||||
if ARGS.flag_skip and re.search(r"readcomiconline", link):
|
||||
skip_arg = " --chapter-range 1"
|
||||
elif ARGS.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
|
||||
skip_arg = " --chapter-range 1-5"
|
||||
else:
|
||||
skip_arg = ""
|
||||
|
||||
gallery = Gallery()
|
||||
gallery.archive = ARGS.flag_archive
|
||||
gallery.skip_arg = skip_arg
|
||||
gallery.link = link
|
||||
gallery.generate_command(is_comic=True)
|
||||
gallery.run_command(ARGS.flag_verbose)
|
||||
save_comic(link)
|
||||
_handle_comic_link(link, args)
|
||||
|
||||
for link in links_videos:
|
||||
video = Video()
|
||||
video.use_archive = ARGS.flag_archive
|
||||
video.link = link
|
||||
video.dest = str(user.directories["media"])
|
||||
video.database = str(user.dbs["media"])
|
||||
run(video_command(video), ARGS.flag_verbose)
|
||||
_handle_video_link(user, link, args)
|
||||
|
||||
for link in links_other:
|
||||
LOG.info("Other type of download %s", link)
|
||||
gallery = Gallery()
|
||||
gallery.archive = False
|
||||
gallery.skip_arg = " -o directory='[]'"
|
||||
gallery.link = link
|
||||
gallery.dest = "push"
|
||||
gallery.generate_command(user)
|
||||
gallery.run_command(ARGS.flag_verbose)
|
||||
_handle_other_link(user, link, args)
|
||||
|
||||
# Flush the push list, cleans all the contents
|
||||
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
|
||||
@@ -280,43 +399,44 @@ def push_manager(user: User):
|
||||
|
||||
def scrapper_manager(user: User) -> None:
|
||||
"""Analyze the user arguments and call in functions"""
|
||||
args = get_args()
|
||||
user.list_manager()
|
||||
if re.search(r"main|instagram|kemono", ARGS.scrapper):
|
||||
skip_arg = "" if ARGS.flag_skip else " -o skip=true"
|
||||
parse_gallery(ARGS.scrapper, user)
|
||||
elif ARGS.scrapper == "push":
|
||||
if re.search(r"main|instagram|kemono", args.scrapper):
|
||||
parse_gallery(args.scrapper, user)
|
||||
elif args.scrapper == "push":
|
||||
push_manager(user)
|
||||
elif re.search("^comic|manga", ARGS.scrapper):
|
||||
skip_arg = " --chapter-range 1" if ARGS.flag_skip else ""
|
||||
skip_arg += "-5" if ARGS.scrapper == "manga" else ""
|
||||
comic_manager(skip_arg, ARGS.scrapper)
|
||||
elif re.search("webcomic", ARGS.scrapper):
|
||||
elif re.search("^comic|manga", args.scrapper):
|
||||
skip_arg = " --chapter-range 1" if args.flag_skip else ""
|
||||
skip_arg += "-5" if args.scrapper == "manga" else ""
|
||||
comic_manager(skip_arg, args.scrapper)
|
||||
elif re.search("webcomic", args.scrapper):
|
||||
webcomic_manager()
|
||||
|
||||
|
||||
def scrap_everyone() -> None:
|
||||
"""Iterates over every user of my scrapper"""
|
||||
for current_user in CONFIGS["users"]:
|
||||
args = get_args()
|
||||
configs = get_configs()
|
||||
for current_user in configs["users"]:
|
||||
user = User(get_index(current_user["name"]))
|
||||
LOG.info("Scrapping %s for %s", ARGS.scrapper, current_user["name"])
|
||||
LOG.info("Scrapping %s for %s", args.scrapper, current_user["name"])
|
||||
scrapper_manager(user)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main module to decide what to do based on the parsed arguments"""
|
||||
if ARGS.scrapper:
|
||||
args = get_args()
|
||||
if args.scrapper:
|
||||
rgx_shared = re.compile("push|main|instagram|kemono")
|
||||
if (ARGS.user in "everyone") and (rgx_shared.search(ARGS.scrapper)):
|
||||
if (args.user == "everyone") and (rgx_shared.search(args.scrapper)):
|
||||
scrap_everyone()
|
||||
else:
|
||||
scrapper_manager(User(get_index(ARGS.user)))
|
||||
elif ARGS.link:
|
||||
is_admin = re.search(r"everyone|jawz", ARGS.user)
|
||||
user = User(get_index("jawz" if is_admin else ARGS.user))
|
||||
for arg_link in ARGS.link[0]:
|
||||
user.append_list("push", parse_link(arg_link))
|
||||
|
||||
push_manager(user)
|
||||
scrapper_manager(User(get_index(args.user)))
|
||||
elif args.link:
|
||||
is_admin = args.user in ("everyone", "jawz")
|
||||
user = User(get_index("jawz" if is_admin else args.user))
|
||||
links = [parse_link(lnk) for grp in args.link for lnk in grp]
|
||||
push_manager(user, links=links)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -9,6 +9,7 @@ import logging
|
||||
import shlex
|
||||
import subprocess
|
||||
import shutil
|
||||
import random
|
||||
from typing import Sequence
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
@@ -31,7 +32,7 @@ def validate_x_link(line: str) -> str:
|
||||
if re.search(r"\/media$", line):
|
||||
return line
|
||||
# if does not contain /media at the end then add /media
|
||||
return f"{line}/media"
|
||||
return f"{line.rstrip('/')}/media"
|
||||
|
||||
|
||||
def parse_link(link: str) -> str:
|
||||
@@ -65,6 +66,8 @@ def run(
|
||||
verbose: bool,
|
||||
cwd: Path | None = None,
|
||||
check: bool = False,
|
||||
on_line=None,
|
||||
log_failure: bool = True,
|
||||
) -> None:
|
||||
"""Run command in a subprocess"""
|
||||
# pylint: disable=subprocess-run-check
|
||||
@@ -82,9 +85,28 @@ def run(
|
||||
else:
|
||||
args = list(command)
|
||||
|
||||
if on_line is None:
|
||||
result = subprocess.run(args, check=check, cwd=cwd)
|
||||
if not check and result.returncode != 0:
|
||||
if log_failure and not check and result.returncode != 0:
|
||||
LOG.warning("Command failed (%s): %s", result.returncode, args)
|
||||
return
|
||||
|
||||
proc = subprocess.Popen(
|
||||
args,
|
||||
cwd=cwd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
)
|
||||
assert proc.stdout is not None
|
||||
for line in proc.stdout:
|
||||
print(line, end="")
|
||||
on_line(line)
|
||||
returncode = proc.wait()
|
||||
if check and returncode != 0:
|
||||
raise subprocess.CalledProcessError(returncode, args)
|
||||
if log_failure and not check and returncode != 0:
|
||||
LOG.warning("Command failed (%s): %s", returncode, args)
|
||||
|
||||
|
||||
def list_lines(i: int, line: str) -> str:
|
||||
@@ -92,28 +114,41 @@ def list_lines(i: int, line: str) -> str:
|
||||
return f"{i}) {line}"
|
||||
|
||||
|
||||
def quote(line: str) -> str:
|
||||
"""Quote the line"""
|
||||
return f'"{line}"'
|
||||
|
||||
|
||||
def sort_txt_file(file_path: Path):
|
||||
"""Sort every line alphabetically
|
||||
remove duplicated and empty lines"""
|
||||
file = str(file_path.resolve())
|
||||
run(["sort", "-u", file, "-o", file], VERBOSE_G)
|
||||
run(["sed", "-i", "/^$/d", file], VERBOSE_G)
|
||||
run(["sed", "-i", "-e", "s,http:,https:,", file], VERBOSE_G)
|
||||
# fix this using strip on python
|
||||
# line.strip("/")
|
||||
run(["sed", "-i", "-e", "s,/$,,", file], VERBOSE_G) # trailing /
|
||||
path = file_path.resolve()
|
||||
with open(path, "r", encoding="utf-8") as open_file:
|
||||
lines = [ln.strip() for ln in open_file]
|
||||
|
||||
normalized = []
|
||||
for ln in lines:
|
||||
if not ln:
|
||||
continue
|
||||
ln = ln.replace("http://", "https://")
|
||||
ln = ln.rstrip("/")
|
||||
normalized.append(ln)
|
||||
|
||||
unique_sorted = sorted(set(normalized))
|
||||
with open(path, "w", encoding="utf-8") as open_file:
|
||||
open_file.write("\n".join(unique_sorted))
|
||||
if unique_sorted:
|
||||
open_file.write("\n")
|
||||
|
||||
|
||||
def randomize_txt_file(file_path: Path):
|
||||
"""Randomize the order of the
|
||||
lines of the txt file"""
|
||||
file = str(file_path.resolve())
|
||||
run(["sort", "-R", file, "-o", file], VERBOSE_G)
|
||||
path = file_path.resolve()
|
||||
with open(path, "r", encoding="utf-8") as open_file:
|
||||
lines = [ln.rstrip("\n") for ln in open_file]
|
||||
random.shuffle(lines)
|
||||
with open(path, "w", encoding="utf-8") as open_file:
|
||||
open_file.write("\n".join(lines))
|
||||
if lines:
|
||||
open_file.write("\n")
|
||||
|
||||
|
||||
def parse_list(file):
|
||||
|
||||
144
src/download/gallery_clean.py
Normal file
144
src/download/gallery_clean.py
Normal file
@@ -0,0 +1,144 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Interactive cleaner for x.com galleries."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
import db
|
||||
from classes.user import User
|
||||
from functions import load_config_variables
|
||||
|
||||
|
||||
def _extract_handle(url: str) -> str | None:
|
||||
parts = urlsplit(url if "://" in url else f"https://{url}")
|
||||
segments = [seg for seg in parts.path.split("/") if seg]
|
||||
if not segments:
|
||||
return None
|
||||
return segments[0]
|
||||
|
||||
|
||||
def _resolve_folder(user: User, handle: str | None) -> Path | None:
|
||||
base = user.directories.get("download")
|
||||
if base is None:
|
||||
return None
|
||||
if not base.exists():
|
||||
return None
|
||||
if not handle:
|
||||
return base
|
||||
candidates = [
|
||||
base / handle,
|
||||
]
|
||||
for cand in candidates:
|
||||
if cand.exists():
|
||||
return cand
|
||||
return None
|
||||
|
||||
|
||||
def _open_folder(path: Path) -> None:
|
||||
if shutil.which("xdg-open") is None:
|
||||
print("xdg-open not found; skipping folder open.")
|
||||
return
|
||||
subprocess.run(["xdg-open", str(path)], check=False)
|
||||
|
||||
|
||||
def _prompt() -> str:
|
||||
return input("Keep? [y] keep / [n] disable / [s] skip / [q] quit: ").strip().lower()
|
||||
|
||||
|
||||
def _build_user_index(configs: dict) -> dict[str, int]:
|
||||
return {entry["name"]: idx for idx, entry in enumerate(configs["users"])}
|
||||
|
||||
|
||||
def _validate_users(user_index: dict[str, int], users: list[str] | None) -> bool:
|
||||
if not users:
|
||||
return True
|
||||
unknown = [u for u in users if u not in user_index]
|
||||
if not unknown:
|
||||
return True
|
||||
print(f"Unknown users: {', '.join(unknown)}")
|
||||
return False
|
||||
|
||||
|
||||
def _print_context(user_name: str, url: str, handle: str | None, folder: Path | None) -> None:
|
||||
print(f"\nUser: {user_name}")
|
||||
print(f"URL: {url}")
|
||||
if handle:
|
||||
print(f"Handle: {handle}")
|
||||
if folder:
|
||||
print(f"Folder: {folder}")
|
||||
_open_folder(folder)
|
||||
return
|
||||
print("Folder: <unknown>")
|
||||
|
||||
|
||||
def _apply_choice(
|
||||
conn,
|
||||
user_name: str,
|
||||
url: str,
|
||||
choice: str,
|
||||
reason: str,
|
||||
) -> bool | None:
|
||||
if choice in ("y", "yes"):
|
||||
ok = db.set_keep(conn, user_name, url, keep=True, reason=reason)
|
||||
if ok:
|
||||
conn.commit()
|
||||
return True
|
||||
if choice in ("n", "no"):
|
||||
ok = db.set_enabled(conn, user_name, url, enabled=False, reason=reason)
|
||||
if ok:
|
||||
conn.commit()
|
||||
return True
|
||||
if choice in ("s", "skip", ""):
|
||||
return True
|
||||
if choice in ("q", "quit"):
|
||||
return None
|
||||
print("Please enter y, n, s, or q.")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(prog="gallery-clean")
|
||||
parser.add_argument(
|
||||
"session",
|
||||
nargs="?",
|
||||
type=int,
|
||||
default=10,
|
||||
help="Number of links to review this session (default: 10)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--reason",
|
||||
default="gallery-clean",
|
||||
help="Reason stored when disabling or keeping",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
configs = load_config_variables()
|
||||
user_index = _build_user_index(configs)
|
||||
users_filter = ["jawz"]
|
||||
if not _validate_users(user_index, users_filter):
|
||||
return
|
||||
|
||||
with db.connect(configs) as conn:
|
||||
rows = db.get_links_for_cleaning(conn, users=users_filter)
|
||||
for row in rows[: max(args.session, 0)]:
|
||||
user_name = row["user_name"]
|
||||
url = row["url_original"]
|
||||
handle = _extract_handle(url)
|
||||
folder = _resolve_folder(User(user_index[user_name]), handle)
|
||||
|
||||
_print_context(user_name, url, handle, folder)
|
||||
|
||||
while True:
|
||||
result = _apply_choice(conn, user_name, url, _prompt(), args.reason)
|
||||
if result is None:
|
||||
return
|
||||
if result:
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
72
src/download/select_links.py
Normal file
72
src/download/select_links.py
Normal file
@@ -0,0 +1,72 @@
|
||||
#!/usr/bin/env python3
|
||||
"""fzf-based selectors for comic and gallery links."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
import db
|
||||
|
||||
USER = "jawz"
|
||||
|
||||
RGX_COMIC = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
|
||||
|
||||
|
||||
def _select_links(urls: list[str]) -> list[str]:
|
||||
if not urls:
|
||||
return []
|
||||
proc = subprocess.run(
|
||||
["fzf", "--multi", "--exact", "-i"],
|
||||
input="\n".join(urls),
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
return []
|
||||
return [ln for ln in proc.stdout.splitlines() if ln.strip()]
|
||||
|
||||
|
||||
def _run_download(selected: list[str], extra_args: list[str]) -> None:
|
||||
if not selected:
|
||||
return
|
||||
subprocess.run(["download", "-u", USER, *extra_args, "-i", *selected], check=False)
|
||||
|
||||
|
||||
def _parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(add_help=False)
|
||||
parser.add_argument("-s", "--no_skip", dest="flag_skip", action="store_false")
|
||||
parser.add_argument("-a", "--no-archive", dest="flag_archive", action="store_false")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def _extra_args_from_flags(args: argparse.Namespace) -> list[str]:
|
||||
extra = []
|
||||
if args.flag_skip is False:
|
||||
extra.append("-s")
|
||||
if args.flag_archive is False:
|
||||
extra.append("-a")
|
||||
return extra
|
||||
|
||||
|
||||
def comic_main() -> None:
|
||||
args = _parse_args()
|
||||
extra_args = _extra_args_from_flags(args)
|
||||
with db.connect() as conn:
|
||||
rows = db.get_links(conn, users=[USER], include_disabled=False, include_banned=False)
|
||||
urls = [row["url_original"] for row in rows if RGX_COMIC.search(row["url_original"])]
|
||||
_run_download(_select_links(urls), extra_args)
|
||||
|
||||
|
||||
def gallery_main() -> None:
|
||||
args = _parse_args()
|
||||
extra_args = _extra_args_from_flags(args)
|
||||
with db.connect() as conn:
|
||||
rows = db.get_links(conn, users=[USER], include_disabled=False, include_banned=False)
|
||||
urls = [row["url_original"] for row in rows if not RGX_COMIC.search(row["url_original"])]
|
||||
_run_download(_select_links(urls), extra_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
gallery_main()
|
||||
@@ -6,10 +6,19 @@ py_modules =
|
||||
download
|
||||
functions
|
||||
argparser
|
||||
db
|
||||
admin
|
||||
admin_links
|
||||
admin_users
|
||||
select_links
|
||||
gallery_clean
|
||||
classes.gallery
|
||||
classes.user
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
download = download:main
|
||||
|
||||
download-admin = admin:main
|
||||
comic = select_links:comic_main
|
||||
gallery = select_links:gallery_main
|
||||
gallery-clean = gallery_clean:main
|
||||
|
||||
79
src/download/tests/test_db.py
Normal file
79
src/download/tests/test_db.py
Normal file
@@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python3
|
||||
import tempfile
|
||||
import unittest
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import db
|
||||
|
||||
|
||||
class TestDB(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.conn = sqlite3.connect(":memory:")
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
db.ensure_schema(self.conn)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.conn.close()
|
||||
|
||||
def test_normalize_url(self):
|
||||
self.assertEqual(
|
||||
db.normalize_url("http://Twitter.com/User/"),
|
||||
"https://x.com/User",
|
||||
)
|
||||
self.assertEqual(
|
||||
db.normalize_url("x.com/SomeUser/media/"),
|
||||
"https://x.com/SomeUser/media",
|
||||
)
|
||||
|
||||
def test_add_link_dedupe(self):
|
||||
res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/")
|
||||
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||
self.assertEqual(res1["status"], "added")
|
||||
self.assertEqual(res2["status"], "exists")
|
||||
|
||||
def test_remove_tombstone(self):
|
||||
db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||
ok = db.remove_link(self.conn, "jawz", "https://x.com/Test")
|
||||
self.assertTrue(ok)
|
||||
res = db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||
self.assertEqual(res["status"], "removed")
|
||||
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True)
|
||||
self.assertEqual(res2["status"], "added")
|
||||
|
||||
def test_disable_and_ban(self):
|
||||
db.add_link(self.conn, "jawz", "https://x.com/Test")
|
||||
ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False)
|
||||
self.assertTrue(ok)
|
||||
active = db.get_active_links(self.conn, "jawz")
|
||||
self.assertEqual(active, [])
|
||||
ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad")
|
||||
self.assertTrue(ok)
|
||||
active = db.get_active_links(self.conn, "jawz")
|
||||
self.assertEqual(active, [])
|
||||
|
||||
def test_import_master_list(self):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = Path(tmp) / "watch.txt"
|
||||
path.write_text(
|
||||
"\n".join(
|
||||
[
|
||||
"https://x.com/User",
|
||||
"# https://x.com/DisabledUser",
|
||||
"https://x.com/User",
|
||||
]
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
result = db.import_master_list(self.conn, "jawz", path)
|
||||
self.assertEqual(result["added"], 2)
|
||||
self.assertEqual(result["exists"], 1)
|
||||
rows = db.get_links_by_user(self.conn, "jawz")
|
||||
by_norm = {db.normalize_url(r["url_original"]): r for r in rows}
|
||||
self.assertTrue(by_norm["https://x.com/User"]["enabled"])
|
||||
self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
140
src/download/tests/test_download.py
Normal file
140
src/download/tests/test_download.py
Normal file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python3
|
||||
import types
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
import download
|
||||
|
||||
|
||||
class DummyArgs:
|
||||
def __init__(self):
|
||||
self.post_type = ["posts", "reels"]
|
||||
self.flag_archive = True
|
||||
self.flag_skip = True
|
||||
self.flag_verbose = True
|
||||
|
||||
|
||||
class DummyUser:
|
||||
def __init__(self):
|
||||
self.name = "jawz"
|
||||
self.sleep = 0
|
||||
self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")}
|
||||
self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")}
|
||||
self.lists = {"push": Path("/tmp/instant.txt")}
|
||||
|
||||
|
||||
class TestDownload(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
download.ARGS = DummyArgs()
|
||||
download.CONFIGS = {
|
||||
"users": [{"name": "jawz"}],
|
||||
"global": {},
|
||||
"comic": {"comic-list": "/tmp/comic.txt"},
|
||||
}
|
||||
self.orig_gallery = download.Gallery
|
||||
self.orig_video_command = download.video_command
|
||||
self.orig_run = download.run
|
||||
self.orig_db_connect = download.db.connect
|
||||
self.orig_db_add_link = download.db.add_link
|
||||
self.orig_save_comic = download.save_comic
|
||||
self.orig_make_handler = download._make_gallery_error_handler
|
||||
|
||||
def tearDown(self) -> None:
|
||||
download.Gallery = self.orig_gallery
|
||||
download.video_command = self.orig_video_command
|
||||
download.run = self.orig_run
|
||||
download.db.connect = self.orig_db_connect
|
||||
download.db.add_link = self.orig_db_add_link
|
||||
download.save_comic = self.orig_save_comic
|
||||
download._make_gallery_error_handler = self.orig_make_handler
|
||||
|
||||
def test_parse_instagram(self):
|
||||
res = download.parse_instagram("https://instagram.com/user")
|
||||
self.assertEqual(res, ["-o", "include=posts,reels"])
|
||||
res2 = download.parse_instagram("https://x.com/user")
|
||||
self.assertEqual(res2, [])
|
||||
|
||||
def test_video_command(self):
|
||||
v = download.Video()
|
||||
v.link = "https://youtu.be/abc"
|
||||
v.dest = "/tmp"
|
||||
cmd = download.video_command(v)
|
||||
self.assertIn("yt-dlp", cmd[0])
|
||||
self.assertIn("https://youtu.be/abc", cmd)
|
||||
|
||||
v2 = download.Video()
|
||||
v2.link = "https://music.youtube.com/watch?v=xyz"
|
||||
v2.dest = "/tmp"
|
||||
v2.use_archive = False
|
||||
cmd2 = download.video_command(v2)
|
||||
self.assertIn("--audio-format", cmd2)
|
||||
|
||||
def test_push_manager_routing(self):
|
||||
user = DummyUser()
|
||||
|
||||
captured = {"gallery": [], "video": [], "comic": [], "other": []}
|
||||
|
||||
def fake_generate(self, *args, **kwargs):
|
||||
return None
|
||||
|
||||
def fake_run(self, *args, **kwargs):
|
||||
link = getattr(self, "link", "")
|
||||
if "mangadex" in link:
|
||||
captured["comic"].append(link)
|
||||
elif "x.com" in link:
|
||||
captured["gallery"].append(link)
|
||||
else:
|
||||
captured["other"].append(link)
|
||||
|
||||
def fake_video_command(video):
|
||||
captured["video"].append(video.link)
|
||||
return ["echo", "ok"]
|
||||
|
||||
# Patch Gallery methods and video_command/run
|
||||
class FakeGallery(self.orig_gallery):
|
||||
def generate_command(self, *args, **kwargs):
|
||||
return fake_generate(self, *args, **kwargs)
|
||||
|
||||
def run_command(self, *args, **kwargs):
|
||||
return fake_run(self, *args, **kwargs)
|
||||
|
||||
download.Gallery = FakeGallery
|
||||
download.video_command = fake_video_command
|
||||
download.run = lambda *args, **kwargs: None
|
||||
download.save_comic = lambda *_args, **_kwargs: None
|
||||
download._make_gallery_error_handler = lambda *_args, **_kwargs: None
|
||||
|
||||
links = [
|
||||
"https://x.com/someuser",
|
||||
"https://youtu.be/abc",
|
||||
"https://mangadex.org/title/123",
|
||||
"https://example.com/page",
|
||||
]
|
||||
|
||||
# Disable DB write path for this test
|
||||
class FakeConn:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def commit(self):
|
||||
return None
|
||||
|
||||
download.db.connect = lambda *a, **k: FakeConn()
|
||||
download.db.add_link = lambda *a, **k: {"status": "added"}
|
||||
|
||||
download.push_manager(user, links=links)
|
||||
|
||||
self.assertEqual(len(captured["gallery"]), 1)
|
||||
self.assertEqual(len(captured["video"]), 1)
|
||||
self.assertEqual(len(captured["comic"]), 1)
|
||||
self.assertEqual(len(captured["other"]), 1)
|
||||
|
||||
# restore handled in tearDown
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user