#!/usr/bin/env python3 """ lidarr_cleanup_singles Identifies single-track releases that can safely be removed because the same track already exists on a full album in Lidarr. """ import argparse import logging import os import subprocess import sys from collections import defaultdict from typing import Dict, List, Optional, Tuple import requests from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) def normalize_title(title: str) -> str: """Normalize a track title for comparison""" return " ".join(title.lower().split()) def get_json( url: str, headers: Dict[str, str], params: Optional[Dict[str, object]] = None ) -> List[Dict]: """Wrapper around requests.get with basic error handling""" try: resp = requests.get(url, headers=headers, params=params, timeout=60) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: logger.error(f"Error fetching {url}: {e}") raise def get_trackfile_info( base_url: str, track_file_id: int, headers: Dict[str, str] ) -> Optional[Dict]: """Get trackfile information including file path""" try: resp = requests.get( f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}", headers=headers, timeout=30, ) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: logger.warning(f"Could not fetch trackfile {track_file_id}: {e}") return None def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str: """Map Docker container path to host path""" if not docker_mount: return file_path container_path, host_path = docker_mount.split(":", 1) if file_path.startswith(container_path): mapped_path = file_path.replace(container_path, host_path, 1) return mapped_path return file_path def get_audio_fingerprint( file_path: str, docker_mount: Optional[str] = None ) -> Optional[str]: """Generate audio fingerprint using fpcalc (chromaprint)""" mapped_path = map_docker_path(file_path, docker_mount) logger.info(f"Generating fingerprint for: {mapped_path} (original: {file_path})") if not os.path.exists(mapped_path): logger.warning(f"File not found: {mapped_path} (original: {file_path})") return None try: logger.debug(f"Running fpcalc on: {mapped_path}") result = subprocess.run( ["fpcalc", "-raw", mapped_path], capture_output=True, text=True, timeout=60, check=False, ) if result.returncode != 0: logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}") return None for line in result.stdout.split("\n"): if line.startswith("FINGERPRINT="): fingerprint = line.split("=", 1)[1] logger.info( f"Successfully generated fingerprint for {mapped_path} (length: {len(fingerprint)})" ) return fingerprint logger.warning(f"fpcalc output did not contain FINGERPRINT= for {mapped_path}") return None except subprocess.TimeoutExpired: logger.warning(f"fpcalc timeout for {mapped_path}") return None except FileNotFoundError: logger.warning( "fpcalc not found. Install chromaprint to enable audio verification." ) return None except Exception as e: logger.warning(f"Error generating fingerprint for {mapped_path}: {e}") return None def compare_fingerprints( fp1: Optional[str], fp2: Optional[str], log_context: Optional[str] = None ) -> bool: """Compare two audio fingerprints for similarity""" if not fp1 or not fp2: context_msg = f" ({log_context})" if log_context else "" logger.debug(f"Fingerprint comparison failed: missing fingerprint{context_msg}") return False if fp1 == fp2: context_msg = f" ({log_context})" if log_context else "" logger.info(f"Fingerprint comparison: exact match{context_msg}") return True def decode_fingerprint(fp: str) -> List[int]: return [int(x) for x in fp.split(",") if x.strip()] try: f1 = decode_fingerprint(fp1) f2 = decode_fingerprint(fp2) if not f1 or not f2: context_msg = f" ({log_context})" if log_context else "" logger.debug( f"Fingerprint comparison failed: empty decoded fingerprint{context_msg}" ) return False min_len = min(len(f1), len(f2)) if min_len == 0: context_msg = f" ({log_context})" if log_context else "" logger.debug(f"Fingerprint comparison failed: zero length{context_msg}") return False matches = sum(1 for i in range(min_len) if f1[i] == f2[i]) similarity = matches / min_len match = similarity >= 0.95 context_msg = f" ({log_context})" if log_context else "" logger.info( f"Fingerprint comparison: similarity={similarity:.3f}, match={match}{context_msg}" ) return match except (ValueError, ZeroDivisionError) as e: context_msg = f" ({log_context})" if log_context else "" logger.debug( f"Fingerprint comparison failed: exception {type(e).__name__}{context_msg}" ) return False # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY # def delete_track_file(base_url: str, track_file_id: int, headers: Dict[str, str]) -> None: # """Delete a track file by ID""" # delete_url = f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}" # resp = requests.delete(delete_url, headers=headers, timeout=60) # resp.raise_for_status() def build_album_track_map( base_url: str, headers: Dict[str, str], albums: List[Dict] ) -> Dict[Tuple[int, str], List[Dict]]: """Create a mapping of tracks present on full albums""" album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list) album_albums = list( filter(lambda album: album.get("albumType", "").lower() == "album", albums) ) for album in album_albums: album_id = album.get("id") artist_id = album.get("artistId") album_title = album.get("title", "Unknown") if not album_id or not artist_id: continue tracks = get_json( f"{base_url.rstrip('/')}/api/v1/track", headers, params={"albumId": album_id}, ) tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) for track in tracks_with_files: title = track.get("title") track_file_id = track.get("trackFileId") if not title or not track_file_id: continue key = (artist_id, normalize_title(title)) album_track_map[key].append( { "album_id": album_id, "album_title": album_title, "track_file_id": track_file_id, } ) return album_track_map def verify_audio_match( base_url: str, headers: Dict[str, str], single_track_file_id: int, album_track_file_id: int, docker_mount: Optional[str] = None, ) -> bool: """Verify that two track files contain the same audio""" logger.info( f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" ) single_file_info = get_trackfile_info(base_url, single_track_file_id, headers) album_file_info = get_trackfile_info(base_url, album_track_file_id, headers) if not single_file_info or not album_file_info: logger.debug( f"Could not fetch track file info: single={single_file_info is not None}, album={album_file_info is not None}" ) return False single_path = single_file_info.get("path") album_path = album_file_info.get("path") if not single_path or not album_path: logger.debug( f"Missing file paths: single_path={single_path is not None}, album_path={album_path is not None}" ) return False logger.info(f"Fetching fingerprints: single={single_path}, album={album_path}") single_fp = get_audio_fingerprint(single_path, docker_mount) album_fp = get_audio_fingerprint(album_path, docker_mount) log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" return compare_fingerprints(single_fp, album_fp, log_context) def find_duplicate_singles( base_url: str, headers: Dict[str, str], albums: List[Dict], album_track_map: Dict[Tuple[int, str], List[Dict]], verify_audio: bool = True, docker_mount: Optional[str] = None, ) -> List[Dict]: """Identify single tracks that duplicate album tracks""" duplicates: List[Dict] = [] single_albums = list( filter(lambda album: album.get("albumType", "").lower() == "single", albums) ) for album in single_albums: album_id = album.get("id") artist_id = album.get("artistId") album_title = album.get("title", "") if not album_id or not artist_id: continue tracks = get_json( f"{base_url.rstrip('/')}/api/v1/track", headers, params={"albumId": album_id}, ) tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) for track in tracks_with_files: title = track.get("title") track_file_id = track.get("trackFileId") if not title or not track_file_id: continue key = (artist_id, normalize_title(title)) if key not in album_track_map: continue duplicate_albums = album_track_map[key] verified_albums = [] if verify_audio: logger.info( f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..." ) for album_track in duplicate_albums: album_track_file_id = album_track["track_file_id"] if verify_audio_match( base_url, headers, track_file_id, album_track_file_id, docker_mount, ): verified_albums.append(album_track) logger.debug( f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id}" ) else: logger.debug( f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id}" ) else: verified_albums = duplicate_albums if verified_albums: duplicates.append( { "artist_id": artist_id, "single_album_id": album_id, "single_album_title": album_title, "track_title": title, "single_track_file_id": track_file_id, "duplicate_albums": verified_albums, } ) return duplicates def main() -> None: parser = argparse.ArgumentParser( description="Identify single tracks that are duplicates of album tracks in Lidarr." ) parser.add_argument( "--base-url", default=os.getenv("LIDARR_URL"), help="Base URL of the Lidarr instance (e.g. https://music.example.org). Can also be set via LIDARR_URL env var.", ) parser.add_argument( "--api-key", default=os.getenv("LIDARR_API_KEY"), help="API key for Lidarr with sufficient permissions. Can also be set via LIDARR_API_KEY env var.", ) parser.add_argument( "--no-audio-verify", action="store_true", help="Skip audio fingerprint verification (faster but less accurate)", ) parser.add_argument( "--docker-mount", default=os.getenv("DOCKER_MOUNT"), help="Docker mount mapping in format 'container_path:host_path' (e.g. '/music:/srv/pool/multimedia/media/Music'). Can also be set via DOCKER_MOUNT env var.", ) parser.add_argument( "--debug", action="store_true", help="Enable debug logging", ) # DELETE FLAG COMMENTED OUT FOR SAFETY # parser.add_argument( # "--delete", # action="store_true", # help="If set, delete the duplicate single track files instead of just listing them", # ) # parser.add_argument( # "--force", # action="store_true", # help="If set together with --delete, do not prompt for confirmation before deletion", # ) args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format="[%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) if not args.base_url: logger.error( "LIDARR_URL not set. Provide --base-url or set LIDARR_URL environment variable." ) sys.exit(1) if not args.api_key: logger.error( "LIDARR_API_KEY not set. Provide --api-key or set LIDARR_API_KEY environment variable." ) sys.exit(1) base_url = args.base_url.rstrip("/") headers = {"X-Api-Key": args.api_key} logger.info("Fetching artists...") artists = get_json(f"{base_url}/api/v1/artist", headers) if not artists: logger.warning("No artists found. Exiting.") return artist_map = { artist.get("id"): artist.get("artistName", "Unknown") for artist in artists if artist.get("id") } logger.info("Fetching albums for each artist...") albums: List[Dict] = [] for artist in artists: artist_id = artist.get("id") if not artist_id: continue artist_albums = get_json( f"{base_url}/api/v1/album", headers, params={"artistId": artist_id}, ) albums.extend(artist_albums) if not albums: logger.warning("No albums found in the library.") return logger.info("Building album track map...") album_track_map = build_album_track_map(base_url, headers, albums) verify_audio = not args.no_audio_verify docker_mount = args.docker_mount if args.docker_mount else None if verify_audio: if docker_mount: logger.info( f"Scanning for duplicate singles with audio verification (Docker mount: {docker_mount})..." ) else: logger.info("Scanning for duplicate singles with audio verification...") logger.info( "NOTE: Audio verification requires 'fpcalc' (chromaprint) to be installed" ) else: logger.info( "Scanning for duplicate singles (audio verification disabled - using title matching only)..." ) duplicates = find_duplicate_singles( base_url, headers, albums, album_track_map, verify_audio=verify_audio, docker_mount=docker_mount, ) if not duplicates: logger.info("No duplicate singles found. The library appears clean.") return logger.info( f"Found {len(duplicates)} single track(s) that are duplicates of album tracks:" ) for dup in duplicates: artist_id = dup["artist_id"] artist_name = artist_map.get(artist_id, f"Unknown (ID: {artist_id})") duplicate_albums = dup["duplicate_albums"] album_names = [album["album_title"] for album in duplicate_albums] logger.info(f"Artist: {artist_name}") logger.info(f" Single: '{dup['single_album_title']}'") logger.info( f" Track: '{dup['track_title']}' (trackFileId: {dup['single_track_file_id']})" ) logger.info( f" Already present on {len(duplicate_albums)} album(s): {', '.join(album_names)}" ) logger.info("") # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY # if args.delete: # if not args.force: # confirm = input( # f"\nAre you sure you want to delete these {len(duplicates)} single track file(s)? [y/N]: " # ).strip().lower() # if confirm not in ("y", "yes"): # logger.info("Aborting deletion.") # return # logger.info("Deleting duplicate single track files...") # for dup in duplicates: # track_file_id = dup["single_track_file_id"] # try: # delete_track_file(base_url, track_file_id, headers) # logger.info( # f"Deleted trackFileId {track_file_id} (track '{dup['track_title']}' from single '{dup['single_album_title']}')." # ) # except Exception as exc: # logger.error(f"Failed to delete trackFileId {track_file_id}: {exc}") if __name__ == "__main__": main()