- Added `black` to the development environment in flake.nix for code formatting. - Updated shell hook to include instructions for using `black`. - Refactored `main.py` to improve code organization and readability, including reordering imports and simplifying list comprehensions. - Enhanced album processing functions for better clarity and efficiency. - Improved error handling and output formatting for better user experience.
521 lines
18 KiB
Python
Executable File
521 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script to identify missing albums on MusicBrainz from Deezer releases
|
|
for artists monitored in Lidarr, and generate submission links.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from typing import Dict, List, Optional, Tuple
|
|
from urllib.parse import quote
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
|
|
class LidarrClient:
|
|
"""Client for interacting with Lidarr API"""
|
|
|
|
def __init__(self, base_url: str, api_key: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.headers = {"X-Api-Key": api_key}
|
|
|
|
def get_artists(self) -> List[Dict]:
|
|
"""Fetch all artists from Lidarr"""
|
|
url = f"{self.base_url}/api/v1/artist"
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error fetching artists from Lidarr: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def get_monitored_artists(
|
|
self, monitor_types: Optional[List[str]] = None
|
|
) -> List[Dict]:
|
|
"""Get artists with monitorNewItems set to specified values"""
|
|
if monitor_types is None:
|
|
monitor_types = ["new", "all"]
|
|
return list(
|
|
filter(
|
|
lambda artist: artist.get("monitorNewItems") in monitor_types,
|
|
self.get_artists(),
|
|
)
|
|
)
|
|
|
|
|
|
class SamblClient:
|
|
"""Client for interacting with SAMBL API to find missing albums"""
|
|
|
|
def __init__(self, base_url: Optional[str] = None):
|
|
self.base_url = (base_url or "https://sambl.lioncat6.com").rstrip("/")
|
|
|
|
def _search_deezer_artist(self, artist_name: str) -> Optional[str]:
|
|
"""Search for an artist on Deezer and return their Deezer ID"""
|
|
try:
|
|
response = requests.get(
|
|
"https://api.deezer.com/search/artist",
|
|
params={"q": artist_name, "limit": 1},
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if data.get("data") and len(data["data"]) > 0:
|
|
return str(data["data"][0]["id"])
|
|
return None
|
|
except requests.exceptions.RequestException:
|
|
return None
|
|
|
|
def _extract_albums(self, data: Dict) -> List[Dict]:
|
|
"""Extract albums list from SAMBL response"""
|
|
if isinstance(data, dict):
|
|
album_data = data.get("albumData")
|
|
if isinstance(album_data, list):
|
|
return album_data
|
|
if isinstance(album_data, dict):
|
|
return album_data.get("albums", album_data.get("data", []))
|
|
if isinstance(data.get("albums"), list):
|
|
return data.get("albums", [])
|
|
if isinstance(data.get("data"), list):
|
|
return data.get("data", [])
|
|
if isinstance(data, list):
|
|
return data
|
|
return []
|
|
|
|
def _build_album_data(self, album: Dict, artist_name: str) -> Optional[Dict]:
|
|
"""Build album data dictionary from SAMBL album response"""
|
|
deezer_id = str(
|
|
album.get("id") or album.get("deezer_id") or album.get("deezerId") or ""
|
|
)
|
|
if not deezer_id or deezer_id == "None":
|
|
return None
|
|
|
|
return {
|
|
"title": album.get("name") or album.get("title") or "Unknown",
|
|
"deezer_url": f"https://www.deezer.com/album/{deezer_id}",
|
|
"deezer_id": deezer_id,
|
|
"release_date": album.get("releaseDate")
|
|
or album.get("release_date")
|
|
or album.get("release")
|
|
or "",
|
|
"artist_name": artist_name,
|
|
"cover_url": album.get("imageUrl")
|
|
or album.get("cover")
|
|
or album.get("cover_medium")
|
|
or album.get("coverUrl")
|
|
or "",
|
|
}
|
|
|
|
def _has_valid_deezer_id(self, album: Dict) -> bool:
|
|
"""Check if album has a valid Deezer ID"""
|
|
deezer_id = str(
|
|
album.get("id") or album.get("deezer_id") or album.get("deezerId") or ""
|
|
)
|
|
return bool(deezer_id and deezer_id != "None")
|
|
|
|
def _get_album_status(self, album: Dict) -> str:
|
|
"""Get album status from SAMBL response"""
|
|
return str(album.get("albumStatus", "")).lower()
|
|
|
|
def _get_mbid(self, album: Dict) -> str:
|
|
"""Extract MusicBrainz ID from album"""
|
|
return (
|
|
album.get("mbid")
|
|
or album.get("musicbrainz_id")
|
|
or album.get("musicbrainzId")
|
|
or ""
|
|
)
|
|
|
|
def _is_album_to_add(self, album: Dict) -> bool:
|
|
"""Check if album needs to be added to MusicBrainz"""
|
|
status = self._get_album_status(album)
|
|
mbid = self._get_mbid(album)
|
|
return status == "red" or not mbid
|
|
|
|
def _is_album_to_update(self, album: Dict) -> bool:
|
|
"""Check if album needs to be updated in MusicBrainz"""
|
|
return self._get_album_status(album) == "orange"
|
|
|
|
def _enrich_update_album(self, album_data: Dict, album: Dict) -> Dict:
|
|
"""Enrich album data with MusicBrainz information for updates"""
|
|
musicbrainz_id = self._get_mbid(album)
|
|
album_data["mbid"] = musicbrainz_id
|
|
album_data["mb_url"] = album.get(
|
|
"albumMBUrl", f"https://musicbrainz.org/release/{musicbrainz_id}"
|
|
)
|
|
album_data["album_issues"] = album.get("albumIssues", [])
|
|
return album_data
|
|
|
|
def find_missing_albums(
|
|
self, artist_mbid: str, artist_name: str
|
|
) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Find albums missing on MusicBrainz from Deezer releases for an artist"""
|
|
deezer_artist_id = self._search_deezer_artist(artist_name)
|
|
if not deezer_artist_id:
|
|
return [], []
|
|
|
|
try:
|
|
params = {
|
|
"provider_id": deezer_artist_id,
|
|
"provider": "deezer",
|
|
"mbid": artist_mbid,
|
|
"full": "true",
|
|
}
|
|
response = requests.get(
|
|
f"{self.base_url}/api/compareArtistAlbums", params=params, timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
albums = self._extract_albums(response.json())
|
|
|
|
valid_albums = list(filter(self._has_valid_deezer_id, albums))
|
|
albums_to_add_raw = list(filter(self._is_album_to_add, valid_albums))
|
|
albums_to_update_raw = list(filter(self._is_album_to_update, valid_albums))
|
|
|
|
build_album = lambda album: self._build_album_data(album, artist_name)
|
|
albums_to_add = list(map(build_album, albums_to_add_raw))
|
|
|
|
enrich_album = lambda album: self._enrich_update_album(
|
|
self._build_album_data(album, artist_name), album
|
|
)
|
|
albums_to_update = list(map(enrich_album, albums_to_update_raw))
|
|
|
|
return albums_to_add, albums_to_update
|
|
|
|
except requests.exceptions.RequestException:
|
|
return [], []
|
|
except (KeyError, ValueError, TypeError):
|
|
return [], []
|
|
|
|
|
|
class SubmissionLinkGenerator:
|
|
"""Generate submission links for MusicBrainz using a-tisket and Harmony"""
|
|
|
|
@staticmethod
|
|
def generate_atisket_link(deezer_url: str) -> str:
|
|
"""Generate an a-tisket submission link from a Deezer URL"""
|
|
return f"https://atisket.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}"
|
|
|
|
@staticmethod
|
|
def generate_harmony_link(deezer_url: str) -> str:
|
|
"""Generate a Harmony submission link from a Deezer URL"""
|
|
return f"https://harmony.pulsewidth.org.uk/?url={quote(deezer_url, safe='')}"
|
|
|
|
@staticmethod
|
|
def generate_links(deezer_url: str) -> Dict[str, str]:
|
|
"""Generate both a-tisket and Harmony links"""
|
|
return {
|
|
"deezer_url": deezer_url,
|
|
"atisket_link": SubmissionLinkGenerator.generate_atisket_link(deezer_url),
|
|
"harmony_link": SubmissionLinkGenerator.generate_harmony_link(deezer_url),
|
|
}
|
|
|
|
|
|
def _process_albums(albums: List[Dict], action: str) -> List[Dict]:
|
|
"""Process albums and generate submission links"""
|
|
return list(
|
|
map(
|
|
lambda album: {
|
|
**album,
|
|
"submission_links": SubmissionLinkGenerator.generate_links(
|
|
album["deezer_url"]
|
|
),
|
|
"action": action,
|
|
},
|
|
filter(lambda album: album.get("deezer_url"), albums),
|
|
)
|
|
)
|
|
|
|
|
|
def _format_album_output(album: Dict) -> str:
|
|
"""Format album information for console output"""
|
|
lines = [f" 📀 {album.get('title', 'Unknown Title')}"]
|
|
lines.append(f" Deezer: {album.get('deezer_url')}")
|
|
if album.get("mb_url"):
|
|
lines.append(f" MusicBrainz: {album['mb_url']}")
|
|
if album.get("album_issues"):
|
|
lines.append(f" Issues: {', '.join(album['album_issues'])}")
|
|
links = album.get("submission_links", {})
|
|
lines.append(f" a-tisket: {links.get('atisket_link')}")
|
|
lines.append(f" Harmony: {links.get('harmony_link')}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
"""Main execution function"""
|
|
LIDARR_URL = os.getenv("LIDARR_URL")
|
|
LIDARR_API_KEY = os.getenv("LIDARR_API_KEY")
|
|
SAMBL_URL = os.getenv("SAMBL_URL") or None
|
|
MAX_ARTISTS = int(os.getenv("MAX_ARTISTS", "5"))
|
|
|
|
if not LIDARR_URL:
|
|
print("Error: LIDARR_URL not set.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not LIDARR_API_KEY:
|
|
print("Error: LIDARR_API_KEY not set.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
lidarr = LidarrClient(LIDARR_URL, LIDARR_API_KEY)
|
|
sambl = SamblClient(SAMBL_URL)
|
|
|
|
print("Fetching monitored artists from Lidarr...")
|
|
artists = lidarr.get_monitored_artists(["new", "all"])
|
|
|
|
if not artists:
|
|
print("No artists found with monitorNewItems set to 'new' or 'all'")
|
|
return
|
|
|
|
total_artists = len(artists)
|
|
if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS:
|
|
print(
|
|
f"Found {total_artists} monitored artists (limiting to {MAX_ARTISTS} for testing)"
|
|
)
|
|
artists = artists[:MAX_ARTISTS]
|
|
else:
|
|
print(f"Found {total_artists} monitored artists")
|
|
print("\n" + "=" * 80)
|
|
|
|
all_albums_to_add = []
|
|
all_albums_to_update = []
|
|
|
|
for artist in artists:
|
|
artist_name = artist.get("artistName", "Unknown")
|
|
artist_mbid = artist.get("foreignArtistId") or artist.get("mbid")
|
|
|
|
if not artist_mbid:
|
|
print(f"\n⚠️ Skipping {artist_name} - no MusicBrainz ID found")
|
|
continue
|
|
|
|
print(f"\n🎵 Artist: {artist_name}")
|
|
print(f" MusicBrainz ID: {artist_mbid}")
|
|
|
|
albums_to_add, albums_to_update = sambl.find_missing_albums(
|
|
artist_mbid, artist_name
|
|
)
|
|
|
|
if albums_to_add:
|
|
print(f"\n 📥 Albums to ADD ({len(albums_to_add)}):")
|
|
processed = _process_albums(albums_to_add, "add")
|
|
all_albums_to_add.extend(processed)
|
|
print("\n".join(map(_format_album_output, processed)))
|
|
|
|
if albums_to_update:
|
|
print(f"\n 🔄 Albums to UPDATE ({len(albums_to_update)}):")
|
|
processed = _process_albums(albums_to_update, "update")
|
|
all_albums_to_update.extend(processed)
|
|
print("\n".join(map(_format_album_output, processed)))
|
|
|
|
if not albums_to_add and not albums_to_update:
|
|
print(f" ✓ All albums are properly linked!")
|
|
|
|
print("\n" + "=" * 80)
|
|
print(f"\n📊 Summary:")
|
|
artists_info = f" Artists processed: {len(artists)}"
|
|
if MAX_ARTISTS > 0 and total_artists > MAX_ARTISTS:
|
|
artists_info += f" (of {total_artists} total)"
|
|
print(artists_info)
|
|
print(f" Albums to ADD: {len(all_albums_to_add)}")
|
|
print(f" Albums to UPDATE: {len(all_albums_to_update)}")
|
|
|
|
all_albums = all_albums_to_add + all_albums_to_update
|
|
if not all_albums:
|
|
print("\n✨ All albums are already on MusicBrainz!")
|
|
return
|
|
|
|
output_data = {
|
|
"albums_to_add": all_albums_to_add,
|
|
"albums_to_update": all_albums_to_update,
|
|
"summary": {
|
|
"total_to_add": len(all_albums_to_add),
|
|
"total_to_update": len(all_albums_to_update),
|
|
"total": len(all_albums),
|
|
},
|
|
}
|
|
with open("missing_albums.json", "w", encoding="utf-8") as f:
|
|
json.dump(output_data, f, indent=2, ensure_ascii=False)
|
|
print(f"\n💾 Results saved to missing_albums.json")
|
|
|
|
generate_html_report(all_albums_to_add, all_albums_to_update)
|
|
|
|
|
|
def generate_html_report(albums_to_add: List[Dict], albums_to_update: List[Dict]):
|
|
"""Generate an HTML report with clickable submission links"""
|
|
html_content = """<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>MusicBrainz Albums - Add & Update</title>
|
|
<style>
|
|
body {{
|
|
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
|
|
max-width: 1200px;
|
|
margin: 0 auto;
|
|
padding: 20px;
|
|
background-color: #f5f5f5;
|
|
}}
|
|
h1 {{
|
|
color: #333;
|
|
border-bottom: 3px solid #4CAF50;
|
|
padding-bottom: 10px;
|
|
}}
|
|
h2 {{
|
|
color: #2196F3;
|
|
margin-top: 30px;
|
|
border-bottom: 2px solid #2196F3;
|
|
padding-bottom: 5px;
|
|
}}
|
|
.album {{
|
|
background: white;
|
|
border-radius: 8px;
|
|
padding: 20px;
|
|
margin: 20px 0;
|
|
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
|
|
}}
|
|
.album-title {{
|
|
font-size: 1.5em;
|
|
font-weight: bold;
|
|
color: #2196F3;
|
|
margin-bottom: 10px;
|
|
}}
|
|
.artist-name {{
|
|
color: #666;
|
|
margin-bottom: 15px;
|
|
}}
|
|
.links {{
|
|
display: flex;
|
|
gap: 10px;
|
|
flex-wrap: wrap;
|
|
}}
|
|
.link-button {{
|
|
display: inline-block;
|
|
padding: 10px 20px;
|
|
background-color: #4CAF50;
|
|
color: white;
|
|
text-decoration: none;
|
|
border-radius: 5px;
|
|
transition: background-color 0.3s;
|
|
}}
|
|
.link-button:hover {{
|
|
background-color: #45a049;
|
|
}}
|
|
.link-button.atisket {{
|
|
background-color: #2196F3;
|
|
}}
|
|
.link-button.atisket:hover {{
|
|
background-color: #0b7dda;
|
|
}}
|
|
.link-button.harmony {{
|
|
background-color: #FF9800;
|
|
}}
|
|
.link-button.harmony:hover {{
|
|
background-color: #e68900;
|
|
}}
|
|
.deezer-link {{
|
|
color: #666;
|
|
font-size: 0.9em;
|
|
margin-top: 10px;
|
|
}}
|
|
.mb-link {{
|
|
color: #666;
|
|
font-size: 0.9em;
|
|
margin-top: 5px;
|
|
}}
|
|
.issues {{
|
|
color: #FF9800;
|
|
font-size: 0.9em;
|
|
margin-top: 5px;
|
|
font-style: italic;
|
|
}}
|
|
.summary {{
|
|
background: white;
|
|
padding: 15px;
|
|
border-radius: 8px;
|
|
margin-bottom: 20px;
|
|
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
|
|
}}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>🎵 MusicBrainz Albums - Add & Update</h1>
|
|
<div class="summary">
|
|
<strong>Albums to ADD: {add_count}</strong> | <strong>Albums to UPDATE: {update_count}</strong>
|
|
</div>
|
|
"""
|
|
|
|
album_html = """
|
|
<div class="album">
|
|
<div class="album-title">{title}</div>
|
|
<div class="artist-name">by {artist}</div>
|
|
{mb_info}
|
|
{issues_info}
|
|
<div class="links">
|
|
<a href="{atisket_link}" target="_blank" class="link-button atisket">Submit via a-tisket</a>
|
|
<a href="{harmony_link}" target="_blank" class="link-button harmony">Submit via Harmony</a>
|
|
</div>
|
|
<div class="deezer-link">
|
|
<a href="{deezer_url}" target="_blank">View on Deezer</a>
|
|
</div>
|
|
</div>
|
|
"""
|
|
|
|
def format_album(album: Dict, is_update: bool = False) -> str:
|
|
submission_links = album.get("submission_links", {})
|
|
mb_info = ""
|
|
issues_info = ""
|
|
|
|
if is_update:
|
|
mb_url = album.get("mb_url", "")
|
|
if mb_url:
|
|
mb_info = f'<div class="mb-link"><a href="{mb_url}" target="_blank">View on MusicBrainz</a></div>'
|
|
issues = album.get("album_issues", [])
|
|
if issues:
|
|
issues_info = f'<div class="issues">Issues: {", ".join(issues)}</div>'
|
|
|
|
title = album.get("title", "Unknown Title")
|
|
artist = album.get("artist_name", "Unknown Artist")
|
|
atisket_link = submission_links.get("atisket_link", "#")
|
|
harmony_link = submission_links.get("harmony_link", "#")
|
|
deezer_url = submission_links.get("deezer_url", "#")
|
|
|
|
return album_html.format(
|
|
title=title,
|
|
artist=artist,
|
|
mb_info=mb_info,
|
|
issues_info=issues_info,
|
|
atisket_link=atisket_link,
|
|
harmony_link=harmony_link,
|
|
deezer_url=deezer_url,
|
|
)
|
|
|
|
albums_html = ""
|
|
if albums_to_add:
|
|
albums_html += "<h2>📥 Albums to ADD (Not in MusicBrainz)</h2>"
|
|
formatted_add = map(lambda album: format_album(album, False), albums_to_add)
|
|
albums_html += "".join(formatted_add)
|
|
|
|
if albums_to_update:
|
|
albums_html += "<h2>🔄 Albums to UPDATE (Need Linking/Updates)</h2>"
|
|
formatted_update = map(
|
|
lambda album: format_album(album, True), albums_to_update
|
|
)
|
|
albums_html += "".join(formatted_update)
|
|
|
|
add_count = len(albums_to_add)
|
|
update_count = len(albums_to_update)
|
|
html_header = html_content.format(add_count=add_count, update_count=update_count)
|
|
html_footer = "\n</body>\n</html>\n"
|
|
html_content = html_header + albums_html + html_footer
|
|
|
|
with open("missing_albums.html", "w", encoding="utf-8") as f:
|
|
f.write(html_content)
|
|
print(f"📄 HTML report saved to missing_albums.html")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|