Files
lidarr-mb-gap/main.py
Danilo Reyes 20b07450d9 Update Harmony link format in SubmissionLinkGenerator
- Modified the URL structure in the generate_harmony_link method to include 'release' in the path for better clarity and accuracy in generating submission links from Deezer URLs.
2025-11-11 10:25:48 -06:00

349 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Script to identify missing albums on MusicBrainz from Deezer releases
for artists monitored in Lidarr, and generate submission links.
"""
import json
import os
import sys
from typing import Dict, List, Optional, Tuple
from urllib.parse import quote
import requests
from dotenv import load_dotenv
from html_report import generate_html_report
load_dotenv()
class LidarrClient:
"""Client for interacting with Lidarr API"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.headers = {"X-Api-Key": api_key}
def get_artists(self) -> List[Dict]:
"""Fetch all artists from Lidarr"""
url = f"{self.base_url}/api/v1/artist"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error fetching artists from Lidarr: {e}", file=sys.stderr)
return []
def get_monitored_artists(
self, monitor_types: Optional[List[str]] = None
) -> List[Dict]:
"""Get artists with monitorNewItems set to specified values"""
if monitor_types is None:
monitor_types = ["new", "all"]
return list(
filter(
lambda artist: artist.get("monitorNewItems") in monitor_types,
self.get_artists(),
)
)
class SamblClient:
"""Client for interacting with SAMBL API to find missing albums"""
def __init__(self, base_url: Optional[str] = None):
self.base_url = (base_url or "https://sambl.lioncat6.com").rstrip("/")
def _search_deezer_artist(self, artist_name: str) -> Optional[str]:
"""Search for an artist on Deezer and return their Deezer ID"""
try:
response = requests.get(
"https://api.deezer.com/search/artist",
params={"q": artist_name, "limit": 1},
timeout=10,
)
response.raise_for_status()
data = response.json()
if data.get("data") and len(data["data"]) > 0:
return str(data["data"][0]["id"])
return None
except requests.exceptions.RequestException:
return None
def _extract_albums(self, data: Dict) -> List[Dict]:
"""Extract albums list from SAMBL response"""
if isinstance(data, dict):
album_data = data.get("albumData")
if isinstance(album_data, list):
return album_data
if isinstance(album_data, dict):
return album_data.get("albums", album_data.get("data", []))
if isinstance(data.get("albums"), list):
return data.get("albums", [])
if isinstance(data.get("data"), list):
return data.get("data", [])
if isinstance(data, list):
return data
return []
def _build_album_data(self, album: Dict, artist_name: str) -> Optional[Dict]:
"""Build album data dictionary from SAMBL album response"""
deezer_id = str(
album.get("id") or album.get("deezer_id") or album.get("deezerId") or ""
)
if not deezer_id or deezer_id == "None":
return None
return {
"title": album.get("name") or album.get("title") or "Unknown",
"deezer_url": f"https://www.deezer.com/album/{deezer_id}",
"deezer_id": deezer_id,
"release_date": album.get("releaseDate")
or album.get("release_date")
or album.get("release")
or "",
"artist_name": artist_name,
"cover_url": album.get("imageUrl")
or album.get("cover")
or album.get("cover_medium")
or album.get("coverUrl")
or "",
}
def _has_valid_deezer_id(self, album: Dict) -> bool:
"""Check if album has a valid Deezer ID"""
deezer_id = str(
album.get("id") or album.get("deezer_id") or album.get("deezerId") or ""
)
return bool(deezer_id and deezer_id != "None")
def _get_album_status(self, album: Dict) -> str:
"""Get album status from SAMBL response"""
return str(album.get("albumStatus", "")).lower()
def _get_mbid(self, album: Dict) -> str:
"""Extract MusicBrainz ID from album"""
return (
album.get("mbid")
or album.get("musicbrainz_id")
or album.get("musicbrainzId")
or ""
)
def _is_album_to_add(self, album: Dict) -> bool:
"""Check if album needs to be added to MusicBrainz"""
status = self._get_album_status(album)
mbid = self._get_mbid(album)
return status == "red" or not mbid
def _is_album_to_update(self, album: Dict) -> bool:
"""Check if album needs to be updated in MusicBrainz"""
return self._get_album_status(album) == "orange"
def _enrich_update_album(self, album_data: Dict, album: Dict) -> Dict:
"""Enrich album data with MusicBrainz information for updates"""
musicbrainz_id = self._get_mbid(album)
album_data["mbid"] = musicbrainz_id
album_data["mb_url"] = album.get(
"albumMBUrl", f"https://musicbrainz.org/release/{musicbrainz_id}"
)
album_data["album_issues"] = album.get("albumIssues", [])
return album_data
def find_missing_albums(
self, artist_mbid: str, artist_name: str
) -> Tuple[List[Dict], List[Dict]]:
"""Find albums missing on MusicBrainz from Deezer releases for an artist"""
deezer_artist_id = self._search_deezer_artist(artist_name)
if not deezer_artist_id:
return [], []
try:
params = {
"provider_id": deezer_artist_id,
"provider": "deezer",
"mbid": artist_mbid,
"full": "true",
}
response = requests.get(
f"{self.base_url}/api/compareArtistAlbums", params=params, timeout=30
)
response.raise_for_status()
albums = self._extract_albums(response.json())
valid_albums = list(filter(self._has_valid_deezer_id, albums))
albums_to_add_raw = list(filter(self._is_album_to_add, valid_albums))
albums_to_update_raw = list(filter(self._is_album_to_update, valid_albums))
build_album = lambda album: self._build_album_data(album, artist_name)
albums_to_add = list(map(build_album, albums_to_add_raw))
enrich_album = lambda album: self._enrich_update_album(
self._build_album_data(album, artist_name), album
)
albums_to_update = list(map(enrich_album, albums_to_update_raw))
return albums_to_add, albums_to_update
except requests.exceptions.RequestException:
return [], []
except (KeyError, ValueError, TypeError):
return [], []
class SubmissionLinkGenerator:
"""Generate submission links for MusicBrainz using a-tisket and Harmony"""
@staticmethod
def generate_atisket_link(deezer_url: str) -> str:
"""Generate an a-tisket submission link from a Deezer URL"""
return f"https://atisket.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}"
@staticmethod
def generate_harmony_link(deezer_url: str) -> str:
"""Generate a Harmony submission link from a Deezer URL"""
return f"https://harmony.pulsewidth.org.uk/release?url={quote(deezer_url, safe='')}"
@staticmethod
def generate_links(deezer_url: str) -> Dict[str, str]:
"""Generate both a-tisket and Harmony links"""
return {
"deezer_url": deezer_url,
"atisket_link": SubmissionLinkGenerator.generate_atisket_link(deezer_url),
"harmony_link": SubmissionLinkGenerator.generate_harmony_link(deezer_url),
}
def _process_albums(albums: List[Dict], action: str) -> List[Dict]:
"""Process albums and generate submission links"""
return list(
map(
lambda album: {
**album,
"submission_links": SubmissionLinkGenerator.generate_links(
album["deezer_url"]
),
"action": action,
},
filter(lambda album: album.get("deezer_url"), albums),
)
)
def _format_album_output(album: Dict) -> str:
"""Format album information for console output"""
lines = [f" 📀 {album.get('title', 'Unknown Title')}"]
lines.append(f" Deezer: {album.get('deezer_url')}")
if album.get("mb_url"):
lines.append(f" MusicBrainz: {album['mb_url']}")
if album.get("album_issues"):
lines.append(f" Issues: {', '.join(album['album_issues'])}")
links = album.get("submission_links", {})
lines.append(f" a-tisket: {links.get('atisket_link')}")
lines.append(f" Harmony: {links.get('harmony_link')}")
return "\n".join(lines)
def main():
"""Main execution function"""
LIDARR_URL = os.getenv("LIDARR_URL")
LIDARR_API_KEY = os.getenv("LIDARR_API_KEY")
SAMBL_URL = os.getenv("SAMBL_URL") or None
MAX_ARTISTS = int(os.getenv("MAX_ARTISTS", "5"))
if not LIDARR_URL:
print("Error: LIDARR_URL not set.", file=sys.stderr)
sys.exit(1)
if not LIDARR_API_KEY:
print("Error: LIDARR_API_KEY not set.", file=sys.stderr)
sys.exit(1)
lidarr = LidarrClient(LIDARR_URL, LIDARR_API_KEY)
sambl = SamblClient(SAMBL_URL)
print("Fetching monitored artists from Lidarr...")
artists = lidarr.get_monitored_artists(["new", "all"])
if not artists:
print("No artists found with monitorNewItems set to 'new' or 'all'")
return
total_artists = len(artists)
if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS:
print(
f"Found {total_artists} monitored artists (limiting to {MAX_ARTISTS} for testing)"
)
artists = artists[:MAX_ARTISTS]
else:
print(f"Found {total_artists} monitored artists")
print("\n" + "=" * 80)
all_albums_to_add = []
all_albums_to_update = []
for artist in artists:
artist_name = artist.get("artistName", "Unknown")
artist_mbid = artist.get("foreignArtistId") or artist.get("mbid")
if not artist_mbid:
print(f"\n⚠️ Skipping {artist_name} - no MusicBrainz ID found")
continue
print(f"\n🎵 Artist: {artist_name}")
print(f" MusicBrainz ID: {artist_mbid}")
albums_to_add, albums_to_update = sambl.find_missing_albums(
artist_mbid, artist_name
)
if albums_to_add:
print(f"\n 📥 Albums to ADD ({len(albums_to_add)}):")
processed = _process_albums(albums_to_add, "add")
all_albums_to_add.extend(processed)
print("\n".join(map(_format_album_output, processed)))
if albums_to_update:
print(f"\n 🔄 Albums to UPDATE ({len(albums_to_update)}):")
processed = _process_albums(albums_to_update, "update")
all_albums_to_update.extend(processed)
print("\n".join(map(_format_album_output, processed)))
if not albums_to_add and not albums_to_update:
print(f" ✓ All albums are properly linked!")
print("\n" + "=" * 80)
print(f"\n📊 Summary:")
artists_info = f" Artists processed: {len(artists)}"
if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS:
artists_info += f" (of {total_artists} total)"
print(artists_info)
print(f" Albums to ADD: {len(all_albums_to_add)}")
print(f" Albums to UPDATE: {len(all_albums_to_update)}")
all_albums = all_albums_to_add + all_albums_to_update
if not all_albums:
print("\n✨ All albums are already on MusicBrainz!")
return
output_data = {
"albums_to_add": all_albums_to_add,
"albums_to_update": all_albums_to_update,
"summary": {
"total_to_add": len(all_albums_to_add),
"total_to_update": len(all_albums_to_update),
"total": len(all_albums),
},
}
with open("missing_albums.json", "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n💾 Results saved to missing_albums.json")
generate_html_report(all_albums_to_add, all_albums_to_update)
if __name__ == "__main__":
main()