diff --git a/flake.nix b/flake.nix index d0448b2..021de6f 100644 --- a/flake.nix +++ b/flake.nix @@ -52,11 +52,13 @@ )) pkgs.black pkgs.chromaprint + pkgs.ffmpeg ]; shellHook = '' echo "Python environment ready!" echo "Run: python src/main.py" echo "Format code with: black src/" + echo "Audio verification tools: ffprobe (ffmpeg), fpcalc (chromaprint)" ''; }; }; diff --git a/nix/package-cleanup.nix b/nix/package-cleanup.nix index 2f12281..829ccb5 100644 --- a/nix/package-cleanup.nix +++ b/nix/package-cleanup.nix @@ -12,9 +12,17 @@ pkgs.python3Packages.buildPythonApplication { requests python-dotenv ]; + + # Runtime dependencies for audio verification buildInputs = [ pkgs.chromaprint + pkgs.ffmpeg ]; + + makeWrapperArgs = [ + "--prefix PATH : ${pkgs.lib.makeBinPath [ pkgs.ffmpeg pkgs.chromaprint ]}" + ]; + meta = { mainProgram = "lidarr-cleanup-singles"; description = "Identify duplicate single tracks in Lidarr"; diff --git a/src-cleanup/main.py b/src-cleanup/main.py index e7aba9e..92906a2 100644 --- a/src-cleanup/main.py +++ b/src-cleanup/main.py @@ -12,7 +12,7 @@ import os import subprocess import sys from collections import defaultdict -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union import requests from dotenv import load_dotenv @@ -43,7 +43,7 @@ def get_json( def get_trackfile_info( base_url: str, track_file_id: int, headers: Dict[str, str] ) -> Optional[Dict]: - """Get trackfile information including file path""" + """Get trackfile information including file path and quality""" try: resp = requests.get( f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}", @@ -57,24 +57,62 @@ def get_trackfile_info( return None +def get_track_info( + base_url: str, track_id: int, headers: Dict[str, str] +) -> Optional[Dict]: + """Get track information including MusicBrainz recording ID""" + try: + resp = requests.get( + f"{base_url.rstrip('/')}/api/v1/track/{track_id}", + headers=headers, + timeout=30, + ) + resp.raise_for_status() + return resp.json() + except requests.exceptions.RequestException as e: + logger.warning(f"Could not fetch track {track_id}: {e}") + return None + + def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str: """Map Docker container path to host path""" if not docker_mount: return file_path container_path, host_path = docker_mount.split(":", 1) - if file_path.startswith(container_path): - mapped_path = file_path.replace(container_path, host_path, 1) - return mapped_path - return file_path + if not file_path.startswith(container_path): + return file_path + + return file_path.replace(container_path, host_path, 1) + + +def get_file_hash( + file_path: str, docker_mount: Optional[str] = None, bytes_to_read: int = 1048576 +) -> Optional[str]: + """Get partial file hash (first N bytes) for quick exact duplicate detection""" + mapped_path = map_docker_path(file_path, docker_mount) + if not os.path.exists(mapped_path): + return None + + try: + import hashlib + + hasher = hashlib.md5() + with open(mapped_path, "rb") as f: + chunk = f.read(bytes_to_read) + hasher.update(chunk) + return hasher.hexdigest() + except Exception as e: + logger.debug(f"Could not compute hash for {mapped_path}: {e}") + return None def get_audio_fingerprint( file_path: str, docker_mount: Optional[str] = None -) -> Optional[str]: - """Generate audio fingerprint using fpcalc (chromaprint)""" +) -> Optional[Tuple[str, int]]: + """Generate audio fingerprint using fpcalc (chromaprint). Returns (fingerprint, duration_seconds)""" mapped_path = map_docker_path(file_path, docker_mount) - logger.info(f"Generating fingerprint for: {mapped_path} (original: {file_path})") + logger.debug(f"Generating fingerprint for: {mapped_path} (original: {file_path})") if not os.path.exists(mapped_path): logger.warning(f"File not found: {mapped_path} (original: {file_path})") @@ -83,7 +121,7 @@ def get_audio_fingerprint( try: logger.debug(f"Running fpcalc on: {mapped_path}") result = subprocess.run( - ["fpcalc", "-raw", mapped_path], + ["fpcalc", "-json", "-length", "180", mapped_path], capture_output=True, text=True, timeout=60, @@ -93,15 +131,29 @@ def get_audio_fingerprint( logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}") return None - for line in result.stdout.split("\n"): - if line.startswith("FINGERPRINT="): - fingerprint = line.split("=", 1)[1] - logger.info( - f"Successfully generated fingerprint for {mapped_path} (length: {len(fingerprint)})" - ) - return fingerprint - logger.warning(f"fpcalc output did not contain FINGERPRINT= for {mapped_path}") - return None + import json + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse fpcalc JSON output for {mapped_path}: {e}") + return None + + fingerprint = data.get("fingerprint") + duration = data.get("duration") + + if not fingerprint or duration is None: + logger.warning( + f"fpcalc output missing fingerprint or duration for {mapped_path}" + ) + return None + + # Fingerprint is already a string in JSON format, no conversion needed + + logger.debug( + f"Successfully generated fingerprint for {mapped_path} (duration: {duration}s)" + ) + return fingerprint, duration except subprocess.TimeoutExpired: logger.warning(f"fpcalc timeout for {mapped_path}") return None @@ -115,55 +167,64 @@ def get_audio_fingerprint( return None +def _format_context(log_context: Optional[str]) -> str: + """Format log context message""" + return f" ({log_context})" if log_context else "" + + def compare_fingerprints( - fp1: Optional[str], fp2: Optional[str], log_context: Optional[str] = None -) -> bool: - """Compare two audio fingerprints for similarity""" - if not fp1 or not fp2: - context_msg = f" ({log_context})" if log_context else "" - logger.debug(f"Fingerprint comparison failed: missing fingerprint{context_msg}") - return False + fp1_data: Optional[Tuple[str, int]], + fp2_data: Optional[Tuple[str, int]], + log_context: Optional[str] = None, + return_message: bool = False, +) -> Union[bool, Tuple[bool, str]]: + """Compare two audio fingerprints for similarity. Returns match or (match, message) if return_message=True""" + if not fp1_data or not fp2_data: + message = "Fingerprint comparison failed: missing fingerprint" + logger.debug(f"{message}{_format_context(log_context)}") + return (False, message) if return_message else False + + fp1, duration1 = fp1_data + fp2, duration2 = fp2_data + + duration_diff = abs(duration1 - duration2) + if duration_diff > 5: + message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)" + logger.debug(f"{message}{_format_context(log_context)}") + return (False, message) if return_message else False if fp1 == fp2: - context_msg = f" ({log_context})" if log_context else "" - logger.info(f"Fingerprint comparison: exact match{context_msg}") - return True - - def decode_fingerprint(fp: str) -> List[int]: - return [int(x) for x in fp.split(",") if x.strip()] + message = "Fingerprint comparison: exact match" + logger.debug(f"{message}{_format_context(log_context)}") + return (True, message) if return_message else True + # Fingerprints are base64-encoded strings from fpcalc -json + # For similarity, we can use simple string similarity metrics try: - f1 = decode_fingerprint(fp1) - f2 = decode_fingerprint(fp2) + # Calculate similarity based on string edit distance + from difflib import SequenceMatcher - if not f1 or not f2: - context_msg = f" ({log_context})" if log_context else "" - logger.debug( - f"Fingerprint comparison failed: empty decoded fingerprint{context_msg}" - ) - return False + # Use SequenceMatcher for string similarity + similarity = SequenceMatcher(None, fp1, fp2).ratio() - min_len = min(len(f1), len(f2)) - if min_len == 0: - context_msg = f" ({log_context})" if log_context else "" - logger.debug(f"Fingerprint comparison failed: zero length{context_msg}") - return False + # Adjust threshold based on duration difference + if duration_diff <= 1: + threshold = 0.90 + elif duration_diff <= 3: + threshold = 0.93 + else: + threshold = 0.95 - matches = sum(1 for i in range(min_len) if f1[i] == f2[i]) - similarity = matches / min_len - match = similarity >= 0.95 - - context_msg = f" ({log_context})" if log_context else "" - logger.info( - f"Fingerprint comparison: similarity={similarity:.3f}, match={match}{context_msg}" + match = similarity >= threshold + message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}" + logger.debug(f"{message}{_format_context(log_context)}") + return (match, message) if return_message else match + except Exception as e: + message = ( + f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}" ) - return match - except (ValueError, ZeroDivisionError) as e: - context_msg = f" ({log_context})" if log_context else "" - logger.debug( - f"Fingerprint comparison failed: exception {type(e).__name__}{context_msg}" - ) - return False + logger.debug(f"{message}{_format_context(log_context)}") + return (False, message) if return_message else False # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY @@ -201,14 +262,16 @@ def build_album_track_map( for track in tracks_with_files: title = track.get("title") + track_id = track.get("id") track_file_id = track.get("trackFileId") - if not title or not track_file_id: + if not title or not track_file_id or not track_id: continue key = (artist_id, normalize_title(title)) album_track_map[key].append( { "album_id": album_id, "album_title": album_title, + "track_id": track_id, "track_file_id": track_file_id, } ) @@ -216,41 +279,254 @@ def build_album_track_map( return album_track_map +def get_file_properties( + file_path: str, docker_mount: Optional[str] = None +) -> Optional[Dict]: + """Get audio file properties using ffprobe""" + mapped_path = map_docker_path(file_path, docker_mount) + if not os.path.exists(mapped_path): + return None + + try: + import json + + result = subprocess.run( + [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_streams", + mapped_path, + ], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + return None + + data = json.loads(result.stdout) + audio_stream = next( + (s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None + ) + format_info = data.get("format", {}) + + if not audio_stream: + return None + + return { + "duration": float(format_info.get("duration", 0)), + "size": int(format_info.get("size", 0)), + "bitrate": int(format_info.get("bit_rate", 0)), + "sample_rate": int(audio_stream.get("sample_rate", 0)), + "channels": int(audio_stream.get("channels", 0)), + "codec": audio_stream.get("codec_name", ""), + "bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)), + } + except Exception as e: + logger.debug(f"Could not get file properties for {mapped_path}: {e}") + return None + + def verify_audio_match( base_url: str, headers: Dict[str, str], + single_track_id: int, single_track_file_id: int, + album_track_id: int, album_track_file_id: int, docker_mount: Optional[str] = None, -) -> bool: - """Verify that two track files contain the same audio""" - logger.info( +) -> Tuple[bool, Optional[str], int]: + """ + Verify that two track files contain the same audio using multiple verification methods. + Returns (match, result_message, confidence_score) + Confidence: 0-100, where 100 = definitely same, 0 = definitely different + """ + logger.debug( f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" ) + + confidence_score = 0 + verification_results = [] + + # Verification 1: MusicBrainz Recording ID (most reliable - 50 points) + single_track_info = get_track_info(base_url, single_track_id, headers) + album_track_info = get_track_info(base_url, album_track_id, headers) + + if single_track_info and album_track_info: + single_mb_id = single_track_info.get("foreignRecordingId") + album_mb_id = album_track_info.get("foreignRecordingId") + + if single_mb_id and album_mb_id: + if single_mb_id == album_mb_id: + confidence_score += 50 + verification_results.append("✓ MusicBrainz Recording ID match (+50)") + else: + confidence_score -= 30 + verification_results.append( + "✗ Different MusicBrainz Recording IDs (-30)" + ) + else: + verification_results.append("⚠ MusicBrainz Recording ID unavailable") + + # Verification 2: File Properties (duration, size, bitrate, codec - 50 points) single_file_info = get_trackfile_info(base_url, single_track_file_id, headers) album_file_info = get_trackfile_info(base_url, album_track_file_id, headers) if not single_file_info or not album_file_info: - logger.debug( - f"Could not fetch track file info: single={single_file_info is not None}, album={album_file_info is not None}" - ) - return False + return False, "Could not fetch track file info", 0 single_path = single_file_info.get("path") album_path = album_file_info.get("path") - if not single_path or not album_path: - logger.debug( - f"Missing file paths: single_path={single_path is not None}, album_path={album_path is not None}" - ) - return False + return False, "Missing file paths", 0 - logger.info(f"Fetching fingerprints: single={single_path}, album={album_path}") + # Verification 1.5: Lidarr quality profile comparison (10 points) + single_quality = single_file_info.get("quality", {}).get("quality", {}) + album_quality = album_file_info.get("quality", {}).get("quality", {}) + + if single_quality and album_quality: + single_quality_name = single_quality.get("name", "") + album_quality_name = album_quality.get("name", "") + + if single_quality_name and album_quality_name: + if single_quality_name == album_quality_name: + confidence_score += 10 + verification_results.append( + f"✓ Same quality ({single_quality_name}) (+10)" + ) + else: + verification_results.append( + f"⚠ Different quality ({single_quality_name} vs {album_quality_name})" + ) + + single_props = get_file_properties(single_path, docker_mount) + album_props = get_file_properties(album_path, docker_mount) + + if single_props and album_props: + # Duration check (15 points) + duration_diff = abs(single_props["duration"] - album_props["duration"]) + if duration_diff <= 1: + confidence_score += 15 + verification_results.append( + f"✓ Duration match ({duration_diff:.1f}s diff) (+15)" + ) + elif duration_diff <= 3: + confidence_score += 5 + verification_results.append( + f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)" + ) + else: + confidence_score -= 10 + verification_results.append( + f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)" + ) + + # File size check (15 points) + size_ratio = min(single_props["size"], album_props["size"]) / max( + single_props["size"], album_props["size"] + ) + if size_ratio >= 0.95: + confidence_score += 15 + verification_results.append(f"✓ File size match ({size_ratio:.2%}) (+15)") + elif size_ratio >= 0.85: + confidence_score += 5 + verification_results.append(f"⚠ Similar file size ({size_ratio:.2%}) (+5)") + else: + verification_results.append(f"⚠ Different file sizes ({size_ratio:.2%})") + + # Bitrate check (10 points) + if single_props["bitrate"] > 0 and album_props["bitrate"] > 0: + bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max( + single_props["bitrate"], album_props["bitrate"] + ) + if bitrate_ratio >= 0.90: + confidence_score += 10 + verification_results.append( + f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)" + ) + + # Sample rate check (5 points) + if single_props["sample_rate"] == album_props["sample_rate"]: + confidence_score += 5 + verification_results.append( + f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)" + ) + + # Codec check (5 points) + if single_props["codec"] and album_props["codec"]: + if single_props["codec"] == album_props["codec"]: + confidence_score += 5 + verification_results.append( + f"✓ Same codec ({single_props['codec']}) (+5)" + ) + else: + verification_results.append( + f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})" + ) + + # Channels check (5 points) + if single_props["channels"] == album_props["channels"]: + confidence_score += 5 + verification_results.append( + f"✓ Same channels ({single_props['channels']}) (+5)" + ) + else: + verification_results.append( + f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})" + ) + + # Bit depth check (5 points) - helps identify remasters + if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0: + if single_props["bit_depth"] == album_props["bit_depth"]: + confidence_score += 5 + verification_results.append( + f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)" + ) + else: + verification_results.append( + f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)" + ) + + # Verification 3: File hash comparison (30 points) - quick exact duplicate check + single_hash = get_file_hash(single_path, docker_mount) + album_hash = get_file_hash(album_path, docker_mount) + + if single_hash and album_hash: + if single_hash == album_hash: + confidence_score += 30 + verification_results.append(f"✓ File hash match (exact duplicate) (+30)") + else: + verification_results.append(f"⚠ Different file hashes") + + # Verification 4: Chromaprint fingerprint (20 points) single_fp = get_audio_fingerprint(single_path, docker_mount) album_fp = get_audio_fingerprint(album_path, docker_mount) - log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" - return compare_fingerprints(single_fp, album_fp, log_context) + if single_fp and album_fp: + log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" + fp_match, fp_message = compare_fingerprints( + single_fp, album_fp, log_context, return_message=True + ) + + if fp_match: + confidence_score += 20 + verification_results.append(f"✓ Audio fingerprint match (+20)") + else: + verification_results.append(f"⚠ {fp_message}") + else: + verification_results.append("⚠ Audio fingerprint unavailable") + + # Final decision + match = confidence_score >= 70 + result_message = f"Confidence: {confidence_score}/100 | " + " | ".join( + verification_results + ) + + return match, result_message, confidence_score def find_duplicate_singles( @@ -285,8 +561,9 @@ def find_duplicate_singles( for track in tracks_with_files: title = track.get("title") + track_id = track.get("id") track_file_id = track.get("trackFileId") - if not title or not track_file_id: + if not title or not track_file_id or not track_id: continue key = (artist_id, normalize_title(title)) @@ -294,33 +571,10 @@ def find_duplicate_singles( continue duplicate_albums = album_track_map[key] - verified_albums = [] + if not duplicate_albums: + continue - if verify_audio: - logger.info( - f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..." - ) - for album_track in duplicate_albums: - album_track_file_id = album_track["track_file_id"] - if verify_audio_match( - base_url, - headers, - track_file_id, - album_track_file_id, - docker_mount, - ): - verified_albums.append(album_track) - logger.debug( - f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id}" - ) - else: - logger.debug( - f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id}" - ) - else: - verified_albums = duplicate_albums - - if verified_albums: + if not verify_audio: duplicates.append( { "artist_id": artist_id, @@ -328,9 +582,60 @@ def find_duplicate_singles( "single_album_title": album_title, "track_title": title, "single_track_file_id": track_file_id, - "duplicate_albums": verified_albums, + "duplicate_albums": duplicate_albums, + "verified_albums": duplicate_albums, + "verification_results": ["Audio verification disabled"], + "confidence_scores": [0], } ) + continue + + logger.debug( + f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..." + ) + verified_albums = [] + verification_results = [] + confidence_scores = [] + + for album_track in duplicate_albums: + album_track_id = album_track["track_id"] + album_track_file_id = album_track["track_file_id"] + match, result_message, confidence = verify_audio_match( + base_url, + headers, + track_id, + track_file_id, + album_track_id, + album_track_file_id, + docker_mount, + ) + verification_results.append(result_message) + confidence_scores.append(confidence) + + if not match: + logger.debug( + f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" + ) + continue + + verified_albums.append(album_track) + logger.debug( + f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" + ) + + duplicates.append( + { + "artist_id": artist_id, + "single_album_id": album_id, + "single_album_title": album_title, + "track_title": title, + "single_track_file_id": track_file_id, + "duplicate_albums": duplicate_albums, + "verified_albums": verified_albums, + "verification_results": verification_results, + "confidence_scores": confidence_scores, + } + ) return duplicates @@ -416,12 +721,11 @@ def main() -> None: artist_id = artist.get("id") if not artist_id: continue - artist_albums = get_json( - f"{base_url}/api/v1/album", - headers, - params={"artistId": artist_id}, + albums.extend( + get_json( + f"{base_url}/api/v1/album", headers, params={"artistId": artist_id} + ) ) - albums.extend(artist_albums) if not albums: logger.warning("No albums found in the library.") @@ -433,20 +737,18 @@ def main() -> None: verify_audio = not args.no_audio_verify docker_mount = args.docker_mount if args.docker_mount else None - if verify_audio: - if docker_mount: - logger.info( - f"Scanning for duplicate singles with audio verification (Docker mount: {docker_mount})..." - ) - else: - logger.info("Scanning for duplicate singles with audio verification...") - logger.info( - "NOTE: Audio verification requires 'fpcalc' (chromaprint) to be installed" - ) - else: + if not verify_audio: logger.info( "Scanning for duplicate singles (audio verification disabled - using title matching only)..." ) + else: + mount_msg = f" (Docker mount: {docker_mount})" if docker_mount else "" + logger.info( + f"Scanning for duplicate singles with audio verification{mount_msg}..." + ) + logger.info( + "NOTE: Audio verification requires 'fpcalc' (chromaprint) to be installed" + ) duplicates = find_duplicate_singles( base_url, @@ -461,13 +763,17 @@ def main() -> None: logger.info("No duplicate singles found. The library appears clean.") return + verified_count = sum(1 for dup in duplicates if dup.get("verified_albums")) logger.info( - f"Found {len(duplicates)} single track(s) that are duplicates of album tracks:" + f"Found {len(duplicates)} single track(s) that are duplicates of album tracks ({verified_count} verified by audio fingerprint):" ) for dup in duplicates: artist_id = dup["artist_id"] artist_name = artist_map.get(artist_id, f"Unknown (ID: {artist_id})") duplicate_albums = dup["duplicate_albums"] + verified_albums = dup.get("verified_albums", duplicate_albums) + verification_results = dup.get("verification_results", []) + confidence_scores = dup.get("confidence_scores", []) album_names = [album["album_title"] for album in duplicate_albums] logger.info(f"Artist: {artist_name}") @@ -475,9 +781,23 @@ def main() -> None: logger.info( f" Track: '{dup['track_title']}' (trackFileId: {dup['single_track_file_id']})" ) + for i, result in enumerate(verification_results): + confidence = confidence_scores[i] if i < len(confidence_scores) else 0 + logger.info(f" {result}") logger.info( f" Already present on {len(duplicate_albums)} album(s): {', '.join(album_names)}" ) + if verify_audio: + if verified_albums: + verified_names = [album["album_title"] for album in verified_albums] + logger.info( + f" ✓ LIKELY safe to delete (audio verified on {len(verified_albums)} album(s): {', '.join(verified_names)})" + ) + logger.info( + f" ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)" + ) + else: + logger.info(f" ⚠ NOT safe to delete (audio verification failed)") logger.info("") # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY