diff --git a/flake.nix b/flake.nix index c842f5c..8631652 100644 --- a/flake.nix +++ b/flake.nix @@ -17,10 +17,14 @@ in { devShells.default = pkgs.mkShell { - buildInputs = [ pythonEnv ]; + buildInputs = [ + pythonEnv + pkgs.black + ]; shellHook = '' echo "Python environment ready!" echo "Run: python main.py" + echo "Format code with: black main.py" ''; }; diff --git a/main.py b/main.py index 13aade1..dac37bc 100755 --- a/main.py +++ b/main.py @@ -2,33 +2,27 @@ """ Script to identify missing albums on MusicBrainz from Deezer releases for artists monitored in Lidarr, and generate submission links. - -This script: -1. Fetches artists from Lidarr with monitorNewItems set to "new" or "all" -2. Uses SAMBL to find albums missing on MusicBrainz from Deezer releases -3. Generates a-tisket/harmony links for submitting albums to MusicBrainz """ -import requests import json -import sys import os -from typing import List, Dict, Optional, Tuple +import sys +from typing import Dict, List, Optional, Tuple from urllib.parse import quote + +import requests from dotenv import load_dotenv -# Load environment variables from .env file load_dotenv() class LidarrClient: """Client for interacting with Lidarr API""" - + def __init__(self, base_url: str, api_key: str): - self.base_url = base_url.rstrip('/') - self.api_key = api_key - self.headers = {'X-Api-Key': api_key} - + self.base_url = base_url.rstrip("/") + self.headers = {"X-Api-Key": api_key} + def get_artists(self) -> List[Dict]: """Fetch all artists from Lidarr""" url = f"{self.base_url}/api/v1/artist" @@ -39,389 +33,313 @@ class LidarrClient: except requests.exceptions.RequestException as e: print(f"Error fetching artists from Lidarr: {e}", file=sys.stderr) return [] - - def get_monitored_artists(self, monitor_types: List[str] = None) -> List[Dict]: - """ - Get artists with monitorNewItems set to specified values - - Args: - monitor_types: List of monitorNewItems values to filter by (default: ['new', 'all']) - - Returns: - List of artist dictionaries matching the criteria - """ + + def get_monitored_artists( + self, monitor_types: Optional[List[str]] = None + ) -> List[Dict]: + """Get artists with monitorNewItems set to specified values""" if monitor_types is None: - monitor_types = ['new', 'all'] - - all_artists = self.get_artists() - filtered = [ - artist for artist in all_artists - if artist.get('monitorNewItems') in monitor_types - ] - return filtered + monitor_types = ["new", "all"] + return list( + filter( + lambda artist: artist.get("monitorNewItems") in monitor_types, + self.get_artists(), + ) + ) class SamblClient: - """ - Client for interacting with SAMBL API to find missing albums. - - SAMBL (Streaming Artist MusicBrainz Lookup) is available at: - - Website: https://sambl.lioncat6.com - - GitHub: https://github.com/Lioncat6/SAMBL-React - - API Root: https://sambl.lioncat6.com/api/ - """ - - def __init__(self, base_url: str = None): - # Default to the public SAMBL instance if no URL provided - self.base_url = (base_url or "https://sambl.lioncat6.com").rstrip('/') - + """Client for interacting with SAMBL API to find missing albums""" + + def __init__(self, base_url: Optional[str] = None): + self.base_url = (base_url or "https://sambl.lioncat6.com").rstrip("/") + def _search_deezer_artist(self, artist_name: str) -> Optional[str]: - """ - Search for an artist on Deezer and return their Deezer ID. - Uses Deezer API directly since SAMBL's searchArtists endpoint is unsupported. - - Args: - artist_name: Name of the artist to search for - - Returns: - Deezer artist ID as string, or None if not found - """ + """Search for an artist on Deezer and return their Deezer ID""" try: - # Use Deezer API to search for the artist - deezer_search_url = "https://api.deezer.com/search/artist" - params = {'q': artist_name, 'limit': 1} - response = requests.get(deezer_search_url, params=params, timeout=10) + response = requests.get( + "https://api.deezer.com/search/artist", + params={"q": artist_name, "limit": 1}, + timeout=10, + ) response.raise_for_status() - data = response.json() - if data.get('data') and len(data['data']) > 0: - artist_id = str(data['data'][0]['id']) - print(f" [Sambl] Found Deezer artist ID: {artist_id}") - return artist_id - else: - print(f" [Sambl] ⚠️ Artist '{artist_name}' not found on Deezer") - return None - except requests.exceptions.RequestException as e: - print(f" [Sambl] ⚠️ Error searching Deezer for artist: {e}", file=sys.stderr) + if data.get("data") and len(data["data"]) > 0: + return str(data["data"][0]["id"]) return None - - def find_missing_albums(self, artist_mbid: str, artist_name: str) -> Tuple[List[Dict], List[Dict]]: - """ - Find albums missing on MusicBrainz from Deezer releases for an artist. - - Uses SAMBL's /api/compareArtistAlbums endpoint which compares albums - from Deezer with MusicBrainz and identifies missing ones. - - Args: - artist_mbid: MusicBrainz ID of the artist - artist_name: Name of the artist - - Returns: - Tuple of (albums_to_add, albums_to_update) - - albums_to_add: Albums not in MusicBrainz (red status, no mbid) - - albums_to_update: Albums in MusicBrainz but need linking/updates (orange status) - Format: - [ - { - 'title': 'Album Title', - 'deezer_url': 'https://www.deezer.com/album/123456789', - 'deezer_id': '123456789', - 'release_date': '2024-01-01', - 'artist_name': artist_name, - 'mbid': 'musicbrainz-id' (only for albums_to_update), - 'album_issues': ['issue1', 'issue2'] (only for albums_to_update) - } - ] - """ - print(f" [Sambl] Checking for missing albums for {artist_name} (MBID: {artist_mbid})") - - # First, we need to find the Deezer artist ID + except requests.exceptions.RequestException: + return None + + def _extract_albums(self, data: Dict) -> List[Dict]: + """Extract albums list from SAMBL response""" + if isinstance(data, dict): + album_data = data.get("albumData") + if isinstance(album_data, list): + return album_data + if isinstance(album_data, dict): + return album_data.get("albums", album_data.get("data", [])) + if isinstance(data.get("albums"), list): + return data.get("albums", []) + if isinstance(data.get("data"), list): + return data.get("data", []) + if isinstance(data, list): + return data + return [] + + def _build_album_data(self, album: Dict, artist_name: str) -> Optional[Dict]: + """Build album data dictionary from SAMBL album response""" + deezer_id = str( + album.get("id") or album.get("deezer_id") or album.get("deezerId") or "" + ) + if not deezer_id or deezer_id == "None": + return None + + return { + "title": album.get("name") or album.get("title") or "Unknown", + "deezer_url": f"https://www.deezer.com/album/{deezer_id}", + "deezer_id": deezer_id, + "release_date": album.get("releaseDate") + or album.get("release_date") + or album.get("release") + or "", + "artist_name": artist_name, + "cover_url": album.get("imageUrl") + or album.get("cover") + or album.get("cover_medium") + or album.get("coverUrl") + or "", + } + + def _has_valid_deezer_id(self, album: Dict) -> bool: + """Check if album has a valid Deezer ID""" + deezer_id = str( + album.get("id") or album.get("deezer_id") or album.get("deezerId") or "" + ) + return bool(deezer_id and deezer_id != "None") + + def _get_album_status(self, album: Dict) -> str: + """Get album status from SAMBL response""" + return str(album.get("albumStatus", "")).lower() + + def _get_mbid(self, album: Dict) -> str: + """Extract MusicBrainz ID from album""" + return ( + album.get("mbid") + or album.get("musicbrainz_id") + or album.get("musicbrainzId") + or "" + ) + + def _is_album_to_add(self, album: Dict) -> bool: + """Check if album needs to be added to MusicBrainz""" + status = self._get_album_status(album) + mbid = self._get_mbid(album) + return status == "red" or not mbid + + def _is_album_to_update(self, album: Dict) -> bool: + """Check if album needs to be updated in MusicBrainz""" + return self._get_album_status(album) == "orange" + + def _enrich_update_album(self, album_data: Dict, album: Dict) -> Dict: + """Enrich album data with MusicBrainz information for updates""" + musicbrainz_id = self._get_mbid(album) + album_data["mbid"] = musicbrainz_id + album_data["mb_url"] = album.get( + "albumMBUrl", f"https://musicbrainz.org/release/{musicbrainz_id}" + ) + album_data["album_issues"] = album.get("albumIssues", []) + return album_data + + def find_missing_albums( + self, artist_mbid: str, artist_name: str + ) -> Tuple[List[Dict], List[Dict]]: + """Find albums missing on MusicBrainz from Deezer releases for an artist""" deezer_artist_id = self._search_deezer_artist(artist_name) if not deezer_artist_id: - return [] - - # Now use SAMBL's compareArtistAlbums endpoint - try: - api_url = f"{self.base_url}/api/compareArtistAlbums" - params = { - 'provider_id': deezer_artist_id, - 'provider': 'deezer', - 'mbid': artist_mbid, - 'full': 'true' # Get full information including missing albums - } - - response = requests.get(api_url, params=params, timeout=30) - response.raise_for_status() - - data = response.json() - - # Debug: Print the raw response structure - print(f" [Sambl] Raw API response structure:") - print(f" [Sambl] Response type: {type(data)}") - if isinstance(data, dict): - print(f" [Sambl] Top-level keys: {list(data.keys())}") - if 'albumData' in data: - album_data = data.get('albumData', []) - print(f" [Sambl] albumData count: {len(album_data)}") - if len(album_data) > 0: - print(f" [Sambl] First album keys: {list(album_data[0].keys()) if isinstance(album_data[0], dict) else 'Not a dict'}") - print(f" [Sambl] First album sample: {json.dumps(album_data[0], indent=2)[:500] if isinstance(album_data[0], dict) else str(album_data[0])[:500]}") - # Check status counts - if 'orange' in data: - print(f" [Sambl] Orange (missing) albums: {data.get('orange', 0)}") - if 'green' in data: - print(f" [Sambl] Green (linked) albums: {data.get('green', 0)}") - if 'red' in data: - print(f" [Sambl] Red albums: {data.get('red', 0)}") - elif isinstance(data, list): - print(f" [Sambl] Response is a list with {len(data)} items") - if len(data) > 0: - print(f" [Sambl] First item keys: {list(data[0].keys()) if isinstance(data[0], dict) else 'Not a dict'}") - print(f" [Sambl] First item sample: {json.dumps(data[0], indent=2)[:500] if isinstance(data[0], dict) else str(data[0])[:500]}") - - # Parse the response to extract albums - # SAMBL returns albums in 'albumData' with status indicators: - # - 'red': Not in MusicBrainz (need to add) - # - 'orange': In MusicBrainz but needs linking/updates (need to update) - # - 'green': Properly linked (skip) - albums_to_add = [] - albums_to_update = [] - - albums = [] - if isinstance(data, dict): - # SAMBL uses 'albumData' as the key for the albums array - album_data = data.get('albumData') - print(f" [Sambl] albumData type: {type(album_data)}, value: {album_data}") - - if isinstance(album_data, list): - albums = album_data - elif isinstance(album_data, dict): - # albumData might be a dict with nested structure - print(f" [Sambl] albumData is dict with keys: {list(album_data.keys()) if album_data else 'None'}") - albums = album_data.get('albums', album_data.get('data', [])) - - # Fallback to other possible keys - if not albums and isinstance(data.get('albums'), list): - albums = data.get('albums', []) - if not albums and isinstance(data.get('data'), list): - albums = data.get('data', []) - elif isinstance(data, list): - albums = data - - print(f" [Sambl] Processing {len(albums)} album(s) from response") - - # If we have status counts but no albums, something is wrong - if isinstance(data, dict) and len(albums) == 0: - print(f" [Sambl] ⚠️ Warning: Found status counts but no albums in albumData") - print(f" [Sambl] Full response keys: {list(data.keys())}") - print(f" [Sambl] Total albums reported: {data.get('total', 'N/A')}") - # Try to print a sample of the response structure - print(f" [Sambl] Response sample: {json.dumps(data, indent=2)[:1000]}") - - for idx, album in enumerate(albums): - # Get album status and MusicBrainz ID - album_status = str(album.get('albumStatus', '')).lower() - musicbrainz_id = album.get('mbid') or album.get('musicbrainz_id') or album.get('musicbrainzId') or '' - album_title = album.get('name') or album.get('title') or 'Unknown' - album_issues = album.get('albumIssues', []) - - # Debug: Print album details - print(f" [Sambl] Album {idx+1}: {album_title}") - print(f" Status: {album_status or 'N/A'}, MBID: {musicbrainz_id or 'None'}, Issues: {album_issues}") - - # Extract Deezer URL and album info - deezer_id = str(album.get('id') or album.get('deezer_id') or album.get('deezerId') or '') - if not deezer_id or deezer_id == 'None': - print(f" ⚠️ Skipping - no valid Deezer ID found") - continue - - deezer_url = f"https://www.deezer.com/album/{deezer_id}" - album_data = { - 'title': album_title, - 'deezer_url': deezer_url, - 'deezer_id': deezer_id, - 'release_date': album.get('releaseDate') or album.get('release_date') or album.get('release') or '', - 'artist_name': artist_name, - 'cover_url': album.get('imageUrl') or album.get('cover') or album.get('cover_medium') or album.get('coverUrl') or '' - } - - # Categorize albums based on status - if album_status == 'red' or not musicbrainz_id or musicbrainz_id == '': - # Red status or no MBID = needs to be added to MusicBrainz - albums_to_add.append(album_data) - print(f" ✓ Added to 'to add' list (not in MusicBrainz)") - elif album_status == 'orange': - # Orange status = in MusicBrainz but needs linking/updates - album_data['mbid'] = musicbrainz_id - album_data['mb_url'] = album.get('albumMBUrl', f'https://musicbrainz.org/release/{musicbrainz_id}') - album_data['album_issues'] = album_issues - albums_to_update.append(album_data) - print(f" ✓ Added to 'to update' list (needs linking/updates)") - else: - # Green status = properly linked, skip - print(f" ✓ Album is properly linked (MBID: {musicbrainz_id})") - - print(f" [Sambl] ✓ Found {len(albums_to_add)} album(s) to add, {len(albums_to_update)} album(s) to update") - - return albums_to_add, albums_to_update - - except requests.exceptions.RequestException as e: - print(f" [Sambl] ⚠️ Error calling SAMBL API: {e}", file=sys.stderr) return [], [] - except (KeyError, ValueError, TypeError) as e: - print(f" [Sambl] ⚠️ Error parsing SAMBL response: {e}", file=sys.stderr) - print(f" [Sambl] Response: {response.text[:200] if 'response' in locals() else 'N/A'}", file=sys.stderr) + + try: + params = { + "provider_id": deezer_artist_id, + "provider": "deezer", + "mbid": artist_mbid, + "full": "true", + } + response = requests.get( + f"{self.base_url}/api/compareArtistAlbums", params=params, timeout=30 + ) + response.raise_for_status() + albums = self._extract_albums(response.json()) + + valid_albums = list(filter(self._has_valid_deezer_id, albums)) + albums_to_add_raw = list(filter(self._is_album_to_add, valid_albums)) + albums_to_update_raw = list(filter(self._is_album_to_update, valid_albums)) + + build_album = lambda album: self._build_album_data(album, artist_name) + albums_to_add = list(map(build_album, albums_to_add_raw)) + + enrich_album = lambda album: self._enrich_update_album( + self._build_album_data(album, artist_name), album + ) + albums_to_update = list(map(enrich_album, albums_to_update_raw)) + + return albums_to_add, albums_to_update + + except requests.exceptions.RequestException: + return [], [] + except (KeyError, ValueError, TypeError): return [], [] class SubmissionLinkGenerator: """Generate submission links for MusicBrainz using a-tisket and Harmony""" - + @staticmethod def generate_atisket_link(deezer_url: str) -> str: """Generate an a-tisket submission link from a Deezer URL""" - encoded_url = quote(deezer_url, safe='') - return f"https://atisket.pulsewidth.org.uk/?url={encoded_url}" - + return f"https://atisket.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}" + @staticmethod def generate_harmony_link(deezer_url: str) -> str: """Generate a Harmony submission link from a Deezer URL""" - encoded_url = quote(deezer_url, safe='') - return f"https://harmony.pulsewidth.org.uk/?url={encoded_url}" - + return f"https://harmony.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}" + @staticmethod def generate_links(deezer_url: str) -> Dict[str, str]: """Generate both a-tisket and Harmony links""" return { - 'deezer_url': deezer_url, - 'atisket_link': SubmissionLinkGenerator.generate_atisket_link(deezer_url), - 'harmony_link': SubmissionLinkGenerator.generate_harmony_link(deezer_url) + "deezer_url": deezer_url, + "atisket_link": SubmissionLinkGenerator.generate_atisket_link(deezer_url), + "harmony_link": SubmissionLinkGenerator.generate_harmony_link(deezer_url), } +def _process_albums(albums: List[Dict], action: str) -> List[Dict]: + """Process albums and generate submission links""" + return list( + map( + lambda album: { + **album, + "submission_links": SubmissionLinkGenerator.generate_links( + album["deezer_url"] + ), + "action": action, + }, + filter(lambda album: album.get("deezer_url"), albums), + ) + ) + + +def _format_album_output(album: Dict) -> str: + """Format album information for console output""" + lines = [f" 📀 {album.get('title', 'Unknown Title')}"] + lines.append(f" Deezer: {album.get('deezer_url')}") + if album.get("mb_url"): + lines.append(f" MusicBrainz: {album['mb_url']}") + if album.get("album_issues"): + lines.append(f" Issues: {', '.join(album['album_issues'])}") + links = album.get("submission_links", {}) + lines.append(f" a-tisket: {links.get('atisket_link')}") + lines.append(f" Harmony: {links.get('harmony_link')}") + return "\n".join(lines) + + def main(): """Main execution function""" - # Configuration - loaded from .env file or environment variables LIDARR_URL = os.getenv("LIDARR_URL") LIDARR_API_KEY = os.getenv("LIDARR_API_KEY") - SAMBL_URL = os.getenv("SAMBL_URL") or None # Set if Sambl has a web API - MAX_ARTISTS = int(os.getenv("MAX_ARTISTS", "5")) # Limit number of artists to process (default: 5) - - # Validate required configuration + SAMBL_URL = os.getenv("SAMBL_URL") or None + MAX_ARTISTS = int(os.getenv("MAX_ARTISTS", "5")) + if not LIDARR_URL: - print("Error: LIDARR_URL not set. Please set it in .env file or environment variables.", file=sys.stderr) + print("Error: LIDARR_URL not set.", file=sys.stderr) sys.exit(1) - + if not LIDARR_API_KEY: - print("Error: LIDARR_API_KEY not set. Please set it in .env file or environment variables.", file=sys.stderr) + print("Error: LIDARR_API_KEY not set.", file=sys.stderr) sys.exit(1) - - # Initialize clients + lidarr = LidarrClient(LIDARR_URL, LIDARR_API_KEY) sambl = SamblClient(SAMBL_URL) - + print("Fetching monitored artists from Lidarr...") - artists = lidarr.get_monitored_artists(['new', 'all']) - + artists = lidarr.get_monitored_artists(["new", "all"]) + if not artists: print("No artists found with monitorNewItems set to 'new' or 'all'") return - + total_artists = len(artists) - - # Limit the number of artists for testing if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS: - print(f"Found {total_artists} monitored artists (limiting to {MAX_ARTISTS} for testing)") + print( + f"Found {total_artists} monitored artists (limiting to {MAX_ARTISTS} for testing)" + ) artists = artists[:MAX_ARTISTS] else: print(f"Found {total_artists} monitored artists") - print("\n" + "="*80) - + print("\n" + "=" * 80) + all_albums_to_add = [] all_albums_to_update = [] - + for artist in artists: - artist_name = artist.get('artistName', 'Unknown') - artist_mbid = artist.get('foreignArtistId') or artist.get('mbid') - + artist_name = artist.get("artistName", "Unknown") + artist_mbid = artist.get("foreignArtistId") or artist.get("mbid") + if not artist_mbid: print(f"\n⚠️ Skipping {artist_name} - no MusicBrainz ID found") continue - + print(f"\n🎵 Artist: {artist_name}") print(f" MusicBrainz ID: {artist_mbid}") - - # Find albums using Sambl - albums_to_add, albums_to_update = sambl.find_missing_albums(artist_mbid, artist_name) - - # Process albums to add + + albums_to_add, albums_to_update = sambl.find_missing_albums( + artist_mbid, artist_name + ) + if albums_to_add: print(f"\n 📥 Albums to ADD ({len(albums_to_add)}):") - for album in albums_to_add: - deezer_url = album.get('deezer_url') - if deezer_url: - links = SubmissionLinkGenerator.generate_links(deezer_url) - album['submission_links'] = links - album['action'] = 'add' - all_albums_to_add.append(album) - - print(f" 📀 {album.get('title', 'Unknown Title')}") - print(f" Deezer: {deezer_url}") - print(f" a-tisket: {links['atisket_link']}") - print(f" Harmony: {links['harmony_link']}") - - # Process albums to update + processed = _process_albums(albums_to_add, "add") + all_albums_to_add.extend(processed) + print("\n".join(map(_format_album_output, processed))) + if albums_to_update: print(f"\n 🔄 Albums to UPDATE ({len(albums_to_update)}):") - for album in albums_to_update: - deezer_url = album.get('deezer_url') - mb_url = album.get('mb_url', '') - issues = album.get('album_issues', []) - if deezer_url: - links = SubmissionLinkGenerator.generate_links(deezer_url) - album['submission_links'] = links - album['action'] = 'update' - all_albums_to_update.append(album) - - print(f" 📀 {album.get('title', 'Unknown Title')}") - print(f" Deezer: {deezer_url}") - if mb_url: - print(f" MusicBrainz: {mb_url}") - if issues: - print(f" Issues: {', '.join(issues)}") - print(f" a-tisket: {links['atisket_link']}") - print(f" Harmony: {links['harmony_link']}") - + processed = _process_albums(albums_to_update, "update") + all_albums_to_update.extend(processed) + print("\n".join(map(_format_album_output, processed))) + if not albums_to_add and not albums_to_update: print(f" ✓ All albums are properly linked!") - - # Generate summary report - print("\n" + "="*80) + + print("\n" + "=" * 80) print(f"\n📊 Summary:") - print(f" Artists processed: {len(artists)}" + (f" (of {total_artists} total)" if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS else "")) + artists_info = f" Artists processed: {len(artists)}" + if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS: + artists_info += f" (of {total_artists} total)" + print(artists_info) print(f" Albums to ADD: {len(all_albums_to_add)}") print(f" Albums to UPDATE: {len(all_albums_to_update)}") - - # Save results to JSON file + all_albums = all_albums_to_add + all_albums_to_update - if all_albums: - output_file = "missing_albums.json" - with open(output_file, 'w', encoding='utf-8') as f: - json.dump({ - 'albums_to_add': all_albums_to_add, - 'albums_to_update': all_albums_to_update, - 'summary': { - 'total_to_add': len(all_albums_to_add), - 'total_to_update': len(all_albums_to_update), - 'total': len(all_albums) - } - }, f, indent=2, ensure_ascii=False) - print(f"\n💾 Results saved to {output_file}") - - # Generate HTML report with clickable links - generate_html_report(all_albums_to_add, all_albums_to_update) - else: + if not all_albums: print("\n✨ All albums are already on MusicBrainz!") + return + + output_data = { + "albums_to_add": all_albums_to_add, + "albums_to_update": all_albums_to_update, + "summary": { + "total_to_add": len(all_albums_to_add), + "total_to_update": len(all_albums_to_update), + "total": len(all_albums), + }, + } + with open("missing_albums.json", "w", encoding="utf-8") as f: + json.dump(output_data, f, indent=2, ensure_ascii=False) + print(f"\n💾 Results saved to missing_albums.json") + + generate_html_report(all_albums_to_add, all_albums_to_update) def generate_html_report(albums_to_add: List[Dict], albums_to_update: List[Dict]): @@ -528,7 +446,7 @@ def generate_html_report(albums_to_add: List[Dict], albums_to_update: List[Dict] Albums to ADD: {add_count} | Albums to UPDATE: {update_count} """ - + album_html = """