Files
lidarr-mb-gap/src-cleanup/plex_metadata.py
Danilo Reyes cc9521f7a4 Implement album deletion feature after metadata migration
- Added a new command-line argument `--delete` to allow users to delete single albums after successful metadata migration.
- Integrated the `unmonitor_and_delete_album` function to handle the deletion process for albums that meet the migration criteria.
- Enhanced the `migrate_plex_metadata` function to support exclusion of smart playlists during migration.
- Updated logging to provide detailed feedback on the deletion process and migration results.
2025-11-14 02:04:11 -06:00

287 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Plex metadata migration functions"""
import logging
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
def get_plex_server(plex_url: str, plex_token: str):
"""Connect to Plex server"""
try:
from plexapi.server import PlexServer
return PlexServer(plex_url, plex_token)
except ImportError:
logger.error("python-plexapi not installed. Install with: pip install plexapi")
return None
except Exception as e:
logger.error(f"Failed to connect to Plex server: {e}")
return None
def find_plex_track_by_path(
plex_server, file_path: str, docker_mount: Optional[str] = None
):
"""Find a Plex track by its file path"""
from audio_verification import map_docker_path
import os
try:
mapped_path = map_docker_path(file_path, docker_mount)
logger.debug(
f"Searching for track: lidarr_path={file_path}, mapped_path={mapped_path}"
)
music_sections = [
s for s in plex_server.library.sections() if s.type == "artist"
]
if not music_sections:
logger.warning("No music sections found in Plex")
return None
# Strategy: Check track.locations (list of file paths)
for section in music_sections:
all_tracks = section.searchTracks()
for track in all_tracks:
track_locations = getattr(track, "locations", [])
if mapped_path in track_locations or file_path in track_locations:
logger.debug(
f"Found track by locations match: {track.title} - {track_locations[0] if track_locations else 'unknown'}"
)
return track
logger.warning(
f"Could not find Plex track. Paths tried: {file_path}, {mapped_path}"
)
return None
except Exception as e:
logger.error(f"Error finding Plex track for path {file_path}: {e}")
import traceback
logger.debug(traceback.format_exc())
return None
def get_smart_playlist_ids(plex_server) -> set:
"""Get set of smart playlist IDs to exclude from migration"""
try:
smart_playlists = [
p.ratingKey
for p in plex_server.playlists()
if p.playlistType == "audio" and p.smart
]
if smart_playlists:
logger.info(
f"Found {len(smart_playlists)} smart playlists (will exclude from migration)"
)
return set(smart_playlists)
except Exception as e:
logger.debug(f"Could not get smart playlists: {e}")
return set()
def get_plex_playlists_for_track(
plex_server, track, exclude_smart_playlists: set = None
) -> List:
"""Get all manual playlists containing this track (excludes smart playlists)"""
if exclude_smart_playlists is None:
exclude_smart_playlists = set()
try:
return [
playlist
for playlist in plex_server.playlists()
if playlist.playlistType == "audio"
and playlist.ratingKey not in exclude_smart_playlists
and any(item.ratingKey == track.ratingKey for item in playlist.items())
]
except Exception as e:
logger.debug(f"Could not get playlists: {e}")
return []
def migrate_rating(
single_track, album_track, single_rating, original_album_rating
) -> Tuple[List[str], List[str], List[str]]:
"""Migrate rating. Returns (changes, already_has, failures)"""
if not single_rating:
return [], [], []
if original_album_rating:
logger.info(f" Album already has rating: {original_album_rating}/10")
return [], [f"rating ({original_album_rating}/10)"], []
try:
logger.info(f" Setting rating to {single_rating}/10...")
album_track.rate(single_rating)
album_track.reload()
new_rating = getattr(album_track, "userRating", None)
if new_rating != single_rating:
logger.warning(
f" ⚠ Rating mismatch: expected {single_rating}, got {new_rating}"
)
return [], [], [f"rating (set to {single_rating} but got {new_rating})"]
logger.info(f" ✓ Rating verified: {new_rating}/10")
return [f"rating ({single_rating}/10) ✓ verified"], [], []
except Exception as e:
logger.error(f"Failed to migrate rating: {e}")
return [], [], [f"rating (error: {e})"]
def migrate_play_count(
album_track, single_plays, album_plays
) -> Tuple[List[str], List[str], List[str]]:
"""Migrate play count. Returns (changes, already_has, failures)"""
if single_plays <= 0:
return [], [], []
expected_count = album_plays + single_plays
logger.info(
f" Migrating play count: single={single_plays}, album={album_plays}, expected={expected_count}"
)
try:
list(
map(
lambda i: (
album_track.markPlayed(),
(
logger.debug(
f" Marked played {i + 1}/{single_plays} times..."
)
if (i + 1) % 10 == 0
else None
),
)[0],
range(single_plays),
)
)
album_track.reload()
new_count = getattr(album_track, "viewCount", 0) or 0
if new_count != expected_count:
logger.warning(
f" ⚠ Play count mismatch: expected {expected_count}, got {new_count}"
)
return (
[],
[],
[f"play count (expected {expected_count} but got {new_count})"],
)
logger.info(f" ✓ Play count verified: {new_count}")
return (
[f"play count ({album_plays} + {single_plays} = {new_count}) ✓ verified"],
[],
[],
)
except Exception as e:
logger.error(f"Failed to migrate play count: {e}")
return [], [], [f"play count (error: {e})"]
def migrate_playlist(playlist, album_track) -> Tuple[List[str], List[str], List[str]]:
"""Migrate single playlist. Returns (changes, already_has, failures)"""
playlist_name = playlist.title
try:
if any(item.ratingKey == album_track.ratingKey for item in playlist.items()):
logger.info(f" Album already in playlist: '{playlist_name}'")
return [], [f"playlist '{playlist_name}'"], []
logger.info(f" Adding to playlist: '{playlist_name}'...")
playlist.addItems(album_track)
playlist.reload()
if not any(
item.ratingKey == album_track.ratingKey for item in playlist.items()
):
logger.warning(f" ⚠ Playlist '{playlist_name}' add failed verification")
return [], [], [f"playlist '{playlist_name}' (add failed)"]
logger.info(f" ✓ Playlist '{playlist_name}' verified")
return [f"added to playlist '{playlist_name}' ✓ verified"], [], []
except Exception as e:
logger.error(f"Failed to add to playlist '{playlist_name}': {e}")
return [], [], [f"playlist '{playlist_name}' (error: {e})"]
def format_migration_message(
changes: List[str], already_has: List[str], failures: List[str]
) -> str:
"""Format migration result message"""
parts = list(
filter(
None,
[
f"✅ Migrated: {', '.join(changes)}" if changes else None,
f" Already has: {', '.join(already_has)}" if already_has else None,
f"❌ Failed: {', '.join(failures)}" if failures else None,
],
)
)
return " | ".join(parts) if parts else "No metadata to migrate"
def migrate_plex_metadata(
plex_server,
single_file_path: str,
album_file_path: str,
docker_mount: Optional[str] = None,
exclude_smart_playlists: set = None,
) -> Tuple[bool, str]:
"""Migrate Plex metadata from single to album track. Returns (success, message)"""
if not plex_server:
return False, "Plex server not connected"
single_track = find_plex_track_by_path(plex_server, single_file_path, docker_mount)
album_track = find_plex_track_by_path(plex_server, album_file_path, docker_mount)
if not single_track:
return False, "Could not find single track in Plex"
if not album_track:
return False, "Could not find album track in Plex"
single_rating = getattr(single_track, "userRating", None)
single_plays = getattr(single_track, "viewCount", 0) or 0
single_playlists = get_plex_playlists_for_track(
plex_server, single_track, exclude_smart_playlists
)
logger.info(
f" Single track metadata: rating={single_rating or 'none'}, plays={single_plays}, playlists={len(single_playlists)}"
)
if single_playlists:
logger.info(
f" Single is in playlists: {', '.join(p.title for p in single_playlists)}"
)
original_album_rating = getattr(album_track, "userRating", None)
album_plays = getattr(album_track, "viewCount", 0) or 0
rating_changes, rating_already, rating_failures = migrate_rating(
single_track, album_track, single_rating, original_album_rating
)
plays_changes, plays_already, plays_failures = migrate_play_count(
album_track, single_plays, album_plays
)
playlist_results = list(
map(lambda p: migrate_playlist(p, album_track), single_playlists)
)
playlist_changes = [c for result in playlist_results for c in result[0]]
playlist_already = [a for result in playlist_results for a in result[1]]
playlist_failures = [f for result in playlist_results for f in result[2]]
all_changes = rating_changes + plays_changes + playlist_changes
all_already = rating_already + plays_already + playlist_already
all_failures = rating_failures + plays_failures + playlist_failures
message = format_migration_message(all_changes, all_already, all_failures)
return len(all_failures) == 0, message