295 lines
10 KiB
Python
295 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Rewriting of the download manager script
|
|
with the intention to make it
|
|
more modular with the use of flags
|
|
in order to avoid unnecesary modifications
|
|
to the cofig files.
|
|
Also following in line more posix and python rules.
|
|
"""
|
|
|
|
import re
|
|
import yaml
|
|
from typing import Dict
|
|
from functions import LOG
|
|
from functions import run
|
|
from functions import quote
|
|
from functions import list_lines
|
|
from functions import load_config_variables
|
|
from functions import parse_link
|
|
from argparser import argparser
|
|
from classes.user import User
|
|
from classes.gallery import Gallery
|
|
|
|
# GLOBAL VARIABLE SECTION
|
|
CONFIGS = load_config_variables()
|
|
# Enable a default "everyone" flag for when running stuff like download gallery
|
|
USERS = ["everyone"] + [user["name"] for user in CONFIGS["users"]]
|
|
ARGS = argparser(USERS)
|
|
|
|
|
|
class Video:
|
|
"""Just a simple class to unify the Video parameters into a single one."""
|
|
|
|
def __init__(self) -> None:
|
|
self.use_archive: bool = True
|
|
self.link: str = ""
|
|
self.dest: str = ""
|
|
self.database: str = ""
|
|
|
|
|
|
def get_index(name: str) -> int:
|
|
"""Find the index in the config file"""
|
|
return next((i for i, d in enumerate(CONFIGS["users"]) if d["name"] == name), -1)
|
|
|
|
|
|
def parse_gallery(gdl_list: str, user: User) -> None:
|
|
"""Processes the gallery-dl command based on the selected gallery"""
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
|
|
gallery.dest = "download"
|
|
gallery.list = gdl_list
|
|
gallery.opt_args = parse_instagram(gdl_list)
|
|
|
|
gallery.generate_command(user)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
|
|
|
|
def parse_instagram(link: str) -> str:
|
|
"""Fix instagram links"""
|
|
if "instagram" not in link:
|
|
return ""
|
|
if isinstance(ARGS.post_type, list):
|
|
return f" -o include={quote(','.join(ARGS.post_type))}"
|
|
return f" -o include={quote(ARGS.post_type)}"
|
|
|
|
|
|
def video_command(video: Video) -> str:
|
|
"""Filters and processes the required command to download videos"""
|
|
command = "yt-dlp"
|
|
rgx_yt = re.compile(r"(https:\/\/youtube|https:\/\/www.youtube|https:\/\/youtu.be)")
|
|
rgx_music = re.compile(r"(https:\/\/music.youtube.*)")
|
|
|
|
if re.search(r"chaturbate", video.link):
|
|
return f"chat-dl {video.link}"
|
|
|
|
if rgx_yt.search(video.link):
|
|
command += " --embed-subs --embed-thumbnail"
|
|
command += " --embed-metadata --embed-chapters"
|
|
command += f" -o {quote(video.dest + '/%(title)s.%(ext)s')}"
|
|
|
|
elif rgx_music.search(video.link):
|
|
command += f" --download-archive {video.database}" if video.use_archive else ""
|
|
command += " --no-playlist --newline -x"
|
|
command += " --audio-format best --add-metadata --audio-quality 0 -o"
|
|
command += f" {quote(video.dest + '/%(title)s.%(ext)s')}"
|
|
|
|
else: # Any other video link, just do it generic
|
|
command += f" -f mp4 -o {quote(video.dest + '/%(title)s.%(ext)s')}"
|
|
|
|
LOG.info("%s %s", command, video.link)
|
|
return f"{command} {quote(video.link)}"
|
|
|
|
|
|
def comic_manager(skip_arg: str, category: str) -> None:
|
|
"""Process the information to download manga"""
|
|
re_cat = "manga|webtoon" if category == "manga" else "readcomiconline"
|
|
with open(CONFIGS["comic"]["comic-list"], "r", encoding="utf-8") as r_file:
|
|
links = list(filter(lambda x: re.search(re_cat, x), r_file))
|
|
|
|
for link in links:
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = skip_arg
|
|
gallery.link = link
|
|
gallery.generate_command(is_comic=True)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
|
|
|
|
def print_webcomics(webcomics: Dict[str, Dict]) -> int:
|
|
"""Prints a list of webcomics, and returns an index."""
|
|
for index, entry in enumerate(webcomics["webcomics"]):
|
|
print(list_lines(index, entry["name"]))
|
|
|
|
return int(input("Select a webcomic: "))
|
|
|
|
|
|
def webcomic_manager():
|
|
"""Process the information to download webcomics"""
|
|
with open(CONFIGS["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file:
|
|
webcomics = yaml.safe_load(r_file)
|
|
|
|
usr_input = print_webcomics(webcomics)
|
|
|
|
# Determines where the webcomic will be downloaded
|
|
rating = webcomics["webcomics"][usr_input]["type"]
|
|
dest = webcomics["global"][f"{rating}_directory"]
|
|
name = webcomics["webcomics"][usr_input]["name"]
|
|
link = webcomics["webcomics"][usr_input]["url"]
|
|
nxt_code = webcomics["webcomics"][usr_input]["next_code"]
|
|
img_code = webcomics["webcomics"][usr_input]["image_code"]
|
|
|
|
LOG.info("The webcomic is %s", dest)
|
|
|
|
command = f"cd {quote(dest)} && webcomix custom"
|
|
command += f" {quote(name)}"
|
|
command += " --start-url"
|
|
command += f" {quote(link)}"
|
|
command += f" --next-page-xpath={quote(nxt_code)}"
|
|
command += f" --image-xpath={quote(img_code)}"
|
|
command += " -y --cbz"
|
|
|
|
run(command, ARGS.flag_verbose)
|
|
|
|
|
|
def save_comic(link: str) -> None:
|
|
"""Add comic/manga link to the list"""
|
|
list_comic = CONFIGS["comic"]["comic-list"]
|
|
with open(list_comic, "r", encoding="utf-8") as r_file:
|
|
links = r_file.read().lower()
|
|
if parse_link(link).lower() in links:
|
|
LOG.info("Graphic novel repeated, not saving")
|
|
return
|
|
LOG.info("New graphic novel, saving")
|
|
|
|
with open(list_comic, "a", encoding="utf-8") as w_file:
|
|
w_file.write(link + "\n")
|
|
|
|
|
|
def push_manager(user: User):
|
|
"""Filters out the URL to use the appropiate downloader"""
|
|
# Creates an array which will store any links that should use youtube-dl
|
|
rgx_gallery = re.compile(
|
|
r"(twitter\.com\/\w+((?=.*media)|(?!.*status)))"
|
|
r"|(men\.wikifeet)"
|
|
r"|(furaffinity\.net\/user\/)"
|
|
r"|((deviantart\.com\/\w+(?!.*\/art\/)))"
|
|
r"|(furaffinity\.net\/gallery\/)"
|
|
r"|(furaffinity\.net\/scraps\/)"
|
|
r"|(furaffinity\.net\/favorites\/)"
|
|
r"|(instagram.com(?!\/p\/)\/\w+)"
|
|
r"|(e621\.net((?=\/post\/)|(?!\/posts\/)))"
|
|
r"|(flickr\.com\/photos\/\w+\/(?!\d+))"
|
|
r"|(tumblr\.com(?!\/post\/))"
|
|
r"|(kemono\.party\/(fanbox|gumroad|patreon)(?!\/user\/\d+\/post))"
|
|
r"|(blogspot\.com(?!\/))"
|
|
r"|(rule34\.paheal\.net\/post\/(?!view))"
|
|
r"|(rule34\.xxx\/index\.php\?page\=post&s=(?!view))"
|
|
r"|(pixiv\.net\/(en\/)?((?=users)|(?!artwork)))"
|
|
r"|(fanbox\.cc\/@\w+(?!.*posts\/\d+))"
|
|
r"|(reddit\.com\/(user|u))"
|
|
r"|(baraag\.net\/((@\w+)|(?!\/\d+)))"
|
|
r"|(pinterest\.com\/(?!pin\/\d+))"
|
|
r"|(redgifs\.com\/(users|u|(?!watch)))",
|
|
)
|
|
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
|
|
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons")
|
|
|
|
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
|
|
links = list(map(lambda x: x.rstrip(), r_file))
|
|
links_galleries = filter(rgx_gallery.search, links)
|
|
links_videos = filter(rgx_video.search, links)
|
|
links_comics = filter(rgx_comic.search, links)
|
|
links_other = filter(
|
|
lambda x: (not rgx_video.search(x))
|
|
and (not rgx_gallery.search(x))
|
|
and (not rgx_comic.search(x)),
|
|
links,
|
|
)
|
|
|
|
for link in links_galleries:
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
|
|
gallery.link = parse_link(link)
|
|
gallery.dest = "download"
|
|
gallery.opt_args = parse_instagram(link)
|
|
gallery.generate_command(user)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
user.save_link(link)
|
|
|
|
for link in links_comics:
|
|
if ARGS.flag_skip and re.search(r"readcomiconline", link):
|
|
skip_arg = " --chapter-range 1"
|
|
elif ARGS.flag_skip and re.search(r"mangahere|webtoons", link):
|
|
skip_arg = " --chapter-range 1-5"
|
|
else:
|
|
skip_arg = ""
|
|
|
|
gallery = Gallery()
|
|
gallery.archive = ARGS.flag_archive
|
|
gallery.skip_arg = skip_arg
|
|
gallery.link = link
|
|
gallery.generate_command(is_comic=True)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
save_comic(link)
|
|
|
|
for link in links_videos:
|
|
video = Video()
|
|
video.use_archive = ARGS.flag_archive
|
|
video.link = link
|
|
video.dest = f"{user.directories['media']}"
|
|
video.database = quote(f"{user.dbs['media']}")
|
|
run(video_command(video), ARGS.flag_verbose)
|
|
|
|
for link in links_other:
|
|
LOG.info("Other type of download %s", link)
|
|
gallery = Gallery()
|
|
gallery.archive = False
|
|
gallery.skip_arg = " -o directory='[]'"
|
|
gallery.link = link
|
|
gallery.dest = "push"
|
|
gallery.generate_command(user)
|
|
gallery.run_command(ARGS.flag_verbose)
|
|
|
|
# Flush the push list, cleans all the contents
|
|
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
|
|
w_file.close()
|
|
|
|
|
|
def scrapper_manager(user: User) -> None:
|
|
"""Analyze the user arguments and call in functions"""
|
|
user.list_manager()
|
|
if re.search(r"main|instagram|kemono", ARGS.scrapper):
|
|
skip_arg = "" if ARGS.flag_skip else " -o skip=true"
|
|
parse_gallery(ARGS.scrapper, user)
|
|
elif ARGS.scrapper in "push":
|
|
push_manager(user)
|
|
elif re.search("^comic|manga", ARGS.scrapper):
|
|
skip_arg = " --chapter-range 1" if ARGS.flag_skip else ""
|
|
skip_arg += "-5" if ARGS.scrapper in "manga" else ""
|
|
comic_manager(skip_arg, ARGS.scrapper)
|
|
elif re.search("webcomic", ARGS.scrapper):
|
|
webcomic_manager()
|
|
|
|
|
|
def scrap_everyone() -> None:
|
|
"""Iterates over every user of my scrapper"""
|
|
for current_user in CONFIGS["users"]:
|
|
user = User(get_index(current_user["name"]))
|
|
LOG.info("Scrapping %s for %s", ARGS.scrapper, current_user["name"])
|
|
scrapper_manager(user)
|
|
|
|
|
|
def main():
|
|
"""Main module to decide what to do based on the parsed arguments"""
|
|
if ARGS.scrapper:
|
|
rgx_shared = re.compile("push|main|instagram|kemono")
|
|
if (ARGS.user in "everyone") and (rgx_shared.search(ARGS.scrapper)):
|
|
scrap_everyone()
|
|
else:
|
|
scrapper_manager(User(get_index(ARGS.user)))
|
|
elif ARGS.link:
|
|
is_admin = re.search(r"everyone|jawz", ARGS.user)
|
|
user = User(get_index("jawz" if is_admin else ARGS.user))
|
|
for arg_link in ARGS.link[0]:
|
|
user.append_list("push", parse_link(arg_link))
|
|
|
|
push_manager(user)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|