diff --git a/flake.nix b/flake.nix index be8d6b4..3f05c86 100644 --- a/flake.nix +++ b/flake.nix @@ -136,6 +136,22 @@ type = "app"; program = "${pkgs.download}/bin/download-admin"; }; + download-tests = { + type = "app"; + program = "${ + pkgs.writeShellApplication { + name = "download-tests"; + runtimeInputs = [ + (pkgs.python3.withPackages (ps: [ ps.pyyaml ])) + ]; + text = '' + set -euo pipefail + export PYTHONPATH="${inputs.self}/src/download" + python -m unittest discover -s "${inputs.self}/src/download/tests" -p "test_*.py" + ''; + } + }/bin/download-tests"; + }; }; }; } diff --git a/src/download/tests/test_db.py b/src/download/tests/test_db.py new file mode 100644 index 0000000..309c6a0 --- /dev/null +++ b/src/download/tests/test_db.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import tempfile +import unittest +import sqlite3 +from pathlib import Path + +import db + + +class TestDB(unittest.TestCase): + def setUp(self) -> None: + self.conn = sqlite3.connect(":memory:") + self.conn.row_factory = sqlite3.Row + db.ensure_schema(self.conn) + + def tearDown(self) -> None: + self.conn.close() + + def test_normalize_url(self): + self.assertEqual( + db.normalize_url("http://Twitter.com/User/"), + "https://x.com/User", + ) + self.assertEqual( + db.normalize_url("x.com/SomeUser/media/"), + "https://x.com/SomeUser/media", + ) + + def test_add_link_dedupe(self): + res1 = db.add_link(self.conn, "jawz", "https://x.com/Test/") + res2 = db.add_link(self.conn, "jawz", "https://x.com/Test") + self.assertEqual(res1["status"], "added") + self.assertEqual(res2["status"], "exists") + + def test_remove_tombstone(self): + db.add_link(self.conn, "jawz", "https://x.com/Test") + ok = db.remove_link(self.conn, "jawz", "https://x.com/Test") + self.assertTrue(ok) + res = db.add_link(self.conn, "jawz", "https://x.com/Test") + self.assertEqual(res["status"], "removed") + res2 = db.add_link(self.conn, "jawz", "https://x.com/Test", assume_yes=True) + self.assertEqual(res2["status"], "added") + + def test_disable_and_ban(self): + db.add_link(self.conn, "jawz", "https://x.com/Test") + ok = db.set_enabled(self.conn, "jawz", "https://x.com/Test", enabled=False) + self.assertTrue(ok) + active = db.get_active_links(self.conn, "jawz") + self.assertEqual(active, []) + ok = db.set_banned(self.conn, "jawz", "https://x.com/Test", banned=True, reason="bad") + self.assertTrue(ok) + active = db.get_active_links(self.conn, "jawz") + self.assertEqual(active, []) + + def test_import_master_list(self): + with tempfile.TemporaryDirectory() as tmp: + path = Path(tmp) / "watch.txt" + path.write_text( + "\n".join( + [ + "https://x.com/User", + "# https://x.com/DisabledUser", + "https://x.com/User", + ] + ) + + "\n", + encoding="utf-8", + ) + result = db.import_master_list(self.conn, "jawz", path) + self.assertEqual(result["added"], 2) + self.assertEqual(result["exists"], 1) + rows = db.get_links_by_user(self.conn, "jawz") + by_norm = {db.normalize_url(r["url_original"]): r for r in rows} + self.assertTrue(by_norm["https://x.com/User"]["enabled"]) + self.assertFalse(by_norm["https://x.com/DisabledUser"]["enabled"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/download/tests/test_download.py b/src/download/tests/test_download.py new file mode 100644 index 0000000..5b620d2 --- /dev/null +++ b/src/download/tests/test_download.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +import types +import unittest +from pathlib import Path +import tempfile + +import download + + +class DummyArgs: + def __init__(self): + self.post_type = ["posts", "reels"] + self.flag_archive = True + self.flag_skip = True + self.flag_verbose = True + + +class DummyUser: + def __init__(self): + self.name = "jawz" + self.sleep = 0 + self.directories = {"media": Path("/tmp/media"), "download": Path("/tmp/dl")} + self.dbs = {"gallery": Path("/tmp/g.sqlite3"), "media": Path("/tmp/m.txt")} + self.lists = {"push": Path("/tmp/instant.txt")} + + +class TestDownload(unittest.TestCase): + def setUp(self) -> None: + download.ARGS = DummyArgs() + download.CONFIGS = { + "users": [{"name": "jawz"}], + "global": {}, + "comic": {"comic-list": "/tmp/comic.txt"}, + } + self.orig_gallery = download.Gallery + self.orig_video_command = download.video_command + self.orig_run = download.run + self.orig_db_connect = download.db.connect + self.orig_db_add_link = download.db.add_link + self.orig_save_comic = download.save_comic + + def tearDown(self) -> None: + download.Gallery = self.orig_gallery + download.video_command = self.orig_video_command + download.run = self.orig_run + download.db.connect = self.orig_db_connect + download.db.add_link = self.orig_db_add_link + download.save_comic = self.orig_save_comic + + def test_parse_instagram(self): + res = download.parse_instagram("https://instagram.com/user") + self.assertEqual(res, ["-o", "include=posts,reels"]) + res2 = download.parse_instagram("https://x.com/user") + self.assertEqual(res2, []) + + def test_video_command(self): + v = download.Video() + v.link = "https://youtu.be/abc" + v.dest = "/tmp" + cmd = download.video_command(v) + self.assertIn("yt-dlp", cmd[0]) + self.assertIn("https://youtu.be/abc", cmd) + + v2 = download.Video() + v2.link = "https://music.youtube.com/watch?v=xyz" + v2.dest = "/tmp" + v2.use_archive = False + cmd2 = download.video_command(v2) + self.assertIn("--audio-format", cmd2) + + def test_push_manager_routing(self): + user = DummyUser() + + captured = {"gallery": [], "video": [], "comic": [], "other": []} + + def fake_generate(self, *args, **kwargs): + return None + + def fake_run(self, *args, **kwargs): + link = getattr(self, "link", "") + if "mangadex" in link: + captured["comic"].append(link) + elif "x.com" in link: + captured["gallery"].append(link) + else: + captured["other"].append(link) + + def fake_video_command(video): + captured["video"].append(video.link) + return ["echo", "ok"] + + # Patch Gallery methods and video_command/run + class FakeGallery(self.orig_gallery): + def generate_command(self, *args, **kwargs): + return fake_generate(self, *args, **kwargs) + + def run_command(self, *args, **kwargs): + return fake_run(self, *args, **kwargs) + + download.Gallery = FakeGallery + download.video_command = fake_video_command + download.run = lambda *args, **kwargs: None + download.save_comic = lambda *_args, **_kwargs: None + + links = [ + "https://x.com/someuser", + "https://youtu.be/abc", + "https://mangadex.org/title/123", + "https://example.com/page", + ] + + # Disable DB write path for this test + class FakeConn: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def commit(self): + return None + + download.db.connect = lambda *a, **k: FakeConn() + download.db.add_link = lambda *a, **k: {"status": "added"} + + download.push_manager(user, links=links) + + self.assertEqual(len(captured["gallery"]), 1) + self.assertEqual(len(captured["video"]), 1) + self.assertEqual(len(captured["comic"]), 1) + self.assertEqual(len(captured["other"]), 1) + + # restore handled in tearDown + + +if __name__ == "__main__": + unittest.main()