315 lines
10 KiB
Python
315 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Rewriting of the download manager script
|
|
with the intention to make it
|
|
more modular with the use of flags
|
|
in order to avoid unnecesary modifications
|
|
to the cofig files.
|
|
Also following in line more posix and python rules.
|
|
"""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
import yaml
|
|
from typing import Dict
|
|
from functions import LOG
|
|
from functions import run
|
|
from functions import list_lines
|
|
from functions import load_config_variables
|
|
from functions import parse_link
|
|
from argparser import argparser
|
|
from classes.user import User
|
|
from classes.gallery import Gallery
|
|
|
|
# GLOBAL VARIABLE SECTION
|
|
CONFIGS = load_config_variables()
|
|
# Enable a default "everyone" flag for when running stuff like download gallery
|
|
USERS = ["everyone"] + [user["name"] for user in CONFIGS["users"]]
|
|
ARGS = argparser(USERS)
|
|
|
|
|
|
class Video:
|
|
"""Just a simple class to unify the Video parameters into a single one."""
|
|
|
|
def __init__(self) -> None:
|
|
self.use_archive: bool = True
|
|
self.link: str = ""
|
|
self.dest: str = ""
|
|
self.database: str = ""
|
|
|
|
|
|
def get_index(name: str) -> int:
|
|
"""Find the index in the config file"""
|
|
return next((i for i, d in enumerate(CONFIGS["users"]) if d["name"] == name), -1)
|
|
|
|
|
|
def parse_gallery(gdl_list: str, user: User) -> None:
|
|
"""Processes the gallery-dl command based on the selected gallery"""
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
|
|
gallery.dest = "download"
|
|
gallery.list = gdl_list
|
|
gallery.opt_args = parse_instagram(gdl_list)
|
|
|
|
gallery.generate_command(user)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
|
|
|
|
def parse_instagram(link: str) -> list[str]:
|
|
"""Fix instagram links"""
|
|
if "instagram" not in link:
|
|
return []
|
|
if isinstance(ARGS.post_type, list):
|
|
return ["-o", f"include={','.join(ARGS.post_type)}"]
|
|
return ["-o", f"include={ARGS.post_type}"]
|
|
|
|
|
|
def video_command(video: Video):
|
|
"""Filters and processes the required command to download videos"""
|
|
command = ["yt-dlp"]
|
|
rgx_yt = re.compile(r"(https:\/\/youtube|https:\/\/www.youtube|https:\/\/youtu.be)")
|
|
rgx_music = re.compile(r"(https:\/\/music.youtube.*)")
|
|
|
|
if re.search(r"chaturbate", video.link):
|
|
return ["stream-dl", video.link.rstrip("/").split("/")[-1]]
|
|
|
|
if rgx_yt.search(video.link):
|
|
command += [
|
|
"--embed-subs",
|
|
"--embed-thumbnail",
|
|
"--embed-metadata",
|
|
"--embed-chapters",
|
|
"-o",
|
|
f"{video.dest}/%(title)s.%(ext)s",
|
|
]
|
|
|
|
elif rgx_music.search(video.link):
|
|
if video.use_archive:
|
|
command += ["--download-archive", video.database]
|
|
command += [
|
|
"--no-playlist",
|
|
"--newline",
|
|
"-x",
|
|
"--audio-format",
|
|
"best",
|
|
"--add-metadata",
|
|
"--audio-quality",
|
|
"0",
|
|
"-o",
|
|
f"{video.dest}/%(title)s.%(ext)s",
|
|
]
|
|
|
|
else: # Any other video link, just do it generic
|
|
command += ["-f", "mp4", "-o", f"{video.dest}/%(title)s.%(ext)s"]
|
|
|
|
LOG.info("%s %s", " ".join(command), video.link)
|
|
return command + [video.link]
|
|
|
|
|
|
def comic_manager(skip_arg: str, category: str) -> None:
|
|
"""Process the information to download manga"""
|
|
re_cat = "manga|webtoon" if category == "manga" else "readcomiconline"
|
|
with open(CONFIGS["comic"]["comic-list"], "r", encoding="utf-8") as r_file:
|
|
links = list(filter(lambda x: re.search(re_cat, x), r_file))
|
|
|
|
for link in links:
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = skip_arg
|
|
gallery.link = link
|
|
gallery.generate_command(is_comic=True)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
|
|
|
|
def print_webcomics(webcomics: Dict[str, Dict]) -> int:
|
|
"""Prints a list of webcomics, and returns an index."""
|
|
for index, entry in enumerate(webcomics["webcomics"]):
|
|
print(list_lines(index, entry["name"]))
|
|
|
|
return int(input("Select a webcomic: "))
|
|
|
|
|
|
def webcomic_manager():
|
|
"""Process the information to download webcomics"""
|
|
with open(CONFIGS["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file:
|
|
webcomics = yaml.safe_load(r_file)
|
|
|
|
usr_input = print_webcomics(webcomics)
|
|
|
|
# Determines where the webcomic will be downloaded
|
|
rating = webcomics["webcomics"][usr_input]["type"]
|
|
dest = webcomics["global"][f"{rating}_directory"]
|
|
name = webcomics["webcomics"][usr_input]["name"]
|
|
link = webcomics["webcomics"][usr_input]["url"]
|
|
nxt_code = webcomics["webcomics"][usr_input]["next_code"]
|
|
img_code = webcomics["webcomics"][usr_input]["image_code"]
|
|
|
|
LOG.info("The webcomic is %s", dest)
|
|
|
|
command = [
|
|
"webcomix",
|
|
"custom",
|
|
name,
|
|
"--start-url",
|
|
link,
|
|
f"--next-page-xpath={nxt_code}",
|
|
f"--image-xpath={img_code}",
|
|
"-y",
|
|
"--cbz",
|
|
]
|
|
|
|
run(command, ARGS.flag_verbose, cwd=Path(dest))
|
|
|
|
|
|
def save_comic(link: str) -> None:
|
|
"""Add comic/manga link to the list"""
|
|
list_comic = CONFIGS["comic"]["comic-list"]
|
|
with open(list_comic, "r", encoding="utf-8") as r_file:
|
|
links = r_file.read().lower()
|
|
if parse_link(link).lower() in links:
|
|
LOG.info("Graphic novel repeated, not saving")
|
|
return
|
|
LOG.info("New graphic novel, saving")
|
|
|
|
with open(list_comic, "a", encoding="utf-8") as w_file:
|
|
w_file.write(link + "\n")
|
|
|
|
|
|
def push_manager(user: User):
|
|
"""Filters out the URL to use the appropiate downloader"""
|
|
# Creates an array which will store any links that should use youtube-dl
|
|
rgx_gallery = re.compile(
|
|
r"(x\.com\/\w+((?=.*media)|(?!.*status)))"
|
|
r"|(men\.wikifeet)"
|
|
r"|(furaffinity\.net\/user\/)"
|
|
r"|((deviantart\.com\/\w+(?!.*\/art\/)))"
|
|
r"|(furaffinity\.net\/gallery\/)"
|
|
r"|(furaffinity\.net\/scraps\/)"
|
|
r"|(furaffinity\.net\/favorites\/)"
|
|
r"|(instagram.com(?!\/p\/)\/\w+)"
|
|
r"|(e621\.net((?=\/post\/)|(?!\/posts\/)))"
|
|
r"|(flickr\.com\/photos\/\w+\/(?!\d+))"
|
|
r"|(tumblr\.com(?!\/post\/))"
|
|
r"|(kemono\.party\/(fanbox|gumroad|patreon)(?!\/user\/\d+\/post))"
|
|
r"|(blogspot\.com(?!\/))"
|
|
r"|(rule34\.paheal\.net\/post\/(?!view))"
|
|
r"|(rule34\.xxx\/index\.php\?page\=post&s=(?!view))"
|
|
r"|(pixiv\.net\/(en\/)?((?=users)|(?!artwork)))"
|
|
r"|(fanbox\.cc\/@\w+(?!.*posts\/\d+))"
|
|
r"|(reddit\.com\/(user|u))"
|
|
r"|(baraag\.net\/((@\w+)|(?!\/\d+)))"
|
|
r"|(pinterest\.com\/(?!pin\/\d+))"
|
|
r"|(redgifs\.com\/(users|u|(?!watch)))"
|
|
r"|(bsky\.app\/profile\/(?!.*\/post\/))"
|
|
)
|
|
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
|
|
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
|
|
|
|
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
|
|
links = list(map(lambda x: x.rstrip(), r_file))
|
|
links_galleries = filter(rgx_gallery.search, links)
|
|
links_videos = filter(rgx_video.search, links)
|
|
links_comics = filter(rgx_comic.search, links)
|
|
links_other = filter(
|
|
lambda x: (not rgx_video.search(x))
|
|
and (not rgx_gallery.search(x))
|
|
and (not rgx_comic.search(x)),
|
|
links,
|
|
)
|
|
|
|
for link in links_galleries:
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
|
|
gallery.link = parse_link(link)
|
|
gallery.dest = "download"
|
|
gallery.opt_args = parse_instagram(link)
|
|
gallery.generate_command(user)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
user.save_link(link)
|
|
|
|
for link in links_comics:
|
|
if ARGS.flag_skip and re.search(r"readcomiconline", link):
|
|
skip_arg = " --chapter-range 1"
|
|
elif ARGS.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
|
|
skip_arg = " --chapter-range 1-5"
|
|
else:
|
|
skip_arg = ""
|
|
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = skip_arg
|
|
gallery.link = link
|
|
gallery.generate_command(is_comic=True)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
save_comic(link)
|
|
|
|
for link in links_videos:
|
|
video = Video()
|
|
video.use_archive = ARGS.flag_archive
|
|
video.link = link
|
|
video.dest = str(user.directories["media"])
|
|
video.database = str(user.dbs["media"])
|
|
run(video_command(video), ARGS.flag_verbose)
|
|
|
|
for link in links_other:
|
|
LOG.info("Other type of download %s", link)
|
|
gallery = Gallery()
|
|
gallery.archive = False
|
|
gallery.skip_arg = " -o directory='[]'"
|
|
gallery.link = link
|
|
gallery.dest = "push"
|
|
gallery.generate_command(user)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
|
|
# Flush the push list, cleans all the contents
|
|
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
|
|
w_file.close()
|
|
|
|
|
|
def scrapper_manager(user: User) -> None:
|
|
"""Analyze the user arguments and call in functions"""
|
|
user.list_manager()
|
|
if re.search(r"main|instagram|kemono", ARGS.scrapper):
|
|
skip_arg = "" if ARGS.flag_skip else " -o skip=true"
|
|
parse_gallery(ARGS.scrapper, user)
|
|
elif ARGS.scrapper in "push":
|
|
push_manager(user)
|
|
elif re.search("^comic|manga", ARGS.scrapper):
|
|
skip_arg = " --chapter-range 1" if ARGS.flag_skip else ""
|
|
skip_arg += "-5" if ARGS.scrapper in "manga" else ""
|
|
comic_manager(skip_arg, ARGS.scrapper)
|
|
elif re.search("webcomic", ARGS.scrapper):
|
|
webcomic_manager()
|
|
|
|
|
|
def scrap_everyone() -> None:
|
|
"""Iterates over every user of my scrapper"""
|
|
for current_user in CONFIGS["users"]:
|
|
user = User(get_index(current_user["name"]))
|
|
LOG.info("Scrapping %s for %s", ARGS.scrapper, current_user["name"])
|
|
scrapper_manager(user)
|
|
|
|
|
|
def main():
|
|
"""Main module to decide what to do based on the parsed arguments"""
|
|
if ARGS.scrapper:
|
|
rgx_shared = re.compile("push|main|instagram|kemono")
|
|
if (ARGS.user in "everyone") and (rgx_shared.search(ARGS.scrapper)):
|
|
scrap_everyone()
|
|
else:
|
|
scrapper_manager(User(get_index(ARGS.user)))
|
|
elif ARGS.link:
|
|
is_admin = re.search(r"everyone|jawz", ARGS.user)
|
|
user = User(get_index("jawz" if is_admin else ARGS.user))
|
|
for arg_link in ARGS.link[0]:
|
|
user.append_list("push", parse_link(arg_link))
|
|
|
|
push_manager(user)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|