Files
lidarr-mb-gap/src-cleanup/audio_verification.py
Danilo Reyes af5a2bf825 Add audio verification and duplicate tracking features
- Integrated `plexapi` and `python-dotenv` as dependencies in `flake.nix` and `pyproject.toml` for enhanced functionality.
- Implemented new modules for audio verification and duplicate tracking, including `audio_verification.py`, `duplicate_finder.py`, and `track_verification.py`.
- Updated `main.py` to utilize the new modules for identifying and managing duplicate single tracks in Lidarr, with detailed logging and confidence scoring.
- Enhanced the `find_duplicate_singles` function to support audio verification results and metadata migration to Plex.
- Refactored existing code for improved structure and maintainability, ensuring better integration of new features.
2025-11-14 01:32:41 -06:00

282 lines
9.7 KiB
Python

"""Audio verification using multiple methods"""
import json
import logging
import os
import subprocess
from difflib import SequenceMatcher
from typing import Dict, List, Optional, Tuple, Union
logger = logging.getLogger(__name__)
def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str:
"""Map Docker container path to host path"""
if not docker_mount:
return file_path
container_path, host_path = docker_mount.split(":", 1)
if not file_path.startswith(container_path):
return file_path
return file_path.replace(container_path, host_path, 1)
def get_audio_fingerprint(
file_path: str, docker_mount: Optional[str] = None
) -> Optional[Tuple[str, int]]:
"""Generate audio fingerprint using fpcalc. Returns (fingerprint, duration)"""
mapped_path = map_docker_path(file_path, docker_mount)
logger.debug(f"Generating fingerprint for: {mapped_path}")
if not os.path.exists(mapped_path):
logger.warning(f"File not found: {mapped_path}")
return None
try:
result = subprocess.run(
["fpcalc", "-json", "-length", "180", mapped_path],
capture_output=True,
text=True,
timeout=60,
check=False,
)
if result.returncode != 0:
logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}")
return None
data = json.loads(result.stdout)
fingerprint = data.get("fingerprint")
duration = data.get("duration")
if not fingerprint or duration is None:
logger.warning(
f"fpcalc output missing fingerprint or duration for {mapped_path}"
)
return None
logger.debug(f"Successfully generated fingerprint (duration: {duration}s)")
return fingerprint, duration
except (
subprocess.TimeoutExpired,
FileNotFoundError,
json.JSONDecodeError,
Exception,
) as e:
logger.warning(f"Error generating fingerprint for {mapped_path}: {e}")
return None
def get_file_properties(
file_path: str, docker_mount: Optional[str] = None
) -> Optional[Dict]:
"""Get audio file properties using ffprobe"""
mapped_path = map_docker_path(file_path, docker_mount)
if not os.path.exists(mapped_path):
return None
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
mapped_path,
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
audio_stream = next(
(s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None
)
format_info = data.get("format", {})
if not audio_stream:
return None
return {
"duration": float(format_info.get("duration", 0)),
"size": int(format_info.get("size", 0)),
"bitrate": int(format_info.get("bit_rate", 0)),
"sample_rate": int(audio_stream.get("sample_rate", 0)),
"channels": int(audio_stream.get("channels", 0)),
"codec": audio_stream.get("codec_name", ""),
"bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)),
}
except Exception as e:
logger.debug(f"Could not get file properties for {mapped_path}: {e}")
return None
def _format_context(log_context: Optional[str]) -> str:
"""Format log context message"""
return f" ({log_context})" if log_context else ""
def compare_fingerprints(
fp1_data: Optional[Tuple[str, int]],
fp2_data: Optional[Tuple[str, int]],
log_context: Optional[str] = None,
return_message: bool = False,
) -> Union[bool, Tuple[bool, str]]:
"""Compare audio fingerprints. Returns match or (match, message) if return_message=True"""
if not fp1_data or not fp2_data:
message = "Fingerprint comparison failed: missing fingerprint"
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
fp1, duration1 = fp1_data
fp2, duration2 = fp2_data
duration_diff = abs(duration1 - duration2)
if duration_diff > 5:
message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)"
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
if fp1 == fp2:
message = "Fingerprint comparison: exact match"
logger.debug(f"{message}{_format_context(log_context)}")
return (True, message) if return_message else True
try:
similarity = SequenceMatcher(None, fp1, fp2).ratio()
if duration_diff <= 1:
threshold = 0.90
elif duration_diff <= 3:
threshold = 0.93
else:
threshold = 0.95
match = similarity >= threshold
message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}"
logger.debug(f"{message}{_format_context(log_context)}")
return (match, message) if return_message else match
except Exception as e:
message = (
f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}"
)
logger.debug(f"{message}{_format_context(log_context)}")
return (False, message) if return_message else False
def check_mb_recording_id(single_track_info, album_track_info) -> Tuple[int, str]:
"""Check MusicBrainz Recording ID match. Returns (score_delta, message)"""
if not (single_track_info and album_track_info):
return 0, "⚠ MusicBrainz Recording ID unavailable"
single_mb_id = single_track_info.get("foreignRecordingId")
album_mb_id = album_track_info.get("foreignRecordingId")
if not (single_mb_id and album_mb_id):
return 0, "⚠ MusicBrainz Recording ID unavailable"
if single_mb_id == album_mb_id:
return 50, "✓ MusicBrainz Recording ID match (+50)"
return -30, "✗ Different MusicBrainz Recording IDs (-30)"
def check_quality_profile(
single_file_info, album_file_info
) -> Tuple[int, Optional[str]]:
"""Check Lidarr quality profile match. Returns (score_delta, message)"""
single_quality = (
single_file_info.get("quality", {}).get("quality", {}).get("name", "")
)
album_quality = (
album_file_info.get("quality", {}).get("quality", {}).get("name", "")
)
if not (single_quality and album_quality):
return 0, None
if single_quality == album_quality:
return 10, f"✓ Same quality ({single_quality}) (+10)"
return 0, f"⚠ Different quality ({single_quality} vs {album_quality})"
def check_file_properties(single_props, album_props) -> List[Tuple[int, str]]:
"""Check file properties. Returns list of (score_delta, message) tuples"""
if not (single_props and album_props):
return []
results = []
duration_diff = abs(single_props["duration"] - album_props["duration"])
if duration_diff <= 1:
results.append((15, f"✓ Duration match ({duration_diff:.1f}s diff) (+15)"))
elif duration_diff <= 3:
results.append((5, f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)"))
else:
results.append((-10, f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)"))
size_ratio = min(single_props["size"], album_props["size"]) / max(
single_props["size"], album_props["size"]
)
if size_ratio >= 0.95:
results.append((15, f"✓ File size match ({size_ratio:.2%}) (+15)"))
elif size_ratio >= 0.85:
results.append((5, f"⚠ Similar file size ({size_ratio:.2%}) (+5)"))
else:
results.append((0, f"⚠ Different file sizes ({size_ratio:.2%})"))
if single_props["bitrate"] > 0 and album_props["bitrate"] > 0:
bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max(
single_props["bitrate"], album_props["bitrate"]
)
if bitrate_ratio >= 0.90:
results.append((10, f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)"))
if single_props["sample_rate"] == album_props["sample_rate"]:
results.append(
(5, f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)")
)
if single_props["codec"] and album_props["codec"]:
if single_props["codec"] == album_props["codec"]:
results.append((5, f"✓ Same codec ({single_props['codec']}) (+5)"))
else:
results.append(
(
0,
f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})",
)
)
if single_props["channels"] == album_props["channels"]:
results.append((5, f"✓ Same channels ({single_props['channels']}) (+5)"))
else:
results.append(
(
0,
f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})",
)
)
if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0:
if single_props["bit_depth"] == album_props["bit_depth"]:
results.append(
(5, f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)")
)
else:
results.append(
(
0,
f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)",
)
)
return results