- Introduced a new `html_report.py` file to generate an HTML report for albums to add and update. - Implemented a `generate_html_report` function that creates a styled HTML document with clickable submission links. - Integrated the new report generation function into `main.py` to streamline the process of reporting missing albums. - Enhanced user experience with filtering options for album types and artists in the generated report.
349 lines
12 KiB
Python
Executable File
349 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script to identify missing albums on MusicBrainz from Deezer releases
|
|
for artists monitored in Lidarr, and generate submission links.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from typing import Dict, List, Optional, Tuple
|
|
from urllib.parse import quote
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
from html_report import generate_html_report
|
|
|
|
load_dotenv()
|
|
|
|
|
|
class LidarrClient:
|
|
"""Client for interacting with Lidarr API"""
|
|
|
|
def __init__(self, base_url: str, api_key: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.headers = {"X-Api-Key": api_key}
|
|
|
|
def get_artists(self) -> List[Dict]:
|
|
"""Fetch all artists from Lidarr"""
|
|
url = f"{self.base_url}/api/v1/artist"
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error fetching artists from Lidarr: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def get_monitored_artists(
|
|
self, monitor_types: Optional[List[str]] = None
|
|
) -> List[Dict]:
|
|
"""Get artists with monitorNewItems set to specified values"""
|
|
if monitor_types is None:
|
|
monitor_types = ["new", "all"]
|
|
return list(
|
|
filter(
|
|
lambda artist: artist.get("monitorNewItems") in monitor_types,
|
|
self.get_artists(),
|
|
)
|
|
)
|
|
|
|
|
|
class SamblClient:
|
|
"""Client for interacting with SAMBL API to find missing albums"""
|
|
|
|
def __init__(self, base_url: Optional[str] = None):
|
|
self.base_url = (base_url or "https://sambl.lioncat6.com").rstrip("/")
|
|
|
|
def _search_deezer_artist(self, artist_name: str) -> Optional[str]:
|
|
"""Search for an artist on Deezer and return their Deezer ID"""
|
|
try:
|
|
response = requests.get(
|
|
"https://api.deezer.com/search/artist",
|
|
params={"q": artist_name, "limit": 1},
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if data.get("data") and len(data["data"]) > 0:
|
|
return str(data["data"][0]["id"])
|
|
return None
|
|
except requests.exceptions.RequestException:
|
|
return None
|
|
|
|
def _extract_albums(self, data: Dict) -> List[Dict]:
|
|
"""Extract albums list from SAMBL response"""
|
|
if isinstance(data, dict):
|
|
album_data = data.get("albumData")
|
|
if isinstance(album_data, list):
|
|
return album_data
|
|
if isinstance(album_data, dict):
|
|
return album_data.get("albums", album_data.get("data", []))
|
|
if isinstance(data.get("albums"), list):
|
|
return data.get("albums", [])
|
|
if isinstance(data.get("data"), list):
|
|
return data.get("data", [])
|
|
if isinstance(data, list):
|
|
return data
|
|
return []
|
|
|
|
def _build_album_data(self, album: Dict, artist_name: str) -> Optional[Dict]:
|
|
"""Build album data dictionary from SAMBL album response"""
|
|
deezer_id = str(
|
|
album.get("id") or album.get("deezer_id") or album.get("deezerId") or ""
|
|
)
|
|
if not deezer_id or deezer_id == "None":
|
|
return None
|
|
|
|
return {
|
|
"title": album.get("name") or album.get("title") or "Unknown",
|
|
"deezer_url": f"https://www.deezer.com/album/{deezer_id}",
|
|
"deezer_id": deezer_id,
|
|
"release_date": album.get("releaseDate")
|
|
or album.get("release_date")
|
|
or album.get("release")
|
|
or "",
|
|
"artist_name": artist_name,
|
|
"cover_url": album.get("imageUrl")
|
|
or album.get("cover")
|
|
or album.get("cover_medium")
|
|
or album.get("coverUrl")
|
|
or "",
|
|
}
|
|
|
|
def _has_valid_deezer_id(self, album: Dict) -> bool:
|
|
"""Check if album has a valid Deezer ID"""
|
|
deezer_id = str(
|
|
album.get("id") or album.get("deezer_id") or album.get("deezerId") or ""
|
|
)
|
|
return bool(deezer_id and deezer_id != "None")
|
|
|
|
def _get_album_status(self, album: Dict) -> str:
|
|
"""Get album status from SAMBL response"""
|
|
return str(album.get("albumStatus", "")).lower()
|
|
|
|
def _get_mbid(self, album: Dict) -> str:
|
|
"""Extract MusicBrainz ID from album"""
|
|
return (
|
|
album.get("mbid")
|
|
or album.get("musicbrainz_id")
|
|
or album.get("musicbrainzId")
|
|
or ""
|
|
)
|
|
|
|
def _is_album_to_add(self, album: Dict) -> bool:
|
|
"""Check if album needs to be added to MusicBrainz"""
|
|
status = self._get_album_status(album)
|
|
mbid = self._get_mbid(album)
|
|
return status == "red" or not mbid
|
|
|
|
def _is_album_to_update(self, album: Dict) -> bool:
|
|
"""Check if album needs to be updated in MusicBrainz"""
|
|
return self._get_album_status(album) == "orange"
|
|
|
|
def _enrich_update_album(self, album_data: Dict, album: Dict) -> Dict:
|
|
"""Enrich album data with MusicBrainz information for updates"""
|
|
musicbrainz_id = self._get_mbid(album)
|
|
album_data["mbid"] = musicbrainz_id
|
|
album_data["mb_url"] = album.get(
|
|
"albumMBUrl", f"https://musicbrainz.org/release/{musicbrainz_id}"
|
|
)
|
|
album_data["album_issues"] = album.get("albumIssues", [])
|
|
return album_data
|
|
|
|
def find_missing_albums(
|
|
self, artist_mbid: str, artist_name: str
|
|
) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Find albums missing on MusicBrainz from Deezer releases for an artist"""
|
|
deezer_artist_id = self._search_deezer_artist(artist_name)
|
|
if not deezer_artist_id:
|
|
return [], []
|
|
|
|
try:
|
|
params = {
|
|
"provider_id": deezer_artist_id,
|
|
"provider": "deezer",
|
|
"mbid": artist_mbid,
|
|
"full": "true",
|
|
}
|
|
response = requests.get(
|
|
f"{self.base_url}/api/compareArtistAlbums", params=params, timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
albums = self._extract_albums(response.json())
|
|
|
|
valid_albums = list(filter(self._has_valid_deezer_id, albums))
|
|
albums_to_add_raw = list(filter(self._is_album_to_add, valid_albums))
|
|
albums_to_update_raw = list(filter(self._is_album_to_update, valid_albums))
|
|
|
|
build_album = lambda album: self._build_album_data(album, artist_name)
|
|
albums_to_add = list(map(build_album, albums_to_add_raw))
|
|
|
|
enrich_album = lambda album: self._enrich_update_album(
|
|
self._build_album_data(album, artist_name), album
|
|
)
|
|
albums_to_update = list(map(enrich_album, albums_to_update_raw))
|
|
|
|
return albums_to_add, albums_to_update
|
|
|
|
except requests.exceptions.RequestException:
|
|
return [], []
|
|
except (KeyError, ValueError, TypeError):
|
|
return [], []
|
|
|
|
|
|
class SubmissionLinkGenerator:
|
|
"""Generate submission links for MusicBrainz using a-tisket and Harmony"""
|
|
|
|
@staticmethod
|
|
def generate_atisket_link(deezer_url: str) -> str:
|
|
"""Generate an a-tisket submission link from a Deezer URL"""
|
|
return f"https://atisket.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}"
|
|
|
|
@staticmethod
|
|
def generate_harmony_link(deezer_url: str) -> str:
|
|
"""Generate a Harmony submission link from a Deezer URL"""
|
|
return f"https://harmony.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}"
|
|
|
|
@staticmethod
|
|
def generate_links(deezer_url: str) -> Dict[str, str]:
|
|
"""Generate both a-tisket and Harmony links"""
|
|
return {
|
|
"deezer_url": deezer_url,
|
|
"atisket_link": SubmissionLinkGenerator.generate_atisket_link(deezer_url),
|
|
"harmony_link": SubmissionLinkGenerator.generate_harmony_link(deezer_url),
|
|
}
|
|
|
|
|
|
def _process_albums(albums: List[Dict], action: str) -> List[Dict]:
|
|
"""Process albums and generate submission links"""
|
|
return list(
|
|
map(
|
|
lambda album: {
|
|
**album,
|
|
"submission_links": SubmissionLinkGenerator.generate_links(
|
|
album["deezer_url"]
|
|
),
|
|
"action": action,
|
|
},
|
|
filter(lambda album: album.get("deezer_url"), albums),
|
|
)
|
|
)
|
|
|
|
|
|
def _format_album_output(album: Dict) -> str:
|
|
"""Format album information for console output"""
|
|
lines = [f" 📀 {album.get('title', 'Unknown Title')}"]
|
|
lines.append(f" Deezer: {album.get('deezer_url')}")
|
|
if album.get("mb_url"):
|
|
lines.append(f" MusicBrainz: {album['mb_url']}")
|
|
if album.get("album_issues"):
|
|
lines.append(f" Issues: {', '.join(album['album_issues'])}")
|
|
links = album.get("submission_links", {})
|
|
lines.append(f" a-tisket: {links.get('atisket_link')}")
|
|
lines.append(f" Harmony: {links.get('harmony_link')}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
"""Main execution function"""
|
|
LIDARR_URL = os.getenv("LIDARR_URL")
|
|
LIDARR_API_KEY = os.getenv("LIDARR_API_KEY")
|
|
SAMBL_URL = os.getenv("SAMBL_URL") or None
|
|
MAX_ARTISTS = int(os.getenv("MAX_ARTISTS", "5"))
|
|
|
|
if not LIDARR_URL:
|
|
print("Error: LIDARR_URL not set.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not LIDARR_API_KEY:
|
|
print("Error: LIDARR_API_KEY not set.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
lidarr = LidarrClient(LIDARR_URL, LIDARR_API_KEY)
|
|
sambl = SamblClient(SAMBL_URL)
|
|
|
|
print("Fetching monitored artists from Lidarr...")
|
|
artists = lidarr.get_monitored_artists(["new", "all"])
|
|
|
|
if not artists:
|
|
print("No artists found with monitorNewItems set to 'new' or 'all'")
|
|
return
|
|
|
|
total_artists = len(artists)
|
|
if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS:
|
|
print(
|
|
f"Found {total_artists} monitored artists (limiting to {MAX_ARTISTS} for testing)"
|
|
)
|
|
artists = artists[:MAX_ARTISTS]
|
|
else:
|
|
print(f"Found {total_artists} monitored artists")
|
|
print("\n" + "=" * 80)
|
|
|
|
all_albums_to_add = []
|
|
all_albums_to_update = []
|
|
|
|
for artist in artists:
|
|
artist_name = artist.get("artistName", "Unknown")
|
|
artist_mbid = artist.get("foreignArtistId") or artist.get("mbid")
|
|
|
|
if not artist_mbid:
|
|
print(f"\n⚠️ Skipping {artist_name} - no MusicBrainz ID found")
|
|
continue
|
|
|
|
print(f"\n🎵 Artist: {artist_name}")
|
|
print(f" MusicBrainz ID: {artist_mbid}")
|
|
|
|
albums_to_add, albums_to_update = sambl.find_missing_albums(
|
|
artist_mbid, artist_name
|
|
)
|
|
|
|
if albums_to_add:
|
|
print(f"\n 📥 Albums to ADD ({len(albums_to_add)}):")
|
|
processed = _process_albums(albums_to_add, "add")
|
|
all_albums_to_add.extend(processed)
|
|
print("\n".join(map(_format_album_output, processed)))
|
|
|
|
if albums_to_update:
|
|
print(f"\n 🔄 Albums to UPDATE ({len(albums_to_update)}):")
|
|
processed = _process_albums(albums_to_update, "update")
|
|
all_albums_to_update.extend(processed)
|
|
print("\n".join(map(_format_album_output, processed)))
|
|
|
|
if not albums_to_add and not albums_to_update:
|
|
print(f" ✓ All albums are properly linked!")
|
|
|
|
print("\n" + "=" * 80)
|
|
print(f"\n📊 Summary:")
|
|
artists_info = f" Artists processed: {len(artists)}"
|
|
if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS:
|
|
artists_info += f" (of {total_artists} total)"
|
|
print(artists_info)
|
|
print(f" Albums to ADD: {len(all_albums_to_add)}")
|
|
print(f" Albums to UPDATE: {len(all_albums_to_update)}")
|
|
|
|
all_albums = all_albums_to_add + all_albums_to_update
|
|
if not all_albums:
|
|
print("\n✨ All albums are already on MusicBrainz!")
|
|
return
|
|
|
|
output_data = {
|
|
"albums_to_add": all_albums_to_add,
|
|
"albums_to_update": all_albums_to_update,
|
|
"summary": {
|
|
"total_to_add": len(all_albums_to_add),
|
|
"total_to_update": len(all_albums_to_update),
|
|
"total": len(all_albums),
|
|
},
|
|
}
|
|
with open("missing_albums.json", "w", encoding="utf-8") as f:
|
|
json.dump(output_data, f, indent=2, ensure_ascii=False)
|
|
print(f"\n💾 Results saved to missing_albums.json")
|
|
|
|
generate_html_report(all_albums_to_add, all_albums_to_update)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|