#!/usr/bin/env python3 import requests from collections import defaultdict from itertools import chain from typing import Dict, List, Set, Optional, Tuple, Iterator import re session = requests.Session() base = "http://localhost:4444" session.auth = ("jawz", "overall-tuition-utensil-lecturer-fantastic-deferral") def fetch_roms_page(session: requests.Session, base: str, offset: int) -> Dict: """Fetch a single page of ROMs.""" r = session.get(f"{base}/api/roms", params={"limit": 500, "offset": offset}) r.raise_for_status() return r.json() def fetch_all_roms(session: requests.Session, base: str) -> List[Dict]: """Fetch all ROMs using functional approach.""" def fetch_pages() -> Iterator[List[Dict]]: offset = 0 limit = 500 total = None while True: page = fetch_roms_page(session, base, offset) items = page.get("items", []) yield items if total is None: total = page.get("total", 0) if not items or offset + len(items) >= total: break offset += limit return list(chain.from_iterable(fetch_pages())) if __name__ == "__main__": print("Fetching all ROMs...") roms = fetch_all_roms(session, base) print(f"Fetched {len(roms)} ROMs") def normalize_name(name: Optional[str]) -> str: """Normalize ROM name for comparison.""" if not name: return "" return re.sub(r"\(.*?\)", "", re.sub(r"\[.*?\]", "", name)).strip().lower() def get_metadata_id(rom: Dict) -> Optional[Tuple[str, int]]: """Get the best metadata ID for grouping ROMs.""" metadata_sources = [ ("igdb", "igdb_id"), ("moby", "moby_id"), ("ss", "ss_id"), ("launchbox", "launchbox_id"), ] for source_name, key in metadata_sources: if value := rom.get(key): return (source_name, value) return None def get_hash(rom: Dict) -> Optional[str]: """Get the best hash for exact duplicate detection.""" hash_sources = [ ("sha1", "sha1_hash"), ("md5", "md5_hash"), ("crc", "crc_hash"), ] for hash_type, key in hash_sources: if value := rom.get(key): return f"{hash_type}:{value}" return None def has_region(rom: Dict, region: str) -> bool: """Check if ROM has a specific region.""" regions = rom.get("regions", []) if not regions: return False region_lower = region.lower() return any(region_lower in r.lower() for r in regions) def is_eur(rom: Dict) -> bool: """Check if ROM is European region.""" return has_region(rom, "eur") or has_region(rom, "europe") or has_region(rom, "pal") def is_usa(rom: Dict) -> bool: """Check if ROM is USA region.""" return ( has_region(rom, "usa") or has_region(rom, "north america") or has_region(rom, "ntsc-u") ) def is_japan(rom: Dict) -> bool: """Check if ROM is Japanese region.""" return ( has_region(rom, "jpn") or has_region(rom, "japan") or has_region(rom, "ntsc-j") ) def get_language_count(rom: Dict) -> int: """Get the number of languages supported.""" languages = rom.get("languages", []) return len(languages) if languages else 0 def _matches_disc_pattern(text: str, patterns: List[str]) -> bool: """Check if text matches any disc pattern.""" return any(re.search(pattern, text.lower()) for pattern in patterns) def _check_tags_for_disc(tags: List) -> bool: """Check if tags contain disc indicators.""" tag_patterns = [ r"disc\s*\d+", r"disk\s*\d+", r"cd\s*\d+", r"dvd\s*\d+", r"part\s*\d+", r"disc\s*[a-z]", r"disk\s*[a-z]", ] return any( _matches_disc_pattern(str(tag), tag_patterns) for tag in filter(None, tags) ) def _check_name_for_disc(name: str) -> bool: """Check if name contains disc indicators.""" name_patterns = [ r"\(disc\s*\d+\)", r"\(disk\s*\d+\)", r"\[disc\s*\d+\]", r"\[disk\s*\d+\]", r"disc\s*\d+", r"disk\s*\d+", r"\(cd\s*\d+\)", r"\(dvd\s*\d+\)", r"part\s*\d+", ] return bool(name) and _matches_disc_pattern(str(name), name_patterns) def _check_filename_for_disc(fs_name: str) -> bool: """Check if filename contains disc indicators.""" filename_patterns = [ r"disc\s*\d+", r"disk\s*\d+", r"cd\s*\d+", r"dvd\s*\d+", r"\(disc\s*\d+\)", r"\(disk\s*\d+\)", r"\[disc\s*\d+\]", r"\[disk\s*\d+\]", ] return bool(fs_name) and _matches_disc_pattern(str(fs_name), filename_patterns) def is_multi_disc_game(rom: Dict) -> bool: """Check if ROM is part of a multi-disc game by scanning tags and name.""" return ( _check_tags_for_disc(rom.get("tags", [])) or _check_name_for_disc(rom.get("name", "")) or _check_filename_for_disc(rom.get("fs_name", "")) ) def has_multi_disc_roms(group: List[Dict]) -> bool: """Check if a group contains any multi-disc game ROMs.""" return any(map(is_multi_disc_game, group)) def get_metadata_score(rom: Dict) -> int: """Calculate a score based on metadata completeness.""" score_map = { "igdb_id": 10, "moby_id": 5, "ss_id": 5, "name": 3, "summary": 2, "alternative_names": 1, } return sum(score for key, score in score_map.items() if rom.get(key)) def get_region_score(rom: Dict) -> int: """Get region preference score. EUR > USA > Japan > Other.""" if is_eur(rom): return 30 if is_usa(rom): return 20 if is_japan(rom): return 10 return 0 def rom_preference_score(rom: Dict) -> Tuple[int, int, int, int]: """ Calculate preference score for keeping a ROM. Returns: (region_score, language_score, metadata_score, total_score) Higher is better. """ region_score = get_region_score(rom) language_score = get_language_count(rom) * 5 metadata_score = get_metadata_score(rom) total_score = region_score + language_score + metadata_score return (region_score, language_score, metadata_score, total_score) def group_by_hash(roms: List[Dict]) -> Dict[str, List[Dict]]: """Group ROMs by hash (exact duplicates).""" hash_groups = defaultdict(list) for rom in filter(lambda r: get_hash(r) is not None, roms): hash_groups[get_hash(rom)].append(rom) return { f"exact_hash_{h}": group for h, group in hash_groups.items() if len(group) > 1 } def group_by_metadata( roms: List[Dict], existing_groups: Dict[str, List[Dict]] ) -> Dict[str, List[Dict]]: """Group ROMs by metadata ID, excluding those already grouped.""" existing_ids = {r["id"] for group in existing_groups.values() for r in group} metadata_groups = defaultdict(list) for rom in filter(lambda r: get_metadata_id(r) is not None, roms): if rom["id"] not in existing_ids: metadata_groups[get_metadata_id(rom)].append(rom) def should_add_group(group: List[Dict]) -> bool: if len(group) <= 1: return False group_hashes = {get_hash(r) for r in group if get_hash(r)} return len(group_hashes) > 1 or not group_hashes return { f"metadata_{meta_id[0]}_{meta_id[1]}": group for meta_id, group in metadata_groups.items() if should_add_group(group) } def group_by_name( roms: List[Dict], existing_groups: Dict[str, List[Dict]] ) -> Dict[str, List[Dict]]: """Group ROMs by normalized name, excluding those already grouped.""" existing_ids = {r["id"] for group in existing_groups.values() for r in group} name_groups = defaultdict(list) for rom in filter(lambda r: normalize_name(r.get("name")), roms): if rom["id"] not in existing_ids: name_groups[normalize_name(rom.get("name"))].append(rom) return { f"name_{name[:50]}": group for name, group in name_groups.items() if len(group) > 1 } def find_duplicates(roms: List[Dict]) -> Dict[str, List[Dict]]: """Group ROMs by various duplicate criteria.""" hash_groups = group_by_hash(roms) metadata_groups = group_by_metadata(roms, hash_groups) name_groups = group_by_name(roms, {**hash_groups, **metadata_groups}) return {**hash_groups, **metadata_groups, **name_groups} def score_roms(group: List[Dict]) -> List[Tuple[Dict, Tuple[int, int, int, int]]]: """Score all ROMs in a group.""" return [(rom, rom_preference_score(rom)) for rom in group] def select_best_rom( scored_roms: List[Tuple[Dict, Tuple[int, int, int, int]]], ) -> Tuple[Dict, List[Dict]]: """Select the best ROM and return it with the rest to delete.""" sorted_roms = sorted(scored_roms, key=lambda x: x[1][3], reverse=True) best_rom, _ = sorted_roms[0] delete_roms = [rom for rom, _ in sorted_roms[1:]] return best_rom, delete_roms def print_rom_info(rom: Dict, score: Tuple[int, int, int, int], prefix: str = " "): """Print ROM information.""" print(f"{prefix}Name: {rom.get('name', 'Unknown')} (ID: {rom['id']})") print(f"{prefix} Region: {rom.get('regions', [])}") print(f"{prefix} Languages: {rom.get('languages', [])}") print(f"{prefix} Score: {score}") print(f"{prefix} File: {rom.get('fs_name', 'N/A')}") def process_group( group_name: str, group: List[Dict], processed_ids: Set[int] ) -> List[Dict]: """Process a duplicate group and return ROMs to delete.""" if len(group) <= 1: return [] group_ids = {r["id"] for r in group} if group_ids & processed_ids: return [] # Skip deletion if this group contains multi-disc games if has_multi_disc_roms(group): print(f"Group: {group_name}") print(" ⚠️ SKIPPED: Contains multi-disc game ROMs (all discs are needed)") for rom in group: score = rom_preference_score(rom) print_rom_info(rom, score, " Keeping: ") print() return [] scored_roms = score_roms(group) best_rom, delete_roms = select_best_rom(scored_roms) print(f"Group: {group_name}") print_rom_info(best_rom, scored_roms[0][1], " Keeping: ") for rom in delete_roms: score = rom_preference_score(rom) print_rom_info(rom, score, " Delete: ") processed_ids.add(rom["id"]) print() return delete_roms def get_kept_roms(groups: Dict[str, List[Dict]]) -> List[Dict]: """Get the ROMs that were kept (best from each group).""" kept = [] for group in groups.values(): if len(group) <= 1: continue scored_roms = score_roms(group) best_rom, _ = select_best_rom(scored_roms) kept.append(best_rom) return kept def recommend_roms_to_delete(roms: List[Dict]) -> Tuple[List[Dict], List[Dict]]: """Find and recommend ROMs to delete based on duplicate analysis. Returns: (roms_to_delete, roms_kept) """ groups = find_duplicates(roms) processed_ids = set() print(f"\nFound {len(groups)} duplicate groups\n") to_delete = list( chain.from_iterable( process_group(group_name, group, processed_ids) for group_name, group in groups.items() ) ) kept = get_kept_roms(groups) return to_delete, kept def delete_roms(session: requests.Session, base: str, rom_ids: List[int]) -> Dict: """Delete ROMs using the API.""" url = f"{base}/api/roms/delete" payload = {"roms": rom_ids, "delete_from_fs": rom_ids} r = session.post(url, json=payload) r.raise_for_status() return r.json() def format_rom_list(roms: List[Dict]) -> str: """Format a list of ROMs for display.""" if not roms: return " (none)" return "\n".join( f" - {rom.get('name', 'Unknown')} (ID: {rom['id']}) - {rom.get('fs_name', 'N/A')}" for rom in sorted(roms, key=lambda r: r.get("name", "")) ) def format_summary( roms: List[Dict], roms_to_delete: List[Dict], roms_kept: List[Dict], ) -> str: """Format the summary output.""" delete_ids = [r["id"] for r in roms_to_delete] separator = "=" * 60 return f""" {separator} SUMMARY {separator} Total ROMs analyzed: {len(roms)} ROMs recommended for deletion: {len(roms_to_delete)} ROMs kept (originals): {len(roms_kept)} {separator} ROM IDs to delete: {delete_ids} {separator} """ def format_deletion_results(deleted: List[Dict], kept: List[Dict]) -> str: """Format the results after deletion.""" separator = "=" * 60 return f""" {separator} DELETION RESULTS {separator} ROMs DELETED ({len(deleted)}): {format_rom_list(deleted)} ROMs KEPT (originals) ({len(kept)}): {format_rom_list(kept)} {separator} """ if __name__ == "__main__": roms_to_delete, roms_kept = recommend_roms_to_delete(roms) print(format_summary(roms, roms_to_delete, roms_kept, base)) if roms_to_delete: delete_ids = [r["id"] for r in roms_to_delete] print(f"Deleting {len(roms_to_delete)} ROMs...") try: result = delete_roms(session, base, delete_ids) print("✓ Deletion successful!") print(format_deletion_results(roms_to_delete, roms_kept)) except Exception as e: print(f"✗ Error during deletion: {e}") print("ROMs were NOT deleted. Please check the error above.") else: print("No ROMs to delete.")