This commit is contained in:
Danilo Reyes
2026-02-28 21:25:26 -06:00
parent 2ccdd713ea
commit 7a64034f8a
3 changed files with 232 additions and 0 deletions

View File

@@ -136,6 +136,22 @@
type = "app";
program = "${pkgs.download}/bin/download-admin";
};
download-tests = {
type = "app";
program = "${
pkgs.writeShellApplication {
name = "download-tests";
runtimeInputs = [
(pkgs.python3.withPackages (ps: [ ps.pyyaml ]))
];
text = ''
set -euo pipefail
export PYTHONPATH="${inputs.self}/src/download"
python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py"
'';
}
}/bin/download-tests";
};
};
};
}

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
import tempfile
import unittest
import sqlite3
from pathlib import Path
import db
class TestDB(unittest.TestCase):
def setUp(self) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.row_factory = sqlite3.Row
db.ensure_schema(self.conn)
def tearDown(self) -> None:
self.conn.close()
def test_normalize_url(self):
self.assertEqual(
db.normalize_url("http://Twitter.com/User/"),
"https://x.com/User",
)
self.assertEqual(
db.normalize_url("x.com/SomeUser/media/"),
"https://x.com/SomeUser/media",
)
def test_add_link_dedupe(self):
res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res1["status"], "added")
self.assertEqual(res2["status"], "exists")
def test_remove_tombstone(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.remove_link(self.conn, "jawz", "https://x.com/Test")
self.assertTrue(ok)
res = db.add_link(self.conn, "jawz", "https://x.com/Test")
self.assertEqual(res["status"], "removed")
res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True)
self.assertEqual(res2["status"], "added")
def test_disable_and_ban(self):
db.add_link(self.conn, "jawz", "https://x.com/Test")
ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False)
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad")
self.assertTrue(ok)
active = db.get_active_links(self.conn, "jawz")
self.assertEqual(active, [])
def test_import_master_list(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "watch.txt"
path.write_text(
"\n".join(
[
"https://x.com/User",
"# https://x.com/DisabledUser",
"https://x.com/User",
]
)
+ "\n",
encoding="utf-8",
)
result = db.import_master_list(self.conn, "jawz", path)
self.assertEqual(result["added"], 2)
self.assertEqual(result["exists"], 1)
rows = db.get_links_by_user(self.conn, "jawz")
by_norm = {db.normalize_url(r["url_original"]): r for r in rows}
self.assertTrue(by_norm["https://x.com/User"]["enabled"])
self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
import types
import unittest
from pathlib import Path
import tempfile
import download
class DummyArgs:
def __init__(self):
self.post_type = ["posts", "reels"]
self.flag_archive = True
self.flag_skip = True
self.flag_verbose = True
class DummyUser:
def __init__(self):
self.name = "jawz"
self.sleep = 0
self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")}
self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")}
self.lists = {"push": Path("/tmp/instant.txt")}
class TestDownload(unittest.TestCase):
def setUp(self) -> None:
download.ARGS = DummyArgs()
download.CONFIGS = {
"users": [{"name": "jawz"}],
"global": {},
"comic": {"comic-list": "/tmp/comic.txt"},
}
self.orig_gallery = download.Gallery
self.orig_video_command = download.video_command
self.orig_run = download.run
self.orig_db_connect = download.db.connect
self.orig_db_add_link = download.db.add_link
self.orig_save_comic = download.save_comic
def tearDown(self) -> None:
download.Gallery = self.orig_gallery
download.video_command = self.orig_video_command
download.run = self.orig_run
download.db.connect = self.orig_db_connect
download.db.add_link = self.orig_db_add_link
download.save_comic = self.orig_save_comic
def test_parse_instagram(self):
res = download.parse_instagram("https://instagram.com/user")
self.assertEqual(res, ["-o", "include=posts,reels"])
res2 = download.parse_instagram("https://x.com/user")
self.assertEqual(res2, [])
def test_video_command(self):
v = download.Video()
v.link = "https://youtu.be/abc"
v.dest = "/tmp"
cmd = download.video_command(v)
self.assertIn("yt-dlp", cmd[0])
self.assertIn("https://youtu.be/abc", cmd)
v2 = download.Video()
v2.link = "https://music.youtube.com/watch?v=xyz"
v2.dest = "/tmp"
v2.use_archive = False
cmd2 = download.video_command(v2)
self.assertIn("--audio-format", cmd2)
def test_push_manager_routing(self):
user = DummyUser()
captured = {"gallery": [], "video": [], "comic": [], "other": []}
def fake_generate(self, *args, **kwargs):
return None
def fake_run(self, *args, **kwargs):
link = getattr(self, "link", "")
if "mangadex" in link:
captured["comic"].append(link)
elif "x.com" in link:
captured["gallery"].append(link)
else:
captured["other"].append(link)
def fake_video_command(video):
captured["video"].append(video.link)
return ["echo", "ok"]
# Patch Gallery methods and video_command/run
class FakeGallery(self.orig_gallery):
def generate_command(self, *args, **kwargs):
return fake_generate(self, *args, **kwargs)
def run_command(self, *args, **kwargs):
return fake_run(self, *args, **kwargs)
download.Gallery = FakeGallery
download.video_command = fake_video_command
download.run = lambda *args, **kwargs: None
download.save_comic = lambda *_args, **_kwargs: None
links = [
"https://x.com/someuser",
"https://youtu.be/abc",
"https://mangadex.org/title/123",
"https://example.com/page",
]
# Disable DB write path for this test
class FakeConn:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def commit(self):
return None
download.db.connect = lambda *a, **k: FakeConn()
download.db.add_link = lambda *a, **k: {"status": "added"}
download.push_manager(user, links=links)
self.assertEqual(len(captured["gallery"]), 1)
self.assertEqual(len(captured["video"]), 1)
self.assertEqual(len(captured["comic"]), 1)
self.assertEqual(len(captured["other"]), 1)
# restore handled in tearDown
if __name__ == "__main__":
unittest.main()