#!/usr/bin/env python3 """ lidarr_cleanup_singles Identifies single-track releases that can safely be removed because the same track already exists on a full album in Lidarr. """ import argparse import logging import os import subprocess import sys from collections import defaultdict from typing import Dict, List, Optional, Tuple, Union import requests from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) def normalize_title(title: str) -> str: """Normalize a track title for comparison""" return " ".join(title.lower().split()) def get_json( url: str, headers: Dict[str, str], params: Optional[Dict[str, object]] = None ) -> List[Dict]: """Wrapper around requests.get with basic error handling""" try: resp = requests.get(url, headers=headers, params=params, timeout=60) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: logger.error(f"Error fetching {url}: {e}") raise def get_trackfile_info( base_url: str, track_file_id: int, headers: Dict[str, str] ) -> Optional[Dict]: """Get trackfile information including file path and quality""" try: resp = requests.get( f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}", headers=headers, timeout=30, ) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: logger.warning(f"Could not fetch trackfile {track_file_id}: {e}") return None def get_track_info( base_url: str, track_id: int, headers: Dict[str, str] ) -> Optional[Dict]: """Get track information including MusicBrainz recording ID""" try: resp = requests.get( f"{base_url.rstrip('/')}/api/v1/track/{track_id}", headers=headers, timeout=30, ) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: logger.warning(f"Could not fetch track {track_id}: {e}") return None def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str: """Map Docker container path to host path""" if not docker_mount: return file_path container_path, host_path = docker_mount.split(":", 1) if not file_path.startswith(container_path): return file_path return file_path.replace(container_path, host_path, 1) def get_file_hash( file_path: str, docker_mount: Optional[str] = None, bytes_to_read: int = 1048576 ) -> Optional[str]: """Get partial file hash (first N bytes) for quick exact duplicate detection""" mapped_path = map_docker_path(file_path, docker_mount) if not os.path.exists(mapped_path): return None try: import hashlib hasher = hashlib.md5() with open(mapped_path, "rb") as f: chunk = f.read(bytes_to_read) hasher.update(chunk) return hasher.hexdigest() except Exception as e: logger.debug(f"Could not compute hash for {mapped_path}: {e}") return None def get_audio_fingerprint( file_path: str, docker_mount: Optional[str] = None ) -> Optional[Tuple[str, int]]: """Generate audio fingerprint using fpcalc (chromaprint). Returns (fingerprint, duration_seconds)""" mapped_path = map_docker_path(file_path, docker_mount) logger.debug(f"Generating fingerprint for: {mapped_path} (original: {file_path})") if not os.path.exists(mapped_path): logger.warning(f"File not found: {mapped_path} (original: {file_path})") return None try: logger.debug(f"Running fpcalc on: {mapped_path}") result = subprocess.run( ["fpcalc", "-json", "-length", "180", mapped_path], capture_output=True, text=True, timeout=60, check=False, ) if result.returncode != 0: logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}") return None import json try: data = json.loads(result.stdout) except json.JSONDecodeError as e: logger.warning(f"Failed to parse fpcalc JSON output for {mapped_path}: {e}") return None fingerprint = data.get("fingerprint") duration = data.get("duration") if not fingerprint or duration is None: logger.warning( f"fpcalc output missing fingerprint or duration for {mapped_path}" ) return None # Fingerprint is already a string in JSON format, no conversion needed logger.debug( f"Successfully generated fingerprint for {mapped_path} (duration: {duration}s)" ) return fingerprint, duration except subprocess.TimeoutExpired: logger.warning(f"fpcalc timeout for {mapped_path}") return None except FileNotFoundError: logger.warning( "fpcalc not found. Install chromaprint to enable audio verification." ) return None except Exception as e: logger.warning(f"Error generating fingerprint for {mapped_path}: {e}") return None def _format_context(log_context: Optional[str]) -> str: """Format log context message""" return f" ({log_context})" if log_context else "" def compare_fingerprints( fp1_data: Optional[Tuple[str, int]], fp2_data: Optional[Tuple[str, int]], log_context: Optional[str] = None, return_message: bool = False, ) -> Union[bool, Tuple[bool, str]]: """Compare two audio fingerprints for similarity. Returns match or (match, message) if return_message=True""" if not fp1_data or not fp2_data: message = "Fingerprint comparison failed: missing fingerprint" logger.debug(f"{message}{_format_context(log_context)}") return (False, message) if return_message else False fp1, duration1 = fp1_data fp2, duration2 = fp2_data duration_diff = abs(duration1 - duration2) if duration_diff > 5: message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)" logger.debug(f"{message}{_format_context(log_context)}") return (False, message) if return_message else False if fp1 == fp2: message = "Fingerprint comparison: exact match" logger.debug(f"{message}{_format_context(log_context)}") return (True, message) if return_message else True # Fingerprints are base64-encoded strings from fpcalc -json # For similarity, we can use simple string similarity metrics try: # Calculate similarity based on string edit distance from difflib import SequenceMatcher # Use SequenceMatcher for string similarity similarity = SequenceMatcher(None, fp1, fp2).ratio() # Adjust threshold based on duration difference if duration_diff <= 1: threshold = 0.90 elif duration_diff <= 3: threshold = 0.93 else: threshold = 0.95 match = similarity >= threshold message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}" logger.debug(f"{message}{_format_context(log_context)}") return (match, message) if return_message else match except Exception as e: message = ( f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}" ) logger.debug(f"{message}{_format_context(log_context)}") return (False, message) if return_message else False # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY # def delete_track_file(base_url: str, track_file_id: int, headers: Dict[str, str]) -> None: # """Delete a track file by ID""" # delete_url = f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}" # resp = requests.delete(delete_url, headers=headers, timeout=60) # resp.raise_for_status() def build_album_track_map( base_url: str, headers: Dict[str, str], albums: List[Dict] ) -> Dict[Tuple[int, str], List[Dict]]: """Create a mapping of tracks present on full albums""" album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list) album_albums = list( filter(lambda album: album.get("albumType", "").lower() == "album", albums) ) for album in album_albums: album_id = album.get("id") artist_id = album.get("artistId") album_title = album.get("title", "Unknown") if not album_id or not artist_id: continue tracks = get_json( f"{base_url.rstrip('/')}/api/v1/track", headers, params={"albumId": album_id}, ) tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) for track in tracks_with_files: title = track.get("title") track_id = track.get("id") track_file_id = track.get("trackFileId") if not title or not track_file_id or not track_id: continue key = (artist_id, normalize_title(title)) album_track_map[key].append( { "album_id": album_id, "album_title": album_title, "track_id": track_id, "track_file_id": track_file_id, } ) return album_track_map def get_file_properties( file_path: str, docker_mount: Optional[str] = None ) -> Optional[Dict]: """Get audio file properties using ffprobe""" mapped_path = map_docker_path(file_path, docker_mount) if not os.path.exists(mapped_path): return None try: import json result = subprocess.run( [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", mapped_path, ], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return None data = json.loads(result.stdout) audio_stream = next( (s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None ) format_info = data.get("format", {}) if not audio_stream: return None return { "duration": float(format_info.get("duration", 0)), "size": int(format_info.get("size", 0)), "bitrate": int(format_info.get("bit_rate", 0)), "sample_rate": int(audio_stream.get("sample_rate", 0)), "channels": int(audio_stream.get("channels", 0)), "codec": audio_stream.get("codec_name", ""), "bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)), } except Exception as e: logger.debug(f"Could not get file properties for {mapped_path}: {e}") return None def verify_audio_match( base_url: str, headers: Dict[str, str], single_track_id: int, single_track_file_id: int, album_track_id: int, album_track_file_id: int, docker_mount: Optional[str] = None, ) -> Tuple[bool, Optional[str], int]: """ Verify that two track files contain the same audio using multiple verification methods. Returns (match, result_message, confidence_score) Confidence: 0-100, where 100 = definitely same, 0 = definitely different """ logger.debug( f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" ) confidence_score = 0 verification_results = [] # Verification 1: MusicBrainz Recording ID (most reliable - 50 points) single_track_info = get_track_info(base_url, single_track_id, headers) album_track_info = get_track_info(base_url, album_track_id, headers) if single_track_info and album_track_info: single_mb_id = single_track_info.get("foreignRecordingId") album_mb_id = album_track_info.get("foreignRecordingId") if single_mb_id and album_mb_id: if single_mb_id == album_mb_id: confidence_score += 50 verification_results.append("✓ MusicBrainz Recording ID match (+50)") else: confidence_score -= 30 verification_results.append( "✗ Different MusicBrainz Recording IDs (-30)" ) else: verification_results.append("⚠ MusicBrainz Recording ID unavailable") # Verification 2: File Properties (duration, size, bitrate, codec - 50 points) single_file_info = get_trackfile_info(base_url, single_track_file_id, headers) album_file_info = get_trackfile_info(base_url, album_track_file_id, headers) if not single_file_info or not album_file_info: return False, "Could not fetch track file info", 0 single_path = single_file_info.get("path") album_path = album_file_info.get("path") if not single_path or not album_path: return False, "Missing file paths", 0 # Verification 1.5: Lidarr quality profile comparison (10 points) single_quality = single_file_info.get("quality", {}).get("quality", {}) album_quality = album_file_info.get("quality", {}).get("quality", {}) if single_quality and album_quality: single_quality_name = single_quality.get("name", "") album_quality_name = album_quality.get("name", "") if single_quality_name and album_quality_name: if single_quality_name == album_quality_name: confidence_score += 10 verification_results.append( f"✓ Same quality ({single_quality_name}) (+10)" ) else: verification_results.append( f"⚠ Different quality ({single_quality_name} vs {album_quality_name})" ) single_props = get_file_properties(single_path, docker_mount) album_props = get_file_properties(album_path, docker_mount) if single_props and album_props: # Duration check (15 points) duration_diff = abs(single_props["duration"] - album_props["duration"]) if duration_diff <= 1: confidence_score += 15 verification_results.append( f"✓ Duration match ({duration_diff:.1f}s diff) (+15)" ) elif duration_diff <= 3: confidence_score += 5 verification_results.append( f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)" ) else: confidence_score -= 10 verification_results.append( f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)" ) # File size check (15 points) size_ratio = min(single_props["size"], album_props["size"]) / max( single_props["size"], album_props["size"] ) if size_ratio >= 0.95: confidence_score += 15 verification_results.append(f"✓ File size match ({size_ratio:.2%}) (+15)") elif size_ratio >= 0.85: confidence_score += 5 verification_results.append(f"⚠ Similar file size ({size_ratio:.2%}) (+5)") else: verification_results.append(f"⚠ Different file sizes ({size_ratio:.2%})") # Bitrate check (10 points) if single_props["bitrate"] > 0 and album_props["bitrate"] > 0: bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max( single_props["bitrate"], album_props["bitrate"] ) if bitrate_ratio >= 0.90: confidence_score += 10 verification_results.append( f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)" ) # Sample rate check (5 points) if single_props["sample_rate"] == album_props["sample_rate"]: confidence_score += 5 verification_results.append( f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)" ) # Codec check (5 points) if single_props["codec"] and album_props["codec"]: if single_props["codec"] == album_props["codec"]: confidence_score += 5 verification_results.append( f"✓ Same codec ({single_props['codec']}) (+5)" ) else: verification_results.append( f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})" ) # Channels check (5 points) if single_props["channels"] == album_props["channels"]: confidence_score += 5 verification_results.append( f"✓ Same channels ({single_props['channels']}) (+5)" ) else: verification_results.append( f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})" ) # Bit depth check (5 points) - helps identify remasters if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0: if single_props["bit_depth"] == album_props["bit_depth"]: confidence_score += 5 verification_results.append( f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)" ) else: verification_results.append( f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)" ) # Verification 3: File hash comparison (30 points) - quick exact duplicate check single_hash = get_file_hash(single_path, docker_mount) album_hash = get_file_hash(album_path, docker_mount) if single_hash and album_hash: if single_hash == album_hash: confidence_score += 30 verification_results.append(f"✓ File hash match (exact duplicate) (+30)") else: verification_results.append(f"⚠ Different file hashes") # Verification 4: Chromaprint fingerprint (20 points) single_fp = get_audio_fingerprint(single_path, docker_mount) album_fp = get_audio_fingerprint(album_path, docker_mount) if single_fp and album_fp: log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" fp_match, fp_message = compare_fingerprints( single_fp, album_fp, log_context, return_message=True ) if fp_match: confidence_score += 20 verification_results.append(f"✓ Audio fingerprint match (+20)") else: verification_results.append(f"⚠ {fp_message}") else: verification_results.append("⚠ Audio fingerprint unavailable") # Final decision match = confidence_score >= 70 result_message = f"Confidence: {confidence_score}/100 | " + " | ".join( verification_results ) return match, result_message, confidence_score def find_duplicate_singles( base_url: str, headers: Dict[str, str], albums: List[Dict], album_track_map: Dict[Tuple[int, str], List[Dict]], verify_audio: bool = True, docker_mount: Optional[str] = None, ) -> List[Dict]: """Identify single tracks that duplicate album tracks""" duplicates: List[Dict] = [] single_albums = list( filter(lambda album: album.get("albumType", "").lower() == "single", albums) ) for album in single_albums: album_id = album.get("id") artist_id = album.get("artistId") album_title = album.get("title", "") if not album_id or not artist_id: continue tracks = get_json( f"{base_url.rstrip('/')}/api/v1/track", headers, params={"albumId": album_id}, ) tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) for track in tracks_with_files: title = track.get("title") track_id = track.get("id") track_file_id = track.get("trackFileId") if not title or not track_file_id or not track_id: continue key = (artist_id, normalize_title(title)) if key not in album_track_map: continue duplicate_albums = album_track_map[key] if not duplicate_albums: continue if not verify_audio: duplicates.append( { "artist_id": artist_id, "single_album_id": album_id, "single_album_title": album_title, "track_title": title, "single_track_file_id": track_file_id, "duplicate_albums": duplicate_albums, "verified_albums": duplicate_albums, "verification_results": ["Audio verification disabled"], "confidence_scores": [0], } ) continue logger.debug( f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..." ) verified_albums = [] verification_results = [] confidence_scores = [] for album_track in duplicate_albums: album_track_id = album_track["track_id"] album_track_file_id = album_track["track_file_id"] match, result_message, confidence = verify_audio_match( base_url, headers, track_id, track_file_id, album_track_id, album_track_file_id, docker_mount, ) verification_results.append(result_message) confidence_scores.append(confidence) if not match: logger.debug( f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" ) continue verified_albums.append(album_track) logger.debug( f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" ) duplicates.append( { "artist_id": artist_id, "single_album_id": album_id, "single_album_title": album_title, "track_title": title, "single_track_file_id": track_file_id, "duplicate_albums": duplicate_albums, "verified_albums": verified_albums, "verification_results": verification_results, "confidence_scores": confidence_scores, } ) return duplicates def main() -> None: parser = argparse.ArgumentParser( description="Identify single tracks that are duplicates of album tracks in Lidarr." ) parser.add_argument( "--base-url", default=os.getenv("LIDARR_URL"), help="Base URL of the Lidarr instance (e.g. https://music.example.org). Can also be set via LIDARR_URL env var.", ) parser.add_argument( "--api-key", default=os.getenv("LIDARR_API_KEY"), help="API key for Lidarr with sufficient permissions. Can also be set via LIDARR_API_KEY env var.", ) parser.add_argument( "--no-audio-verify", action="store_true", help="Skip audio fingerprint verification (faster but less accurate)", ) parser.add_argument( "--docker-mount", default=os.getenv("DOCKER_MOUNT"), help="Docker mount mapping in format 'container_path:host_path' (e.g. '/music:/srv/pool/multimedia/media/Music'). Can also be set via DOCKER_MOUNT env var.", ) parser.add_argument( "--debug", action="store_true", help="Enable debug logging", ) # DELETE FLAG COMMENTED OUT FOR SAFETY # parser.add_argument( # "--delete", # action="store_true", # help="If set, delete the duplicate single track files instead of just listing them", # ) # parser.add_argument( # "--force", # action="store_true", # help="If set together with --delete, do not prompt for confirmation before deletion", # ) args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format="[%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) if not args.base_url: logger.error( "LIDARR_URL not set. Provide --base-url or set LIDARR_URL environment variable." ) sys.exit(1) if not args.api_key: logger.error( "LIDARR_API_KEY not set. Provide --api-key or set LIDARR_API_KEY environment variable." ) sys.exit(1) base_url = args.base_url.rstrip("/") headers = {"X-Api-Key": args.api_key} logger.info("Fetching artists...") artists = get_json(f"{base_url}/api/v1/artist", headers) if not artists: logger.warning("No artists found. Exiting.") return artist_map = { artist.get("id"): artist.get("artistName", "Unknown") for artist in artists if artist.get("id") } logger.info("Fetching albums for each artist...") albums: List[Dict] = [] for artist in artists: artist_id = artist.get("id") if not artist_id: continue albums.extend( get_json( f"{base_url}/api/v1/album", headers, params={"artistId": artist_id} ) ) if not albums: logger.warning("No albums found in the library.") return logger.info("Building album track map...") album_track_map = build_album_track_map(base_url, headers, albums) verify_audio = not args.no_audio_verify docker_mount = args.docker_mount if args.docker_mount else None if not verify_audio: logger.info( "Scanning for duplicate singles (audio verification disabled - using title matching only)..." ) else: mount_msg = f" (Docker mount: {docker_mount})" if docker_mount else "" logger.info( f"Scanning for duplicate singles with audio verification{mount_msg}..." ) logger.info( "NOTE: Audio verification requires 'fpcalc' (chromaprint) to be installed" ) duplicates = find_duplicate_singles( base_url, headers, albums, album_track_map, verify_audio=verify_audio, docker_mount=docker_mount, ) if not duplicates: logger.info("No duplicate singles found. The library appears clean.") return verified_count = sum(1 for dup in duplicates if dup.get("verified_albums")) logger.info( f"Found {len(duplicates)} single track(s) that are duplicates of album tracks ({verified_count} verified by audio fingerprint):" ) for dup in duplicates: artist_id = dup["artist_id"] artist_name = artist_map.get(artist_id, f"Unknown (ID: {artist_id})") duplicate_albums = dup["duplicate_albums"] verified_albums = dup.get("verified_albums", duplicate_albums) verification_results = dup.get("verification_results", []) confidence_scores = dup.get("confidence_scores", []) album_names = [album["album_title"] for album in duplicate_albums] logger.info(f"Artist: {artist_name}") logger.info(f" Single: '{dup['single_album_title']}'") logger.info( f" Track: '{dup['track_title']}' (trackFileId: {dup['single_track_file_id']})" ) for i, result in enumerate(verification_results): confidence = confidence_scores[i] if i < len(confidence_scores) else 0 logger.info(f" {result}") logger.info( f" Already present on {len(duplicate_albums)} album(s): {', '.join(album_names)}" ) if verify_audio: if verified_albums: verified_names = [album["album_title"] for album in verified_albums] logger.info( f" ✓ LIKELY safe to delete (audio verified on {len(verified_albums)} album(s): {', '.join(verified_names)})" ) logger.info( f" ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)" ) else: logger.info(f" ⚠ NOT safe to delete (audio verification failed)") logger.info("") # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY # if args.delete: # if not args.force: # confirm = input( # f"\nAre you sure you want to delete these {len(duplicates)} single track file(s)? [y/N]: " # ).strip().lower() # if confirm not in ("y", "yes"): # logger.info("Aborting deletion.") # return # logger.info("Deleting duplicate single track files...") # for dup in duplicates: # track_file_id = dup["single_track_file_id"] # try: # delete_track_file(base_url, track_file_id, headers) # logger.info( # f"Deleted trackFileId {track_file_id} (track '{dup['track_title']}' from single '{dup['single_album_title']}')." # ) # except Exception as exc: # logger.error(f"Failed to delete trackFileId {track_file_id}: {exc}") if __name__ == "__main__": main()