- Create .editorconfig for consistent coding styles. - Add .envrc for direnv integration. - Include .gitignore to exclude environment and build files. - Implement compare_movies.py and analyze_movies.py for movie library comparison and analysis. - Implement compare_series.py and analyze_series.py for TV series library comparison and analysis. - Add configuration example in config.example.txt. - Create README.md with project overview, setup instructions, and usage examples. - Add LICENSE file for MIT License. - Include flake.nix and flake.lock for Nix-based development environment. - Add USAGE.md for quick start guide and common commands.
543 lines
24 KiB
Python
Executable File
543 lines
24 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Compare Plex and Jellyfin movie libraries to find discrepancies.
|
|
Identifies movies that exist in filesystem/Jellyfin but are missing from Plex.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Set, Dict, List, Tuple
|
|
from collections import defaultdict
|
|
import json
|
|
|
|
try:
|
|
from plexapi.server import PlexServer
|
|
from plexapi.exceptions import NotFound, Unauthorized
|
|
except ImportError:
|
|
print("Error: plexapi not installed. Run: pip install -r requirements.txt")
|
|
sys.exit(1)
|
|
|
|
import requests
|
|
from requests.exceptions import RequestException
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
class MovieLibraryComparator:
|
|
def __init__(self, plex_url: str, plex_token: str,
|
|
jellyfin_url: str, jellyfin_api_key: str, jellyfin_user_id: str,
|
|
movies_path: str):
|
|
self.plex_url = plex_url
|
|
self.plex_token = plex_token
|
|
self.jellyfin_url = jellyfin_url.rstrip('/')
|
|
self.jellyfin_api_key = jellyfin_api_key
|
|
self.jellyfin_user_id = jellyfin_user_id
|
|
self.movies_path = Path(movies_path)
|
|
# Normalize the movies path for filtering
|
|
self.movies_path_normalized = self.normalize_path(str(self.movies_path))
|
|
|
|
def get_filesystem_movies(self) -> Dict[str, Dict]:
|
|
"""Scan filesystem for movie files and return normalized title -> path mapping."""
|
|
movies = {}
|
|
video_extensions = {'.mkv', '.avi', '.mp4', '.m4v', '.mov', '.wmv', '.flv', '.webm'}
|
|
|
|
print(f"Scanning filesystem at: {self.movies_path}")
|
|
|
|
if not self.movies_path.exists():
|
|
print(f"Warning: Movies path does not exist: {self.movies_path}")
|
|
return movies
|
|
|
|
for video_file in self.movies_path.rglob('*'):
|
|
if video_file.is_file() and video_file.suffix.lower() in video_extensions:
|
|
# Use filename without extension as key
|
|
title = video_file.stem
|
|
movies[title] = {
|
|
'path': video_file,
|
|
'full_path': str(video_file),
|
|
'filename': video_file.name,
|
|
'stem': video_file.stem
|
|
}
|
|
|
|
print(f"Found {len(movies)} video files in filesystem")
|
|
return movies
|
|
|
|
def get_plex_movies(self) -> Dict[str, Dict]:
|
|
"""Query Plex API to get all movies."""
|
|
print("\nConnecting to Plex...")
|
|
try:
|
|
plex = PlexServer(self.plex_url, self.plex_token)
|
|
movies_section = None
|
|
|
|
# Find the Movies library section
|
|
for section in plex.library.sections():
|
|
if section.type == 'movie':
|
|
movies_section = section
|
|
break
|
|
|
|
if not movies_section:
|
|
print("Error: No Movies library found in Plex")
|
|
return {}
|
|
|
|
print(f"Found Plex Movies library: {movies_section.title}")
|
|
movies_section.refresh() # Refresh to get latest data
|
|
|
|
plex_movies = {}
|
|
for movie in movies_section.all():
|
|
# Normalize title - use filename if available, otherwise title
|
|
key = movie.title
|
|
file_path = ''
|
|
|
|
if hasattr(movie, 'media') and movie.media:
|
|
# Try to get the actual filename
|
|
for media in movie.media:
|
|
if hasattr(media, 'parts') and media.parts:
|
|
for part in media.parts:
|
|
if hasattr(part, 'file'):
|
|
file_path = part.file
|
|
filename = Path(part.file).stem
|
|
key = filename
|
|
break
|
|
if file_path:
|
|
break
|
|
|
|
plex_movies[key] = {
|
|
'title': movie.title,
|
|
'year': getattr(movie, 'year', None),
|
|
'file': file_path,
|
|
'added_at': getattr(movie, 'addedAt', None),
|
|
}
|
|
|
|
print(f"Found {len(plex_movies)} movies in Plex")
|
|
return plex_movies
|
|
|
|
except Unauthorized:
|
|
print("Error: Plex authentication failed. Check your PLEX_TOKEN")
|
|
return {}
|
|
except Exception as e:
|
|
print(f"Error connecting to Plex: {e}")
|
|
return {}
|
|
|
|
def get_jellyfin_movies(self) -> Dict[str, Dict]:
|
|
"""Query Jellyfin API to get all movies."""
|
|
print("\nConnecting to Jellyfin...")
|
|
|
|
headers = {
|
|
'X-Emby-Token': self.jellyfin_api_key,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
# Get all items from the Movies library
|
|
url = f"{self.jellyfin_url}/Users/{self.jellyfin_user_id}/Items"
|
|
params = {
|
|
'Recursive': 'true',
|
|
'IncludeItemTypes': 'Movie',
|
|
'Fields': 'Path,MediaSources,DateCreated',
|
|
'Limit': 1000 # Adjust if you have more than 1000 movies
|
|
}
|
|
|
|
try:
|
|
all_movies = {}
|
|
start_index = 0
|
|
movies_without_path = 0
|
|
|
|
while True:
|
|
params['StartIndex'] = start_index
|
|
response = requests.get(url, headers=headers, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
items = data.get('Items', [])
|
|
|
|
if not items:
|
|
break
|
|
|
|
for item in items:
|
|
item_id = item.get('Id', '')
|
|
title = item.get('Name', '')
|
|
|
|
# Try to get path from multiple sources
|
|
path = item.get('Path', '')
|
|
|
|
# If Path is empty or looks like a directory, try MediaSources
|
|
if not path or (path and not Path(path).suffix):
|
|
media_sources = item.get('MediaSources', [])
|
|
if media_sources:
|
|
# Get the first media source's path
|
|
for media_source in media_sources:
|
|
media_path = media_source.get('Path', '')
|
|
if media_path and Path(media_path).suffix:
|
|
path = media_path
|
|
break
|
|
|
|
# If still no path, try to get it from the item details endpoint
|
|
if not path or (path and not Path(path).suffix):
|
|
try:
|
|
item_url = f"{self.jellyfin_url}/Users/{self.jellyfin_user_id}/Items/{item_id}"
|
|
item_response = requests.get(item_url, headers=headers, params={'Fields': 'MediaSources'}, timeout=10)
|
|
if item_response.status_code == 200:
|
|
item_data = item_response.json()
|
|
media_sources = item_data.get('MediaSources', [])
|
|
if media_sources:
|
|
for media_source in media_sources:
|
|
media_path = media_source.get('Path', '')
|
|
if media_path and Path(media_path).suffix:
|
|
path = media_path
|
|
break
|
|
except:
|
|
pass # If we can't get details, continue with what we have
|
|
|
|
# Filter: Only include movies from the configured Movies directory
|
|
if path:
|
|
path_normalized = self.normalize_path(path)
|
|
# Skip if not in the movies path
|
|
if not path_normalized.startswith(self.movies_path_normalized):
|
|
continue
|
|
else:
|
|
# Skip movies without paths as we can't determine their location
|
|
continue
|
|
|
|
# Determine key and store movie info
|
|
if path and Path(path).suffix:
|
|
filename = Path(path).stem
|
|
key = filename
|
|
else:
|
|
# Use title as fallback
|
|
key = title
|
|
movies_without_path += 1
|
|
|
|
all_movies[key] = {
|
|
'title': title,
|
|
'year': item.get('ProductionYear'),
|
|
'path': path,
|
|
'id': item_id,
|
|
'date_created': item.get('DateCreated'),
|
|
}
|
|
|
|
# Check if there are more items
|
|
total_records = data.get('TotalRecordCount', 0)
|
|
if start_index + len(items) >= total_records:
|
|
break
|
|
|
|
start_index += len(items)
|
|
|
|
print(f"Found {len(all_movies)} movies in Jellyfin")
|
|
if movies_without_path > 0:
|
|
print(f"Warning: {movies_without_path} movies without file paths (using title as key)")
|
|
return all_movies
|
|
|
|
except RequestException as e:
|
|
print(f"Error connecting to Jellyfin: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
print(f"Response: {e.response.text}")
|
|
return {}
|
|
|
|
def normalize_title(self, title: str) -> str:
|
|
"""Normalize title for comparison (lowercase, remove special chars)."""
|
|
return title.lower().strip()
|
|
|
|
def normalize_path(self, path: str) -> str:
|
|
"""Normalize path for comparison."""
|
|
if not path:
|
|
return ""
|
|
return str(Path(path)).lower().replace('\\', '/').strip()
|
|
|
|
def build_jellyfin_lookup(self, jellyfin_movies: Dict) -> Dict[str, Dict]:
|
|
"""Build a lookup index for Jellyfin movies by normalized paths and stems."""
|
|
lookup = {}
|
|
|
|
for jf_key, jf_data in jellyfin_movies.items():
|
|
jf_path = jf_data.get('path', '')
|
|
if jf_path:
|
|
path_obj = Path(jf_path)
|
|
|
|
# Index by normalized full path
|
|
path_norm = self.normalize_path(jf_path)
|
|
lookup[path_norm] = jf_data
|
|
|
|
# If it's a file (has extension), index by stem and filename
|
|
if path_obj.suffix:
|
|
# Index by normalized filename stem
|
|
stem = path_obj.stem
|
|
stem_norm = self.normalize_title(stem)
|
|
lookup[stem_norm] = jf_data
|
|
|
|
# Also index by filename (with extension)
|
|
filename = path_obj.name
|
|
filename_norm = self.normalize_title(filename)
|
|
lookup[filename_norm] = jf_data
|
|
else:
|
|
# It's a directory path, try to find files in it
|
|
# This handles cases where Jellyfin returns directory paths
|
|
try:
|
|
if path_obj.exists() and path_obj.is_dir():
|
|
for video_file in path_obj.glob('*'):
|
|
if video_file.is_file() and video_file.suffix.lower() in {'.mkv', '.avi', '.mp4', '.m4v', '.mov', '.wmv', '.flv', '.webm'}:
|
|
stem = video_file.stem
|
|
stem_norm = self.normalize_title(stem)
|
|
lookup[stem_norm] = jf_data
|
|
|
|
filename = video_file.name
|
|
filename_norm = self.normalize_title(filename)
|
|
lookup[filename_norm] = jf_data
|
|
|
|
full_path_norm = self.normalize_path(str(video_file))
|
|
lookup[full_path_norm] = jf_data
|
|
except:
|
|
pass # If we can't access the path, skip
|
|
|
|
# Also index by the key itself (normalized)
|
|
if jf_key:
|
|
key_norm = self.normalize_title(jf_key)
|
|
lookup[key_norm] = jf_data
|
|
|
|
# Also index by title (normalized)
|
|
title = jf_data.get('title', '')
|
|
if title:
|
|
title_norm = self.normalize_title(title)
|
|
lookup[title_norm] = jf_data
|
|
|
|
return lookup
|
|
|
|
def find_jellyfin_match(self, fs_path: str, fs_stem: str, jellyfin_lookup: Dict) -> Tuple[bool, str]:
|
|
"""Find if a filesystem movie exists in Jellyfin using the lookup index."""
|
|
fs_path_norm = self.normalize_path(fs_path)
|
|
fs_stem_norm = self.normalize_title(fs_stem)
|
|
fs_filename = Path(fs_path).name
|
|
fs_filename_norm = self.normalize_title(fs_filename)
|
|
|
|
# Extract just the filename part from the path for matching
|
|
# This handles cases where paths might be in different formats
|
|
fs_path_parts = Path(fs_path).parts
|
|
fs_basename = fs_path_parts[-1] if fs_path_parts else fs_filename
|
|
fs_basename_norm = self.normalize_title(fs_basename)
|
|
fs_basename_stem_norm = self.normalize_title(Path(fs_basename).stem)
|
|
|
|
# Try multiple matching strategies in order of specificity
|
|
match_keys = [
|
|
fs_path_norm, # Full normalized path
|
|
fs_stem_norm, # Filename stem
|
|
fs_filename_norm, # Full filename with extension
|
|
fs_basename_norm, # Just the basename
|
|
fs_basename_stem_norm, # Basename without extension
|
|
]
|
|
|
|
for norm_key in match_keys:
|
|
if norm_key and norm_key in jellyfin_lookup:
|
|
jf_data = jellyfin_lookup[norm_key]
|
|
return True, jf_data.get('title', '')
|
|
|
|
# Also try partial path matching (in case paths differ slightly)
|
|
# Check if any part of the filesystem path matches any Jellyfin path
|
|
if fs_path_norm:
|
|
for jf_norm_key, jf_data in jellyfin_lookup.items():
|
|
# Check if paths overlap significantly
|
|
if (fs_path_norm in jf_norm_key or jf_norm_key in fs_path_norm or
|
|
fs_stem_norm in jf_norm_key or jf_norm_key in fs_stem_norm):
|
|
# Additional check: make sure it's not just a partial word match
|
|
if len(fs_stem_norm) > 5 and len(jf_norm_key) > 5:
|
|
return True, jf_data.get('title', '')
|
|
|
|
return False, None
|
|
|
|
def compare_libraries(self) -> Tuple[Dict, Dict, Dict]:
|
|
"""Compare all three sources and return discrepancies."""
|
|
fs_movies = self.get_filesystem_movies()
|
|
plex_movies = self.get_plex_movies()
|
|
jellyfin_movies = self.get_jellyfin_movies()
|
|
|
|
# Build Jellyfin lookup index for efficient matching
|
|
jellyfin_lookup = self.build_jellyfin_lookup(jellyfin_movies)
|
|
|
|
# Normalize keys for comparison
|
|
fs_normalized = {self.normalize_title(k): (k, v) for k, v in fs_movies.items()}
|
|
plex_normalized = {self.normalize_title(k): (k, v) for k, v in plex_movies.items()}
|
|
jellyfin_normalized = {self.normalize_title(k): (k, v) for k, v in jellyfin_movies.items()}
|
|
|
|
# Find movies in filesystem but not in Plex
|
|
missing_from_plex = {}
|
|
debug_samples = [] # Store first few for debugging
|
|
|
|
for norm_key, (orig_key, fs_data) in fs_normalized.items():
|
|
if norm_key not in plex_normalized:
|
|
# Check if it's in Jellyfin using improved matching
|
|
fs_path = fs_data['full_path']
|
|
fs_stem = fs_data['stem']
|
|
in_jellyfin, jf_title = self.find_jellyfin_match(fs_path, fs_stem, jellyfin_lookup)
|
|
|
|
# Store debug info for first few unmatched items
|
|
if not in_jellyfin and len(debug_samples) < 3:
|
|
debug_samples.append({
|
|
'fs_path': fs_path,
|
|
'fs_stem': fs_stem,
|
|
'fs_norm_path': self.normalize_path(fs_path),
|
|
'fs_norm_stem': self.normalize_title(fs_stem),
|
|
'jellyfin_keys_sample': list(jellyfin_lookup.keys())[:5] if jellyfin_lookup else []
|
|
})
|
|
|
|
missing_from_plex[orig_key] = {
|
|
'path': fs_path,
|
|
'in_jellyfin': in_jellyfin,
|
|
'jellyfin_title': jf_title
|
|
}
|
|
|
|
# Print debug info if we have samples
|
|
if debug_samples:
|
|
print("\n🔍 DEBUG: Sample of unmatched files (first 3):")
|
|
for i, sample in enumerate(debug_samples, 1):
|
|
print(f"\n Sample {i}:")
|
|
print(f" FS Path: {sample['fs_path']}")
|
|
print(f" FS Stem: {sample['fs_stem']}")
|
|
print(f" Normalized Path: {sample['fs_norm_path']}")
|
|
print(f" Normalized Stem: {sample['fs_norm_stem']}")
|
|
print(f" Sample Jellyfin keys: {sample['jellyfin_keys_sample']}")
|
|
|
|
# Find movies in Jellyfin but not in Plex
|
|
missing_from_plex_jellyfin = {}
|
|
for orig_key, data in jellyfin_movies.items():
|
|
jf_stem = Path(data['path']).stem if data.get('path') else orig_key
|
|
jf_norm = self.normalize_title(jf_stem)
|
|
|
|
if jf_norm not in plex_normalized:
|
|
# Check if it's in filesystem using improved matching
|
|
jf_path = data.get('path', '')
|
|
in_filesystem = False
|
|
fs_path = None
|
|
|
|
if jf_path:
|
|
jf_path_norm = self.normalize_path(jf_path)
|
|
jf_stem_norm = self.normalize_title(jf_stem)
|
|
jf_filename = Path(jf_path).name
|
|
jf_filename_norm = self.normalize_title(jf_filename)
|
|
|
|
# Try to find matching filesystem movie
|
|
for fs_orig_key, fs_data in fs_movies.items():
|
|
fs_path_str = fs_data['full_path']
|
|
fs_path_norm = self.normalize_path(fs_path_str)
|
|
fs_stem_norm = self.normalize_title(fs_data['stem'])
|
|
fs_filename_norm = self.normalize_title(fs_data['filename'])
|
|
|
|
# Match by path, stem, or filename
|
|
if (jf_path_norm == fs_path_norm or
|
|
jf_stem_norm == fs_stem_norm or
|
|
jf_filename_norm == fs_filename_norm or
|
|
jf_path_norm in fs_path_norm or
|
|
fs_path_norm in jf_path_norm):
|
|
in_filesystem = True
|
|
fs_path = fs_path_str
|
|
break
|
|
|
|
missing_from_plex_jellyfin[orig_key] = {
|
|
'title': data['title'],
|
|
'path': jf_path,
|
|
'in_filesystem': in_filesystem,
|
|
'filesystem_path': fs_path
|
|
}
|
|
|
|
# Find movies in Plex but not in filesystem (orphaned)
|
|
orphaned_in_plex = {}
|
|
for norm_key, (orig_key, data) in plex_normalized.items():
|
|
if norm_key not in fs_normalized:
|
|
orphaned_in_plex[orig_key] = data
|
|
|
|
return missing_from_plex, missing_from_plex_jellyfin, orphaned_in_plex
|
|
|
|
def generate_report(self):
|
|
"""Generate and print a comprehensive comparison report."""
|
|
print("\n" + "="*80)
|
|
print("LIBRARY COMPARISON REPORT")
|
|
print("="*80)
|
|
|
|
missing_from_plex, missing_from_plex_jellyfin, orphaned_in_plex = self.compare_libraries()
|
|
|
|
print(f"\n📊 SUMMARY:")
|
|
print(f" Movies missing from Plex (found in filesystem): {len(missing_from_plex)}")
|
|
print(f" Movies missing from Plex (found in Jellyfin): {len(missing_from_plex_jellyfin)}")
|
|
print(f" Movies in Plex but not in filesystem: {len(orphaned_in_plex)}")
|
|
|
|
if missing_from_plex:
|
|
print(f"\n❌ MOVIES IN FILESYSTEM BUT MISSING FROM PLEX ({len(missing_from_plex)}):")
|
|
print("-" * 80)
|
|
for i, (title, info) in enumerate(sorted(missing_from_plex.items()), 1):
|
|
print(f"\n{i}. {title}")
|
|
print(f" Path: {info['path']}")
|
|
print(f" In Jellyfin: {'✓ Yes' if info['in_jellyfin'] else '✗ No'}")
|
|
if info['jellyfin_title']:
|
|
print(f" Jellyfin Title: {info['jellyfin_title']}")
|
|
|
|
if missing_from_plex_jellyfin and len(missing_from_plex_jellyfin) != len(missing_from_plex):
|
|
print(f"\n❌ MOVIES IN JELLYFIN BUT MISSING FROM PLEX ({len(missing_from_plex_jellyfin)}):")
|
|
print("-" * 80)
|
|
for i, (key, info) in enumerate(sorted(missing_from_plex_jellyfin.items()), 1):
|
|
print(f"\n{i}. {info['title']}")
|
|
print(f" Path: {info['path']}")
|
|
print(f" In Filesystem: {'✓ Yes' if info['in_filesystem'] else '✗ No'}")
|
|
if info['filesystem_path']:
|
|
print(f" Filesystem Path: {info['filesystem_path']}")
|
|
|
|
if orphaned_in_plex:
|
|
print(f"\n⚠️ MOVIES IN PLEX BUT NOT IN FILESYSTEM ({len(orphaned_in_plex)}):")
|
|
print("-" * 80)
|
|
for i, (title, info) in enumerate(sorted(orphaned_in_plex.items()), 1):
|
|
print(f"\n{i}. {title}")
|
|
if info.get('file'):
|
|
print(f" File: {info['file']}")
|
|
|
|
# Save detailed report to JSON
|
|
report_data = {
|
|
'missing_from_plex': missing_from_plex,
|
|
'missing_from_plex_jellyfin': missing_from_plex_jellyfin,
|
|
'orphaned_in_plex': orphaned_in_plex
|
|
}
|
|
|
|
report_file = Path('movies_comparison_report.json')
|
|
with open(report_file, 'w') as f:
|
|
json.dump(report_data, f, indent=2, default=str)
|
|
|
|
print(f"\n💾 Detailed report saved to: {report_file}")
|
|
print("="*80)
|
|
|
|
|
|
def main():
|
|
load_dotenv()
|
|
|
|
# Get configuration from environment variables
|
|
plex_url = os.getenv('PLEX_URL', 'http://localhost:32400')
|
|
plex_token = os.getenv('PLEX_TOKEN')
|
|
jellyfin_url = os.getenv('JELLYFIN_URL', 'http://localhost:8096')
|
|
jellyfin_api_key = os.getenv('JELLYFIN_API_KEY')
|
|
jellyfin_user_id = os.getenv('JELLYFIN_USER_ID')
|
|
movies_path = os.getenv('MOVIES_PATH')
|
|
|
|
# Validate required configuration
|
|
if not plex_token:
|
|
print("Error: PLEX_TOKEN not set in environment or .env file")
|
|
sys.exit(1)
|
|
|
|
if not jellyfin_api_key:
|
|
print("Error: JELLYFIN_API_KEY not set in environment or .env file")
|
|
sys.exit(1)
|
|
|
|
if not jellyfin_user_id:
|
|
print("Error: JELLYFIN_USER_ID not set in environment or .env file")
|
|
sys.exit(1)
|
|
|
|
if not movies_path:
|
|
print("Error: MOVIES_PATH not set in environment or .env file")
|
|
sys.exit(1)
|
|
|
|
comparator = MovieLibraryComparator(
|
|
plex_url=plex_url,
|
|
plex_token=plex_token,
|
|
jellyfin_url=jellyfin_url,
|
|
jellyfin_api_key=jellyfin_api_key,
|
|
jellyfin_user_id=jellyfin_user_id,
|
|
movies_path=movies_path
|
|
)
|
|
|
|
comparator.generate_report()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|