- Added a new command-line argument `--delete` to allow users to delete single albums after successful metadata migration. - Integrated the `unmonitor_and_delete_album` function to handle the deletion process for albums that meet the migration criteria. - Enhanced the `migrate_plex_metadata` function to support exclusion of smart playlists during migration. - Updated logging to provide detailed feedback on the deletion process and migration results.
287 lines
10 KiB
Python
287 lines
10 KiB
Python
"""Plex metadata migration functions"""
|
||
|
||
import logging
|
||
from typing import List, Optional, Tuple
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def get_plex_server(plex_url: str, plex_token: str):
|
||
"""Connect to Plex server"""
|
||
try:
|
||
from plexapi.server import PlexServer
|
||
|
||
return PlexServer(plex_url, plex_token)
|
||
except ImportError:
|
||
logger.error("python-plexapi not installed. Install with: pip install plexapi")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Failed to connect to Plex server: {e}")
|
||
return None
|
||
|
||
|
||
def find_plex_track_by_path(
|
||
plex_server, file_path: str, docker_mount: Optional[str] = None
|
||
):
|
||
"""Find a Plex track by its file path"""
|
||
from audio_verification import map_docker_path
|
||
import os
|
||
|
||
try:
|
||
mapped_path = map_docker_path(file_path, docker_mount)
|
||
logger.debug(
|
||
f"Searching for track: lidarr_path={file_path}, mapped_path={mapped_path}"
|
||
)
|
||
|
||
music_sections = [
|
||
s for s in plex_server.library.sections() if s.type == "artist"
|
||
]
|
||
if not music_sections:
|
||
logger.warning("No music sections found in Plex")
|
||
return None
|
||
|
||
# Strategy: Check track.locations (list of file paths)
|
||
for section in music_sections:
|
||
all_tracks = section.searchTracks()
|
||
for track in all_tracks:
|
||
track_locations = getattr(track, "locations", [])
|
||
if mapped_path in track_locations or file_path in track_locations:
|
||
logger.debug(
|
||
f"Found track by locations match: {track.title} - {track_locations[0] if track_locations else 'unknown'}"
|
||
)
|
||
return track
|
||
|
||
logger.warning(
|
||
f"Could not find Plex track. Paths tried: {file_path}, {mapped_path}"
|
||
)
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Error finding Plex track for path {file_path}: {e}")
|
||
import traceback
|
||
|
||
logger.debug(traceback.format_exc())
|
||
return None
|
||
|
||
|
||
def get_smart_playlist_ids(plex_server) -> set:
|
||
"""Get set of smart playlist IDs to exclude from migration"""
|
||
try:
|
||
smart_playlists = [
|
||
p.ratingKey
|
||
for p in plex_server.playlists()
|
||
if p.playlistType == "audio" and p.smart
|
||
]
|
||
if smart_playlists:
|
||
logger.info(
|
||
f"Found {len(smart_playlists)} smart playlists (will exclude from migration)"
|
||
)
|
||
return set(smart_playlists)
|
||
except Exception as e:
|
||
logger.debug(f"Could not get smart playlists: {e}")
|
||
return set()
|
||
|
||
|
||
def get_plex_playlists_for_track(
|
||
plex_server, track, exclude_smart_playlists: set = None
|
||
) -> List:
|
||
"""Get all manual playlists containing this track (excludes smart playlists)"""
|
||
if exclude_smart_playlists is None:
|
||
exclude_smart_playlists = set()
|
||
|
||
try:
|
||
return [
|
||
playlist
|
||
for playlist in plex_server.playlists()
|
||
if playlist.playlistType == "audio"
|
||
and playlist.ratingKey not in exclude_smart_playlists
|
||
and any(item.ratingKey == track.ratingKey for item in playlist.items())
|
||
]
|
||
except Exception as e:
|
||
logger.debug(f"Could not get playlists: {e}")
|
||
return []
|
||
|
||
|
||
def migrate_rating(
|
||
single_track, album_track, single_rating, original_album_rating
|
||
) -> Tuple[List[str], List[str], List[str]]:
|
||
"""Migrate rating. Returns (changes, already_has, failures)"""
|
||
if not single_rating:
|
||
return [], [], []
|
||
|
||
if original_album_rating:
|
||
logger.info(f" Album already has rating: {original_album_rating}/10")
|
||
return [], [f"rating ({original_album_rating}/10)"], []
|
||
|
||
try:
|
||
logger.info(f" Setting rating to {single_rating}/10...")
|
||
album_track.rate(single_rating)
|
||
album_track.reload()
|
||
new_rating = getattr(album_track, "userRating", None)
|
||
|
||
if new_rating != single_rating:
|
||
logger.warning(
|
||
f" ⚠ Rating mismatch: expected {single_rating}, got {new_rating}"
|
||
)
|
||
return [], [], [f"rating (set to {single_rating} but got {new_rating})"]
|
||
|
||
logger.info(f" ✓ Rating verified: {new_rating}/10")
|
||
return [f"rating ({single_rating}/10) ✓ verified"], [], []
|
||
except Exception as e:
|
||
logger.error(f"Failed to migrate rating: {e}")
|
||
return [], [], [f"rating (error: {e})"]
|
||
|
||
|
||
def migrate_play_count(
|
||
album_track, single_plays, album_plays
|
||
) -> Tuple[List[str], List[str], List[str]]:
|
||
"""Migrate play count. Returns (changes, already_has, failures)"""
|
||
if single_plays <= 0:
|
||
return [], [], []
|
||
|
||
expected_count = album_plays + single_plays
|
||
logger.info(
|
||
f" Migrating play count: single={single_plays}, album={album_plays}, expected={expected_count}"
|
||
)
|
||
|
||
try:
|
||
list(
|
||
map(
|
||
lambda i: (
|
||
album_track.markPlayed(),
|
||
(
|
||
logger.debug(
|
||
f" Marked played {i + 1}/{single_plays} times..."
|
||
)
|
||
if (i + 1) % 10 == 0
|
||
else None
|
||
),
|
||
)[0],
|
||
range(single_plays),
|
||
)
|
||
)
|
||
|
||
album_track.reload()
|
||
new_count = getattr(album_track, "viewCount", 0) or 0
|
||
|
||
if new_count != expected_count:
|
||
logger.warning(
|
||
f" ⚠ Play count mismatch: expected {expected_count}, got {new_count}"
|
||
)
|
||
return (
|
||
[],
|
||
[],
|
||
[f"play count (expected {expected_count} but got {new_count})"],
|
||
)
|
||
|
||
logger.info(f" ✓ Play count verified: {new_count}")
|
||
return (
|
||
[f"play count ({album_plays} + {single_plays} = {new_count}) ✓ verified"],
|
||
[],
|
||
[],
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to migrate play count: {e}")
|
||
return [], [], [f"play count (error: {e})"]
|
||
|
||
|
||
def migrate_playlist(playlist, album_track) -> Tuple[List[str], List[str], List[str]]:
|
||
"""Migrate single playlist. Returns (changes, already_has, failures)"""
|
||
playlist_name = playlist.title
|
||
|
||
try:
|
||
if any(item.ratingKey == album_track.ratingKey for item in playlist.items()):
|
||
logger.info(f" Album already in playlist: '{playlist_name}'")
|
||
return [], [f"playlist '{playlist_name}'"], []
|
||
|
||
logger.info(f" Adding to playlist: '{playlist_name}'...")
|
||
playlist.addItems(album_track)
|
||
playlist.reload()
|
||
|
||
if not any(
|
||
item.ratingKey == album_track.ratingKey for item in playlist.items()
|
||
):
|
||
logger.warning(f" ⚠ Playlist '{playlist_name}' add failed verification")
|
||
return [], [], [f"playlist '{playlist_name}' (add failed)"]
|
||
|
||
logger.info(f" ✓ Playlist '{playlist_name}' verified")
|
||
return [f"added to playlist '{playlist_name}' ✓ verified"], [], []
|
||
except Exception as e:
|
||
logger.error(f"Failed to add to playlist '{playlist_name}': {e}")
|
||
return [], [], [f"playlist '{playlist_name}' (error: {e})"]
|
||
|
||
|
||
def format_migration_message(
|
||
changes: List[str], already_has: List[str], failures: List[str]
|
||
) -> str:
|
||
"""Format migration result message"""
|
||
parts = list(
|
||
filter(
|
||
None,
|
||
[
|
||
f"✅ Migrated: {', '.join(changes)}" if changes else None,
|
||
f"ℹ️ Already has: {', '.join(already_has)}" if already_has else None,
|
||
f"❌ Failed: {', '.join(failures)}" if failures else None,
|
||
],
|
||
)
|
||
)
|
||
return " | ".join(parts) if parts else "No metadata to migrate"
|
||
|
||
|
||
def migrate_plex_metadata(
|
||
plex_server,
|
||
single_file_path: str,
|
||
album_file_path: str,
|
||
docker_mount: Optional[str] = None,
|
||
exclude_smart_playlists: set = None,
|
||
) -> Tuple[bool, str]:
|
||
"""Migrate Plex metadata from single to album track. Returns (success, message)"""
|
||
if not plex_server:
|
||
return False, "Plex server not connected"
|
||
|
||
single_track = find_plex_track_by_path(plex_server, single_file_path, docker_mount)
|
||
album_track = find_plex_track_by_path(plex_server, album_file_path, docker_mount)
|
||
|
||
if not single_track:
|
||
return False, "Could not find single track in Plex"
|
||
if not album_track:
|
||
return False, "Could not find album track in Plex"
|
||
|
||
single_rating = getattr(single_track, "userRating", None)
|
||
single_plays = getattr(single_track, "viewCount", 0) or 0
|
||
single_playlists = get_plex_playlists_for_track(
|
||
plex_server, single_track, exclude_smart_playlists
|
||
)
|
||
|
||
logger.info(
|
||
f" Single track metadata: rating={single_rating or 'none'}, plays={single_plays}, playlists={len(single_playlists)}"
|
||
)
|
||
if single_playlists:
|
||
logger.info(
|
||
f" Single is in playlists: {', '.join(p.title for p in single_playlists)}"
|
||
)
|
||
|
||
original_album_rating = getattr(album_track, "userRating", None)
|
||
album_plays = getattr(album_track, "viewCount", 0) or 0
|
||
|
||
rating_changes, rating_already, rating_failures = migrate_rating(
|
||
single_track, album_track, single_rating, original_album_rating
|
||
)
|
||
|
||
plays_changes, plays_already, plays_failures = migrate_play_count(
|
||
album_track, single_plays, album_plays
|
||
)
|
||
|
||
playlist_results = list(
|
||
map(lambda p: migrate_playlist(p, album_track), single_playlists)
|
||
)
|
||
playlist_changes = [c for result in playlist_results for c in result[0]]
|
||
playlist_already = [a for result in playlist_results for a in result[1]]
|
||
playlist_failures = [f for result in playlist_results for f in result[2]]
|
||
|
||
all_changes = rating_changes + plays_changes + playlist_changes
|
||
all_already = rating_already + plays_already + playlist_already
|
||
all_failures = rating_failures + plays_failures + playlist_failures
|
||
|
||
message = format_migration_message(all_changes, all_already, all_failures)
|
||
return len(all_failures) == 0, message
|