From af5a2bf825a6a957a77094117058a3316d06a80d Mon Sep 17 00:00:00 2001 From: Danilo Reyes Date: Fri, 14 Nov 2025 01:32:41 -0600 Subject: [PATCH] Add audio verification and duplicate tracking features - Integrated `plexapi` and `python-dotenv` as dependencies in `flake.nix` and `pyproject.toml` for enhanced functionality. - Implemented new modules for audio verification and duplicate tracking, including `audio_verification.py`, `duplicate_finder.py`, and `track_verification.py`. - Updated `main.py` to utilize the new modules for identifying and managing duplicate single tracks in Lidarr, with detailed logging and confidence scoring. - Enhanced the `find_duplicate_singles` function to support audio verification results and metadata migration to Plex. - Refactored existing code for improved structure and maintainability, ensuring better integration of new features. --- flake.nix | 1 + nix/package-cleanup.nix | 1 + src-cleanup/__init__.py | 1 - src-cleanup/audio_verification.py | 281 +++++++++++ src-cleanup/duplicate_finder.py | 267 +++++++++++ src-cleanup/lidarr_client.py | 89 ++++ src-cleanup/main.py | 774 ++++-------------------------- src-cleanup/plex_metadata.py | 267 +++++++++++ src-cleanup/pyproject.toml | 4 +- src-cleanup/track_verification.py | 85 ++++ 10 files changed, 1090 insertions(+), 680 deletions(-) create mode 100644 src-cleanup/audio_verification.py create mode 100644 src-cleanup/duplicate_finder.py create mode 100644 src-cleanup/lidarr_client.py create mode 100644 src-cleanup/plex_metadata.py create mode 100644 src-cleanup/track_verification.py diff --git a/flake.nix b/flake.nix index 021de6f..7bcefd8 100644 --- a/flake.nix +++ b/flake.nix @@ -48,6 +48,7 @@ ps: with ps; [ requests python-dotenv + plexapi ] )) pkgs.black diff --git a/nix/package-cleanup.nix b/nix/package-cleanup.nix index 829ccb5..743da7a 100644 --- a/nix/package-cleanup.nix +++ b/nix/package-cleanup.nix @@ -11,6 +11,7 @@ pkgs.python3Packages.buildPythonApplication { propagatedBuildInputs = with pkgs.python3Packages; [ requests python-dotenv + plexapi ]; # Runtime dependencies for audio verification diff --git a/src-cleanup/__init__.py b/src-cleanup/__init__.py index 2897813..ef370d3 100644 --- a/src-cleanup/__init__.py +++ b/src-cleanup/__init__.py @@ -1,2 +1 @@ """Lidarr Cleanup Singles - Remove duplicate single tracks""" - diff --git a/src-cleanup/audio_verification.py b/src-cleanup/audio_verification.py new file mode 100644 index 0000000..79e3bee --- /dev/null +++ b/src-cleanup/audio_verification.py @@ -0,0 +1,281 @@ +"""Audio verification using multiple methods""" + +import json +import logging +import os +import subprocess +from difflib import SequenceMatcher +from typing import Dict, List, Optional, Tuple, Union + +logger = logging.getLogger(__name__) + + +def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str: + """Map Docker container path to host path""" + if not docker_mount: + return file_path + + container_path, host_path = docker_mount.split(":", 1) + if not file_path.startswith(container_path): + return file_path + + return file_path.replace(container_path, host_path, 1) + + +def get_audio_fingerprint( + file_path: str, docker_mount: Optional[str] = None +) -> Optional[Tuple[str, int]]: + """Generate audio fingerprint using fpcalc. Returns (fingerprint, duration)""" + mapped_path = map_docker_path(file_path, docker_mount) + logger.debug(f"Generating fingerprint for: {mapped_path}") + + if not os.path.exists(mapped_path): + logger.warning(f"File not found: {mapped_path}") + return None + + try: + result = subprocess.run( + ["fpcalc", "-json", "-length", "180", mapped_path], + capture_output=True, + text=True, + timeout=60, + check=False, + ) + if result.returncode != 0: + logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}") + return None + + data = json.loads(result.stdout) + fingerprint = data.get("fingerprint") + duration = data.get("duration") + + if not fingerprint or duration is None: + logger.warning( + f"fpcalc output missing fingerprint or duration for {mapped_path}" + ) + return None + + logger.debug(f"Successfully generated fingerprint (duration: {duration}s)") + return fingerprint, duration + except ( + subprocess.TimeoutExpired, + FileNotFoundError, + json.JSONDecodeError, + Exception, + ) as e: + logger.warning(f"Error generating fingerprint for {mapped_path}: {e}") + return None + + +def get_file_properties( + file_path: str, docker_mount: Optional[str] = None +) -> Optional[Dict]: + """Get audio file properties using ffprobe""" + mapped_path = map_docker_path(file_path, docker_mount) + if not os.path.exists(mapped_path): + return None + + try: + result = subprocess.run( + [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_streams", + mapped_path, + ], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + return None + + data = json.loads(result.stdout) + audio_stream = next( + (s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None + ) + format_info = data.get("format", {}) + + if not audio_stream: + return None + + return { + "duration": float(format_info.get("duration", 0)), + "size": int(format_info.get("size", 0)), + "bitrate": int(format_info.get("bit_rate", 0)), + "sample_rate": int(audio_stream.get("sample_rate", 0)), + "channels": int(audio_stream.get("channels", 0)), + "codec": audio_stream.get("codec_name", ""), + "bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)), + } + except Exception as e: + logger.debug(f"Could not get file properties for {mapped_path}: {e}") + return None + + +def _format_context(log_context: Optional[str]) -> str: + """Format log context message""" + return f" ({log_context})" if log_context else "" + + +def compare_fingerprints( + fp1_data: Optional[Tuple[str, int]], + fp2_data: Optional[Tuple[str, int]], + log_context: Optional[str] = None, + return_message: bool = False, +) -> Union[bool, Tuple[bool, str]]: + """Compare audio fingerprints. Returns match or (match, message) if return_message=True""" + if not fp1_data or not fp2_data: + message = "Fingerprint comparison failed: missing fingerprint" + logger.debug(f"{message}{_format_context(log_context)}") + return (False, message) if return_message else False + + fp1, duration1 = fp1_data + fp2, duration2 = fp2_data + + duration_diff = abs(duration1 - duration2) + if duration_diff > 5: + message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)" + logger.debug(f"{message}{_format_context(log_context)}") + return (False, message) if return_message else False + + if fp1 == fp2: + message = "Fingerprint comparison: exact match" + logger.debug(f"{message}{_format_context(log_context)}") + return (True, message) if return_message else True + + try: + similarity = SequenceMatcher(None, fp1, fp2).ratio() + + if duration_diff <= 1: + threshold = 0.90 + elif duration_diff <= 3: + threshold = 0.93 + else: + threshold = 0.95 + + match = similarity >= threshold + message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}" + logger.debug(f"{message}{_format_context(log_context)}") + return (match, message) if return_message else match + except Exception as e: + message = ( + f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}" + ) + logger.debug(f"{message}{_format_context(log_context)}") + return (False, message) if return_message else False + + +def check_mb_recording_id(single_track_info, album_track_info) -> Tuple[int, str]: + """Check MusicBrainz Recording ID match. Returns (score_delta, message)""" + if not (single_track_info and album_track_info): + return 0, "⚠ MusicBrainz Recording ID unavailable" + + single_mb_id = single_track_info.get("foreignRecordingId") + album_mb_id = album_track_info.get("foreignRecordingId") + + if not (single_mb_id and album_mb_id): + return 0, "⚠ MusicBrainz Recording ID unavailable" + + if single_mb_id == album_mb_id: + return 50, "✓ MusicBrainz Recording ID match (+50)" + + return -30, "✗ Different MusicBrainz Recording IDs (-30)" + + +def check_quality_profile( + single_file_info, album_file_info +) -> Tuple[int, Optional[str]]: + """Check Lidarr quality profile match. Returns (score_delta, message)""" + single_quality = ( + single_file_info.get("quality", {}).get("quality", {}).get("name", "") + ) + album_quality = ( + album_file_info.get("quality", {}).get("quality", {}).get("name", "") + ) + + if not (single_quality and album_quality): + return 0, None + + if single_quality == album_quality: + return 10, f"✓ Same quality ({single_quality}) (+10)" + + return 0, f"⚠ Different quality ({single_quality} vs {album_quality})" + + +def check_file_properties(single_props, album_props) -> List[Tuple[int, str]]: + """Check file properties. Returns list of (score_delta, message) tuples""" + if not (single_props and album_props): + return [] + + results = [] + + duration_diff = abs(single_props["duration"] - album_props["duration"]) + if duration_diff <= 1: + results.append((15, f"✓ Duration match ({duration_diff:.1f}s diff) (+15)")) + elif duration_diff <= 3: + results.append((5, f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)")) + else: + results.append((-10, f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)")) + + size_ratio = min(single_props["size"], album_props["size"]) / max( + single_props["size"], album_props["size"] + ) + if size_ratio >= 0.95: + results.append((15, f"✓ File size match ({size_ratio:.2%}) (+15)")) + elif size_ratio >= 0.85: + results.append((5, f"⚠ Similar file size ({size_ratio:.2%}) (+5)")) + else: + results.append((0, f"⚠ Different file sizes ({size_ratio:.2%})")) + + if single_props["bitrate"] > 0 and album_props["bitrate"] > 0: + bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max( + single_props["bitrate"], album_props["bitrate"] + ) + if bitrate_ratio >= 0.90: + results.append((10, f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)")) + + if single_props["sample_rate"] == album_props["sample_rate"]: + results.append( + (5, f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)") + ) + + if single_props["codec"] and album_props["codec"]: + if single_props["codec"] == album_props["codec"]: + results.append((5, f"✓ Same codec ({single_props['codec']}) (+5)")) + else: + results.append( + ( + 0, + f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})", + ) + ) + + if single_props["channels"] == album_props["channels"]: + results.append((5, f"✓ Same channels ({single_props['channels']}) (+5)")) + else: + results.append( + ( + 0, + f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})", + ) + ) + + if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0: + if single_props["bit_depth"] == album_props["bit_depth"]: + results.append( + (5, f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)") + ) + else: + results.append( + ( + 0, + f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)", + ) + ) + + return results diff --git a/src-cleanup/duplicate_finder.py b/src-cleanup/duplicate_finder.py new file mode 100644 index 0000000..44c5143 --- /dev/null +++ b/src-cleanup/duplicate_finder.py @@ -0,0 +1,267 @@ +"""Functions to find duplicate singles in Lidarr""" + +import logging +from collections import defaultdict +from typing import Dict, List, Optional, Tuple + +from lidarr_client import fetch_tracks_for_album, get_trackfile_info +from track_verification import verify_audio_match + +logger = logging.getLogger(__name__) + + +def normalize_title(title: str) -> str: + """Normalize a track title for comparison""" + return " ".join(title.lower().split()) + + +def build_album_track_map( + base_url: str, headers: Dict[str, str], albums: List[Dict] +) -> Dict[Tuple[int, str], List[Dict]]: + """Create a mapping of tracks present on full albums""" + album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list) + + def process_album_for_map(album): + """Process single album and add tracks to map""" + album_id = album.get("id") + artist_id = album.get("artistId") + album_title = album.get("title", "Unknown") + + if not (album_id and artist_id): + return + + tracks = fetch_tracks_for_album(base_url, headers, album_id) + if not tracks: + logger.debug( + f"Skipping album '{album_title}' (albumId: {album_id}) - could not fetch tracks" + ) + return + + def add_track_to_map(track): + """Add track to album_track_map""" + title = track.get("title") + track_id = track.get("id") + track_file_id = track.get("trackFileId") + + if not (title and track_file_id and track_id): + return + + key = (artist_id, normalize_title(title)) + album_track_map[key].append( + { + "album_id": album_id, + "album_title": album_title, + "track_id": track_id, + "track_file_id": track_file_id, + } + ) + + tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) + list(map(add_track_to_map, tracks_with_files)) + + album_albums = filter( + lambda album: album.get("albumType", "").lower() == "album", albums + ) + list(map(process_album_for_map, album_albums)) + + return album_track_map + + +def create_unverified_duplicate( + artist_id, album_id, album_title, title, track_file_id, duplicate_albums +) -> Dict: + """Create duplicate entry for unverified tracks""" + return { + "artist_id": artist_id, + "single_album_id": album_id, + "single_album_title": album_title, + "track_title": title, + "single_track_file_id": track_file_id, + "duplicate_albums": duplicate_albums, + "verified_albums": duplicate_albums, + "verification_results": ["Audio verification disabled"], + "confidence_scores": [0], + } + + +def verify_and_mark_album_track( + base_url, + headers, + track_id, + track_file_id, + album_track, + docker_mount, + single_file_path, +) -> Tuple[bool, Optional[Dict], str, int]: + """Verify album track and mark for migration if perfect match""" + album_track_id = album_track["track_id"] + album_track_file_id = album_track["track_file_id"] + + album_track_file_info = get_trackfile_info(base_url, album_track_file_id, headers) + album_file_path = ( + album_track_file_info.get("path") if album_track_file_info else None + ) + + match, result_message, confidence = verify_audio_match( + base_url, + headers, + track_id, + track_file_id, + album_track_id, + album_track_file_id, + docker_mount, + ) + + if not match: + logger.debug( + f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" + ) + return False, None, result_message, confidence + + album_track["confidence"] = confidence + album_track["migration_status"] = ( + "eligible" + if confidence >= 95 and single_file_path and album_file_path + else "not_eligible" + ) + if album_track["migration_status"] == "eligible": + album_track["single_file_path"] = single_file_path + album_track["album_file_path"] = album_file_path + + logger.debug( + f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" + ) + return True, album_track, result_message, confidence + + +def process_single_track( + base_url, + headers, + album_id, + artist_id, + album_title, + track, + album_track_map, + verify_audio, + docker_mount, +) -> Optional[Dict]: + """Process a single track and return duplicate info or None""" + title = track.get("title") + track_id = track.get("id") + track_file_id = track.get("trackFileId") + + if not (title and track_file_id and track_id): + return None + + key = (artist_id, normalize_title(title)) + if key not in album_track_map: + return None + + duplicate_albums = album_track_map[key] + if not duplicate_albums: + return None + + if not verify_audio: + return create_unverified_duplicate( + artist_id, album_id, album_title, title, track_file_id, duplicate_albums + ) + + logger.debug( + f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..." + ) + + single_track_file_info = get_trackfile_info(base_url, track_file_id, headers) + single_file_path = ( + single_track_file_info.get("path") if single_track_file_info else None + ) + + verification_data = list( + map( + lambda album_track: verify_and_mark_album_track( + base_url, + headers, + track_id, + track_file_id, + album_track, + docker_mount, + single_file_path, + ), + duplicate_albums, + ) + ) + + verified_albums = [ + track for match, track, _, _ in verification_data if match and track + ] + verification_results = [result for _, _, result, _ in verification_data] + confidence_scores = [conf for _, _, _, conf in verification_data] + + return { + "artist_id": artist_id, + "single_album_id": album_id, + "single_album_title": album_title, + "track_title": title, + "single_track_file_id": track_file_id, + "duplicate_albums": duplicate_albums, + "verified_albums": verified_albums, + "verification_results": verification_results, + "confidence_scores": confidence_scores, + } + + +def process_single_album( + base_url, headers, album, album_track_map, verify_audio, docker_mount +) -> List[Dict]: + """Process a single album and return list of duplicates found""" + album_id = album.get("id") + artist_id = album.get("artistId") + album_title = album.get("title", "") + + if not (album_id and artist_id): + return [] + + tracks = fetch_tracks_for_album(base_url, headers, album_id) + if not tracks: + logger.debug( + f"Skipping single album '{album_title}' (albumId: {album_id}) - could not fetch tracks" + ) + return [] + + tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) + process_track = lambda track: process_single_track( + base_url, + headers, + album_id, + artist_id, + album_title, + track, + album_track_map, + verify_audio, + docker_mount, + ) + duplicate_infos = map(process_track, tracks_with_files) + + return list(filter(lambda x: x is not None, duplicate_infos)) + + +def find_duplicate_singles( + base_url: str, + headers: Dict[str, str], + albums: List[Dict], + album_track_map: Dict[Tuple[int, str], List[Dict]], + verify_audio: bool = True, + docker_mount: Optional[str] = None, +) -> List[Dict]: + """Identify single tracks that duplicate album tracks""" + single_albums = filter( + lambda album: album.get("albumType", "").lower() == "single", albums + ) + + album_duplicates = map( + lambda album: process_single_album( + base_url, headers, album, album_track_map, verify_audio, docker_mount + ), + single_albums, + ) + + return [dup for album_dups in album_duplicates for dup in album_dups] diff --git a/src-cleanup/lidarr_client.py b/src-cleanup/lidarr_client.py new file mode 100644 index 0000000..6ac059c --- /dev/null +++ b/src-cleanup/lidarr_client.py @@ -0,0 +1,89 @@ +"""Lidarr API client functions""" + +import logging +from typing import Dict, List, Optional + +import requests + +logger = logging.getLogger(__name__) + + +def get_json( + url: str, + headers: Dict[str, str], + params: Optional[Dict[str, object]] = None, + raise_on_error: bool = True, +) -> List[Dict]: + """Fetch JSON from URL with error handling""" + try: + resp = requests.get(url, headers=headers, params=params, timeout=60) + resp.raise_for_status() + return resp.json() + except requests.exceptions.RequestException as e: + logger.warning(f"Error fetching {url}: {e}") + if raise_on_error: + raise + return [] + + +def get_trackfile_info( + base_url: str, track_file_id: int, headers: Dict[str, str] +) -> Optional[Dict]: + """Get trackfile information including file path and quality""" + try: + resp = requests.get( + f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}", + headers=headers, + timeout=30, + ) + resp.raise_for_status() + return resp.json() + except requests.exceptions.RequestException as e: + logger.warning(f"Could not fetch trackfile {track_file_id}: {e}") + return None + + +def get_track_info( + base_url: str, track_id: int, headers: Dict[str, str] +) -> Optional[Dict]: + """Get track information including MusicBrainz recording ID""" + try: + resp = requests.get( + f"{base_url.rstrip('/')}/api/v1/track/{track_id}", + headers=headers, + timeout=30, + ) + resp.raise_for_status() + return resp.json() + except requests.exceptions.RequestException as e: + logger.warning(f"Could not fetch track {track_id}: {e}") + return None + + +def fetch_all_artists(base_url: str, headers: Dict[str, str]) -> List[Dict]: + """Fetch all artists from Lidarr""" + return get_json(f"{base_url}/api/v1/artist", headers) + + +def fetch_albums_for_artist( + base_url: str, headers: Dict[str, str], artist_id: int +) -> List[Dict]: + """Fetch all albums for an artist""" + return get_json( + f"{base_url}/api/v1/album", + headers, + params={"artistId": artist_id}, + raise_on_error=False, + ) + + +def fetch_tracks_for_album( + base_url: str, headers: Dict[str, str], album_id: int +) -> List[Dict]: + """Fetch all tracks for an album""" + return get_json( + f"{base_url.rstrip('/')}/api/v1/track", + headers, + params={"albumId": album_id}, + raise_on_error=False, + ) diff --git a/src-cleanup/main.py b/src-cleanup/main.py index 92906a2..eead51b 100644 --- a/src-cleanup/main.py +++ b/src-cleanup/main.py @@ -9,637 +9,19 @@ the same track already exists on a full album in Lidarr. import argparse import logging import os -import subprocess import sys -from collections import defaultdict -from typing import Dict, List, Optional, Tuple, Union -import requests from dotenv import load_dotenv +from duplicate_finder import build_album_track_map, find_duplicate_singles +from lidarr_client import fetch_all_artists, fetch_albums_for_artist +from plex_metadata import get_plex_server, migrate_plex_metadata + load_dotenv() logger = logging.getLogger(__name__) -def normalize_title(title: str) -> str: - """Normalize a track title for comparison""" - return " ".join(title.lower().split()) - - -def get_json( - url: str, headers: Dict[str, str], params: Optional[Dict[str, object]] = None -) -> List[Dict]: - """Wrapper around requests.get with basic error handling""" - try: - resp = requests.get(url, headers=headers, params=params, timeout=60) - resp.raise_for_status() - return resp.json() - except requests.exceptions.RequestException as e: - logger.error(f"Error fetching {url}: {e}") - raise - - -def get_trackfile_info( - base_url: str, track_file_id: int, headers: Dict[str, str] -) -> Optional[Dict]: - """Get trackfile information including file path and quality""" - try: - resp = requests.get( - f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}", - headers=headers, - timeout=30, - ) - resp.raise_for_status() - return resp.json() - except requests.exceptions.RequestException as e: - logger.warning(f"Could not fetch trackfile {track_file_id}: {e}") - return None - - -def get_track_info( - base_url: str, track_id: int, headers: Dict[str, str] -) -> Optional[Dict]: - """Get track information including MusicBrainz recording ID""" - try: - resp = requests.get( - f"{base_url.rstrip('/')}/api/v1/track/{track_id}", - headers=headers, - timeout=30, - ) - resp.raise_for_status() - return resp.json() - except requests.exceptions.RequestException as e: - logger.warning(f"Could not fetch track {track_id}: {e}") - return None - - -def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str: - """Map Docker container path to host path""" - if not docker_mount: - return file_path - - container_path, host_path = docker_mount.split(":", 1) - if not file_path.startswith(container_path): - return file_path - - return file_path.replace(container_path, host_path, 1) - - -def get_file_hash( - file_path: str, docker_mount: Optional[str] = None, bytes_to_read: int = 1048576 -) -> Optional[str]: - """Get partial file hash (first N bytes) for quick exact duplicate detection""" - mapped_path = map_docker_path(file_path, docker_mount) - if not os.path.exists(mapped_path): - return None - - try: - import hashlib - - hasher = hashlib.md5() - with open(mapped_path, "rb") as f: - chunk = f.read(bytes_to_read) - hasher.update(chunk) - return hasher.hexdigest() - except Exception as e: - logger.debug(f"Could not compute hash for {mapped_path}: {e}") - return None - - -def get_audio_fingerprint( - file_path: str, docker_mount: Optional[str] = None -) -> Optional[Tuple[str, int]]: - """Generate audio fingerprint using fpcalc (chromaprint). Returns (fingerprint, duration_seconds)""" - mapped_path = map_docker_path(file_path, docker_mount) - logger.debug(f"Generating fingerprint for: {mapped_path} (original: {file_path})") - - if not os.path.exists(mapped_path): - logger.warning(f"File not found: {mapped_path} (original: {file_path})") - return None - - try: - logger.debug(f"Running fpcalc on: {mapped_path}") - result = subprocess.run( - ["fpcalc", "-json", "-length", "180", mapped_path], - capture_output=True, - text=True, - timeout=60, - check=False, - ) - if result.returncode != 0: - logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}") - return None - - import json - - try: - data = json.loads(result.stdout) - except json.JSONDecodeError as e: - logger.warning(f"Failed to parse fpcalc JSON output for {mapped_path}: {e}") - return None - - fingerprint = data.get("fingerprint") - duration = data.get("duration") - - if not fingerprint or duration is None: - logger.warning( - f"fpcalc output missing fingerprint or duration for {mapped_path}" - ) - return None - - # Fingerprint is already a string in JSON format, no conversion needed - - logger.debug( - f"Successfully generated fingerprint for {mapped_path} (duration: {duration}s)" - ) - return fingerprint, duration - except subprocess.TimeoutExpired: - logger.warning(f"fpcalc timeout for {mapped_path}") - return None - except FileNotFoundError: - logger.warning( - "fpcalc not found. Install chromaprint to enable audio verification." - ) - return None - except Exception as e: - logger.warning(f"Error generating fingerprint for {mapped_path}: {e}") - return None - - -def _format_context(log_context: Optional[str]) -> str: - """Format log context message""" - return f" ({log_context})" if log_context else "" - - -def compare_fingerprints( - fp1_data: Optional[Tuple[str, int]], - fp2_data: Optional[Tuple[str, int]], - log_context: Optional[str] = None, - return_message: bool = False, -) -> Union[bool, Tuple[bool, str]]: - """Compare two audio fingerprints for similarity. Returns match or (match, message) if return_message=True""" - if not fp1_data or not fp2_data: - message = "Fingerprint comparison failed: missing fingerprint" - logger.debug(f"{message}{_format_context(log_context)}") - return (False, message) if return_message else False - - fp1, duration1 = fp1_data - fp2, duration2 = fp2_data - - duration_diff = abs(duration1 - duration2) - if duration_diff > 5: - message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)" - logger.debug(f"{message}{_format_context(log_context)}") - return (False, message) if return_message else False - - if fp1 == fp2: - message = "Fingerprint comparison: exact match" - logger.debug(f"{message}{_format_context(log_context)}") - return (True, message) if return_message else True - - # Fingerprints are base64-encoded strings from fpcalc -json - # For similarity, we can use simple string similarity metrics - try: - # Calculate similarity based on string edit distance - from difflib import SequenceMatcher - - # Use SequenceMatcher for string similarity - similarity = SequenceMatcher(None, fp1, fp2).ratio() - - # Adjust threshold based on duration difference - if duration_diff <= 1: - threshold = 0.90 - elif duration_diff <= 3: - threshold = 0.93 - else: - threshold = 0.95 - - match = similarity >= threshold - message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}" - logger.debug(f"{message}{_format_context(log_context)}") - return (match, message) if return_message else match - except Exception as e: - message = ( - f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}" - ) - logger.debug(f"{message}{_format_context(log_context)}") - return (False, message) if return_message else False - - -# DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY -# def delete_track_file(base_url: str, track_file_id: int, headers: Dict[str, str]) -> None: -# """Delete a track file by ID""" -# delete_url = f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}" -# resp = requests.delete(delete_url, headers=headers, timeout=60) -# resp.raise_for_status() - - -def build_album_track_map( - base_url: str, headers: Dict[str, str], albums: List[Dict] -) -> Dict[Tuple[int, str], List[Dict]]: - """Create a mapping of tracks present on full albums""" - album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list) - - album_albums = list( - filter(lambda album: album.get("albumType", "").lower() == "album", albums) - ) - - for album in album_albums: - album_id = album.get("id") - artist_id = album.get("artistId") - album_title = album.get("title", "Unknown") - if not album_id or not artist_id: - continue - - tracks = get_json( - f"{base_url.rstrip('/')}/api/v1/track", - headers, - params={"albumId": album_id}, - ) - - tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) - - for track in tracks_with_files: - title = track.get("title") - track_id = track.get("id") - track_file_id = track.get("trackFileId") - if not title or not track_file_id or not track_id: - continue - key = (artist_id, normalize_title(title)) - album_track_map[key].append( - { - "album_id": album_id, - "album_title": album_title, - "track_id": track_id, - "track_file_id": track_file_id, - } - ) - - return album_track_map - - -def get_file_properties( - file_path: str, docker_mount: Optional[str] = None -) -> Optional[Dict]: - """Get audio file properties using ffprobe""" - mapped_path = map_docker_path(file_path, docker_mount) - if not os.path.exists(mapped_path): - return None - - try: - import json - - result = subprocess.run( - [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - "-show_streams", - mapped_path, - ], - capture_output=True, - text=True, - timeout=30, - ) - if result.returncode != 0: - return None - - data = json.loads(result.stdout) - audio_stream = next( - (s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None - ) - format_info = data.get("format", {}) - - if not audio_stream: - return None - - return { - "duration": float(format_info.get("duration", 0)), - "size": int(format_info.get("size", 0)), - "bitrate": int(format_info.get("bit_rate", 0)), - "sample_rate": int(audio_stream.get("sample_rate", 0)), - "channels": int(audio_stream.get("channels", 0)), - "codec": audio_stream.get("codec_name", ""), - "bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)), - } - except Exception as e: - logger.debug(f"Could not get file properties for {mapped_path}: {e}") - return None - - -def verify_audio_match( - base_url: str, - headers: Dict[str, str], - single_track_id: int, - single_track_file_id: int, - album_track_id: int, - album_track_file_id: int, - docker_mount: Optional[str] = None, -) -> Tuple[bool, Optional[str], int]: - """ - Verify that two track files contain the same audio using multiple verification methods. - Returns (match, result_message, confidence_score) - Confidence: 0-100, where 100 = definitely same, 0 = definitely different - """ - logger.debug( - f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" - ) - - confidence_score = 0 - verification_results = [] - - # Verification 1: MusicBrainz Recording ID (most reliable - 50 points) - single_track_info = get_track_info(base_url, single_track_id, headers) - album_track_info = get_track_info(base_url, album_track_id, headers) - - if single_track_info and album_track_info: - single_mb_id = single_track_info.get("foreignRecordingId") - album_mb_id = album_track_info.get("foreignRecordingId") - - if single_mb_id and album_mb_id: - if single_mb_id == album_mb_id: - confidence_score += 50 - verification_results.append("✓ MusicBrainz Recording ID match (+50)") - else: - confidence_score -= 30 - verification_results.append( - "✗ Different MusicBrainz Recording IDs (-30)" - ) - else: - verification_results.append("⚠ MusicBrainz Recording ID unavailable") - - # Verification 2: File Properties (duration, size, bitrate, codec - 50 points) - single_file_info = get_trackfile_info(base_url, single_track_file_id, headers) - album_file_info = get_trackfile_info(base_url, album_track_file_id, headers) - - if not single_file_info or not album_file_info: - return False, "Could not fetch track file info", 0 - - single_path = single_file_info.get("path") - album_path = album_file_info.get("path") - if not single_path or not album_path: - return False, "Missing file paths", 0 - - # Verification 1.5: Lidarr quality profile comparison (10 points) - single_quality = single_file_info.get("quality", {}).get("quality", {}) - album_quality = album_file_info.get("quality", {}).get("quality", {}) - - if single_quality and album_quality: - single_quality_name = single_quality.get("name", "") - album_quality_name = album_quality.get("name", "") - - if single_quality_name and album_quality_name: - if single_quality_name == album_quality_name: - confidence_score += 10 - verification_results.append( - f"✓ Same quality ({single_quality_name}) (+10)" - ) - else: - verification_results.append( - f"⚠ Different quality ({single_quality_name} vs {album_quality_name})" - ) - - single_props = get_file_properties(single_path, docker_mount) - album_props = get_file_properties(album_path, docker_mount) - - if single_props and album_props: - # Duration check (15 points) - duration_diff = abs(single_props["duration"] - album_props["duration"]) - if duration_diff <= 1: - confidence_score += 15 - verification_results.append( - f"✓ Duration match ({duration_diff:.1f}s diff) (+15)" - ) - elif duration_diff <= 3: - confidence_score += 5 - verification_results.append( - f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)" - ) - else: - confidence_score -= 10 - verification_results.append( - f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)" - ) - - # File size check (15 points) - size_ratio = min(single_props["size"], album_props["size"]) / max( - single_props["size"], album_props["size"] - ) - if size_ratio >= 0.95: - confidence_score += 15 - verification_results.append(f"✓ File size match ({size_ratio:.2%}) (+15)") - elif size_ratio >= 0.85: - confidence_score += 5 - verification_results.append(f"⚠ Similar file size ({size_ratio:.2%}) (+5)") - else: - verification_results.append(f"⚠ Different file sizes ({size_ratio:.2%})") - - # Bitrate check (10 points) - if single_props["bitrate"] > 0 and album_props["bitrate"] > 0: - bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max( - single_props["bitrate"], album_props["bitrate"] - ) - if bitrate_ratio >= 0.90: - confidence_score += 10 - verification_results.append( - f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)" - ) - - # Sample rate check (5 points) - if single_props["sample_rate"] == album_props["sample_rate"]: - confidence_score += 5 - verification_results.append( - f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)" - ) - - # Codec check (5 points) - if single_props["codec"] and album_props["codec"]: - if single_props["codec"] == album_props["codec"]: - confidence_score += 5 - verification_results.append( - f"✓ Same codec ({single_props['codec']}) (+5)" - ) - else: - verification_results.append( - f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})" - ) - - # Channels check (5 points) - if single_props["channels"] == album_props["channels"]: - confidence_score += 5 - verification_results.append( - f"✓ Same channels ({single_props['channels']}) (+5)" - ) - else: - verification_results.append( - f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})" - ) - - # Bit depth check (5 points) - helps identify remasters - if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0: - if single_props["bit_depth"] == album_props["bit_depth"]: - confidence_score += 5 - verification_results.append( - f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)" - ) - else: - verification_results.append( - f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)" - ) - - # Verification 3: File hash comparison (30 points) - quick exact duplicate check - single_hash = get_file_hash(single_path, docker_mount) - album_hash = get_file_hash(album_path, docker_mount) - - if single_hash and album_hash: - if single_hash == album_hash: - confidence_score += 30 - verification_results.append(f"✓ File hash match (exact duplicate) (+30)") - else: - verification_results.append(f"⚠ Different file hashes") - - # Verification 4: Chromaprint fingerprint (20 points) - single_fp = get_audio_fingerprint(single_path, docker_mount) - album_fp = get_audio_fingerprint(album_path, docker_mount) - - if single_fp and album_fp: - log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" - fp_match, fp_message = compare_fingerprints( - single_fp, album_fp, log_context, return_message=True - ) - - if fp_match: - confidence_score += 20 - verification_results.append(f"✓ Audio fingerprint match (+20)") - else: - verification_results.append(f"⚠ {fp_message}") - else: - verification_results.append("⚠ Audio fingerprint unavailable") - - # Final decision - match = confidence_score >= 70 - result_message = f"Confidence: {confidence_score}/100 | " + " | ".join( - verification_results - ) - - return match, result_message, confidence_score - - -def find_duplicate_singles( - base_url: str, - headers: Dict[str, str], - albums: List[Dict], - album_track_map: Dict[Tuple[int, str], List[Dict]], - verify_audio: bool = True, - docker_mount: Optional[str] = None, -) -> List[Dict]: - """Identify single tracks that duplicate album tracks""" - duplicates: List[Dict] = [] - - single_albums = list( - filter(lambda album: album.get("albumType", "").lower() == "single", albums) - ) - - for album in single_albums: - album_id = album.get("id") - artist_id = album.get("artistId") - album_title = album.get("title", "") - if not album_id or not artist_id: - continue - - tracks = get_json( - f"{base_url.rstrip('/')}/api/v1/track", - headers, - params={"albumId": album_id}, - ) - - tracks_with_files = filter(lambda track: track.get("hasFile"), tracks) - - for track in tracks_with_files: - title = track.get("title") - track_id = track.get("id") - track_file_id = track.get("trackFileId") - if not title or not track_file_id or not track_id: - continue - - key = (artist_id, normalize_title(title)) - if key not in album_track_map: - continue - - duplicate_albums = album_track_map[key] - if not duplicate_albums: - continue - - if not verify_audio: - duplicates.append( - { - "artist_id": artist_id, - "single_album_id": album_id, - "single_album_title": album_title, - "track_title": title, - "single_track_file_id": track_file_id, - "duplicate_albums": duplicate_albums, - "verified_albums": duplicate_albums, - "verification_results": ["Audio verification disabled"], - "confidence_scores": [0], - } - ) - continue - - logger.debug( - f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..." - ) - verified_albums = [] - verification_results = [] - confidence_scores = [] - - for album_track in duplicate_albums: - album_track_id = album_track["track_id"] - album_track_file_id = album_track["track_file_id"] - match, result_message, confidence = verify_audio_match( - base_url, - headers, - track_id, - track_file_id, - album_track_id, - album_track_file_id, - docker_mount, - ) - verification_results.append(result_message) - confidence_scores.append(confidence) - - if not match: - logger.debug( - f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" - ) - continue - - verified_albums.append(album_track) - logger.debug( - f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)" - ) - - duplicates.append( - { - "artist_id": artist_id, - "single_album_id": album_id, - "single_album_title": album_title, - "track_title": title, - "single_track_file_id": track_file_id, - "duplicate_albums": duplicate_albums, - "verified_albums": verified_albums, - "verification_results": verification_results, - "confidence_scores": confidence_scores, - } - ) - - return duplicates - - def main() -> None: parser = argparse.ArgumentParser( description="Identify single tracks that are duplicates of album tracks in Lidarr." @@ -647,12 +29,12 @@ def main() -> None: parser.add_argument( "--base-url", default=os.getenv("LIDARR_URL"), - help="Base URL of the Lidarr instance (e.g. https://music.example.org). Can also be set via LIDARR_URL env var.", + help="Base URL of the Lidarr instance. Can also be set via LIDARR_URL env var.", ) parser.add_argument( "--api-key", default=os.getenv("LIDARR_API_KEY"), - help="API key for Lidarr with sufficient permissions. Can also be set via LIDARR_API_KEY env var.", + help="API key for Lidarr. Can also be set via LIDARR_API_KEY env var.", ) parser.add_argument( "--no-audio-verify", @@ -662,24 +44,18 @@ def main() -> None: parser.add_argument( "--docker-mount", default=os.getenv("DOCKER_MOUNT"), - help="Docker mount mapping in format 'container_path:host_path' (e.g. '/music:/srv/pool/multimedia/media/Music'). Can also be set via DOCKER_MOUNT env var.", + help="Docker mount mapping in format 'container_path:host_path'. Can also be set via DOCKER_MOUNT env var.", ) parser.add_argument( "--debug", action="store_true", help="Enable debug logging", ) - # DELETE FLAG COMMENTED OUT FOR SAFETY - # parser.add_argument( - # "--delete", - # action="store_true", - # help="If set, delete the duplicate single track files instead of just listing them", - # ) - # parser.add_argument( - # "--force", - # action="store_true", - # help="If set together with --delete, do not prompt for confirmation before deletion", - # ) + parser.add_argument( + "--migrate-metadata", + action="store_true", + help="Migrate metadata (ratings, play counts) from singles to album tracks. Only applies to perfect matches (confidence >= 95).", + ) args = parser.parse_args() logging.basicConfig( @@ -704,7 +80,7 @@ def main() -> None: headers = {"X-Api-Key": args.api_key} logger.info("Fetching artists...") - artists = get_json(f"{base_url}/api/v1/artist", headers) + artists = fetch_all_artists(base_url, headers) if not artists: logger.warning("No artists found. Exiting.") return @@ -716,16 +92,12 @@ def main() -> None: } logger.info("Fetching albums for each artist...") - albums: List[Dict] = [] - for artist in artists: - artist_id = artist.get("id") - if not artist_id: - continue - albums.extend( - get_json( - f"{base_url}/api/v1/album", headers, params={"artistId": artist_id} - ) - ) + albums = [ + album + for artist in artists + if artist.get("id") + for album in fetch_albums_for_artist(base_url, headers, artist["id"]) + ] if not albums: logger.warning("No albums found in the library.") @@ -763,10 +135,61 @@ def main() -> None: logger.info("No duplicate singles found. The library appears clean.") return + if args.migrate_metadata: + plex_url = os.getenv("PLEX_URL") + plex_token = os.getenv("PLEX_TOKEN") + + if not (plex_url and plex_token): + logger.error( + "PLEX_URL and PLEX_TOKEN environment variables required for metadata migration" + ) + logger.error("Set them in your .env file or environment") + return + + logger.info(f"Connecting to Plex server at {plex_url}...") + plex_server = get_plex_server(plex_url, plex_token) + + if not plex_server: + logger.error( + "Failed to connect to Plex server. Skipping metadata migration." + ) + return + + logger.info("Migrating Plex metadata for perfect matches (confidence >= 95)...") + migration_count = 0 + + for dup in duplicates: + for album_track in dup.get("verified_albums", []): + if album_track.get("migration_status") != "eligible": + continue + + single_file_path = album_track.get("single_file_path") + album_file_path = album_track.get("album_file_path") + + logger.info( + f"Migrating Plex metadata for '{dup['track_title']}' to album '{album_track['album_title']}'..." + ) + success, message = migrate_plex_metadata( + plex_server, single_file_path, album_file_path, docker_mount + ) + + album_track["migration_message"] = message + album_track["migration_success"] = success + + if success: + migration_count += 1 + logger.info(f" ✓ {message}") + else: + logger.warning(f" ✗ {message}") + + logger.info(f"Completed Plex metadata migration for {migration_count} track(s)") + logger.info("") + verified_count = sum(1 for dup in duplicates if dup.get("verified_albums")) logger.info( f"Found {len(duplicates)} single track(s) that are duplicates of album tracks ({verified_count} verified by audio fingerprint):" ) + for dup in duplicates: artist_id = dup["artist_id"] artist_name = artist_map.get(artist_id, f"Unknown (ID: {artist_id})") @@ -781,44 +204,39 @@ def main() -> None: logger.info( f" Track: '{dup['track_title']}' (trackFileId: {dup['single_track_file_id']})" ) + for i, result in enumerate(verification_results): confidence = confidence_scores[i] if i < len(confidence_scores) else 0 logger.info(f" {result}") + logger.info( f" Already present on {len(duplicate_albums)} album(s): {', '.join(album_names)}" ) - if verify_audio: - if verified_albums: - verified_names = [album["album_title"] for album in verified_albums] - logger.info( - f" ✓ LIKELY safe to delete (audio verified on {len(verified_albums)} album(s): {', '.join(verified_names)})" - ) - logger.info( - f" ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)" - ) - else: - logger.info(f" ⚠ NOT safe to delete (audio verification failed)") - logger.info("") - # DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY - # if args.delete: - # if not args.force: - # confirm = input( - # f"\nAre you sure you want to delete these {len(duplicates)} single track file(s)? [y/N]: " - # ).strip().lower() - # if confirm not in ("y", "yes"): - # logger.info("Aborting deletion.") - # return - # logger.info("Deleting duplicate single track files...") - # for dup in duplicates: - # track_file_id = dup["single_track_file_id"] - # try: - # delete_track_file(base_url, track_file_id, headers) - # logger.info( - # f"Deleted trackFileId {track_file_id} (track '{dup['track_title']}' from single '{dup['single_album_title']}')." - # ) - # except Exception as exc: - # logger.error(f"Failed to delete trackFileId {track_file_id}: {exc}") + if verify_audio and not verified_albums: + logger.info(" ⚠ NOT safe to delete (audio verification failed)") + elif verify_audio: + verified_names = [album["album_title"] for album in verified_albums] + max_confidence = max(confidence_scores) if confidence_scores else 0 + + logger.info( + f" ✓ LIKELY safe to delete (audio verified on {len(verified_albums)} album(s): {', '.join(verified_names)})" + ) + logger.info(f" Max confidence: {max_confidence}/100") + + perfect_matches = [ + a for a in verified_albums if a.get("confidence", 0) >= 95 + ] + for album_track in perfect_matches: + migration_msg = album_track.get("migration_message", "") + if migration_msg: + logger.info(f" Metadata: {migration_msg}") + + logger.info( + " ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)" + ) + + logger.info("") if __name__ == "__main__": diff --git a/src-cleanup/plex_metadata.py b/src-cleanup/plex_metadata.py new file mode 100644 index 0000000..49a922d --- /dev/null +++ b/src-cleanup/plex_metadata.py @@ -0,0 +1,267 @@ +"""Plex metadata migration functions""" + +import logging +from typing import List, Optional, Tuple + +logger = logging.getLogger(__name__) + + +def get_plex_server(plex_url: str, plex_token: str): + """Connect to Plex server""" + try: + from plexapi.server import PlexServer + + return PlexServer(plex_url, plex_token) + except ImportError: + logger.error("python-plexapi not installed. Install with: pip install plexapi") + return None + except Exception as e: + logger.error(f"Failed to connect to Plex server: {e}") + return None + + +def find_plex_track_by_path( + plex_server, file_path: str, docker_mount: Optional[str] = None +): + """Find a Plex track by its file path""" + from audio_verification import map_docker_path + import os + + try: + mapped_path = map_docker_path(file_path, docker_mount) + music_sections = [ + s for s in plex_server.library.sections() if s.type == "artist" + ] + + # Try searching by exact mapped path first + for section in music_sections: + results = section.search(filters={"track.file": mapped_path}) + if results: + logger.debug(f"Found track by mapped path: {mapped_path}") + return results[0] + + # Try original path (might be what Plex sees in Docker) + for section in music_sections: + results = section.search(filters={"track.file": file_path}) + if results: + logger.debug(f"Found track by original path: {file_path}") + return results[0] + + # Fallback: search by filename in all tracks + filename = os.path.basename(file_path) + for section in music_sections: + all_tracks = section.searchTracks() + for track in all_tracks: + for media in track.media: + for part in media.parts: + if part.file and ( + part.file == mapped_path + or part.file == file_path + or part.file.endswith(filename) + ): + logger.debug(f"Found track by filename match: {part.file}") + return track + + logger.warning( + f"Could not find Plex track for path: {file_path} (mapped: {mapped_path})" + ) + return None + except Exception as e: + logger.debug(f"Could not find Plex track for path {file_path}: {e}") + return None + + +def get_plex_playlists_for_track(plex_server, track) -> List: + """Get all playlists containing this track""" + try: + return [ + playlist + for playlist in plex_server.playlists() + if playlist.playlistType == "audio" + and any(item.ratingKey == track.ratingKey for item in playlist.items()) + ] + except Exception as e: + logger.debug(f"Could not get playlists: {e}") + return [] + + +def migrate_rating( + single_track, album_track, single_rating, original_album_rating +) -> Tuple[List[str], List[str], List[str]]: + """Migrate rating. Returns (changes, already_has, failures)""" + if not single_rating: + return [], [], [] + + if original_album_rating: + logger.info(f" Album already has rating: {original_album_rating}/10") + return [], [f"rating ({original_album_rating}/10)"], [] + + try: + logger.info(f" Setting rating to {single_rating}/10...") + album_track.rate(single_rating) + album_track.reload() + new_rating = getattr(album_track, "userRating", None) + + if new_rating != single_rating: + logger.warning( + f" ⚠ Rating mismatch: expected {single_rating}, got {new_rating}" + ) + return [], [], [f"rating (set to {single_rating} but got {new_rating})"] + + logger.info(f" ✓ Rating verified: {new_rating}/10") + return [f"rating ({single_rating}/10) ✓ verified"], [], [] + except Exception as e: + logger.error(f"Failed to migrate rating: {e}") + return [], [], [f"rating (error: {e})"] + + +def migrate_play_count( + album_track, single_plays, album_plays +) -> Tuple[List[str], List[str], List[str]]: + """Migrate play count. Returns (changes, already_has, failures)""" + if single_plays <= 0: + return [], [], [] + + expected_count = album_plays + single_plays + logger.info( + f" Migrating play count: single={single_plays}, album={album_plays}, expected={expected_count}" + ) + + try: + list( + map( + lambda i: ( + album_track.markPlayed(), + ( + logger.debug( + f" Marked played {i + 1}/{single_plays} times..." + ) + if (i + 1) % 10 == 0 + else None + ), + )[0], + range(single_plays), + ) + ) + + album_track.reload() + new_count = getattr(album_track, "viewCount", 0) or 0 + + if new_count != expected_count: + logger.warning( + f" ⚠ Play count mismatch: expected {expected_count}, got {new_count}" + ) + return ( + [], + [], + [f"play count (expected {expected_count} but got {new_count})"], + ) + + logger.info(f" ✓ Play count verified: {new_count}") + return ( + [f"play count ({album_plays} + {single_plays} = {new_count}) ✓ verified"], + [], + [], + ) + except Exception as e: + logger.error(f"Failed to migrate play count: {e}") + return [], [], [f"play count (error: {e})"] + + +def migrate_playlist(playlist, album_track) -> Tuple[List[str], List[str], List[str]]: + """Migrate single playlist. Returns (changes, already_has, failures)""" + playlist_name = playlist.title + + try: + if any(item.ratingKey == album_track.ratingKey for item in playlist.items()): + logger.info(f" Album already in playlist: '{playlist_name}'") + return [], [f"playlist '{playlist_name}'"], [] + + logger.info(f" Adding to playlist: '{playlist_name}'...") + playlist.addItems(album_track) + playlist.reload() + + if not any( + item.ratingKey == album_track.ratingKey for item in playlist.items() + ): + logger.warning(f" ⚠ Playlist '{playlist_name}' add failed verification") + return [], [], [f"playlist '{playlist_name}' (add failed)"] + + logger.info(f" ✓ Playlist '{playlist_name}' verified") + return [f"added to playlist '{playlist_name}' ✓ verified"], [], [] + except Exception as e: + logger.error(f"Failed to add to playlist '{playlist_name}': {e}") + return [], [], [f"playlist '{playlist_name}' (error: {e})"] + + +def format_migration_message( + changes: List[str], already_has: List[str], failures: List[str] +) -> str: + """Format migration result message""" + parts = list( + filter( + None, + [ + f"✅ Migrated: {', '.join(changes)}" if changes else None, + f"ℹ️ Already has: {', '.join(already_has)}" if already_has else None, + f"❌ Failed: {', '.join(failures)}" if failures else None, + ], + ) + ) + return " | ".join(parts) if parts else "No metadata to migrate" + + +def migrate_plex_metadata( + plex_server, + single_file_path: str, + album_file_path: str, + docker_mount: Optional[str] = None, +) -> Tuple[bool, str]: + """Migrate Plex metadata from single to album track. Returns (success, message)""" + if not plex_server: + return False, "Plex server not connected" + + single_track = find_plex_track_by_path(plex_server, single_file_path, docker_mount) + album_track = find_plex_track_by_path(plex_server, album_file_path, docker_mount) + + if not single_track: + return False, "Could not find single track in Plex" + if not album_track: + return False, "Could not find album track in Plex" + + single_rating = getattr(single_track, "userRating", None) + single_plays = getattr(single_track, "viewCount", 0) or 0 + single_playlists = get_plex_playlists_for_track(plex_server, single_track) + + logger.info( + f" Single track metadata: rating={single_rating or 'none'}, plays={single_plays}, playlists={len(single_playlists)}" + ) + if single_playlists: + logger.info( + f" Single is in playlists: {', '.join(p.title for p in single_playlists)}" + ) + + original_album_rating = getattr(album_track, "userRating", None) + album_plays = getattr(album_track, "viewCount", 0) or 0 + + rating_changes, rating_already, rating_failures = migrate_rating( + single_track, album_track, single_rating, original_album_rating + ) + + plays_changes, plays_already, plays_failures = migrate_play_count( + album_track, single_plays, album_plays + ) + + playlist_results = list( + map(lambda p: migrate_playlist(p, album_track), single_playlists) + ) + playlist_changes = [c for result in playlist_results for c in result[0]] + playlist_already = [a for result in playlist_results for a in result[1]] + playlist_failures = [f for result in playlist_results for f in result[2]] + + all_changes = rating_changes + plays_changes + playlist_changes + all_already = rating_already + plays_already + playlist_already + all_failures = rating_failures + plays_failures + playlist_failures + + message = format_migration_message(all_changes, all_already, all_failures) + return len(all_failures) == 0, message diff --git a/src-cleanup/pyproject.toml b/src-cleanup/pyproject.toml index 883b03e..c7b0e91 100644 --- a/src-cleanup/pyproject.toml +++ b/src-cleanup/pyproject.toml @@ -3,7 +3,7 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.setuptools] -py-modules = ["main"] +py-modules = ["main", "lidarr_client", "audio_verification", "track_verification", "plex_metadata", "duplicate_finder"] [project] name = "lidarr-cleanup-singles" @@ -12,6 +12,8 @@ description = "Identify and optionally delete duplicate single tracks in Lidarr" requires-python = ">=3.8" dependencies = [ "requests", + "python-dotenv", + "plexapi", ] [project.scripts] diff --git a/src-cleanup/track_verification.py b/src-cleanup/track_verification.py new file mode 100644 index 0000000..728161e --- /dev/null +++ b/src-cleanup/track_verification.py @@ -0,0 +1,85 @@ +"""Track verification using multiple methods""" + +import logging +from typing import Dict, Optional, Tuple + +from audio_verification import ( + check_file_properties, + check_mb_recording_id, + check_quality_profile, + compare_fingerprints, + get_audio_fingerprint, + get_file_properties, +) +from lidarr_client import get_track_info, get_trackfile_info + +logger = logging.getLogger(__name__) + + +def verify_audio_match( + base_url: str, + headers: Dict[str, str], + single_track_id: int, + single_track_file_id: int, + album_track_id: int, + album_track_file_id: int, + docker_mount: Optional[str] = None, +) -> Tuple[bool, Optional[str], int]: + """Verify tracks using multiple methods. Returns (match, message, confidence_score)""" + logger.debug( + f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" + ) + + single_file_info = get_trackfile_info(base_url, single_track_file_id, headers) + album_file_info = get_trackfile_info(base_url, album_track_file_id, headers) + + if not (single_file_info and album_file_info): + return False, "Could not fetch track file info", 0 + + single_path = single_file_info.get("path") + album_path = album_file_info.get("path") + if not (single_path and album_path): + return False, "Missing file paths", 0 + + single_track_info = get_track_info(base_url, single_track_id, headers) + album_track_info = get_track_info(base_url, album_track_id, headers) + + mb_score, mb_msg = check_mb_recording_id(single_track_info, album_track_info) + quality_score, quality_msg = check_quality_profile( + single_file_info, album_file_info + ) + + single_props = get_file_properties(single_path, docker_mount) + album_props = get_file_properties(album_path, docker_mount) + prop_checks = check_file_properties(single_props, album_props) + + single_fp = get_audio_fingerprint(single_path, docker_mount) + album_fp = get_audio_fingerprint(album_path, docker_mount) + log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}" + + if single_fp and album_fp: + fp_match, fp_message = compare_fingerprints( + single_fp, album_fp, log_context, return_message=True + ) + fp_score = 20 if fp_match else 0 + fp_msg = f"✓ Audio fingerprint match (+20)" if fp_match else f"⚠ {fp_message}" + else: + fp_score, fp_msg = 0, "⚠ Audio fingerprint unavailable" + + all_checks = [ + (mb_score, mb_msg), + (quality_score, quality_msg) if quality_msg else None, + *prop_checks, + (fp_score, fp_msg), + ] + + valid_checks = list(filter(lambda x: x is not None, all_checks)) + confidence_score = sum(score for score, _ in valid_checks) + verification_results = [msg for _, msg in valid_checks] + + match = confidence_score >= 70 + result_message = f"Confidence: {confidence_score}/100 | " + " | ".join( + verification_results + ) + + return match, result_message, confidence_score