init
This commit is contained in:
382
main.py
Normal file
382
main.py
Normal file
@@ -0,0 +1,382 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import requests
|
||||
from collections import defaultdict
|
||||
from itertools import chain
|
||||
from typing import Dict, List, Set, Optional, Tuple, Iterator
|
||||
import re
|
||||
|
||||
session = requests.Session()
|
||||
base = "http://localhost:4444"
|
||||
session.auth = ("jawz", "overall-tuition-utensil-lecturer-fantastic-deferral")
|
||||
|
||||
|
||||
def fetch_roms_page(session: requests.Session, base: str, offset: int) -> Dict:
|
||||
"""Fetch a single page of ROMs."""
|
||||
r = session.get(f"{base}/api/roms", params={"limit": 500, "offset": offset})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def fetch_all_roms(session: requests.Session, base: str) -> List[Dict]:
|
||||
"""Fetch all ROMs using functional approach."""
|
||||
|
||||
def fetch_pages() -> Iterator[List[Dict]]:
|
||||
offset = 0
|
||||
limit = 500
|
||||
total = None
|
||||
|
||||
while True:
|
||||
page = fetch_roms_page(session, base, offset)
|
||||
items = page.get("items", [])
|
||||
yield items
|
||||
|
||||
if total is None:
|
||||
total = page.get("total", 0)
|
||||
|
||||
if not items or offset + len(items) >= total:
|
||||
break
|
||||
offset += limit
|
||||
|
||||
return list(chain.from_iterable(fetch_pages()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Fetching all ROMs...")
|
||||
roms = fetch_all_roms(session, base)
|
||||
print(f"Fetched {len(roms)} ROMs")
|
||||
|
||||
|
||||
def normalize_name(name: Optional[str]) -> str:
|
||||
"""Normalize ROM name for comparison."""
|
||||
if not name:
|
||||
return ""
|
||||
return re.sub(r"\(.*?\)", "", re.sub(r"\[.*?\]", "", name)).strip().lower()
|
||||
|
||||
|
||||
def get_metadata_id(rom: Dict) -> Optional[Tuple[str, int]]:
|
||||
"""Get the best metadata ID for grouping ROMs."""
|
||||
metadata_sources = [
|
||||
("igdb", "igdb_id"),
|
||||
("moby", "moby_id"),
|
||||
("ss", "ss_id"),
|
||||
("launchbox", "launchbox_id"),
|
||||
]
|
||||
|
||||
for source_name, key in metadata_sources:
|
||||
if value := rom.get(key):
|
||||
return (source_name, value)
|
||||
return None
|
||||
|
||||
|
||||
def get_hash(rom: Dict) -> Optional[str]:
|
||||
"""Get the best hash for exact duplicate detection."""
|
||||
hash_sources = [
|
||||
("sha1", "sha1_hash"),
|
||||
("md5", "md5_hash"),
|
||||
("crc", "crc_hash"),
|
||||
]
|
||||
|
||||
for hash_type, key in hash_sources:
|
||||
if value := rom.get(key):
|
||||
return f"{hash_type}:{value}"
|
||||
return None
|
||||
|
||||
|
||||
def has_region(rom: Dict, region: str) -> bool:
|
||||
"""Check if ROM has a specific region."""
|
||||
regions = rom.get("regions", [])
|
||||
if not regions:
|
||||
return False
|
||||
region_lower = region.lower()
|
||||
return any(region_lower in r.lower() for r in regions)
|
||||
|
||||
|
||||
def is_eur(rom: Dict) -> bool:
|
||||
"""Check if ROM is European region."""
|
||||
return has_region(rom, "eur") or has_region(rom, "europe") or has_region(rom, "pal")
|
||||
|
||||
|
||||
def is_usa(rom: Dict) -> bool:
|
||||
"""Check if ROM is USA region."""
|
||||
return (
|
||||
has_region(rom, "usa")
|
||||
or has_region(rom, "north america")
|
||||
or has_region(rom, "ntsc-u")
|
||||
)
|
||||
|
||||
|
||||
def is_japan(rom: Dict) -> bool:
|
||||
"""Check if ROM is Japanese region."""
|
||||
return (
|
||||
has_region(rom, "jpn") or has_region(rom, "japan") or has_region(rom, "ntsc-j")
|
||||
)
|
||||
|
||||
|
||||
def get_language_count(rom: Dict) -> int:
|
||||
"""Get the number of languages supported."""
|
||||
languages = rom.get("languages", [])
|
||||
return len(languages) if languages else 0
|
||||
|
||||
|
||||
def get_metadata_score(rom: Dict) -> int:
|
||||
"""Calculate a score based on metadata completeness."""
|
||||
score_map = {
|
||||
"igdb_id": 10,
|
||||
"moby_id": 5,
|
||||
"ss_id": 5,
|
||||
"name": 3,
|
||||
"summary": 2,
|
||||
"alternative_names": 1,
|
||||
}
|
||||
|
||||
return sum(score for key, score in score_map.items() if rom.get(key))
|
||||
|
||||
|
||||
def get_region_score(rom: Dict) -> int:
|
||||
"""Get region preference score. EUR > USA > Japan > Other."""
|
||||
if is_eur(rom):
|
||||
return 30
|
||||
if is_usa(rom):
|
||||
return 20
|
||||
if is_japan(rom):
|
||||
return 10
|
||||
return 0
|
||||
|
||||
|
||||
def rom_preference_score(rom: Dict) -> Tuple[int, int, int, int]:
|
||||
"""
|
||||
Calculate preference score for keeping a ROM.
|
||||
Returns: (region_score, language_score, metadata_score, total_score)
|
||||
Higher is better.
|
||||
"""
|
||||
region_score = get_region_score(rom)
|
||||
language_score = get_language_count(rom) * 5
|
||||
metadata_score = get_metadata_score(rom)
|
||||
total_score = region_score + language_score + metadata_score
|
||||
|
||||
return (region_score, language_score, metadata_score, total_score)
|
||||
|
||||
|
||||
def group_by_hash(roms: List[Dict]) -> Dict[str, List[Dict]]:
|
||||
"""Group ROMs by hash (exact duplicates)."""
|
||||
hash_groups = defaultdict(list)
|
||||
for rom in filter(lambda r: get_hash(r) is not None, roms):
|
||||
hash_groups[get_hash(rom)].append(rom)
|
||||
|
||||
return {
|
||||
f"exact_hash_{h}": group for h, group in hash_groups.items() if len(group) > 1
|
||||
}
|
||||
|
||||
|
||||
def group_by_metadata(
|
||||
roms: List[Dict], existing_groups: Dict[str, List[Dict]]
|
||||
) -> Dict[str, List[Dict]]:
|
||||
"""Group ROMs by metadata ID, excluding those already grouped."""
|
||||
existing_ids = {r["id"] for group in existing_groups.values() for r in group}
|
||||
|
||||
metadata_groups = defaultdict(list)
|
||||
for rom in filter(lambda r: get_metadata_id(r) is not None, roms):
|
||||
if rom["id"] not in existing_ids:
|
||||
metadata_groups[get_metadata_id(rom)].append(rom)
|
||||
|
||||
def should_add_group(group: List[Dict]) -> bool:
|
||||
if len(group) <= 1:
|
||||
return False
|
||||
group_hashes = {get_hash(r) for r in group if get_hash(r)}
|
||||
return len(group_hashes) > 1 or not group_hashes
|
||||
|
||||
return {
|
||||
f"metadata_{meta_id[0]}_{meta_id[1]}": group
|
||||
for meta_id, group in metadata_groups.items()
|
||||
if should_add_group(group)
|
||||
}
|
||||
|
||||
|
||||
def group_by_name(
|
||||
roms: List[Dict], existing_groups: Dict[str, List[Dict]]
|
||||
) -> Dict[str, List[Dict]]:
|
||||
"""Group ROMs by normalized name, excluding those already grouped."""
|
||||
existing_ids = {r["id"] for group in existing_groups.values() for r in group}
|
||||
|
||||
name_groups = defaultdict(list)
|
||||
for rom in filter(lambda r: normalize_name(r.get("name")), roms):
|
||||
if rom["id"] not in existing_ids:
|
||||
name_groups[normalize_name(rom.get("name"))].append(rom)
|
||||
|
||||
return {
|
||||
f"name_{name[:50]}": group
|
||||
for name, group in name_groups.items()
|
||||
if len(group) > 1
|
||||
}
|
||||
|
||||
|
||||
def find_duplicates(roms: List[Dict]) -> Dict[str, List[Dict]]:
|
||||
"""Group ROMs by various duplicate criteria."""
|
||||
hash_groups = group_by_hash(roms)
|
||||
metadata_groups = group_by_metadata(roms, hash_groups)
|
||||
name_groups = group_by_name(roms, {**hash_groups, **metadata_groups})
|
||||
|
||||
return {**hash_groups, **metadata_groups, **name_groups}
|
||||
|
||||
|
||||
def score_roms(group: List[Dict]) -> List[Tuple[Dict, Tuple[int, int, int, int]]]:
|
||||
"""Score all ROMs in a group."""
|
||||
return [(rom, rom_preference_score(rom)) for rom in group]
|
||||
|
||||
|
||||
def select_best_rom(
|
||||
scored_roms: List[Tuple[Dict, Tuple[int, int, int, int]]],
|
||||
) -> Tuple[Dict, List[Dict]]:
|
||||
"""Select the best ROM and return it with the rest to delete."""
|
||||
sorted_roms = sorted(scored_roms, key=lambda x: x[1][3], reverse=True)
|
||||
best_rom, _ = sorted_roms[0]
|
||||
delete_roms = [rom for rom, _ in sorted_roms[1:]]
|
||||
return best_rom, delete_roms
|
||||
|
||||
|
||||
def print_rom_info(rom: Dict, score: Tuple[int, int, int, int], prefix: str = " "):
|
||||
"""Print ROM information."""
|
||||
print(f"{prefix}Name: {rom.get('name', 'Unknown')} (ID: {rom['id']})")
|
||||
print(f"{prefix} Region: {rom.get('regions', [])}")
|
||||
print(f"{prefix} Languages: {rom.get('languages', [])}")
|
||||
print(f"{prefix} Score: {score}")
|
||||
print(f"{prefix} File: {rom.get('fs_name', 'N/A')}")
|
||||
|
||||
|
||||
def process_group(
|
||||
group_name: str, group: List[Dict], processed_ids: Set[int]
|
||||
) -> List[Dict]:
|
||||
"""Process a duplicate group and return ROMs to delete."""
|
||||
if len(group) <= 1:
|
||||
return []
|
||||
|
||||
group_ids = {r["id"] for r in group}
|
||||
if group_ids & processed_ids:
|
||||
return []
|
||||
|
||||
scored_roms = score_roms(group)
|
||||
best_rom, delete_roms = select_best_rom(scored_roms)
|
||||
|
||||
print(f"Group: {group_name}")
|
||||
print_rom_info(best_rom, scored_roms[0][1], " Keeping: ")
|
||||
|
||||
for rom in delete_roms:
|
||||
score = rom_preference_score(rom)
|
||||
print_rom_info(rom, score, " Delete: ")
|
||||
processed_ids.add(rom["id"])
|
||||
|
||||
print()
|
||||
return delete_roms
|
||||
|
||||
|
||||
def get_kept_roms(groups: Dict[str, List[Dict]]) -> List[Dict]:
|
||||
"""Get the ROMs that were kept (best from each group)."""
|
||||
kept = []
|
||||
for group in groups.values():
|
||||
if len(group) <= 1:
|
||||
continue
|
||||
scored_roms = score_roms(group)
|
||||
best_rom, _ = select_best_rom(scored_roms)
|
||||
kept.append(best_rom)
|
||||
return kept
|
||||
|
||||
|
||||
def recommend_roms_to_delete(roms: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
|
||||
"""Find and recommend ROMs to delete based on duplicate analysis.
|
||||
Returns: (roms_to_delete, roms_kept)
|
||||
"""
|
||||
groups = find_duplicates(roms)
|
||||
processed_ids = set()
|
||||
|
||||
print(f"\nFound {len(groups)} duplicate groups\n")
|
||||
|
||||
to_delete = list(
|
||||
chain.from_iterable(
|
||||
process_group(group_name, group, processed_ids)
|
||||
for group_name, group in groups.items()
|
||||
)
|
||||
)
|
||||
|
||||
kept = get_kept_roms(groups)
|
||||
|
||||
return to_delete, kept
|
||||
|
||||
|
||||
def delete_roms(session: requests.Session, base: str, rom_ids: List[int]) -> Dict:
|
||||
"""Delete ROMs using the API."""
|
||||
url = f"{base}/api/roms/delete"
|
||||
payload = {"roms": rom_ids, "delete_from_fs": rom_ids}
|
||||
r = session.post(url, json=payload)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def format_rom_list(roms: List[Dict]) -> str:
|
||||
"""Format a list of ROMs for display."""
|
||||
if not roms:
|
||||
return " (none)"
|
||||
return "\n".join(
|
||||
f" - {rom.get('name', 'Unknown')} (ID: {rom['id']}) - {rom.get('fs_name', 'N/A')}"
|
||||
for rom in sorted(roms, key=lambda r: r.get("name", ""))
|
||||
)
|
||||
|
||||
|
||||
def format_summary(
|
||||
roms: List[Dict],
|
||||
roms_to_delete: List[Dict],
|
||||
roms_kept: List[Dict],
|
||||
) -> str:
|
||||
"""Format the summary output."""
|
||||
delete_ids = [r["id"] for r in roms_to_delete]
|
||||
separator = "=" * 60
|
||||
|
||||
return f"""
|
||||
{separator}
|
||||
SUMMARY
|
||||
{separator}
|
||||
Total ROMs analyzed: {len(roms)}
|
||||
ROMs recommended for deletion: {len(roms_to_delete)}
|
||||
ROMs kept (originals): {len(roms_kept)}
|
||||
|
||||
{separator}
|
||||
ROM IDs to delete: {delete_ids}
|
||||
{separator}
|
||||
"""
|
||||
|
||||
|
||||
def format_deletion_results(deleted: List[Dict], kept: List[Dict]) -> str:
|
||||
"""Format the results after deletion."""
|
||||
separator = "=" * 60
|
||||
|
||||
return f"""
|
||||
{separator}
|
||||
DELETION RESULTS
|
||||
{separator}
|
||||
|
||||
ROMs DELETED ({len(deleted)}):
|
||||
{format_rom_list(deleted)}
|
||||
|
||||
ROMs KEPT (originals) ({len(kept)}):
|
||||
{format_rom_list(kept)}
|
||||
|
||||
{separator}
|
||||
"""
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
roms_to_delete, roms_kept = recommend_roms_to_delete(roms)
|
||||
print(format_summary(roms, roms_to_delete, roms_kept, base))
|
||||
|
||||
if roms_to_delete:
|
||||
delete_ids = [r["id"] for r in roms_to_delete]
|
||||
print(f"Deleting {len(roms_to_delete)} ROMs...")
|
||||
try:
|
||||
result = delete_roms(session, base, delete_ids)
|
||||
print("✓ Deletion successful!")
|
||||
print(format_deletion_results(roms_to_delete, roms_kept))
|
||||
except Exception as e:
|
||||
print(f"✗ Error during deletion: {e}")
|
||||
print("ROMs were NOT deleted. Please check the error above.")
|
||||
else:
|
||||
print("No ROMs to delete.")
|
||||
Reference in New Issue
Block a user