migrated tasks and downloads to this flake

This commit is contained in:
Danilo Reyes 2024-12-14 14:08:50 -06:00
parent b33b69a698
commit c12e26ae7d
18 changed files with 953 additions and 25 deletions

View File

@ -1,18 +1,18 @@
{
description = "Nix flake for the activity logging script";
outputs =
{ self, nixpkgs, ... }@inputs:
{ nixpkgs, ... }:
{
packages.x86_64-linux =
let
inherit (self) outputs;
system = "x86_64-linux";
pkgs = import nixpkgs { inherit system; };
in
{
tuh-activity-logger = import ./pkgs/tuh-activity-logger/default.nix {
inherit (pkgs) python3Packages sqlite;
};
tasks = pkgs.writeScriptBin "tasks" (builtins.readFile ./src/tasks.sh);
download = pkgs.callPackage ./pkgs/download.nix { };
webcomix = pkgs.callPackage ./pkgs/webcomix.nix { };
tuh-activity-logger = pkgs.callPackage ./pkgs/tuh-activity-logger.nix { };
};
};
}

37
pkgs/download.nix Normal file
View File

@ -0,0 +1,37 @@
{
python3Packages,
gallery-dl,
ffmpeg,
callPackage,
...
}:
let
pname = "download";
version = "2.5";
in
python3Packages.buildPythonApplication {
inherit pname version;
src = builtins.path {
path = ../src/download/.;
name = "${pname}-${version}";
};
pyproject = true;
build-system = [ python3Packages.setuptools ];
dependencies =
[
ffmpeg
gallery-dl
(callPackage ./webcomix.nix { })
]
++ builtins.attrValues {
inherit (python3Packages)
pyyaml
types-pyyaml
yt-dlp
;
};
}

53
pkgs/webcomix.nix Normal file
View File

@ -0,0 +1,53 @@
{
lib,
fetchFromGitHub,
python3Packages,
}:
python3Packages.buildPythonApplication rec {
pname = "webcomix";
version = "3.11.0";
pyproject = true;
src = fetchFromGitHub {
owner = "J-CPelletier";
repo = "webcomix";
rev = version;
hash = "sha256-Y16+/9TnECMkppgI/BeAbTLWt0M4V/xn1+hM4ILp/+g=";
};
postPatch = ''
substituteInPlace pyproject.toml \
--replace-fail "poetry>=1.2.0" poetry-core \
--replace-fail "poetry.masonry.api" "poetry.core.masonry.api" \
--replace-fail 'pytest-rerunfailures = "^11.1.2"' 'pytest-rerunfailures = "14.0"'
'';
build-system = [
python3Packages.poetry-core
];
dependencies = with python3Packages; [
click
tqdm
scrapy
scrapy-splash
scrapy-fake-useragent
pytest-rerunfailures
docker
];
preCheck = ''
export HOME=$(mktemp -d)
'';
doCheck = false;
meta = {
description = "Webcomic downloader";
homepage = "https://github.com/J-CPelletier/webcomix";
license = lib.licenses.mit;
maintainers = with lib.maintainers; [ CaptainJawZ ];
mainProgram = "webcomix";
};
}

1
src/download/.env Normal file
View File

@ -0,0 +1 @@
CONFIG_FILE = "/home/jawz/.config/jawz/config.yaml"

1
src/download/.envrc Normal file
View File

@ -0,0 +1 @@
use nix

96
src/download/argparser.py Normal file
View File

@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""Setup the argparser"""
import argparse
scrapper_types = (
"push",
"main",
"instagram",
"kemono",
"comic",
"manga",
"webcomic",
)
# Define types of instagram stories
instagram_types = ["posts", "reels", "stories", "highlights", "avatar"]
def argparser(users: list) -> argparse.Namespace:
"""Returns an argparser to evaluate user input"""
# ARG PARSER
parser = argparse.ArgumentParser(
prog="Downloader",
description="Download images and galleries from a wide array of websites"
" either by using links or chosing from user define lists."
" This program also takes care of archiving tasks,"
" that keep the run time fast and prevents downloading duplicates.",
)
# Chose the type of scrapper
parser.add_argument(
choices=scrapper_types,
nargs="?",
dest="scrapper",
help="Select a scrapper.",
)
# Parse user list
parser.add_argument(
"-u",
"--user",
choices=users,
dest="user",
help="Selects the personal user list to process. Defaults to everyone",
default="everyone",
type=str,
)
# Parse individual links
parser.add_argument(
"-i",
"--input",
nargs="*",
dest="link",
action="append",
help="Download the provided links",
type=str,
)
# Set the print list flag
parser.add_argument(
"-l",
"--list",
dest="flag_list",
action="store_true",
help="Prints a list of all the added links and prompts for a choice",
)
# Set the use archiver flag
parser.add_argument(
"-a",
"--no-archive",
dest="flag_archive",
action="store_false",
help="Disables the archiver flag",
)
# Set the skip flag
parser.add_argument(
"-s",
"--no_skip",
dest="flag_skip",
action="store_false",
help="Disables the skip function, downloads the entire gallery",
)
parser.add_argument(
"-v",
"--verbose",
dest="flag_verbose",
action="store_true",
help="Prints the generated commands instead of running them",
)
parser.add_argument(
"-t",
"--type-post",
choices=instagram_types,
nargs="*",
dest="post_type",
help="Filters posts on instagram by type",
default=instagram_types,
type=str,
)
return parser.parse_args()

Binary file not shown.

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
from classes.user import User
from functions import LOG
from functions import load_config_variables
from functions import quote
from functions import run
class Gallery:
def __init__(self) -> None:
self.archive: bool = True
self.skip_arg: str = ""
self.link: str = ""
self.dest: str = ""
self.list: str = ""
self.opt_args: str = ""
self.command: str = ""
def generate_command(self, user: User = User(1), is_comic: bool = False) -> None:
"""Generates a command string."""
if is_comic:
configs = load_config_variables()
directory = quote(configs["comic"]["download-dir"])
database = quote(configs["comic"]["database"])
queue = quote(configs["comic"][f"{self.list}-list"]) if self.list else ""
else:
directory = quote(str(user.directories[self.dest]))
database = quote(str(user.dbs["gallery"]))
queue = quote(str(user.lists[self.list])) if self.list else ""
command = f"gallery-dl --sleep {str(user.sleep)}"
command += self.skip_arg if self.skip_arg else ""
command += f" --dest {directory}" if self.dest or is_comic else ""
command += f" --download-archive {database}" if self.archive else ""
command += self.opt_args if self.opt_args else ""
if self.link and not self.list:
command += f" {quote(self.link)}"
if self.list and not self.link:
command += f" -i {queue}"
LOG.debug(command)
self.command = command
def run_command(self, verbose: bool):
run(self.command, verbose)

View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""Define the user class to populate and setup the download environment"""
import re
from random import shuffle
from pathlib import Path
from functions import load_config_variables
from functions import validate_x_link
from functions import parse_link
from functions import clean_cache
from functions import LOG
class User:
"""Populate the directory for each user"""
# pylint: disable=too-many-instance-attributes
def __init__(self, index) -> None:
config = load_config_variables()
self.config = config["users"][index] | config["global"]
self.name = self.config["name"]
self.sleep = self.config["sleep"]
# Directories
self.directories = {
str(key).replace("-dir", ""): Path(self.config[f"{key}"])
for key in filter(lambda x: re.search("-dir", x), self.config.keys())
}
self.directories["cache"] = self.directories["cache"] / self.name
self.directories["lists"] = self.directories["lists"] / self.name
# Files
self.dbs = {
"gallery": self.directories["databases"] / f"{self.name}.sqlite3",
"media": self.directories["databases"] / f"{self.name}_ytdl.txt",
}
# Lists
self.lists = {
"master": self.directories["lists"] / "watch.txt",
"push": self.directories["lists"] / "instant.txt",
"instagram": self.directories["cache"] / "instagram.txt",
"kemono": self.directories["cache"] / "kemono.txt",
"main": self.directories["cache"] / "main.txt",
}
def _create_directories(self) -> None:
"""Create user directories if they don't exist"""
clean_cache(self.directories["cache"])
# Create directories
for directory in self.directories.keys():
self.directories[directory].mkdir(parents=True, exist_ok=True)
# Check for the existence of core files
if not self.directories["lists"].is_dir():
LOG.error("Lists directory for user %s doesn't exist", self.name)
# dbs stands for databases, the archives.
for db in filter(lambda x: not self.dbs[x].is_file(), self.dbs.keys()):
self.dbs[db].touch()
for lst in filter(lambda x: not self.lists[x].is_file(), ["master", "push"]):
self.lists[lst].touch()
def append_list(self, name: str, line: str) -> None:
"""Appends a line into the given list"""
with open(self.lists[name], "a+", encoding="utf-8") as a_file:
a_file.write(line + "\n")
def _append_cache_list(self, line) -> None:
"""Writes the input line into it's respective list,
depending on what website it belongs to."""
if re.search("x", line):
self.append_list("main", validate_x_link(line))
elif re.search(r"kemono\.party", line):
self.append_list("kemono", line)
elif re.search("instagram", line):
self.append_list("instagram", line)
else:
self.append_list("main", line)
def list_manager(self) -> None:
"""Manage all the user list and create sub-lists"""
self._create_directories() # Call the function to create necesary cache dirs
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
master_content = list(map(lambda x: x.rstrip(), r_file))
# Create temporary list files segmented per scrapper
shuffle(master_content)
for line in master_content:
self._append_cache_list(line)
def save_link(self, link: str) -> None:
"""Checks the master list against a new link
if unmatched, appends it to the end of the list"""
with open(self.lists["master"], "r", encoding="utf-8") as r_file:
links = r_file.read().lower()
if parse_link(link).lower() in links:
LOG.info("Gallery repeated, not saving")
return
LOG.info("New gallery, saving")
self.append_list("master", parse_link(link))

295
src/download/download.py Normal file
View File

@ -0,0 +1,295 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Rewriting of the download manager script
with the intention to make it
more modular with the use of flags
in order to avoid unnecesary modifications
to the cofig files.
Also following in line more posix and python rules.
"""
import re
import yaml
from typing import Dict
from functions import LOG
from functions import run
from functions import quote
from functions import list_lines
from functions import load_config_variables
from functions import parse_link
from argparser import argparser
from classes.user import User
from classes.gallery import Gallery
# GLOBAL VARIABLE SECTION
CONFIGS = load_config_variables()
# Enable a default "everyone" flag for when running stuff like download gallery
USERS = ["everyone"] + [user["name"] for user in CONFIGS["users"]]
ARGS = argparser(USERS)
class Video:
"""Just a simple class to unify the Video parameters into a single one."""
def __init__(self) -> None:
self.use_archive: bool = True
self.link: str = ""
self.dest: str = ""
self.database: str = ""
def get_index(name: str) -> int:
"""Find the index in the config file"""
return next((i for i, d in enumerate(CONFIGS["users"]) if d["name"] == name), -1)
def parse_gallery(gdl_list: str, user: User) -> None:
"""Processes the gallery-dl command based on the selected gallery"""
gallery = Gallery()
gallery.archive = ARGS.flag_archive
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
gallery.dest = "download"
gallery.list = gdl_list
gallery.opt_args = parse_instagram(gdl_list)
gallery.generate_command(user)
gallery.run_command(ARGS.flag_verbose)
def parse_instagram(link: str) -> str:
"""Fix instagram links"""
if "instagram" not in link:
return ""
if isinstance(ARGS.post_type, list):
return f" -o include={quote(','.join(ARGS.post_type))}"
return f" -o include={quote(ARGS.post_type)}"
def video_command(video: Video) -> str:
"""Filters and processes the required command to download videos"""
command = "yt-dlp"
rgx_yt = re.compile(r"(https:\/\/youtube|https:\/\/www.youtube|https:\/\/youtu.be)")
rgx_music = re.compile(r"(https:\/\/music.youtube.*)")
if re.search(r"chaturbate", video.link):
return f"chat-dl {video.link}"
if rgx_yt.search(video.link):
command += " --embed-subs --embed-thumbnail"
command += " --embed-metadata --embed-chapters"
command += f" -o {quote(video.dest + '/%(title)s.%(ext)s')}"
elif rgx_music.search(video.link):
command += f" --download-archive {video.database}" if video.use_archive else ""
command += " --no-playlist --newline -x"
command += " --audio-format best --add-metadata --audio-quality 0 -o"
command += f" {quote(video.dest + '/%(title)s.%(ext)s')}"
else: # Any other video link, just do it generic
command += f" -f mp4 -o {quote(video.dest + '/%(title)s.%(ext)s')}"
LOG.info("%s %s", command, video.link)
return f"{command} {quote(video.link)}"
def comic_manager(skip_arg: str, category: str) -> None:
"""Process the information to download manga"""
re_cat = "manga|webtoon" if category == "manga" else "readcomiconline"
with open(CONFIGS["comic"]["comic-list"], "r", encoding="utf-8") as r_file:
links = list(filter(lambda x: re.search(re_cat, x), r_file))
for link in links:
gallery = Gallery()
gallery.archive = ARGS.flag_archive
gallery.skip_arg = skip_arg
gallery.link = link
gallery.generate_command(is_comic=True)
gallery.run_command(ARGS.flag_verbose)
def print_webcomics(webcomics: Dict[str, Dict]) -> int:
"""Prints a list of webcomics, and returns an index."""
for index, entry in enumerate(webcomics["webcomics"]):
print(list_lines(index, entry["name"]))
return int(input("Select a webcomic: "))
def webcomic_manager():
"""Process the information to download webcomics"""
with open(CONFIGS["comic"]["webcomic-list"], "r", encoding="utf-8") as r_file:
webcomics = yaml.safe_load(r_file)
usr_input = print_webcomics(webcomics)
# Determines where the webcomic will be downloaded
rating = webcomics["webcomics"][usr_input]["type"]
dest = webcomics["global"][f"{rating}_directory"]
name = webcomics["webcomics"][usr_input]["name"]
link = webcomics["webcomics"][usr_input]["url"]
nxt_code = webcomics["webcomics"][usr_input]["next_code"]
img_code = webcomics["webcomics"][usr_input]["image_code"]
LOG.info("The webcomic is %s", dest)
command = f"cd {quote(dest)} && webcomix custom"
command += f" {quote(name)}"
command += " --start-url"
command += f" {quote(link)}"
command += f" --next-page-xpath={quote(nxt_code)}"
command += f" --image-xpath={quote(img_code)}"
command += " -y --cbz"
run(command, ARGS.flag_verbose)
def save_comic(link: str) -> None:
"""Add comic/manga link to the list"""
list_comic = CONFIGS["comic"]["comic-list"]
with open(list_comic, "r", encoding="utf-8") as r_file:
links = r_file.read().lower()
if parse_link(link).lower() in links:
LOG.info("Graphic novel repeated, not saving")
return
LOG.info("New graphic novel, saving")
with open(list_comic, "a", encoding="utf-8") as w_file:
w_file.write(link + "\n")
def push_manager(user: User):
"""Filters out the URL to use the appropiate downloader"""
# Creates an array which will store any links that should use youtube-dl
rgx_gallery = re.compile(
r"(x\.com\/\w+((?=.*media)|(?!.*status)))"
r"|(men\.wikifeet)"
r"|(furaffinity\.net\/user\/)"
r"|((deviantart\.com\/\w+(?!.*\/art\/)))"
r"|(furaffinity\.net\/gallery\/)"
r"|(furaffinity\.net\/scraps\/)"
r"|(furaffinity\.net\/favorites\/)"
r"|(instagram.com(?!\/p\/)\/\w+)"
r"|(e621\.net((?=\/post\/)|(?!\/posts\/)))"
r"|(flickr\.com\/photos\/\w+\/(?!\d+))"
r"|(tumblr\.com(?!\/post\/))"
r"|(kemono\.party\/(fanbox|gumroad|patreon)(?!\/user\/\d+\/post))"
r"|(blogspot\.com(?!\/))"
r"|(rule34\.paheal\.net\/post\/(?!view))"
r"|(rule34\.xxx\/index\.php\?page\=post&s=(?!view))"
r"|(pixiv\.net\/(en\/)?((?=users)|(?!artwork)))"
r"|(fanbox\.cc\/@\w+(?!.*posts\/\d+))"
r"|(reddit\.com\/(user|u))"
r"|(baraag\.net\/((@\w+)|(?!\/\d+)))"
r"|(pinterest\.com\/(?!pin\/\d+))"
r"|(redgifs\.com\/(users|u|(?!watch)))"
r"|(bsky\.app\/profile\/(?!.*\/post\/))"
)
rgx_video = re.compile("youtu.be|youtube|pornhub|xtube|xvideos|chaturbate")
rgx_comic = re.compile("readcomiconline|mangahere|mangadex|webtoons|manganato")
with open(user.lists["push"], "r", encoding="utf-8") as r_file:
links = list(map(lambda x: x.rstrip(), r_file))
links_galleries = filter(rgx_gallery.search, links)
links_videos = filter(rgx_video.search, links)
links_comics = filter(rgx_comic.search, links)
links_other = filter(
lambda x: (not rgx_video.search(x))
and (not rgx_gallery.search(x))
and (not rgx_comic.search(x)),
links,
)
for link in links_galleries:
gallery = Gallery()
gallery.archive = ARGS.flag_archive
gallery.skip_arg = " -o skip=true" if not ARGS.flag_skip else ""
gallery.link = parse_link(link)
gallery.dest = "download"
gallery.opt_args = parse_instagram(link)
gallery.generate_command(user)
gallery.run_command(ARGS.flag_verbose)
user.save_link(link)
for link in links_comics:
if ARGS.flag_skip and re.search(r"readcomiconline", link):
skip_arg = " --chapter-range 1"
elif ARGS.flag_skip and re.search(r"manganato|mangahere|webtoons", link):
skip_arg = " --chapter-range 1-5"
else:
skip_arg = ""
gallery = Gallery()
gallery.archive = ARGS.flag_archive
gallery.skip_arg = skip_arg
gallery.link = link
gallery.generate_command(is_comic=True)
gallery.run_command(ARGS.flag_verbose)
save_comic(link)
for link in links_videos:
video = Video()
video.use_archive = ARGS.flag_archive
video.link = link
video.dest = f"{user.directories['media']}"
video.database = quote(f"{user.dbs['media']}")
run(video_command(video), ARGS.flag_verbose)
for link in links_other:
LOG.info("Other type of download %s", link)
gallery = Gallery()
gallery.archive = False
gallery.skip_arg = " -o directory='[]'"
gallery.link = link
gallery.dest = "push"
gallery.generate_command(user)
gallery.run_command(ARGS.flag_verbose)
# Flush the push list, cleans all the contents
with open(user.lists["push"], "w", encoding="utf-8") as w_file:
w_file.close()
def scrapper_manager(user: User) -> None:
"""Analyze the user arguments and call in functions"""
user.list_manager()
if re.search(r"main|instagram|kemono", ARGS.scrapper):
skip_arg = "" if ARGS.flag_skip else " -o skip=true"
parse_gallery(ARGS.scrapper, user)
elif ARGS.scrapper in "push":
push_manager(user)
elif re.search("^comic|manga", ARGS.scrapper):
skip_arg = " --chapter-range 1" if ARGS.flag_skip else ""
skip_arg += "-5" if ARGS.scrapper in "manga" else ""
comic_manager(skip_arg, ARGS.scrapper)
elif re.search("webcomic", ARGS.scrapper):
webcomic_manager()
def scrap_everyone() -> None:
"""Iterates over every user of my scrapper"""
for current_user in CONFIGS["users"]:
user = User(get_index(current_user["name"]))
LOG.info("Scrapping %s for %s", ARGS.scrapper, current_user["name"])
scrapper_manager(user)
def main():
"""Main module to decide what to do based on the parsed arguments"""
if ARGS.scrapper:
rgx_shared = re.compile("push|main|instagram|kemono")
if (ARGS.user in "everyone") and (rgx_shared.search(ARGS.scrapper)):
scrap_everyone()
else:
scrapper_manager(User(get_index(ARGS.user)))
elif ARGS.link:
is_admin = re.search(r"everyone|jawz", ARGS.user)
user = User(get_index("jawz" if is_admin else ARGS.user))
for arg_link in ARGS.link[0]:
user.append_list("push", parse_link(arg_link))
push_manager(user)
if __name__ == "__main__":
main()

112
src/download/functions.py Normal file
View File

@ -0,0 +1,112 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Personal functions to aid on multiple scripts"""
import sys
import fileinput
import re
import os
import logging
from pathlib import Path
import yaml
VERBOSE_G = False
LOG = logging.getLogger()
HANDLER = logging.StreamHandler()
FORMATTER = logging.Formatter(
"[%(filename)s][%(levelname)s] %(funcName)s '%(message)s'"
)
HANDLER.setFormatter(FORMATTER)
LOG.addHandler(HANDLER)
LOG.setLevel(logging.INFO)
def validate_x_link(line: str) -> str:
"""returns a fixed link, which ends with /media"""
# if url contains /media at the end just write the line
if re.search(r"\/media$", line):
return line
# if does not contain /media at the end then add /media
return f"{line}/media"
def parse_link(link: str) -> str:
"""Fixes links"""
if not re.search(r"(x\.com\/\w+(\/)?(?!.*status))", link):
LOG.debug("No modifications needed for the link %s", link)
return link
# if url contains /media at the end just write the line
fixed_link = validate_x_link(link)
LOG.debug("Processed link %s", fixed_link)
return fixed_link
def load_config_variables():
"""Loads all the variables from the config file"""
config_file = Path("~/.config/jawz/config.yaml")
with open(config_file.expanduser(), encoding="utf-8") as open_file:
return yaml.safe_load(open_file)
def clean_cache(directory: Path):
"""Recursively deletes all the content of a directory,
including the directory itself."""
if not directory.is_dir():
return
for file in filter(lambda x: x.is_file(), directory.iterdir()):
file.unlink()
for dir in filter(lambda x: x.is_dir(), directory.iterdir()):
dir.rmdir()
directory.rmdir()
def run(command: str, verbose: bool):
"""Run command in a subprocess"""
# pylint: disable=subprocess-run-check
# This toggle allows for a really wasy debug when using -v
if verbose:
print(command)
else:
os.system(command)
def list_lines(i: int, line: str) -> str:
"""Create a numbered list"""
return f"{i}) {line}"
def quote(line: str) -> str:
"""Quote the line"""
return f'"{line}"'
def sort_txt_file(file_path: Path):
"""Sort every line alphabetically
remove duplicated and empty lines"""
file = str(file_path.resolve())
run(f"sort -u {quote(file)} -o {quote(file)}", VERBOSE_G)
run(f"sed -i '/^$/d' {quote(file)}", VERBOSE_G)
run(f'sed -i -e "s,http:,https:," {quote(file)}', VERBOSE_G)
# fix this using strip on python
# line.strip("/")
run(f'sed -i -e "s,/$,," {quote(file)}', VERBOSE_G) # trailing /
def randomize_txt_file(file_path: Path):
"""Randomize the order of the
lines of the txt file"""
file = str(file_path.resolve())
run(f"sort -R {quote(file)} -o {quote(file)}", VERBOSE_G)
def parse_list(file):
"""Replace http with https and remove trailing /"""
for line in fileinput.input(file, inplace=True):
sys.stdout.write(str(line).replace("http://", "https://"))
with open(file, "r+", encoding="utf-8") as open_file:
f_content = open_file.read()
f_content = re.compile(r"\/$", 0).sub(r"\/$", "")
open_file.seek(0)
open_file.truncate()
print(f_content)
sort_txt_file(file)

15
src/download/setup.cfg Normal file
View File

@ -0,0 +1,15 @@
[metadata]
name = download
[options]
py_modules =
download
functions
argparser
classes.gallery
classes.user
[options.entry_points]
console_scripts =
download = download:main

3
src/download/setup.py Normal file
View File

@ -0,0 +1,3 @@
from setuptools import setup
setup()

19
src/download/shell.nix Normal file
View File

@ -0,0 +1,19 @@
{
pkgs ? import <nixpkgs> { },
}:
let
pythonPackages = builtins.attrValues {
inherit (pkgs.python3Packages) setuptools pyyaml types-pyyaml;
};
otherPackages = builtins.attrValues {
inherit (pkgs) yt-dlp gallery-dl ffmpeg;
};
in
pkgs.mkShell {
packages = [
(pkgs.python3.withPackages (_ps: pythonPackages))
] ++ otherPackages;
buildInputs = [ ];
}

165
src/tasks.sh Executable file
View File

@ -0,0 +1,165 @@
#! /usr/bin/env nix-shell
#! nix-shell -i bash -p bash trashy fd ripgrep file
directories=("$HOME/Pictures/To Organize/" "$HOME/Downloads/" "$HOME/Downloads/cache")
replace_extension() {
local file_basename
file_basename=$(basename "$1")
echo "${file_basename%.*}.$2"
}
generate_random_number() {
local min=0
local max=9999999999
printf "%010d\n" $((min + RANDOM % max))
}
test_name() {
local random_number
random_number=$(generate_random_number)
while (($(fd "$random_number"* "$HOME/Pictures/" "$HOME/Downloads/" -tf | wc -l) > 0)); do
echo "Conflicts found, generating a new filename"
random_number=$(generate_random_number)
echo "$random_number"
done
echo "$random_number"
}
while IFS= read -r file; do
regex_str='source|tenor|media|duckduckgo\.com|giphy|'
regex_str+='(?<!app)image|^download|unknown|zoom|'
regex_str+='new_canvas|untitled|drawpile|OIG|'
regex_str+='imgpsh_'
if ! basename "$file" | rg --pcre2 -q "$regex_str"; then
continue
fi
new_name=$(test_name)
echo renaming
echo "$file"
echo into
echo "$(dirname "$file")"/"$new_name"
echo ---------------
command mv -n "$(dirname "$file")"/{"$(basename "$file")","$new_name"}
if ! basename "$file" | rg -q 'Screenshot_\d{8}'; then
continue
fi
echo "moving screenshot $file into $HOME/Pictures/Screenshots/"
command mv -n "$file" "$HOME/Pictures/Screenshots/"
done < <(fd . "${directories[@]}" -d 1 -tf --absolute-path)
# screenshots=$HOME/Pictures/Screenshots
classify_directories=("$HOME/Pictures/Screenshots" "$HOME/Pictures/Photos/Camera")
if (($(fd . "${classify_directories[@]}" -tf -d 1 | wc -l) > 0)); then
while IFS= read -r file; do
date=$(stat -c "%y" "$file" | rg -o "\d{4}-\d{2}-\d{2}")
year=$(echo "$date" | rg -o "\d{4}")
month=$(echo "$date" | rg -o "\d{4}-\d{2}" | rg -o --pcre2 "(?<=-)\d{2}")
parent_dir=$(dirname "$(realpath "$file")")
dest_dir=$(realpath "$parent_dir")/$year/$month
echo "Moving screenshot $(basename "$file") into $dest_dir"
mkdir -vp "$dest_dir"
command mv -n "$file" "$dest_dir/"
done < <(fd . "${classify_directories[@]}" --absolute-path -tf -d 1)
fi
# Where steam screenshots are stored, may need to replace with ur ID
dir_steam=$XDG_DATA_HOME/Steam/userdata/107446271/760/remote
declare -A games
# Insert here new games, put between [] the ID of the game
# You can find it by visiting the $dir_steam directory
# the ID is simply the name of the folder in there.
games+=(
[386360]=Smite
[960090]="Bloons Tower Defense 6"
[648800]=Raft
[262060]="Darkest Dungeon"
[234140]="Mad Max"
[433340]="Slime Rancher"
[1190460]="Death Stranding"
[1850570]="Death Stranding"
[440900]="Conan Exiles"
[679110]="Werewolf Apocalypse"
[2221490]="The Division 2"
)
for key in "${!games[@]}"; do
# Modify this to store your screenshots somewhere else
dir_dest=$(realpath "$HOME/Pictures/Screenshots/Games")/${games[$key]}
dir_game=$(realpath "$dir_steam")/$key/screenshots
# If there are not screenshots currently stored, why bother lol
if ! [[ -d $dir_game ]]; then #
continue
fi
# If screenshots exist however...
if ! (($(fd . "$dir_game" -d 1 -tf | wc -l) > 0)); then
continue
fi
# Create destination directory
mkdir -vp "$dir_dest"
echo "Moving ${games[$key]} screenshots..."
fd . "$dir_game" -d 1 -tf -x mv -n {} "$dir_dest"/
# Delete thumnnails
echo "Deleting ${games[$key]} thumbnails..."
rm -rf "$dir_game"/thumbnails
done
# Clearing up empty directories
fd . "$dir_steam" -td -te -x trash {}
cyberpunk_dir=$HOME/Games/gog/cyberpunk-2077/drive_c/users/jawz/Pictures/"Cyberpunk 2077"
if [[ -d $cyberpunk_dir ]]; then
while IFS= read -r file; do
echo "Moving cyberpunk screenshots $(basename "$file")"
command mv -n "$file" "$HOME/Pictures/Screenshots/Games/Cyberpunk 2077/"
done < <(fd . "$cyberpunk_dir" -tf)
fi
proton_dir=$HOME/.steam/steam/compatibilitytools.d
if [[ -d "$proton_dir" ]]; then
while IFS= read -r protonver; do
lutrisdir=$XDG_DATA_HOME/lutris/runners/wine/$(basename "$protonver")
if [ -d "$lutrisdir" ] && [ -L "$lutrisdir" ]; then
continue
fi
echo "Symlink $lutrisdir doesn't exist, creating link..."
ln -s "$(realpath "$protonver")"/files "$lutrisdir"
done < <(fd . "$proton_dir" -d 1 -td)
fi
fd . "$XDG_DATA_HOME/lutris/runners/wine" -d1 -tl -Lx trash {}
while IFS= read -r file; do
ext=$(file --mime-type "$file" | rg -o '\w+$')
correct_ext=${ext,,}
filename=$(basename -- "$file")
current_ext="${filename##*.}"
filename="${filename%.*}"
if ! echo "$correct_ext" | rg -q 'jpe|jpg|jpeg|png|gif|webp'; then
continue
fi
if [ "$current_ext" == "$correct_ext" ]; then
continue
fi
echo "The file $(basename "$file")" \
"will be renamed, the propper extension is $correct_ext"
new_name="$filename".$correct_ext
if command mv -n "$(dirname "$file")"/{"$(basename "$file")","$new_name"}; then
continue
fi
file_hash="$(sha256sum "$file" | gawk '{ print $1 }')"
if ! echo "$file_hash $(dirname "$file")/$new_name" | sha256sum -c; then
continue
fi
echo "deleting duplicated: $file"
rm "$file"
done < <(fd . "${directories[@]}" -d 1 -tf)
files_home_clean=(.pki HuionCore.pid DriverUI.pid huion.log)
for file in "${files_home_clean[@]}"; do
file=$HOME/$file
if [ ! -e "$file" ]; then
continue
fi
rm -rf "$file"
done

View File

@ -1,20 +0,0 @@
-- Create the table if it doesn't exist
CREATE TABLE IF NOT EXISTS activity_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL
);
-- Insert dummy data for one year
DELETE FROM activity_log; -- Clear existing data
WITH RECURSIVE dates(date) AS (
SELECT datetime('2023-12-01 00:00:00')
UNION ALL
SELECT datetime(date, '+1 hour')
FROM dates
WHERE date < datetime('2024-12-01 00:00:00')
)
INSERT INTO activity_log (timestamp)
SELECT date
FROM dates
WHERE random() % 4 = 0; -- Randomly select approximately 25% of hours for activity