Add audio verification and duplicate tracking features

- Integrated `plexapi` and `python-dotenv` as dependencies in `flake.nix` and `pyproject.toml` for enhanced functionality.
- Implemented new modules for audio verification and duplicate tracking, including `audio_verification.py`, `duplicate_finder.py`, and `track_verification.py`.
- Updated `main.py` to utilize the new modules for identifying and managing duplicate single tracks in Lidarr, with detailed logging and confidence scoring.
- Enhanced the `find_duplicate_singles` function to support audio verification results and metadata migration to Plex.
- Refactored existing code for improved structure and maintainability, ensuring better integration of new features.
This commit is contained in:
Danilo Reyes
2025-11-14 01:32:41 -06:00
parent 03e8eb6f4e
commit af5a2bf825
10 changed files with 1090 additions and 680 deletions

View File

@@ -48,6 +48,7 @@
ps: with ps; [
requests
python-dotenv
plexapi
]
))
pkgs.black

View File

@@ -11,6 +11,7 @@ pkgs.python3Packages.buildPythonApplication {
propagatedBuildInputs = with pkgs.python3Packages; [
requests
python-dotenv
plexapi
];
# Runtime dependencies for audio verification

View File

@@ -1,2 +1 @@
"""Lidarr Cleanup Singles - Remove duplicate single tracks"""

View File

@@ -0,0 +1,281 @@
"""Audio verification using multiple methods"""
import json
import logging
import os
import subprocess
from difflib import SequenceMatcher
from typing import Dict, List, Optional, Tuple, Union
logger = logging.getLogger(__name__)
def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str:
"""Map Docker container path to host path"""
if not docker_mount:
return file_path
container_path, host_path = docker_mount.split(":", 1)
if not file_path.startswith(container_path):
return file_path
return file_path.replace(container_path, host_path, 1)
def get_audio_fingerprint(
file_path: str, docker_mount: Optional[str] = None
) -> Optional[Tuple[str, int]]:
"""Generate audio fingerprint using fpcalc. Returns (fingerprint, duration)"""
mapped_path = map_docker_path(file_path, docker_mount)
logger.debug(f"Generating fingerprint for: {mapped_path}")
if not os.path.exists(mapped_path):
logger.warning(f"File not found: {mapped_path}")
return None
try:
result = subprocess.run(
["fpcalc", "-json", "-length", "180", mapped_path],
capture_output=True,
text=True,
timeout=60,
check=False,
)
if result.returncode != 0:
logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}")
return None
data = json.loads(result.stdout)
fingerprint = data.get("fingerprint")
duration = data.get("duration")
if not fingerprint or duration is None:
logger.warning(
f"fpcalc output missing fingerprint or duration for {mapped_path}"
)
return None
logger.debug(f"Successfully generated fingerprint (duration: {duration}s)")
return fingerprint, duration
except (
subprocess.TimeoutExpired,
FileNotFoundError,
json.JSONDecodeError,
Exception,
) as e:
logger.warning(f"Error generating fingerprint for {mapped_path}: {e}")
return None
def get_file_properties(
file_path: str, docker_mount: Optional[str] = None
) -> Optional[Dict]:
"""Get audio file properties using ffprobe"""
mapped_path = map_docker_path(file_path, docker_mount)
if not os.path.exists(mapped_path):
return None
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
mapped_path,
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
audio_stream = next(
(s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None
)
format_info = data.get("format", {})
if not audio_stream:
return None
return {
"duration": float(format_info.get("duration", 0)),
"size": int(format_info.get("size", 0)),
"bitrate": int(format_info.get("bit_rate", 0)),
"sample_rate": int(audio_stream.get("sample_rate", 0)),
"channels": int(audio_stream.get("channels", 0)),
"codec": audio_stream.get("codec_name", ""),
"bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)),
}
except Exception as e:
logger.debug(f"Could not get file properties for {mapped_path}: {e}")
return None
def _format_context(log_context: Optional[str]) -> str:
"""Format log context message"""
return f" ({log_context})" if log_context else ""
def compare_fingerprints(
fp1_data: Optional[Tuple[str, int]],
fp2_data: Optional[Tuple[str, int]],
log_context: Optional[str] = None,
return_message: bool = False,
) -> Union[bool, Tuple[bool, str]]:
"""Compare audio fingerprints. Returns match or (match, message) if return_message=True"""
if not fp1_data or not fp2_data:
message = "Fingerprint comparison failed: missing fingerprint"
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
fp1, duration1 = fp1_data
fp2, duration2 = fp2_data
duration_diff = abs(duration1 - duration2)
if duration_diff > 5:
message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)"
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
if fp1 == fp2:
message = "Fingerprint comparison: exact match"
logger.debug(f"{message}{_format_context(log_context)}")
return (True, message) if return_message else True
try:
similarity = SequenceMatcher(None, fp1, fp2).ratio()
if duration_diff <= 1:
threshold = 0.90
elif duration_diff <= 3:
threshold = 0.93
else:
threshold = 0.95
match = similarity >= threshold
message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}"
logger.debug(f"{message}{_format_context(log_context)}")
return (match, message) if return_message else match
except Exception as e:
message = (
f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}"
)
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
def check_mb_recording_id(single_track_info, album_track_info) -> Tuple[int, str]:
"""Check MusicBrainz Recording ID match. Returns (score_delta, message)"""
if not (single_track_info and album_track_info):
return 0, "⚠ MusicBrainz Recording ID unavailable"
single_mb_id = single_track_info.get("foreignRecordingId")
album_mb_id = album_track_info.get("foreignRecordingId")
if not (single_mb_id and album_mb_id):
return 0, "⚠ MusicBrainz Recording ID unavailable"
if single_mb_id == album_mb_id:
return 50, "✓ MusicBrainz Recording ID match (+50)"
return -30, "✗ Different MusicBrainz Recording IDs (-30)"
def check_quality_profile(
single_file_info, album_file_info
) -> Tuple[int, Optional[str]]:
"""Check Lidarr quality profile match. Returns (score_delta, message)"""
single_quality = (
single_file_info.get("quality", {}).get("quality", {}).get("name", "")
)
album_quality = (
album_file_info.get("quality", {}).get("quality", {}).get("name", "")
)
if not (single_quality and album_quality):
return 0, None
if single_quality == album_quality:
return 10, f"✓ Same quality ({single_quality}) (+10)"
return 0, f"⚠ Different quality ({single_quality} vs {album_quality})"
def check_file_properties(single_props, album_props) -> List[Tuple[int, str]]:
"""Check file properties. Returns list of (score_delta, message) tuples"""
if not (single_props and album_props):
return []
results = []
duration_diff = abs(single_props["duration"] - album_props["duration"])
if duration_diff <= 1:
results.append((15, f"✓ Duration match ({duration_diff:.1f}s diff) (+15)"))
elif duration_diff <= 3:
results.append((5, f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)"))
else:
results.append((-10, f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)"))
size_ratio = min(single_props["size"], album_props["size"]) / max(
single_props["size"], album_props["size"]
)
if size_ratio >= 0.95:
results.append((15, f"✓ File size match ({size_ratio:.2%}) (+15)"))
elif size_ratio >= 0.85:
results.append((5, f"⚠ Similar file size ({size_ratio:.2%}) (+5)"))
else:
results.append((0, f"⚠ Different file sizes ({size_ratio:.2%})"))
if single_props["bitrate"] > 0 and album_props["bitrate"] > 0:
bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max(
single_props["bitrate"], album_props["bitrate"]
)
if bitrate_ratio >= 0.90:
results.append((10, f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)"))
if single_props["sample_rate"] == album_props["sample_rate"]:
results.append(
(5, f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)")
)
if single_props["codec"] and album_props["codec"]:
if single_props["codec"] == album_props["codec"]:
results.append((5, f"✓ Same codec ({single_props['codec']}) (+5)"))
else:
results.append(
(
0,
f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})",
)
)
if single_props["channels"] == album_props["channels"]:
results.append((5, f"✓ Same channels ({single_props['channels']}) (+5)"))
else:
results.append(
(
0,
f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})",
)
)
if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0:
if single_props["bit_depth"] == album_props["bit_depth"]:
results.append(
(5, f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)")
)
else:
results.append(
(
0,
f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)",
)
)
return results

View File

@@ -0,0 +1,267 @@
"""Functions to find duplicate singles in Lidarr"""
import logging
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from lidarr_client import fetch_tracks_for_album, get_trackfile_info
from track_verification import verify_audio_match
logger = logging.getLogger(__name__)
def normalize_title(title: str) -> str:
"""Normalize a track title for comparison"""
return " ".join(title.lower().split())
def build_album_track_map(
base_url: str, headers: Dict[str, str], albums: List[Dict]
) -> Dict[Tuple[int, str], List[Dict]]:
"""Create a mapping of tracks present on full albums"""
album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list)
def process_album_for_map(album):
"""Process single album and add tracks to map"""
album_id = album.get("id")
artist_id = album.get("artistId")
album_title = album.get("title", "Unknown")
if not (album_id and artist_id):
return
tracks = fetch_tracks_for_album(base_url, headers, album_id)
if not tracks:
logger.debug(
f"Skipping album '{album_title}' (albumId: {album_id}) - could not fetch tracks"
)
return
def add_track_to_map(track):
"""Add track to album_track_map"""
title = track.get("title")
track_id = track.get("id")
track_file_id = track.get("trackFileId")
if not (title and track_file_id and track_id):
return
key = (artist_id, normalize_title(title))
album_track_map[key].append(
{
"album_id": album_id,
"album_title": album_title,
"track_id": track_id,
"track_file_id": track_file_id,
}
)
tracks_with_files = filter(lambda track: track.get("hasFile"), tracks)
list(map(add_track_to_map, tracks_with_files))
album_albums = filter(
lambda album: album.get("albumType", "").lower() == "album", albums
)
list(map(process_album_for_map, album_albums))
return album_track_map
def create_unverified_duplicate(
artist_id, album_id, album_title, title, track_file_id, duplicate_albums
) -> Dict:
"""Create duplicate entry for unverified tracks"""
return {
"artist_id": artist_id,
"single_album_id": album_id,
"single_album_title": album_title,
"track_title": title,
"single_track_file_id": track_file_id,
"duplicate_albums": duplicate_albums,
"verified_albums": duplicate_albums,
"verification_results": ["Audio verification disabled"],
"confidence_scores": [0],
}
def verify_and_mark_album_track(
base_url,
headers,
track_id,
track_file_id,
album_track,
docker_mount,
single_file_path,
) -> Tuple[bool, Optional[Dict], str, int]:
"""Verify album track and mark for migration if perfect match"""
album_track_id = album_track["track_id"]
album_track_file_id = album_track["track_file_id"]
album_track_file_info = get_trackfile_info(base_url, album_track_file_id, headers)
album_file_path = (
album_track_file_info.get("path") if album_track_file_info else None
)
match, result_message, confidence = verify_audio_match(
base_url,
headers,
track_id,
track_file_id,
album_track_id,
album_track_file_id,
docker_mount,
)
if not match:
logger.debug(
f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)"
)
return False, None, result_message, confidence
album_track["confidence"] = confidence
album_track["migration_status"] = (
"eligible"
if confidence >= 95 and single_file_path and album_file_path
else "not_eligible"
)
if album_track["migration_status"] == "eligible":
album_track["single_file_path"] = single_file_path
album_track["album_file_path"] = album_file_path
logger.debug(
f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)"
)
return True, album_track, result_message, confidence
def process_single_track(
base_url,
headers,
album_id,
artist_id,
album_title,
track,
album_track_map,
verify_audio,
docker_mount,
) -> Optional[Dict]:
"""Process a single track and return duplicate info or None"""
title = track.get("title")
track_id = track.get("id")
track_file_id = track.get("trackFileId")
if not (title and track_file_id and track_id):
return None
key = (artist_id, normalize_title(title))
if key not in album_track_map:
return None
duplicate_albums = album_track_map[key]
if not duplicate_albums:
return None
if not verify_audio:
return create_unverified_duplicate(
artist_id, album_id, album_title, title, track_file_id, duplicate_albums
)
logger.debug(
f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..."
)
single_track_file_info = get_trackfile_info(base_url, track_file_id, headers)
single_file_path = (
single_track_file_info.get("path") if single_track_file_info else None
)
verification_data = list(
map(
lambda album_track: verify_and_mark_album_track(
base_url,
headers,
track_id,
track_file_id,
album_track,
docker_mount,
single_file_path,
),
duplicate_albums,
)
)
verified_albums = [
track for match, track, _, _ in verification_data if match and track
]
verification_results = [result for _, _, result, _ in verification_data]
confidence_scores = [conf for _, _, _, conf in verification_data]
return {
"artist_id": artist_id,
"single_album_id": album_id,
"single_album_title": album_title,
"track_title": title,
"single_track_file_id": track_file_id,
"duplicate_albums": duplicate_albums,
"verified_albums": verified_albums,
"verification_results": verification_results,
"confidence_scores": confidence_scores,
}
def process_single_album(
base_url, headers, album, album_track_map, verify_audio, docker_mount
) -> List[Dict]:
"""Process a single album and return list of duplicates found"""
album_id = album.get("id")
artist_id = album.get("artistId")
album_title = album.get("title", "")
if not (album_id and artist_id):
return []
tracks = fetch_tracks_for_album(base_url, headers, album_id)
if not tracks:
logger.debug(
f"Skipping single album '{album_title}' (albumId: {album_id}) - could not fetch tracks"
)
return []
tracks_with_files = filter(lambda track: track.get("hasFile"), tracks)
process_track = lambda track: process_single_track(
base_url,
headers,
album_id,
artist_id,
album_title,
track,
album_track_map,
verify_audio,
docker_mount,
)
duplicate_infos = map(process_track, tracks_with_files)
return list(filter(lambda x: x is not None, duplicate_infos))
def find_duplicate_singles(
base_url: str,
headers: Dict[str, str],
albums: List[Dict],
album_track_map: Dict[Tuple[int, str], List[Dict]],
verify_audio: bool = True,
docker_mount: Optional[str] = None,
) -> List[Dict]:
"""Identify single tracks that duplicate album tracks"""
single_albums = filter(
lambda album: album.get("albumType", "").lower() == "single", albums
)
album_duplicates = map(
lambda album: process_single_album(
base_url, headers, album, album_track_map, verify_audio, docker_mount
),
single_albums,
)
return [dup for album_dups in album_duplicates for dup in album_dups]

View File

@@ -0,0 +1,89 @@
"""Lidarr API client functions"""
import logging
from typing import Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
def get_json(
url: str,
headers: Dict[str, str],
params: Optional[Dict[str, object]] = None,
raise_on_error: bool = True,
) -> List[Dict]:
"""Fetch JSON from URL with error handling"""
try:
resp = requests.get(url, headers=headers, params=params, timeout=60)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Error fetching {url}: {e}")
if raise_on_error:
raise
return []
def get_trackfile_info(
base_url: str, track_file_id: int, headers: Dict[str, str]
) -> Optional[Dict]:
"""Get trackfile information including file path and quality"""
try:
resp = requests.get(
f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}",
headers=headers,
timeout=30,
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Could not fetch trackfile {track_file_id}: {e}")
return None
def get_track_info(
base_url: str, track_id: int, headers: Dict[str, str]
) -> Optional[Dict]:
"""Get track information including MusicBrainz recording ID"""
try:
resp = requests.get(
f"{base_url.rstrip('/')}/api/v1/track/{track_id}",
headers=headers,
timeout=30,
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Could not fetch track {track_id}: {e}")
return None
def fetch_all_artists(base_url: str, headers: Dict[str, str]) -> List[Dict]:
"""Fetch all artists from Lidarr"""
return get_json(f"{base_url}/api/v1/artist", headers)
def fetch_albums_for_artist(
base_url: str, headers: Dict[str, str], artist_id: int
) -> List[Dict]:
"""Fetch all albums for an artist"""
return get_json(
f"{base_url}/api/v1/album",
headers,
params={"artistId": artist_id},
raise_on_error=False,
)
def fetch_tracks_for_album(
base_url: str, headers: Dict[str, str], album_id: int
) -> List[Dict]:
"""Fetch all tracks for an album"""
return get_json(
f"{base_url.rstrip('/')}/api/v1/track",
headers,
params={"albumId": album_id},
raise_on_error=False,
)

View File

@@ -9,637 +9,19 @@ the same track already exists on a full album in Lidarr.
import argparse
import logging
import os
import subprocess
import sys
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union
import requests
from dotenv import load_dotenv
from duplicate_finder import build_album_track_map, find_duplicate_singles
from lidarr_client import fetch_all_artists, fetch_albums_for_artist
from plex_metadata import get_plex_server, migrate_plex_metadata
load_dotenv()
logger = logging.getLogger(__name__)
def normalize_title(title: str) -> str:
"""Normalize a track title for comparison"""
return " ".join(title.lower().split())
def get_json(
url: str, headers: Dict[str, str], params: Optional[Dict[str, object]] = None
) -> List[Dict]:
"""Wrapper around requests.get with basic error handling"""
try:
resp = requests.get(url, headers=headers, params=params, timeout=60)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching {url}: {e}")
raise
def get_trackfile_info(
base_url: str, track_file_id: int, headers: Dict[str, str]
) -> Optional[Dict]:
"""Get trackfile information including file path and quality"""
try:
resp = requests.get(
f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}",
headers=headers,
timeout=30,
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Could not fetch trackfile {track_file_id}: {e}")
return None
def get_track_info(
base_url: str, track_id: int, headers: Dict[str, str]
) -> Optional[Dict]:
"""Get track information including MusicBrainz recording ID"""
try:
resp = requests.get(
f"{base_url.rstrip('/')}/api/v1/track/{track_id}",
headers=headers,
timeout=30,
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Could not fetch track {track_id}: {e}")
return None
def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str:
"""Map Docker container path to host path"""
if not docker_mount:
return file_path
container_path, host_path = docker_mount.split(":", 1)
if not file_path.startswith(container_path):
return file_path
return file_path.replace(container_path, host_path, 1)
def get_file_hash(
file_path: str, docker_mount: Optional[str] = None, bytes_to_read: int = 1048576
) -> Optional[str]:
"""Get partial file hash (first N bytes) for quick exact duplicate detection"""
mapped_path = map_docker_path(file_path, docker_mount)
if not os.path.exists(mapped_path):
return None
try:
import hashlib
hasher = hashlib.md5()
with open(mapped_path, "rb") as f:
chunk = f.read(bytes_to_read)
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
logger.debug(f"Could not compute hash for {mapped_path}: {e}")
return None
def get_audio_fingerprint(
file_path: str, docker_mount: Optional[str] = None
) -> Optional[Tuple[str, int]]:
"""Generate audio fingerprint using fpcalc (chromaprint). Returns (fingerprint, duration_seconds)"""
mapped_path = map_docker_path(file_path, docker_mount)
logger.debug(f"Generating fingerprint for: {mapped_path} (original: {file_path})")
if not os.path.exists(mapped_path):
logger.warning(f"File not found: {mapped_path} (original: {file_path})")
return None
try:
logger.debug(f"Running fpcalc on: {mapped_path}")
result = subprocess.run(
["fpcalc", "-json", "-length", "180", mapped_path],
capture_output=True,
text=True,
timeout=60,
check=False,
)
if result.returncode != 0:
logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}")
return None
import json
try:
data = json.loads(result.stdout)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse fpcalc JSON output for {mapped_path}: {e}")
return None
fingerprint = data.get("fingerprint")
duration = data.get("duration")
if not fingerprint or duration is None:
logger.warning(
f"fpcalc output missing fingerprint or duration for {mapped_path}"
)
return None
# Fingerprint is already a string in JSON format, no conversion needed
logger.debug(
f"Successfully generated fingerprint for {mapped_path} (duration: {duration}s)"
)
return fingerprint, duration
except subprocess.TimeoutExpired:
logger.warning(f"fpcalc timeout for {mapped_path}")
return None
except FileNotFoundError:
logger.warning(
"fpcalc not found. Install chromaprint to enable audio verification."
)
return None
except Exception as e:
logger.warning(f"Error generating fingerprint for {mapped_path}: {e}")
return None
def _format_context(log_context: Optional[str]) -> str:
"""Format log context message"""
return f" ({log_context})" if log_context else ""
def compare_fingerprints(
fp1_data: Optional[Tuple[str, int]],
fp2_data: Optional[Tuple[str, int]],
log_context: Optional[str] = None,
return_message: bool = False,
) -> Union[bool, Tuple[bool, str]]:
"""Compare two audio fingerprints for similarity. Returns match or (match, message) if return_message=True"""
if not fp1_data or not fp2_data:
message = "Fingerprint comparison failed: missing fingerprint"
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
fp1, duration1 = fp1_data
fp2, duration2 = fp2_data
duration_diff = abs(duration1 - duration2)
if duration_diff > 5:
message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)"
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
if fp1 == fp2:
message = "Fingerprint comparison: exact match"
logger.debug(f"{message}{_format_context(log_context)}")
return (True, message) if return_message else True
# Fingerprints are base64-encoded strings from fpcalc -json
# For similarity, we can use simple string similarity metrics
try:
# Calculate similarity based on string edit distance
from difflib import SequenceMatcher
# Use SequenceMatcher for string similarity
similarity = SequenceMatcher(None, fp1, fp2).ratio()
# Adjust threshold based on duration difference
if duration_diff <= 1:
threshold = 0.90
elif duration_diff <= 3:
threshold = 0.93
else:
threshold = 0.95
match = similarity >= threshold
message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}"
logger.debug(f"{message}{_format_context(log_context)}")
return (match, message) if return_message else match
except Exception as e:
message = (
f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}"
)
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
# DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY
# def delete_track_file(base_url: str, track_file_id: int, headers: Dict[str, str]) -> None:
# """Delete a track file by ID"""
# delete_url = f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}"
# resp = requests.delete(delete_url, headers=headers, timeout=60)
# resp.raise_for_status()
def build_album_track_map(
base_url: str, headers: Dict[str, str], albums: List[Dict]
) -> Dict[Tuple[int, str], List[Dict]]:
"""Create a mapping of tracks present on full albums"""
album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list)
album_albums = list(
filter(lambda album: album.get("albumType", "").lower() == "album", albums)
)
for album in album_albums:
album_id = album.get("id")
artist_id = album.get("artistId")
album_title = album.get("title", "Unknown")
if not album_id or not artist_id:
continue
tracks = get_json(
f"{base_url.rstrip('/')}/api/v1/track",
headers,
params={"albumId": album_id},
)
tracks_with_files = filter(lambda track: track.get("hasFile"), tracks)
for track in tracks_with_files:
title = track.get("title")
track_id = track.get("id")
track_file_id = track.get("trackFileId")
if not title or not track_file_id or not track_id:
continue
key = (artist_id, normalize_title(title))
album_track_map[key].append(
{
"album_id": album_id,
"album_title": album_title,
"track_id": track_id,
"track_file_id": track_file_id,
}
)
return album_track_map
def get_file_properties(
file_path: str, docker_mount: Optional[str] = None
) -> Optional[Dict]:
"""Get audio file properties using ffprobe"""
mapped_path = map_docker_path(file_path, docker_mount)
if not os.path.exists(mapped_path):
return None
try:
import json
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
mapped_path,
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
audio_stream = next(
(s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None
)
format_info = data.get("format", {})
if not audio_stream:
return None
return {
"duration": float(format_info.get("duration", 0)),
"size": int(format_info.get("size", 0)),
"bitrate": int(format_info.get("bit_rate", 0)),
"sample_rate": int(audio_stream.get("sample_rate", 0)),
"channels": int(audio_stream.get("channels", 0)),
"codec": audio_stream.get("codec_name", ""),
"bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)),
}
except Exception as e:
logger.debug(f"Could not get file properties for {mapped_path}: {e}")
return None
def verify_audio_match(
base_url: str,
headers: Dict[str, str],
single_track_id: int,
single_track_file_id: int,
album_track_id: int,
album_track_file_id: int,
docker_mount: Optional[str] = None,
) -> Tuple[bool, Optional[str], int]:
"""
Verify that two track files contain the same audio using multiple verification methods.
Returns (match, result_message, confidence_score)
Confidence: 0-100, where 100 = definitely same, 0 = definitely different
"""
logger.debug(
f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}"
)
confidence_score = 0
verification_results = []
# Verification 1: MusicBrainz Recording ID (most reliable - 50 points)
single_track_info = get_track_info(base_url, single_track_id, headers)
album_track_info = get_track_info(base_url, album_track_id, headers)
if single_track_info and album_track_info:
single_mb_id = single_track_info.get("foreignRecordingId")
album_mb_id = album_track_info.get("foreignRecordingId")
if single_mb_id and album_mb_id:
if single_mb_id == album_mb_id:
confidence_score += 50
verification_results.append("✓ MusicBrainz Recording ID match (+50)")
else:
confidence_score -= 30
verification_results.append(
"✗ Different MusicBrainz Recording IDs (-30)"
)
else:
verification_results.append("⚠ MusicBrainz Recording ID unavailable")
# Verification 2: File Properties (duration, size, bitrate, codec - 50 points)
single_file_info = get_trackfile_info(base_url, single_track_file_id, headers)
album_file_info = get_trackfile_info(base_url, album_track_file_id, headers)
if not single_file_info or not album_file_info:
return False, "Could not fetch track file info", 0
single_path = single_file_info.get("path")
album_path = album_file_info.get("path")
if not single_path or not album_path:
return False, "Missing file paths", 0
# Verification 1.5: Lidarr quality profile comparison (10 points)
single_quality = single_file_info.get("quality", {}).get("quality", {})
album_quality = album_file_info.get("quality", {}).get("quality", {})
if single_quality and album_quality:
single_quality_name = single_quality.get("name", "")
album_quality_name = album_quality.get("name", "")
if single_quality_name and album_quality_name:
if single_quality_name == album_quality_name:
confidence_score += 10
verification_results.append(
f"✓ Same quality ({single_quality_name}) (+10)"
)
else:
verification_results.append(
f"⚠ Different quality ({single_quality_name} vs {album_quality_name})"
)
single_props = get_file_properties(single_path, docker_mount)
album_props = get_file_properties(album_path, docker_mount)
if single_props and album_props:
# Duration check (15 points)
duration_diff = abs(single_props["duration"] - album_props["duration"])
if duration_diff <= 1:
confidence_score += 15
verification_results.append(
f"✓ Duration match ({duration_diff:.1f}s diff) (+15)"
)
elif duration_diff <= 3:
confidence_score += 5
verification_results.append(
f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)"
)
else:
confidence_score -= 10
verification_results.append(
f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)"
)
# File size check (15 points)
size_ratio = min(single_props["size"], album_props["size"]) / max(
single_props["size"], album_props["size"]
)
if size_ratio >= 0.95:
confidence_score += 15
verification_results.append(f"✓ File size match ({size_ratio:.2%}) (+15)")
elif size_ratio >= 0.85:
confidence_score += 5
verification_results.append(f"⚠ Similar file size ({size_ratio:.2%}) (+5)")
else:
verification_results.append(f"⚠ Different file sizes ({size_ratio:.2%})")
# Bitrate check (10 points)
if single_props["bitrate"] > 0 and album_props["bitrate"] > 0:
bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max(
single_props["bitrate"], album_props["bitrate"]
)
if bitrate_ratio >= 0.90:
confidence_score += 10
verification_results.append(
f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)"
)
# Sample rate check (5 points)
if single_props["sample_rate"] == album_props["sample_rate"]:
confidence_score += 5
verification_results.append(
f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)"
)
# Codec check (5 points)
if single_props["codec"] and album_props["codec"]:
if single_props["codec"] == album_props["codec"]:
confidence_score += 5
verification_results.append(
f"✓ Same codec ({single_props['codec']}) (+5)"
)
else:
verification_results.append(
f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})"
)
# Channels check (5 points)
if single_props["channels"] == album_props["channels"]:
confidence_score += 5
verification_results.append(
f"✓ Same channels ({single_props['channels']}) (+5)"
)
else:
verification_results.append(
f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})"
)
# Bit depth check (5 points) - helps identify remasters
if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0:
if single_props["bit_depth"] == album_props["bit_depth"]:
confidence_score += 5
verification_results.append(
f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)"
)
else:
verification_results.append(
f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)"
)
# Verification 3: File hash comparison (30 points) - quick exact duplicate check
single_hash = get_file_hash(single_path, docker_mount)
album_hash = get_file_hash(album_path, docker_mount)
if single_hash and album_hash:
if single_hash == album_hash:
confidence_score += 30
verification_results.append(f"✓ File hash match (exact duplicate) (+30)")
else:
verification_results.append(f"⚠ Different file hashes")
# Verification 4: Chromaprint fingerprint (20 points)
single_fp = get_audio_fingerprint(single_path, docker_mount)
album_fp = get_audio_fingerprint(album_path, docker_mount)
if single_fp and album_fp:
log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}"
fp_match, fp_message = compare_fingerprints(
single_fp, album_fp, log_context, return_message=True
)
if fp_match:
confidence_score += 20
verification_results.append(f"✓ Audio fingerprint match (+20)")
else:
verification_results.append(f"{fp_message}")
else:
verification_results.append("⚠ Audio fingerprint unavailable")
# Final decision
match = confidence_score >= 70
result_message = f"Confidence: {confidence_score}/100 | " + " | ".join(
verification_results
)
return match, result_message, confidence_score
def find_duplicate_singles(
base_url: str,
headers: Dict[str, str],
albums: List[Dict],
album_track_map: Dict[Tuple[int, str], List[Dict]],
verify_audio: bool = True,
docker_mount: Optional[str] = None,
) -> List[Dict]:
"""Identify single tracks that duplicate album tracks"""
duplicates: List[Dict] = []
single_albums = list(
filter(lambda album: album.get("albumType", "").lower() == "single", albums)
)
for album in single_albums:
album_id = album.get("id")
artist_id = album.get("artistId")
album_title = album.get("title", "")
if not album_id or not artist_id:
continue
tracks = get_json(
f"{base_url.rstrip('/')}/api/v1/track",
headers,
params={"albumId": album_id},
)
tracks_with_files = filter(lambda track: track.get("hasFile"), tracks)
for track in tracks_with_files:
title = track.get("title")
track_id = track.get("id")
track_file_id = track.get("trackFileId")
if not title or not track_file_id or not track_id:
continue
key = (artist_id, normalize_title(title))
if key not in album_track_map:
continue
duplicate_albums = album_track_map[key]
if not duplicate_albums:
continue
if not verify_audio:
duplicates.append(
{
"artist_id": artist_id,
"single_album_id": album_id,
"single_album_title": album_title,
"track_title": title,
"single_track_file_id": track_file_id,
"duplicate_albums": duplicate_albums,
"verified_albums": duplicate_albums,
"verification_results": ["Audio verification disabled"],
"confidence_scores": [0],
}
)
continue
logger.debug(
f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..."
)
verified_albums = []
verification_results = []
confidence_scores = []
for album_track in duplicate_albums:
album_track_id = album_track["track_id"]
album_track_file_id = album_track["track_file_id"]
match, result_message, confidence = verify_audio_match(
base_url,
headers,
track_id,
track_file_id,
album_track_id,
album_track_file_id,
docker_mount,
)
verification_results.append(result_message)
confidence_scores.append(confidence)
if not match:
logger.debug(
f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)"
)
continue
verified_albums.append(album_track)
logger.debug(
f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)"
)
duplicates.append(
{
"artist_id": artist_id,
"single_album_id": album_id,
"single_album_title": album_title,
"track_title": title,
"single_track_file_id": track_file_id,
"duplicate_albums": duplicate_albums,
"verified_albums": verified_albums,
"verification_results": verification_results,
"confidence_scores": confidence_scores,
}
)
return duplicates
def main() -> None:
parser = argparse.ArgumentParser(
description="Identify single tracks that are duplicates of album tracks in Lidarr."
@@ -647,12 +29,12 @@ def main() -> None:
parser.add_argument(
"--base-url",
default=os.getenv("LIDARR_URL"),
help="Base URL of the Lidarr instance (e.g. https://music.example.org). Can also be set via LIDARR_URL env var.",
help="Base URL of the Lidarr instance. Can also be set via LIDARR_URL env var.",
)
parser.add_argument(
"--api-key",
default=os.getenv("LIDARR_API_KEY"),
help="API key for Lidarr with sufficient permissions. Can also be set via LIDARR_API_KEY env var.",
help="API key for Lidarr. Can also be set via LIDARR_API_KEY env var.",
)
parser.add_argument(
"--no-audio-verify",
@@ -662,24 +44,18 @@ def main() -> None:
parser.add_argument(
"--docker-mount",
default=os.getenv("DOCKER_MOUNT"),
help="Docker mount mapping in format 'container_path:host_path' (e.g. '/music:/srv/pool/multimedia/media/Music'). Can also be set via DOCKER_MOUNT env var.",
help="Docker mount mapping in format 'container_path:host_path'. Can also be set via DOCKER_MOUNT env var.",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging",
)
# DELETE FLAG COMMENTED OUT FOR SAFETY
# parser.add_argument(
# "--delete",
# action="store_true",
# help="If set, delete the duplicate single track files instead of just listing them",
# )
# parser.add_argument(
# "--force",
# action="store_true",
# help="If set together with --delete, do not prompt for confirmation before deletion",
# )
parser.add_argument(
"--migrate-metadata",
action="store_true",
help="Migrate metadata (ratings, play counts) from singles to album tracks. Only applies to perfect matches (confidence >= 95).",
)
args = parser.parse_args()
logging.basicConfig(
@@ -704,7 +80,7 @@ def main() -> None:
headers = {"X-Api-Key": args.api_key}
logger.info("Fetching artists...")
artists = get_json(f"{base_url}/api/v1/artist", headers)
artists = fetch_all_artists(base_url, headers)
if not artists:
logger.warning("No artists found. Exiting.")
return
@@ -716,16 +92,12 @@ def main() -> None:
}
logger.info("Fetching albums for each artist...")
albums: List[Dict] = []
for artist in artists:
artist_id = artist.get("id")
if not artist_id:
continue
albums.extend(
get_json(
f"{base_url}/api/v1/album", headers, params={"artistId": artist_id}
)
)
albums = [
album
for artist in artists
if artist.get("id")
for album in fetch_albums_for_artist(base_url, headers, artist["id"])
]
if not albums:
logger.warning("No albums found in the library.")
@@ -763,10 +135,61 @@ def main() -> None:
logger.info("No duplicate singles found. The library appears clean.")
return
if args.migrate_metadata:
plex_url = os.getenv("PLEX_URL")
plex_token = os.getenv("PLEX_TOKEN")
if not (plex_url and plex_token):
logger.error(
"PLEX_URL and PLEX_TOKEN environment variables required for metadata migration"
)
logger.error("Set them in your .env file or environment")
return
logger.info(f"Connecting to Plex server at {plex_url}...")
plex_server = get_plex_server(plex_url, plex_token)
if not plex_server:
logger.error(
"Failed to connect to Plex server. Skipping metadata migration."
)
return
logger.info("Migrating Plex metadata for perfect matches (confidence >= 95)...")
migration_count = 0
for dup in duplicates:
for album_track in dup.get("verified_albums", []):
if album_track.get("migration_status") != "eligible":
continue
single_file_path = album_track.get("single_file_path")
album_file_path = album_track.get("album_file_path")
logger.info(
f"Migrating Plex metadata for '{dup['track_title']}' to album '{album_track['album_title']}'..."
)
success, message = migrate_plex_metadata(
plex_server, single_file_path, album_file_path, docker_mount
)
album_track["migration_message"] = message
album_track["migration_success"] = success
if success:
migration_count += 1
logger.info(f"{message}")
else:
logger.warning(f"{message}")
logger.info(f"Completed Plex metadata migration for {migration_count} track(s)")
logger.info("")
verified_count = sum(1 for dup in duplicates if dup.get("verified_albums"))
logger.info(
f"Found {len(duplicates)} single track(s) that are duplicates of album tracks ({verified_count} verified by audio fingerprint):"
)
for dup in duplicates:
artist_id = dup["artist_id"]
artist_name = artist_map.get(artist_id, f"Unknown (ID: {artist_id})")
@@ -781,44 +204,39 @@ def main() -> None:
logger.info(
f" Track: '{dup['track_title']}' (trackFileId: {dup['single_track_file_id']})"
)
for i, result in enumerate(verification_results):
confidence = confidence_scores[i] if i < len(confidence_scores) else 0
logger.info(f" {result}")
logger.info(
f" Already present on {len(duplicate_albums)} album(s): {', '.join(album_names)}"
)
if verify_audio:
if verified_albums:
if verify_audio and not verified_albums:
logger.info(" ⚠ NOT safe to delete (audio verification failed)")
elif verify_audio:
verified_names = [album["album_title"] for album in verified_albums]
max_confidence = max(confidence_scores) if confidence_scores else 0
logger.info(
f" ✓ LIKELY safe to delete (audio verified on {len(verified_albums)} album(s): {', '.join(verified_names)})"
)
logger.info(
f" ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)"
)
else:
logger.info(f" ⚠ NOT safe to delete (audio verification failed)")
logger.info("")
logger.info(f" Max confidence: {max_confidence}/100")
# DELETE FUNCTIONALITY COMMENTED OUT FOR SAFETY
# if args.delete:
# if not args.force:
# confirm = input(
# f"\nAre you sure you want to delete these {len(duplicates)} single track file(s)? [y/N]: "
# ).strip().lower()
# if confirm not in ("y", "yes"):
# logger.info("Aborting deletion.")
# return
# logger.info("Deleting duplicate single track files...")
# for dup in duplicates:
# track_file_id = dup["single_track_file_id"]
# try:
# delete_track_file(base_url, track_file_id, headers)
# logger.info(
# f"Deleted trackFileId {track_file_id} (track '{dup['track_title']}' from single '{dup['single_album_title']}')."
# )
# except Exception as exc:
# logger.error(f"Failed to delete trackFileId {track_file_id}: {exc}")
perfect_matches = [
a for a in verified_albums if a.get("confidence", 0) >= 95
]
for album_track in perfect_matches:
migration_msg = album_track.get("migration_message", "")
if migration_msg:
logger.info(f" Metadata: {migration_msg}")
logger.info(
" ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)"
)
logger.info("")
if __name__ == "__main__":

View File

@@ -0,0 +1,267 @@
"""Plex metadata migration functions"""
import logging
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
def get_plex_server(plex_url: str, plex_token: str):
"""Connect to Plex server"""
try:
from plexapi.server import PlexServer
return PlexServer(plex_url, plex_token)
except ImportError:
logger.error("python-plexapi not installed. Install with: pip install plexapi")
return None
except Exception as e:
logger.error(f"Failed to connect to Plex server: {e}")
return None
def find_plex_track_by_path(
plex_server, file_path: str, docker_mount: Optional[str] = None
):
"""Find a Plex track by its file path"""
from audio_verification import map_docker_path
import os
try:
mapped_path = map_docker_path(file_path, docker_mount)
music_sections = [
s for s in plex_server.library.sections() if s.type == "artist"
]
# Try searching by exact mapped path first
for section in music_sections:
results = section.search(filters={"track.file": mapped_path})
if results:
logger.debug(f"Found track by mapped path: {mapped_path}")
return results[0]
# Try original path (might be what Plex sees in Docker)
for section in music_sections:
results = section.search(filters={"track.file": file_path})
if results:
logger.debug(f"Found track by original path: {file_path}")
return results[0]
# Fallback: search by filename in all tracks
filename = os.path.basename(file_path)
for section in music_sections:
all_tracks = section.searchTracks()
for track in all_tracks:
for media in track.media:
for part in media.parts:
if part.file and (
part.file == mapped_path
or part.file == file_path
or part.file.endswith(filename)
):
logger.debug(f"Found track by filename match: {part.file}")
return track
logger.warning(
f"Could not find Plex track for path: {file_path} (mapped: {mapped_path})"
)
return None
except Exception as e:
logger.debug(f"Could not find Plex track for path {file_path}: {e}")
return None
def get_plex_playlists_for_track(plex_server, track) -> List:
"""Get all playlists containing this track"""
try:
return [
playlist
for playlist in plex_server.playlists()
if playlist.playlistType == "audio"
and any(item.ratingKey == track.ratingKey for item in playlist.items())
]
except Exception as e:
logger.debug(f"Could not get playlists: {e}")
return []
def migrate_rating(
single_track, album_track, single_rating, original_album_rating
) -> Tuple[List[str], List[str], List[str]]:
"""Migrate rating. Returns (changes, already_has, failures)"""
if not single_rating:
return [], [], []
if original_album_rating:
logger.info(f" Album already has rating: {original_album_rating}/10")
return [], [f"rating ({original_album_rating}/10)"], []
try:
logger.info(f" Setting rating to {single_rating}/10...")
album_track.rate(single_rating)
album_track.reload()
new_rating = getattr(album_track, "userRating", None)
if new_rating != single_rating:
logger.warning(
f" ⚠ Rating mismatch: expected {single_rating}, got {new_rating}"
)
return [], [], [f"rating (set to {single_rating} but got {new_rating})"]
logger.info(f" ✓ Rating verified: {new_rating}/10")
return [f"rating ({single_rating}/10) ✓ verified"], [], []
except Exception as e:
logger.error(f"Failed to migrate rating: {e}")
return [], [], [f"rating (error: {e})"]
def migrate_play_count(
album_track, single_plays, album_plays
) -> Tuple[List[str], List[str], List[str]]:
"""Migrate play count. Returns (changes, already_has, failures)"""
if single_plays <= 0:
return [], [], []
expected_count = album_plays + single_plays
logger.info(
f" Migrating play count: single={single_plays}, album={album_plays}, expected={expected_count}"
)
try:
list(
map(
lambda i: (
album_track.markPlayed(),
(
logger.debug(
f" Marked played {i + 1}/{single_plays} times..."
)
if (i + 1) % 10 == 0
else None
),
)[0],
range(single_plays),
)
)
album_track.reload()
new_count = getattr(album_track, "viewCount", 0) or 0
if new_count != expected_count:
logger.warning(
f" ⚠ Play count mismatch: expected {expected_count}, got {new_count}"
)
return (
[],
[],
[f"play count (expected {expected_count} but got {new_count})"],
)
logger.info(f" ✓ Play count verified: {new_count}")
return (
[f"play count ({album_plays} + {single_plays} = {new_count}) ✓ verified"],
[],
[],
)
except Exception as e:
logger.error(f"Failed to migrate play count: {e}")
return [], [], [f"play count (error: {e})"]
def migrate_playlist(playlist, album_track) -> Tuple[List[str], List[str], List[str]]:
"""Migrate single playlist. Returns (changes, already_has, failures)"""
playlist_name = playlist.title
try:
if any(item.ratingKey == album_track.ratingKey for item in playlist.items()):
logger.info(f" Album already in playlist: '{playlist_name}'")
return [], [f"playlist '{playlist_name}'"], []
logger.info(f" Adding to playlist: '{playlist_name}'...")
playlist.addItems(album_track)
playlist.reload()
if not any(
item.ratingKey == album_track.ratingKey for item in playlist.items()
):
logger.warning(f" ⚠ Playlist '{playlist_name}' add failed verification")
return [], [], [f"playlist '{playlist_name}' (add failed)"]
logger.info(f" ✓ Playlist '{playlist_name}' verified")
return [f"added to playlist '{playlist_name}' ✓ verified"], [], []
except Exception as e:
logger.error(f"Failed to add to playlist '{playlist_name}': {e}")
return [], [], [f"playlist '{playlist_name}' (error: {e})"]
def format_migration_message(
changes: List[str], already_has: List[str], failures: List[str]
) -> str:
"""Format migration result message"""
parts = list(
filter(
None,
[
f"✅ Migrated: {', '.join(changes)}" if changes else None,
f" Already has: {', '.join(already_has)}" if already_has else None,
f"❌ Failed: {', '.join(failures)}" if failures else None,
],
)
)
return " | ".join(parts) if parts else "No metadata to migrate"
def migrate_plex_metadata(
plex_server,
single_file_path: str,
album_file_path: str,
docker_mount: Optional[str] = None,
) -> Tuple[bool, str]:
"""Migrate Plex metadata from single to album track. Returns (success, message)"""
if not plex_server:
return False, "Plex server not connected"
single_track = find_plex_track_by_path(plex_server, single_file_path, docker_mount)
album_track = find_plex_track_by_path(plex_server, album_file_path, docker_mount)
if not single_track:
return False, "Could not find single track in Plex"
if not album_track:
return False, "Could not find album track in Plex"
single_rating = getattr(single_track, "userRating", None)
single_plays = getattr(single_track, "viewCount", 0) or 0
single_playlists = get_plex_playlists_for_track(plex_server, single_track)
logger.info(
f" Single track metadata: rating={single_rating or 'none'}, plays={single_plays}, playlists={len(single_playlists)}"
)
if single_playlists:
logger.info(
f" Single is in playlists: {', '.join(p.title for p in single_playlists)}"
)
original_album_rating = getattr(album_track, "userRating", None)
album_plays = getattr(album_track, "viewCount", 0) or 0
rating_changes, rating_already, rating_failures = migrate_rating(
single_track, album_track, single_rating, original_album_rating
)
plays_changes, plays_already, plays_failures = migrate_play_count(
album_track, single_plays, album_plays
)
playlist_results = list(
map(lambda p: migrate_playlist(p, album_track), single_playlists)
)
playlist_changes = [c for result in playlist_results for c in result[0]]
playlist_already = [a for result in playlist_results for a in result[1]]
playlist_failures = [f for result in playlist_results for f in result[2]]
all_changes = rating_changes + plays_changes + playlist_changes
all_already = rating_already + plays_already + playlist_already
all_failures = rating_failures + plays_failures + playlist_failures
message = format_migration_message(all_changes, all_already, all_failures)
return len(all_failures) == 0, message

View File

@@ -3,7 +3,7 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
py-modules = ["main"]
py-modules = ["main", "lidarr_client", "audio_verification", "track_verification", "plex_metadata", "duplicate_finder"]
[project]
name = "lidarr-cleanup-singles"
@@ -12,6 +12,8 @@ description = "Identify and optionally delete duplicate single tracks in Lidarr"
requires-python = ">=3.8"
dependencies = [
"requests",
"python-dotenv",
"plexapi",
]
[project.scripts]

View File

@@ -0,0 +1,85 @@
"""Track verification using multiple methods"""
import logging
from typing import Dict, Optional, Tuple
from audio_verification import (
check_file_properties,
check_mb_recording_id,
check_quality_profile,
compare_fingerprints,
get_audio_fingerprint,
get_file_properties,
)
from lidarr_client import get_track_info, get_trackfile_info
logger = logging.getLogger(__name__)
def verify_audio_match(
base_url: str,
headers: Dict[str, str],
single_track_id: int,
single_track_file_id: int,
album_track_id: int,
album_track_file_id: int,
docker_mount: Optional[str] = None,
) -> Tuple[bool, Optional[str], int]:
"""Verify tracks using multiple methods. Returns (match, message, confidence_score)"""
logger.debug(
f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}"
)
single_file_info = get_trackfile_info(base_url, single_track_file_id, headers)
album_file_info = get_trackfile_info(base_url, album_track_file_id, headers)
if not (single_file_info and album_file_info):
return False, "Could not fetch track file info", 0
single_path = single_file_info.get("path")
album_path = album_file_info.get("path")
if not (single_path and album_path):
return False, "Missing file paths", 0
single_track_info = get_track_info(base_url, single_track_id, headers)
album_track_info = get_track_info(base_url, album_track_id, headers)
mb_score, mb_msg = check_mb_recording_id(single_track_info, album_track_info)
quality_score, quality_msg = check_quality_profile(
single_file_info, album_file_info
)
single_props = get_file_properties(single_path, docker_mount)
album_props = get_file_properties(album_path, docker_mount)
prop_checks = check_file_properties(single_props, album_props)
single_fp = get_audio_fingerprint(single_path, docker_mount)
album_fp = get_audio_fingerprint(album_path, docker_mount)
log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}"
if single_fp and album_fp:
fp_match, fp_message = compare_fingerprints(
single_fp, album_fp, log_context, return_message=True
)
fp_score = 20 if fp_match else 0
fp_msg = f"✓ Audio fingerprint match (+20)" if fp_match else f"{fp_message}"
else:
fp_score, fp_msg = 0, "⚠ Audio fingerprint unavailable"
all_checks = [
(mb_score, mb_msg),
(quality_score, quality_msg) if quality_msg else None,
*prop_checks,
(fp_score, fp_msg),
]
valid_checks = list(filter(lambda x: x is not None, all_checks))
confidence_score = sum(score for score, _ in valid_checks)
verification_results = [msg for _, msg in valid_checks]
match = confidence_score >= 70
result_message = f"Confidence: {confidence_score}/100 | " + " | ".join(
verification_results
)
return match, result_message, confidence_score