#!/usr/bin/env python3 """ Compare Plex and Jellyfin TV series libraries to find discrepancies. Identifies series that exist in filesystem/Jellyfin but are missing from Plex. """ import os import sys from pathlib import Path from typing import Set, Dict, List, Tuple from collections import defaultdict import json try: from plexapi.server import PlexServer from plexapi.exceptions import NotFound, Unauthorized except ImportError: print("Error: plexapi not installed. Run: pip install -r requirements.txt") sys.exit(1) import requests from requests.exceptions import RequestException from dotenv import load_dotenv class SeriesLibraryComparator: def __init__(self, plex_url: str, plex_token: str, jellyfin_url: str, jellyfin_api_key: str, jellyfin_user_id: str, series_paths: List[str]): self.plex_url = plex_url self.plex_token = plex_token self.jellyfin_url = jellyfin_url.rstrip('/') self.jellyfin_api_key = jellyfin_api_key self.jellyfin_user_id = jellyfin_user_id # Support multiple series paths if isinstance(series_paths, str): series_paths = [series_paths] self.series_paths = [Path(p) for p in series_paths] self.series_paths_normalized = [self.normalize_path(str(p)) for p in self.series_paths] def normalize_path(self, path: str) -> str: """Normalize path for comparison.""" if not path: return "" return str(Path(path)).lower().replace('\\', '/').strip() def normalize_title(self, title: str) -> str: """Normalize title for comparison (lowercase, normalize punctuation).""" import re normalized = title.lower().strip() # Remove year in parentheses normalized = re.sub(r'\s*\(\d{4}\)\s*', ' ', normalized) # Remove all punctuation and special characters (keep alphanumeric and spaces only) normalized = re.sub(r'[^a-z0-9\s]', ' ', normalized) # Normalize multiple spaces normalized = re.sub(r'\s+', ' ', normalized) # Remove common articles normalized = re.sub(r'\b(the|a|an)\b', ' ', normalized) # Normalize multiple spaces again after removals normalized = re.sub(r'\s+', ' ', normalized) return normalized.strip() def extract_base_title(self, dirname: str) -> str: """Extract base title from directory name, removing year, imdb tags, etc.""" import re # Remove patterns like (2021), [imdbid-tt123456], [tvdbid-123456], {imdb-tt123456} # Note: Use * instead of + to match zero or more characters (handles empty IDs like [imdbid-]) cleaned = re.sub(r'\s*\(\d{4}\)\s*', ' ', dirname) # Remove (year) cleaned = re.sub(r'\s*\[imdbid-[^\]]*\]\s*', ' ', cleaned) # Remove [imdbid-...] or [imdbid-] cleaned = re.sub(r'\s*\[tvdbid-[^\]]*\]\s*', ' ', cleaned) # Remove [tvdbid-...] or [tvdbid-] cleaned = re.sub(r'\s*\{imdb-[^\}]*\}\s*', ' ', cleaned) # Remove {imdb-...} or {imdb-} cleaned = re.sub(r'\s*\{tvdb-[^\}]*\}\s*', ' ', cleaned) # Remove {tvdb-...} or {tvdb-} # Remove trailing dash/hyphen with spaces cleaned = re.sub(r'\s*-\s*$', '', cleaned) cleaned = re.sub(r'\s+', ' ', cleaned) # Normalize whitespace return cleaned.strip() def get_filesystem_series(self) -> Dict[str, Dict]: """Scan filesystem for TV series directories.""" series = {} for series_path in self.series_paths: print(f"Scanning filesystem at: {series_path}") if not series_path.exists(): print(f"Warning: Series path does not exist: {series_path}") continue # Each subdirectory of the series path is assumed to be a TV show for series_dir in series_path.iterdir(): if series_dir.is_dir() and not series_dir.name.startswith('.'): # Count episodes episode_count = 0 video_extensions = {'.mkv', '.avi', '.mp4', '.m4v', '.mov', '.wmv', '.flv', '.webm'} for video_file in series_dir.rglob('*'): if video_file.is_file() and video_file.suffix.lower() in video_extensions: episode_count += 1 # Use full path as key to handle duplicates across paths key = f"{series_dir.name}||{series_path.name}" series[key] = { 'path': series_dir, 'full_path': str(series_dir), 'name': series_dir.name, 'episode_count': episode_count, 'base_path': str(series_path) } print(f"Found {len(series)} TV series directories in filesystem") return series def get_plex_series(self) -> Dict[str, Dict]: """Query Plex API to get all TV series.""" print("\nConnecting to Plex...") try: plex = PlexServer(self.plex_url, self.plex_token) series_section = None # Find the TV Shows library section for section in plex.library.sections(): if section.type == 'show': series_section = section break if not series_section: print("Error: No TV Shows library found in Plex") return {} print(f"Found Plex TV Shows library: {series_section.title}") plex_series = {} for show in series_section.all(): episode_count = 0 try: episode_count = show.leafCount # Total episodes except: pass # Try to get the directory path show_path = '' try: if hasattr(show, 'locations') and show.locations: show_path = show.locations[0] except: pass plex_series[show.title] = { 'title': show.title, 'year': getattr(show, 'year', None), 'episode_count': episode_count, 'path': show_path, 'added_at': getattr(show, 'addedAt', None), } print(f"Found {len(plex_series)} TV series in Plex") return plex_series except Unauthorized: print("Error: Plex authentication failed. Check your PLEX_TOKEN") return {} except Exception as e: print(f"Error connecting to Plex: {e}") return {} def get_jellyfin_series(self) -> Dict[str, Dict]: """Query Jellyfin API to get all TV series.""" print("\nConnecting to Jellyfin...") headers = { 'X-Emby-Token': self.jellyfin_api_key, 'Content-Type': 'application/json' } # Get all TV series from the library url = f"{self.jellyfin_url}/Users/{self.jellyfin_user_id}/Items" params = { 'Recursive': 'false', 'IncludeItemTypes': 'Series', 'Fields': 'Path,DateCreated', 'Limit': 1000 } try: all_series = {} start_index = 0 while True: params['StartIndex'] = start_index response = requests.get(url, headers=headers, params=params, timeout=30) response.raise_for_status() data = response.json() items = data.get('Items', []) if not items: break for item in items: series_id = item.get('Id', '') title = item.get('Name', '') path = item.get('Path', '') # Filter: Only include series from the configured Series directories if path: path_normalized = self.normalize_path(path) # Check if path starts with any of the configured series paths if not any(path_normalized.startswith(sp) for sp in self.series_paths_normalized): continue else: continue # Get episode count episode_count = 0 try: # Query for episode count episodes_url = f"{self.jellyfin_url}/Users/{self.jellyfin_user_id}/Items" episodes_params = { 'ParentId': series_id, 'Recursive': 'true', 'IncludeItemTypes': 'Episode', 'Limit': 1 } ep_response = requests.get(episodes_url, headers=headers, params=episodes_params, timeout=10) if ep_response.status_code == 200: ep_data = ep_response.json() episode_count = ep_data.get('TotalRecordCount', 0) except: pass all_series[title] = { 'title': title, 'year': item.get('ProductionYear'), 'path': path, 'id': series_id, 'episode_count': episode_count, 'date_created': item.get('DateCreated'), } # Check if there are more items total_records = data.get('TotalRecordCount', 0) if start_index + len(items) >= total_records: break start_index += len(items) print(f"Found {len(all_series)} TV series in Jellyfin") return all_series except RequestException as e: print(f"Error connecting to Jellyfin: {e}") if hasattr(e, 'response') and e.response is not None: print(f"Response: {e.response.text}") return {} def build_jellyfin_lookup(self, jellyfin_series: Dict) -> Dict[str, Dict]: """Build a lookup index for Jellyfin series by normalized names and paths.""" lookup = {} for jf_key, jf_data in jellyfin_series.items(): # Index by normalized title title_norm = self.normalize_title(jf_data['title']) lookup[title_norm] = jf_data # Index by normalized path directory name (with and without year/tags) jf_path = jf_data.get('path', '') if jf_path: path_obj = Path(jf_path) dirname = path_obj.name dirname_norm = self.normalize_title(dirname) lookup[dirname_norm] = jf_data # Also index by base title (without year/tags) base_title = self.extract_base_title(dirname) base_title_norm = self.normalize_title(base_title) lookup[base_title_norm] = jf_data return lookup def find_jellyfin_match(self, fs_name: str, fs_path: str, jellyfin_lookup: Dict) -> Tuple[bool, str, int]: """Find if a filesystem series exists in Jellyfin.""" fs_name_norm = self.normalize_title(fs_name) fs_path_norm = self.normalize_path(fs_path) # Extract base title from filesystem name fs_base_title = self.extract_base_title(fs_name) fs_base_norm = self.normalize_title(fs_base_title) # Try matching by normalized name (full and base) for norm_key in [fs_name_norm, fs_base_norm]: if norm_key in jellyfin_lookup: jf_data = jellyfin_lookup[norm_key] return True, jf_data.get('title', ''), jf_data.get('episode_count', 0) # Try matching by path for jf_norm_key, jf_data in jellyfin_lookup.items(): jf_path = jf_data.get('path', '') if jf_path: jf_path_norm = self.normalize_path(jf_path) if fs_path_norm == jf_path_norm or fs_path_norm in jf_path_norm: return True, jf_data.get('title', ''), jf_data.get('episode_count', 0) return False, None, 0 def compare_libraries(self) -> Tuple[Dict, Dict, Dict]: """Compare all three sources and return discrepancies.""" fs_series = self.get_filesystem_series() plex_series = self.get_plex_series() jellyfin_series = self.get_jellyfin_series() # Build Jellyfin lookup index jellyfin_lookup = self.build_jellyfin_lookup(jellyfin_series) # Normalize keys for comparison - extract base title from filesystem fs_normalized = {} for k, v in fs_series.items(): # Extract base title from directory name (remove year, imdb tags) base_title = self.extract_base_title(v['name']) normalized = self.normalize_title(base_title) fs_normalized[normalized] = (k, v) plex_normalized = {self.normalize_title(k): (k, v) for k, v in plex_series.items()} # Find series in filesystem but not in Plex missing_from_plex = {} for norm_key, (orig_key, fs_data) in fs_normalized.items(): if norm_key not in plex_normalized: # Double-check by path - maybe the title doesn't match but path does fs_path = fs_data['full_path'] fs_path_norm = self.normalize_path(fs_path) found_in_plex = False # Check if any Plex series has a matching path for plex_title, plex_data in plex_series.items(): plex_path = plex_data.get('path', '') if plex_path: plex_path_norm = self.normalize_path(plex_path) if fs_path_norm == plex_path_norm or fs_path_norm in plex_path_norm or plex_path_norm in fs_path_norm: found_in_plex = True break if not found_in_plex: fs_name = fs_data['name'] in_jellyfin, jf_title, jf_ep_count = self.find_jellyfin_match(fs_name, fs_path, jellyfin_lookup) missing_from_plex[orig_key] = { 'path': fs_path, 'episode_count': fs_data['episode_count'], 'in_jellyfin': in_jellyfin, 'jellyfin_title': jf_title, 'jellyfin_episode_count': jf_ep_count } # Find series in Jellyfin but not in Plex missing_from_plex_jellyfin = {} for orig_key, data in jellyfin_series.items(): jf_title_norm = self.normalize_title(data['title']) if jf_title_norm not in plex_normalized: jf_path = data.get('path', '') in_filesystem = False fs_path = None fs_ep_count = 0 if jf_path: jf_path_norm = self.normalize_path(jf_path) dirname = Path(jf_path).name dirname_norm = self.normalize_title(dirname) jf_title_norm_alt = self.normalize_title(data['title']) for fs_orig_key, fs_data in fs_series.items(): fs_path_str = fs_data['full_path'] fs_path_norm = self.normalize_path(fs_path_str) fs_name_norm = self.normalize_title(fs_data['name']) # Match by path, directory name, or title if (jf_path_norm == fs_path_norm or dirname_norm == fs_name_norm or jf_title_norm_alt == fs_name_norm): in_filesystem = True fs_path = fs_path_str fs_ep_count = fs_data['episode_count'] break missing_from_plex_jellyfin[orig_key] = { 'title': data['title'], 'path': jf_path, 'episode_count': data['episode_count'], 'in_filesystem': in_filesystem, 'filesystem_path': fs_path, 'filesystem_episode_count': fs_ep_count } # Find series in Plex but not in filesystem (orphaned) orphaned_in_plex = {} for norm_key, (orig_key, data) in plex_normalized.items(): if norm_key not in fs_normalized: # Double-check by looking at the actual path if available plex_path = data.get('path', '') found_in_fs = False if plex_path: plex_path_norm = self.normalize_path(plex_path) # Check if this path matches any filesystem series for fs_key, fs_data in fs_series.items(): fs_path_norm = self.normalize_path(fs_data['full_path']) if plex_path_norm == fs_path_norm or plex_path_norm in fs_path_norm or fs_path_norm in plex_path_norm: found_in_fs = True break if not found_in_fs: orphaned_in_plex[orig_key] = data return missing_from_plex, missing_from_plex_jellyfin, orphaned_in_plex def generate_report(self): """Generate and print a comprehensive comparison report.""" print("\n" + "="*80) print("TV SERIES COMPARISON REPORT") print("="*80) missing_from_plex, missing_from_plex_jellyfin, orphaned_in_plex = self.compare_libraries() print(f"\nšŸ“Š SUMMARY:") print(f" Series missing from Plex (found in filesystem): {len(missing_from_plex)}") print(f" Series missing from Plex (found in Jellyfin): {len(missing_from_plex_jellyfin)}") print(f" Series in Plex but not in filesystem: {len(orphaned_in_plex)}") if missing_from_plex: print(f"\nāŒ TV SERIES IN FILESYSTEM BUT MISSING FROM PLEX ({len(missing_from_plex)}):") print("-" * 80) for i, (title, info) in enumerate(sorted(missing_from_plex.items()), 1): print(f"\n{i}. {title}") print(f" Path: {info['path']}") print(f" Episodes: {info['episode_count']}") print(f" In Jellyfin: {'āœ“ Yes' if info['in_jellyfin'] else 'āœ— No'}") if info['jellyfin_title']: print(f" Jellyfin Title: {info['jellyfin_title']}") print(f" Jellyfin Episodes: {info['jellyfin_episode_count']}") if missing_from_plex_jellyfin and len(missing_from_plex_jellyfin) != len(missing_from_plex): print(f"\nāŒ TV SERIES IN JELLYFIN BUT MISSING FROM PLEX ({len(missing_from_plex_jellyfin)}):") print("-" * 80) for i, (key, info) in enumerate(sorted(missing_from_plex_jellyfin.items()), 1): print(f"\n{i}. {info['title']}") print(f" Path: {info['path']}") print(f" Episodes: {info['episode_count']}") print(f" In Filesystem: {'āœ“ Yes' if info['in_filesystem'] else 'āœ— No'}") if info['filesystem_path']: print(f" Filesystem Path: {info['filesystem_path']}") print(f" Filesystem Episodes: {info['filesystem_episode_count']}") if orphaned_in_plex: print(f"\nāš ļø TV SERIES IN PLEX BUT NOT IN FILESYSTEM ({len(orphaned_in_plex)}):") print("-" * 80) for i, (title, info) in enumerate(sorted(orphaned_in_plex.items()), 1): print(f"\n{i}. {title}") if info.get('path'): print(f" Path: {info['path']}") print(f" Episodes: {info.get('episode_count', 0)}") # Save detailed report to JSON report_data = { 'missing_from_plex': missing_from_plex, 'missing_from_plex_jellyfin': missing_from_plex_jellyfin, 'orphaned_in_plex': orphaned_in_plex } report_file = Path('series_comparison_report.json') with open(report_file, 'w') as f: json.dump(report_data, f, indent=2, default=str) print(f"\nšŸ’¾ Detailed report saved to: {report_file}") print("="*80) def main(): load_dotenv() # Get configuration from environment variables plex_url = os.getenv('PLEX_URL', 'http://localhost:32400') plex_token = os.getenv('PLEX_TOKEN') jellyfin_url = os.getenv('JELLYFIN_URL', 'http://localhost:8096') jellyfin_api_key = os.getenv('JELLYFIN_API_KEY') jellyfin_user_id = os.getenv('JELLYFIN_USER_ID') # Support multiple series paths series_paths = [] series_path_primary = os.getenv('SERIES_PATH', os.getenv('TV_PATH')) if series_path_primary: series_paths.append(series_path_primary) # Check for additional paths (SERIES_PATH_2, SERIES_PATH_3, etc.) for i in range(2, 10): extra_path = os.getenv(f'SERIES_PATH_{i}') if extra_path: series_paths.append(extra_path) if not series_paths: print("Error: No SERIES_PATH or TV_PATH set in environment or .env file") sys.exit(1) # Validate required configuration if not plex_token: print("Error: PLEX_TOKEN not set in environment or .env file") sys.exit(1) if not jellyfin_api_key: print("Error: JELLYFIN_API_KEY not set in environment or .env file") sys.exit(1) if not jellyfin_user_id: print("Error: JELLYFIN_USER_ID not set in environment or .env file") sys.exit(1) print(f"Configured series paths: {series_paths}") comparator = SeriesLibraryComparator( plex_url=plex_url, plex_token=plex_token, jellyfin_url=jellyfin_url, jellyfin_api_key=jellyfin_api_key, jellyfin_user_id=jellyfin_user_id, series_paths=series_paths ) comparator.generate_report() if __name__ == '__main__': main()