gallery-clean + autocompletions
This commit is contained in:
@@ -35,4 +35,11 @@ buildPythonApplication {
|
||||
webcomix
|
||||
fzf
|
||||
];
|
||||
|
||||
postInstall = ''
|
||||
install -Dm644 completions/download.bash \
|
||||
$out/share/bash-completion/completions/download
|
||||
install -Dm644 completions/download.bash \
|
||||
$out/share/bash-completion/completions/download-admin
|
||||
'';
|
||||
}
|
||||
|
||||
102
src/download/completions/download.bash
Normal file
102
src/download/completions/download.bash
Normal file
@@ -0,0 +1,102 @@
|
||||
# Bash completion for download and download-admin.
|
||||
# Source this file or install it in your bash_completion.d directory.
|
||||
|
||||
__download_users() {
|
||||
python3 - <<'PY' 2>/dev/null
|
||||
import pathlib
|
||||
try:
|
||||
import yaml
|
||||
except Exception:
|
||||
print("")
|
||||
raise SystemExit(0)
|
||||
|
||||
cfg = pathlib.Path("~/.config/jawz/config.yaml").expanduser()
|
||||
if not cfg.is_file():
|
||||
print("")
|
||||
raise SystemExit(0)
|
||||
data = yaml.safe_load(cfg.read_text(encoding="utf-8")) or {}
|
||||
users = [u.get("name") for u in data.get("users", []) if isinstance(u, dict)]
|
||||
print(" ".join([u for u in users if u]))
|
||||
PY
|
||||
}
|
||||
|
||||
_download() {
|
||||
local cur prev words cword
|
||||
_init_completion -n : || return
|
||||
|
||||
local scrappers="push main instagram kemono comic manga webcomic"
|
||||
local opts="-u --user -i --input -l --list -a --no-archive -s --no_skip -v --verbose -t --type-post"
|
||||
local post_types="posts reels stories highlights avatar"
|
||||
|
||||
if [[ "$cur" == -* ]]; then
|
||||
COMPREPLY=( $(compgen -W "$opts" -- "$cur") )
|
||||
return
|
||||
fi
|
||||
|
||||
case "$prev" in
|
||||
-u|--user)
|
||||
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
|
||||
return
|
||||
;;
|
||||
-t|--type-post)
|
||||
COMPREPLY=( $(compgen -W "$post_types" -- "$cur") )
|
||||
return
|
||||
;;
|
||||
-i|--input)
|
||||
return
|
||||
;;
|
||||
esac
|
||||
|
||||
local have_scrapper=0
|
||||
local w
|
||||
for w in "${words[@]:1}"; do
|
||||
[[ "$w" == -* ]] && continue
|
||||
if [[ " $scrappers " == *" $w "* ]]; then
|
||||
have_scrapper=1
|
||||
break
|
||||
fi
|
||||
done
|
||||
if [[ $have_scrapper -eq 0 ]]; then
|
||||
COMPREPLY=( $(compgen -W "$scrappers" -- "$cur") )
|
||||
fi
|
||||
}
|
||||
|
||||
_download_admin() {
|
||||
local cur prev words cword
|
||||
_init_completion -n : || return
|
||||
|
||||
local cmds="add disable enable ban unban remove rename list users import validate-import fix-revision fix-x-media user-rename"
|
||||
local list_opts="--user --disabled --banned --requires-revision"
|
||||
|
||||
if [[ "$cur" == -* ]]; then
|
||||
if [[ "${words[1]}" == "list" ]]; then
|
||||
COMPREPLY=( $(compgen -W "$list_opts" -- "$cur") )
|
||||
else
|
||||
COMPREPLY=()
|
||||
fi
|
||||
return
|
||||
fi
|
||||
|
||||
case "$prev" in
|
||||
--user)
|
||||
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
|
||||
return
|
||||
;;
|
||||
esac
|
||||
|
||||
if [[ $cword -eq 1 ]]; then
|
||||
COMPREPLY=( $(compgen -W "$cmds" -- "$cur") )
|
||||
return
|
||||
fi
|
||||
|
||||
case "${words[1]}" in
|
||||
add|disable|enable|ban|unban|remove|rename|user-rename)
|
||||
if [[ $cword -eq 2 ]]; then
|
||||
COMPREPLY=( $(compgen -W "$(__download_users)" -- "$cur") )
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
complete -F _download download
|
||||
complete -F _download_admin download-admin
|
||||
@@ -39,9 +39,11 @@ def ensure_schema(conn: sqlite3.Connection) -> None:
|
||||
url_normalized TEXT NOT NULL,
|
||||
site TEXT,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
keep INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
disabled_at TEXT,
|
||||
disabled_reason TEXT,
|
||||
banned_at TEXT,
|
||||
banned_reason TEXT,
|
||||
requires_revision INTEGER NOT NULL DEFAULT 0
|
||||
@@ -79,6 +81,18 @@ def ensure_schema(conn: sqlite3.Connection) -> None:
|
||||
"requires_revision",
|
||||
"ALTER TABLE links ADD COLUMN requires_revision INTEGER NOT NULL DEFAULT 0",
|
||||
)
|
||||
_ensure_column(
|
||||
conn,
|
||||
"links",
|
||||
"keep",
|
||||
"ALTER TABLE links ADD COLUMN keep INTEGER NOT NULL DEFAULT 0",
|
||||
)
|
||||
_ensure_column(
|
||||
conn,
|
||||
"links",
|
||||
"disabled_reason",
|
||||
"ALTER TABLE links ADD COLUMN disabled_reason TEXT",
|
||||
)
|
||||
|
||||
|
||||
def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None:
|
||||
@@ -195,6 +209,7 @@ def set_enabled(
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
enabled: bool,
|
||||
reason: str | None = None,
|
||||
) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
row = conn.execute(
|
||||
@@ -207,7 +222,11 @@ def set_enabled(
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET enabled = 1, disabled_at = NULL, requires_revision = 0, updated_at = CURRENT_TIMESTAMP
|
||||
SET enabled = 1,
|
||||
disabled_at = NULL,
|
||||
disabled_reason = NULL,
|
||||
requires_revision = 0,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
@@ -217,12 +236,22 @@ def set_enabled(
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET enabled = 0, disabled_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
|
||||
SET enabled = 0,
|
||||
disabled_at = CURRENT_TIMESTAMP,
|
||||
disabled_reason = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
(reason, row["id"]),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"disable",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
add_history(conn, user_name, "disable", link_id=row["id"], old_url=row["url_original"])
|
||||
return True
|
||||
|
||||
|
||||
@@ -448,6 +477,80 @@ def get_links(
|
||||
return conn.execute(f"SELECT * FROM links {clause} ORDER BY user_name, id", params).fetchall()
|
||||
|
||||
|
||||
def get_links_for_cleaning(
|
||||
conn: sqlite3.Connection,
|
||||
users: Iterable[str] | None = None,
|
||||
) -> list[sqlite3.Row]:
|
||||
params: list = []
|
||||
where = [
|
||||
"site = ?",
|
||||
"enabled = 1",
|
||||
"banned_at IS NULL",
|
||||
"keep = 0",
|
||||
]
|
||||
params.append("x.com")
|
||||
user_list = list(users) if users else []
|
||||
if user_list:
|
||||
where.append(f"user_name IN ({','.join(['?'] * len(user_list))})")
|
||||
params.extend(user_list)
|
||||
clause = " AND ".join(where)
|
||||
return conn.execute(
|
||||
f"SELECT * FROM links WHERE {clause} ORDER BY user_name, id",
|
||||
params,
|
||||
).fetchall()
|
||||
|
||||
|
||||
def set_keep(
|
||||
conn: sqlite3.Connection,
|
||||
user_name: str,
|
||||
url_original: str,
|
||||
keep: bool,
|
||||
reason: str | None = None,
|
||||
) -> bool:
|
||||
url_norm = normalize_url(url_original)
|
||||
row = conn.execute(
|
||||
"SELECT id, url_original FROM links WHERE user_name = ? AND url_normalized = ?",
|
||||
(user_name, url_norm),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
if keep:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET keep = 1, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"keep",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE links
|
||||
SET keep = 0, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
""",
|
||||
(row["id"],),
|
||||
)
|
||||
add_history(
|
||||
conn,
|
||||
user_name,
|
||||
"unkeep",
|
||||
link_id=row["id"],
|
||||
old_url=row["url_original"],
|
||||
note=reason,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def get_links_by_user(conn: sqlite3.Connection, user_name: str) -> list[sqlite3.Row]:
|
||||
return conn.execute(
|
||||
"SELECT * FROM links WHERE user_name = ? ORDER BY id",
|
||||
|
||||
145
src/download/gallery_clean.py
Normal file
145
src/download/gallery_clean.py
Normal file
@@ -0,0 +1,145 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Interactive cleaner for x.com galleries."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
import db
|
||||
from classes.user import User
|
||||
from functions import load_config_variables
|
||||
|
||||
|
||||
def _extract_handle(url: str) -> str | None:
|
||||
parts = urlsplit(url if "://" in url else f"https://{url}")
|
||||
segments = [seg for seg in parts.path.split("/") if seg]
|
||||
if not segments:
|
||||
return None
|
||||
return segments[0]
|
||||
|
||||
|
||||
def _resolve_folder(user: User, handle: str | None) -> Path | None:
|
||||
base = user.directories.get("download")
|
||||
if base is None:
|
||||
return None
|
||||
if not handle:
|
||||
return base
|
||||
candidates = [
|
||||
base / "x.com" / handle,
|
||||
base / "twitter" / handle,
|
||||
base / handle,
|
||||
]
|
||||
for cand in candidates:
|
||||
if cand.exists():
|
||||
return cand
|
||||
return base
|
||||
|
||||
|
||||
def _open_folder(path: Path) -> None:
|
||||
if shutil.which("xdg-open") is None:
|
||||
print("xdg-open not found; skipping folder open.")
|
||||
return
|
||||
subprocess.run(["xdg-open", str(path)], check=False)
|
||||
|
||||
|
||||
def _prompt() -> str:
|
||||
return input("Keep? [y] keep / [n] disable / [s] skip / [q] quit: ").strip().lower()
|
||||
|
||||
|
||||
def _build_user_index(configs: dict) -> dict[str, int]:
|
||||
return {entry["name"]: idx for idx, entry in enumerate(configs["users"])}
|
||||
|
||||
|
||||
def _validate_users(user_index: dict[str, int], users: list[str] | None) -> bool:
|
||||
if not users:
|
||||
return True
|
||||
unknown = [u for u in users if u not in user_index]
|
||||
if not unknown:
|
||||
return True
|
||||
print(f"Unknown users: {', '.join(unknown)}")
|
||||
return False
|
||||
|
||||
|
||||
def _print_context(user_name: str, url: str, handle: str | None, folder: Path | None) -> None:
|
||||
print(f"\nUser: {user_name}")
|
||||
print(f"URL: {url}")
|
||||
if handle:
|
||||
print(f"Handle: {handle}")
|
||||
if folder:
|
||||
print(f"Folder: {folder}")
|
||||
_open_folder(folder)
|
||||
return
|
||||
print("Folder: <unknown>")
|
||||
|
||||
|
||||
def _apply_choice(
|
||||
conn,
|
||||
user_name: str,
|
||||
url: str,
|
||||
choice: str,
|
||||
reason: str,
|
||||
) -> bool | None:
|
||||
if choice in ("y", "yes"):
|
||||
ok = db.set_keep(conn, user_name, url, keep=True, reason=reason)
|
||||
if ok:
|
||||
conn.commit()
|
||||
return True
|
||||
if choice in ("n", "no"):
|
||||
ok = db.set_enabled(conn, user_name, url, enabled=False, reason=reason)
|
||||
if ok:
|
||||
conn.commit()
|
||||
return True
|
||||
if choice in ("s", "skip", ""):
|
||||
return True
|
||||
if choice in ("q", "quit"):
|
||||
return None
|
||||
print("Please enter y, n, s, or q.")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(prog="gallery-clean")
|
||||
parser.add_argument(
|
||||
"session",
|
||||
nargs="?",
|
||||
type=int,
|
||||
default=10,
|
||||
help="Number of links to review this session (default: 10)",
|
||||
)
|
||||
parser.add_argument("-u", "--user", action="append", help="Filter by user")
|
||||
parser.add_argument(
|
||||
"--reason",
|
||||
default="gallery-clean",
|
||||
help="Reason stored when disabling or keeping",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
configs = load_config_variables()
|
||||
user_index = _build_user_index(configs)
|
||||
users_filter = args.user or None
|
||||
if not _validate_users(user_index, users_filter):
|
||||
return
|
||||
|
||||
with db.connect(configs) as conn:
|
||||
rows = db.get_links_for_cleaning(conn, users=users_filter)
|
||||
for row in rows[: max(args.session, 0)]:
|
||||
user_name = row["user_name"]
|
||||
url = row["url_original"]
|
||||
handle = _extract_handle(url)
|
||||
folder = _resolve_folder(User(user_index[user_name]), handle)
|
||||
|
||||
_print_context(user_name, url, handle, folder)
|
||||
|
||||
while True:
|
||||
result = _apply_choice(conn, user_name, url, _prompt(), args.reason)
|
||||
if result is None:
|
||||
return
|
||||
if result:
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -11,6 +11,7 @@ py_modules =
|
||||
admin_links
|
||||
admin_users
|
||||
select_links
|
||||
gallery_clean
|
||||
classes.gallery
|
||||
classes.user
|
||||
|
||||
@@ -20,3 +21,4 @@ console_scripts =
|
||||
download-admin = admin:main
|
||||
comic = select_links:comic_main
|
||||
gallery = select_links:gallery_main
|
||||
gallery-clean = gallery_clean:main
|
||||
|
||||
Reference in New Issue
Block a user