split configs into two systems...

This commit is contained in:
2023-09-06 23:46:45 -06:00
parent b161ac589c
commit 19e87b429e
56 changed files with 4280 additions and 4 deletions

View File

@@ -0,0 +1 @@
CONFIG_FILE = "/home/jawz/.config/jawz/config.yaml"

View File

@@ -0,0 +1 @@
use nix

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""Setup the argparser"""
import argparse
scrapper_types = (
"push",
"gallery",
"instagram",
"kemono",
"comic",
"manga",
"webcomic",
)
# Define types of instagram stories
instagram_types = ["posts", "reels", "channel", "stories", "highlights"]
def argparser(users: list) -> argparse.Namespace:
"""Returns an argparser to evaluate user input"""
# ARG PARSER
parser = argparse.ArgumentParser(
prog="Downloader",
description="Download images and galleries from a wide array of websites"
" either by using links or chosing from user define lists."
" This program also takes care of archiving tasks,"
" that keep the run time fast and prevents downloading duplicates.",
)
# Chose the type of scrapper
parser.add_argument(
choices=scrapper_types,
nargs="?",
dest="scrapper",
help="Select a scrapper.",
)
# Parse user list
parser.add_argument(
"-u",
"--user",
choices=users,
dest="user",
help="Selects the personal user list to process. Defaults to everyone",
default="everyone",
type=str,
)
# Parse individual links
parser.add_argument(
"-i",
"--input",
nargs="*",
dest="link",
action="append",
help="Download the provided links",
type=str,
)
# Set the print list flag
parser.add_argument(
"-l",
"--list",
dest="flag_list",
action="store_true",
help="Prints a list of all the added links and prompts for a choice",
)
# Set the use archiver flag
parser.add_argument(
"-a",
"--no-archive",
dest="flag_archive",
action="store_false",
help="Disables the archiver flag",
)
# Set the skip flag
parser.add_argument(
"-s",
"--no_skip",
dest="flag_skip",
action="store_false",
help="Disables the skip function, downloads the entire gallery",
)
parser.add_argument(
"-v",
"--verbose",
dest="flag_verbose",
action="store_true",
help="Prints the generated commands instead of running them",
)
parser.add_argument(
"-t",
"--type-post",
choices=instagram_types,
nargs="*",
dest="post_type",
help="Filters posts on instagram by type",
default=instagram_types,
type=str,
)
return parser.parse_args()

View File

@@ -0,0 +1,417 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Rewriting of the download manager script
with the intention to make it
more modular with the use of flags
in order to avoid unnecesary modifications
to the cofig files.
Also following in line more posix and python rules.
"""
import re
import time
import logging
import yaml
from functions import run
from functions import quote
from functions import list_lines
from functions import load_config_variables
from argparser import argparser
from gdl_classes import User
# GLOBAL VARIABLE SECTION
# Store the name of the main binaries early in the code
BIN_GALLERY = "gallery-dl"
BIN_YOUTUBE = "yt-dlp"
# SKIP = "3"
CONFIGS = load_config_variables()
LOGGER = logging.getLogger()
HANDLER = logging.StreamHandler()
FORMATTER = logging.Formatter(
"[%(filename)s][%(levelname)s] %(funcName)s '%(message)s'"
)
HANDLER.setFormatter(FORMATTER)
LOGGER.addHandler(HANDLER)
LOGGER.setLevel(logging.INFO)
# Enable a default "everyone" flag for when running stuff like download gallery
USERS = ["everyone"]
for dictionary in CONFIGS["users"]:
USERS.append(dictionary["name"])
ARGS = argparser(USERS)
def get_index(value: str) -> int:
"""Find the index in the config file"""
for i, dic in enumerate(CONFIGS["users"]):
if dic["name"] == value:
LOGGER.debug("%s is %s", dic["name"], i)
return i
return -1
def parse_gallery(gdl_list: str, user: User):
"""Processes the gallery-dl command based on the selected gallery"""
# skip_arg = f" -A {SKIP}" if ARGS.flag_skip else ""
skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
LOGGER.debug(skip_arg)
# Send the list to gallery-dl
download_gallery(
ARGS.flag_archive,
skip_arg,
"",
str(user.sleep),
quote(f"{user.dir_download}"),
quote(f"{user.archive_gallery}"),
quote(gdl_list),
parse_instagram(gdl_list),
)
def parse_instagram(link: str) -> str:
"""Fix instagram links"""
if "instagram" not in link:
return ""
if isinstance(ARGS.post_type, list):
string = f" -o include={quote(','.join(ARGS.post_type))}"
LOGGER.debug(string)
return string
string = f" -o include={quote(ARGS.post_type)}"
LOGGER.debug(string)
return string
def parse_link(link: str) -> str:
"""Fixes links"""
if not re.search(r"(twitter\.com\/\w+(\/)?(?!.*status))", link):
LOGGER.debug("No modifications needed for the link %s", link)
return link
# if url contains /media at the end just write the line
fixed_link = re.sub(r"\/$|\/media(\/?)$", "", link) + "/media"
LOGGER.debug("Processed link %s", fixed_link)
return fixed_link
def download_gallery(
use_archive: bool,
skip_arg: str = "",
link: str = "",
sleep: str = "0",
destination: str = "",
database: str = "",
queue: str = "",
opt_args: str = "",
):
"""Processes the command string to run the gallery archiver"""
command = f"{BIN_GALLERY} --sleep {sleep}"
if skip_arg != "":
command += skip_arg
if destination != "":
command += f" --dest {destination}"
if use_archive:
command += f" --download-archive {database}"
if opt_args != "":
command += opt_args
if link != "" and queue == "":
LOGGER.info("link: %s", quote(link))
command += f" {link}"
if queue != "" and link == "":
LOGGER.info("queue: %s", queue)
command += f" -i {queue}"
LOGGER.debug(command)
run(command, ARGS.flag_verbose)
def download_youtube(
use_archive: bool,
link: str = "",
destination: str = "",
database: str = "",
):
"""Filters and processes the required command to download videos"""
command = BIN_YOUTUBE
if re.search(r"(https:\/\/youtube|https:\/\/www.youtube|https:\/\/youtu.be)", link):
command += f' -o {quote(destination + "/%(title)s.%(ext)s")}'
elif re.search(r"(https:\/\/music.youtube.*)", link):
if use_archive:
command += f" --download-archive {database}"
command += f""" \
--no-playlist --newline -x \
--audio-format best --add-metadata --audio-quality 0 -o \
{quote(destination + '/%(title)s.%(ext)s')} \
"""
elif re.search(r"chaturbate", link):
# Re-runs the program every 30 seconds in case the stream goes private or dc
for i in range(1, 41): # For a 20 minute total
run(
f"""
{BIN_YOUTUBE} \
--hls-use-mpegts --prefer-ffmpeg \
-o {quote(destination + '/%(title)s.%(ext)s')} \
{link}
""",
ARGS.flag_verbose,
)
time.sleep(30)
LOGGER.info("waited for %s minutes", i * 30 / 60)
else: # Any other video link, just do it generic
command += f" -f mp4 -o {quote(destination + '/%(title)s.%(ext)s')}"
LOGGER.info("%s %s", command, link)
run(f"{command} {link}", ARGS.flag_verbose)
def comic_manager(skip_arg: str, category: str):
"""Process the information to download manga"""
re_cat = ""
if category == "manga":
re_cat = "manga|webtoon"
elif category == "comic":
re_cat = "readcomiconline"
with open(CONFIGS["comic"]["list"], encoding="utf-8") as list_comic:
for graphic_novel in [line.rstrip() for line in list_comic]:
# Search for mangas but exclude comics
if not re.search(re_cat, graphic_novel):
LOGGER.debug("%s does not match regex espression", graphic_novel)
continue
download_gallery(
ARGS.flag_archive,
skip_arg,
quote(graphic_novel),
"0",
CONFIGS["comic"]["download-directory"],
CONFIGS["comic"]["archive"],
"",
"",
)
def webcomic_manager():
"""Process the information to download webcomics"""
webcomic_list = CONFIGS["comic"]["webcomic-list"]
with open(webcomic_list, encoding="utf-8") as open_list:
webcomic_file = yaml.safe_load(open_list)
# Create a list of all the available webcomics for the user to chose from
for index, entry in enumerate(webcomic_file["Webcomics"]):
print(list_lines(index, entry["name"]))
# Prompt for a choice
usr_input = int(input("Select your comic: "))
# Determines where the webcomic will be downloaded
rating = webcomic_file["Webcomics"][usr_input]["type"]
webcomic_category = webcomic_file["Global"][f"{rating}_directory"]
LOGGER.debug("The webcomic is %s", webcomic_category)
command = f"""cd {quote(webcomic_category)} && webcomix custom \
{quote(webcomic_file["Webcomics"][usr_input]["name"])} \
--start-url \
{quote(webcomic_file["Webcomics"][usr_input]["url"])} \
--next-page-xpath={quote(webcomic_file["Webcomics"][usr_input]["next_code"])} \
--image-xpath={quote(webcomic_file["Webcomics"][usr_input]["image_code"])} \
-y --cbz"""
LOGGER.debug(command)
run(command, ARGS.flag_verbose)
def push_manager(user: User):
"""Filters out the URL to use the appropiate downloader"""
# Creates an array which will store any links that should use youtube-dl
link_video_cache = []
re_links = re.compile(
r"(twitter\.com\/\w+((?=.*media)|(?!.*status)))"
r"|(men\.wikifeet)"
r"|(furaffinity\.net\/user\/)"
r"|((deviantart\.com\/\w+(?!.*\/art\/)))"
r"|(furaffinity\.net\/gallery\/)"
r"|(furaffinity\.net\/scraps\/)"
r"|(furaffinity\.net\/favorites\/)"
r"|(instagram.com(?!\/p\/)\/\w+)"
r"|(e621\.net((?=\/post\/)|(?!\/posts\/)))"
r"|(flickr\.com\/photos\/\w+\/(?!\d+))"
r"|(tumblr\.com(?!\/post\/))"
r"|(kemono\.party\/(fanbox|gumroad|patreon)(?!\/user\/\d+\/post))"
r"|(blogspot\.com(?!\/))"
r"|(rule34\.paheal\.net\/post\/(?!view))"
r"|(rule34\.xxx\/index\.php\?page\=post&s=(?!view))"
r"|(pixiv\.net\/(en\/)?((?=users)|(?!artwork)))"
r"|(reddit\.com\/(user|u))"
r"|(baraag\.net\/((@\w+)|(?!\/\d+)))"
r"|(pinterest\.com\/(?!pin\/\d+))"
r"|(redgifs\.com\/(users|u|(?!watch)))",
)
with open(user.list_push, encoding="utf-8") as list_push:
for link in [line.rstrip() for line in list_push]:
LOGGER.debug("Processing %s", link)
# Flush the push list, cleans all the contents
with open(user.list_push, "w", encoding="utf-8") as list_push:
list_push.close()
# VIDEOS
if re.search(r"youtu.be|youtube|pornhub|xtube|xvideos|chaturbate", link):
LOGGER.debug("Matched type yt-dlp")
link_video_cache.append(link)
# Search for gallery links, these will be added to a list after downloading
elif re.search(re_links, link):
LOGGER.debug("Matched type gallery-dl")
# skip_arg = f" -A {SKIP}" if ARGS.flag_skip else ""
skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
LOGGER.debug("Skip: %s, link: %s", skip_arg, parse_instagram(link))
download_gallery(
ARGS.flag_archive,
skip_arg,
quote(f"{parse_link(link)}"),
f"{user.sleep}",
quote(f"{user.dir_download}"),
quote(f"{user.archive_gallery}"),
"",
f"{parse_instagram(link)}",
)
# Record the gallery link, so it remains on the watch list
with open(user.list_master, "a", encoding="utf-8") as w_file, open(
user.list_master, "r", encoding="utf-8"
) as r_file:
content = r_file.read().lower()
if parse_link(link).lower() in content:
LOGGER.info("Gallery repeated, not saving")
continue
LOGGER.info("New gallery, saving")
w_file.write(parse_link(str(link)) + "\n")
# Searches for comic/manga links
elif re.search(r"readcomiconline|mangahere|mangadex|webtoons", link):
# Toggle for comic/manga skip flag
if ARGS.flag_skip and re.search(r"readcomiconline", link):
skip_arg = " --chapter-range 1"
elif ARGS.flag_skip and re.search(r"mangahere|webtoons", link):
skip_arg = " --chapter-range 1-5"
else:
skip_arg = ""
LOGGER.debug(skip_arg)
download_gallery(
ARGS.flag_archive,
skip_arg,
quote(link),
"0",
CONFIGS["comic"]["download-directory"],
CONFIGS["comic"]["archive"],
"",
"",
)
# Add comic/manga link to the list
list_gn = CONFIGS["comic"]["list"]
with open(list_gn, "a", encoding="utf-8") as w_file, open(
list_gn, "r", encoding="utf-8"
) as r_file:
content = r_file.read().lower()
if parse_link(link).lower() in content:
LOGGER.info("Graphic novel repeated, not saving")
continue
LOGGER.info("New graphic novel, saving")
w_file.write(link + "\n")
# Download generic links, the -o flag overwrites config file and
# downloads the files into the root destination
else:
LOGGER.info("Other type of download %s", link)
download_gallery(
False,
" -o directory='[]'",
quote(link),
"0",
quote(str(user.dir_push)),
"",
"",
"",
)
# Send the video links to youtube-dl
for link in link_video_cache:
download_youtube(
ARGS.flag_archive,
quote(link),
f"{user.dir_media_download}",
quote(f"{user.archive_media}"),
)
def scrapper_manager(user: User):
# pylint: disable=too-many-branches
"""Analyze the user arguments and call in functions"""
if not ARGS.scrapper: # Check if a scrapper was selected
return
if re.search(r"gallery|instagram|kemono", ARGS.scrapper):
# skip_arg = f" -A {SKIP}" if ARGS.flag_skip else ""
skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
LOGGER.debug(skip_arg)
if ARGS.scrapper == "gallery":
parse_gallery(f"{user.list_main}", user)
elif ARGS.scrapper == "instagram":
parse_gallery(f"{user.list_instagram}", user)
elif ARGS.scrapper == "kemono":
parse_gallery(f"{user.list_kemono}", user)
elif ARGS.scrapper in "push":
push_manager(user)
elif ARGS.scrapper in "comic":
skip_arg = " --chapter-range 1" if ARGS.flag_skip else ""
LOGGER.debug(skip_arg)
comic_manager(skip_arg, "comic")
elif ARGS.scrapper in "manga":
skip_arg = " --chapter-range 1-5" if ARGS.flag_skip else ""
LOGGER.debug(skip_arg)
comic_manager(skip_arg, "manga")
elif ARGS.scrapper in "webcomic":
webcomic_manager()
def main():
"""Main module to decide what to do based on the parsed arguments"""
if ARGS.scrapper:
if (ARGS.user in "everyone") and (
re.search(r"push|gallery|instagram|kemono", ARGS.scrapper)
):
for current_user in CONFIGS["users"]:
user = User(get_index(current_user["name"]))
user.list_manager()
LOGGER.info("Scrapping %s for %s", ARGS.scrapper, current_user["name"])
scrapper_manager(user)
elif re.search(r"comic|manga|webcomic", ARGS.scrapper):
user = User(get_index("jawz"))
user.list_manager()
LOGGER.info("Scrapping %s", ARGS.scrapper)
scrapper_manager(user)
else:
# Create the lists to scrap
user = User(get_index(ARGS.user))
user.list_manager()
scrapper_manager(user)
elif ARGS.link:
LOGGER.debug(ARGS.link)
if re.search(r"everyone|jawz", ARGS.user):
# Create the lists to scrap
user = User(get_index("jawz"))
user.list_manager()
else:
# Create the lists to scrap
user = User(get_index(ARGS.user))
user.list_manager()
for arg_link in ARGS.link[0]:
LOGGER.debug(arg_link)
if ARGS.flag_verbose:
LOGGER.debug(
"%s >> %s", quote(parse_link(arg_link)), quote(user.list_push)
)
else:
with open(user.list_push, "a", encoding="utf-8") as open_file:
open_file.write(parse_link(arg_link) + "\n")
push_manager(user)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,70 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Personal functions to aid on multiple scripts"""
import sys
import fileinput
import re
import os
from pathlib import Path
import yaml
VERBOSE_G = False
def load_config_variables():
"""Loads all the variables from the config file"""
config_file = Path("~/.config/jawz/config.yaml")
with open(config_file.expanduser(), encoding="utf-8") as open_file:
return yaml.safe_load(open_file)
def run(command: str, verbose: bool):
"""Run command in a subprocess"""
# pylint: disable=subprocess-run-check
# This toggle allows for a really wasy debug when using -v
if verbose:
print(command)
else:
os.system(command)
def list_lines(i: int, line: str) -> str:
"""Create a numbered list"""
return f"{i}) {line}"
def quote(line: str) -> str:
"""Quote the line"""
return f'"{line}"'
def sort_txt_file(file_path: Path):
"""Sort every line alphabetically
remove duplicated and empty lines"""
file = str(file_path.resolve())
run(f"sort -u {quote(file)} -o {quote(file)}", VERBOSE_G)
run(f"sed -i '/^$/d' {quote(file)}", VERBOSE_G)
run(f'sed -i -e "s,http:,https:," {quote(file)}', VERBOSE_G)
# fix this using strip on python
# line.strip("/")
run(f'sed -i -e "s,/$,," {quote(file)}', VERBOSE_G) # trailing /
def randomize_txt_file(file_path: Path):
"""Randomize the order of the
lines of the txt file"""
file = str(file_path.resolve())
run(f"sort -R {quote(file)} -o {quote(file)}", VERBOSE_G)
def parse_list(file):
"""Replace http with https and remove trailing /"""
for line in fileinput.input(file, inplace=True):
sys.stdout.write(str(line).replace("http://", "https://"))
with open(file, "r+", encoding="utf-8") as open_file:
f_content = open_file.read()
f_content = re.compile(r"\/$", 0).sub(r"\/$", "")
open_file.seek(0)
open_file.truncate()
print(f_content)
sort_txt_file(file)

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""Define the user class to populate and setup the download environment"""
import re
from pathlib import Path
from functions import sort_txt_file, randomize_txt_file, load_config_variables
config_variables = load_config_variables()
class User:
"""Populate the directory for each user"""
# pylint: disable=too-many-instance-attributes
def __init__(self, index):
self.user = config_variables["users"][index]
self.config = config_variables["global"]
self.name = self.user["name"]
self.sleep = self.config["sleep"]
# Directories
self.dir_cache = Path(self.config["cache-directory"]) / self.name
self.dir_log = Path(self.config["log-directory"])
self.dir_archive = Path(self.config["archive-directory"])
self.dir_download = Path(self.user["download-directory"])
self.dir_media_download = Path(self.user["media-directory"])
self.dir_push = Path(self.user["push-directory"])
self.dir_master_list = Path(self.config["list-dir"]) / self.name
# Files
self.archive_gallery = self.dir_archive / f"{self.name}.sqlite3"
self.archive_media = self.dir_archive / f"{self.name}_ytdl.txt"
# Lists
self.list_master = self.dir_master_list / "watch.txt"
self.list_push = self.dir_master_list / "instant.txt"
self.list_instagram = self.dir_cache / "instagram.txt"
self.list_kemono = self.dir_cache / "kemono.txt"
self.list_main = self.dir_cache / "main.txt"
def create_directories(self):
"""Create user directories if they don't exist"""
if self.dir_cache.is_dir():
for file in self.dir_cache.iterdir():
if file.is_file():
file.unlink()
for file in self.dir_cache.iterdir():
if file.is_dir():
file.rmdir()
self.dir_cache.rmdir()
# Create directories
self.dir_cache.mkdir(parents=True, exist_ok=True)
self.dir_log.mkdir(parents=True, exist_ok=True)
self.dir_archive.mkdir(parents=True, exist_ok=True)
self.dir_download.mkdir(parents=True, exist_ok=True)
self.dir_media_download.mkdir(parents=True, exist_ok=True)
self.dir_push.mkdir(parents=True, exist_ok=True)
# Check for the existence of core files
if not Path(self.archive_gallery).is_file():
self.archive_gallery.touch()
if not Path(self.archive_media).is_file():
self.archive_media.touch()
if not self.dir_master_list.is_dir():
print(f"ERROR: Directory for user {self.name} doesn't exist")
if not Path(self.list_master).is_file():
self.list_master.touch()
if not Path(self.list_push).is_file():
self.list_push.touch()
# Create temporary lists
for gdl_list in ("instagram", "kemono", "main"):
Path(self.dir_cache.resolve() / f"{gdl_list}.txt").touch()
def list_manager(self):
"""Manage all the user list and create sub-lists"""
# sort_txt_file(self.list_master)
self.create_directories() # Call the function to create necesary cache dirs
with open(self.list_master, encoding="utf-8") as list_master:
# Create temporary list files segmented per scrapper
for line in [line.rstrip() for line in list_master]:
# WIKIFEET
with open(self.list_main, "a", encoding="utf-8") as list_main, open(
self.list_kemono, "a", encoding="utf-8"
) as list_kemono, open(
self.list_instagram, "a", encoding="utf-8"
) as list_instagram:
if re.search(r"kemono.party", line):
list_kemono.write(line + "\n")
elif re.search(r"instagram", line):
list_instagram.write(line + "\n")
elif re.search(r"wikifeet", line):
continue
# list_main.write(line + "\n")
elif re.search(r"furaffinity", line):
list_main.write(line + "\n")
elif re.search(r"twitter", line):
# if url contains /media at the end just write the line
if re.search(r"\/media$", line):
list_main.write(line + "\n")
else:
# if does not contain /media at the end then add /media
list_main.write(line + "/media" + "\n")
else:
list_main.write(line + "\n")
sort_txt_file(self.list_kemono)
# Try to avoid getting banned by shuffling download order
randomize_txt_file(self.list_instagram)
randomize_txt_file(self.list_main)

View File

@@ -0,0 +1,17 @@
[metadata]
name = download
version = 1.5
[options]
py_modules =
download
functions
argparser
gdl_classes
[options.entry_points]
console_scripts =
download = download:main
# [aliases]
# test = pytest

View File

@@ -0,0 +1,24 @@
from setuptools import setup
setup()
# import os
# from setuptools import find_packages
# from distutils.core import setup
# import setuptools
# # User-friendly description from README.md
# current_directory = os.path.dirname(os.path.abspath(__file__))
# try:
# with open(os.path.join(current_directory, "README.md"), encoding="utf-8") as f:
# long_description = f.read()
# except Exception:
# long_description = ""
# setup(
# name="download",
# # packages=["argparser", "functions"],
# version="1.5.0",
# scripts=["download.py"],
# # entry_points={"console_scripts": ["download = download:main"]},
# )

View File

@@ -0,0 +1,28 @@
{ pkgs ? import <nixpkgs> { } }:
with pkgs;
mkShell {
packages = [
(python3.withPackages (ps:
with ps; [
setuptools
pyyaml
types-pyyaml
# (buildPythonApplication rec {
# pname = "webcomix";
# version = "3.6.6";
# src = fetchPypi {
# inherit pname version;
# sha256 = "sha256-hCnic8Rd81qY1R1XMrSME5ntYTSvZu4/ANp03nCmLKU=";
# };
# doCheck = false;
# propagatedBuildInputs =
# [ click scrapy scrapy-splash scrapy-fake-useragent tqdm ];
# })
]))
];
buildInputs = [
];
}