Compare commits
13 Commits
a6d2e7f7df
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cc9521f7a4 | ||
|
|
af5a2bf825 | ||
|
|
03e8eb6f4e | ||
|
|
026c7fe0d8 | ||
|
|
0b86143646 | ||
|
|
578b9d316a | ||
|
|
73ae1787d1 | ||
|
|
0403647a1c | ||
|
|
b67e154777 | ||
|
|
ed5984e32b | ||
|
|
f3f154d1b0 | ||
|
|
35e6c7e330 | ||
|
|
a3b7bfa0df |
34
flake.lock
generated
34
flake.lock
generated
@@ -1,23 +1,5 @@
|
|||||||
{
|
{
|
||||||
"nodes": {
|
"nodes": {
|
||||||
"flake-utils": {
|
|
||||||
"inputs": {
|
|
||||||
"systems": "systems"
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1731533236,
|
|
||||||
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
|
|
||||||
"owner": "numtide",
|
|
||||||
"repo": "flake-utils",
|
|
||||||
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "numtide",
|
|
||||||
"repo": "flake-utils",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1762596750,
|
"lastModified": 1762596750,
|
||||||
@@ -36,24 +18,8 @@
|
|||||||
},
|
},
|
||||||
"root": {
|
"root": {
|
||||||
"inputs": {
|
"inputs": {
|
||||||
"flake-utils": "flake-utils",
|
|
||||||
"nixpkgs": "nixpkgs"
|
"nixpkgs": "nixpkgs"
|
||||||
}
|
}
|
||||||
},
|
|
||||||
"systems": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1681028828,
|
|
||||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
|
||||||
"owner": "nix-systems",
|
|
||||||
"repo": "default",
|
|
||||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "nix-systems",
|
|
||||||
"repo": "default",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"root": "root",
|
"root": "root",
|
||||||
|
|||||||
75
flake.nix
75
flake.nix
@@ -1,58 +1,67 @@
|
|||||||
{
|
{
|
||||||
description = "Lidarr to MusicBrainz Missing Albums Finder";
|
description = "Lidarr to MusicBrainz Missing Albums Finder";
|
||||||
|
|
||||||
inputs = {
|
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
|
||||||
flake-utils.url = "github:numtide/flake-utils";
|
|
||||||
};
|
|
||||||
|
|
||||||
outputs = { self, nixpkgs, flake-utils }:
|
outputs =
|
||||||
let
|
{ nixpkgs, self }:
|
||||||
# NixOS module output (not system-specific)
|
|
||||||
# The module accepts a package option, which can be set from the flake's packages
|
|
||||||
nixosModules = {
|
|
||||||
lidarr-mb-gap = import ./nixos/lidarr-mb-gap.nix;
|
|
||||||
};
|
|
||||||
in
|
|
||||||
{
|
|
||||||
# Export NixOS modules
|
|
||||||
nixosModules = nixosModules;
|
|
||||||
} // flake-utils.lib.eachDefaultSystem (system:
|
|
||||||
let
|
let
|
||||||
|
system = "x86_64-linux";
|
||||||
pkgs = import nixpkgs { inherit system; };
|
pkgs = import nixpkgs { inherit system; };
|
||||||
lib = pkgs.lib;
|
inherit (pkgs) lib;
|
||||||
lidarr-mb-gap = import ./nix/package.nix {
|
lidarr-mb-gap = import ./nix/package.nix {
|
||||||
inherit pkgs lib;
|
inherit pkgs;
|
||||||
src = lib.cleanSource ./src;
|
src = lib.cleanSource ./src;
|
||||||
};
|
};
|
||||||
|
lidarr-cleanup-singles = import ./nix/package-cleanup.nix {
|
||||||
|
inherit pkgs;
|
||||||
|
src = lib.cleanSource ./src-cleanup;
|
||||||
|
};
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
devShells.default = pkgs.mkShell {
|
nixosModules.lidarr-mb-gap = import ./nixos/lidarr-mb-gap.nix;
|
||||||
|
|
||||||
|
packages.${system} = {
|
||||||
|
default = lidarr-mb-gap;
|
||||||
|
inherit lidarr-mb-gap lidarr-cleanup-singles;
|
||||||
|
};
|
||||||
|
|
||||||
|
apps.${system} = {
|
||||||
|
default = {
|
||||||
|
type = "app";
|
||||||
|
program = "${lidarr-mb-gap}/bin/lidarr-mb-gap";
|
||||||
|
};
|
||||||
|
lidarr-mb-gap = {
|
||||||
|
type = "app";
|
||||||
|
program = "${lidarr-mb-gap}/bin/lidarr-mb-gap";
|
||||||
|
};
|
||||||
|
lidarr-cleanup-singles = {
|
||||||
|
type = "app";
|
||||||
|
program = "${lidarr-cleanup-singles}/bin/lidarr-cleanup-singles";
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
devShells.${system} = {
|
||||||
|
default = pkgs.mkShell {
|
||||||
buildInputs = [
|
buildInputs = [
|
||||||
(pkgs.python3.withPackages (ps: with ps; [
|
(pkgs.python3.withPackages (
|
||||||
|
ps: with ps; [
|
||||||
requests
|
requests
|
||||||
python-dotenv
|
python-dotenv
|
||||||
]))
|
plexapi
|
||||||
|
]
|
||||||
|
))
|
||||||
pkgs.black
|
pkgs.black
|
||||||
|
pkgs.chromaprint
|
||||||
|
pkgs.ffmpeg
|
||||||
];
|
];
|
||||||
shellHook = ''
|
shellHook = ''
|
||||||
echo "Python environment ready!"
|
echo "Python environment ready!"
|
||||||
echo "Run: python src/main.py"
|
echo "Run: python src/main.py"
|
||||||
echo "Format code with: black src/"
|
echo "Format code with: black src/"
|
||||||
|
echo "Audio verification tools: ffprobe (ffmpeg), fpcalc (chromaprint)"
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
|
||||||
packages.default = lidarr-mb-gap;
|
|
||||||
packages.lidarr-mb-gap = lidarr-mb-gap;
|
|
||||||
apps.default = {
|
|
||||||
type = "app";
|
|
||||||
program = "${lidarr-mb-gap}/bin/lidarr-mb-gap";
|
|
||||||
};
|
};
|
||||||
apps.lidarr-mb-gap = {
|
|
||||||
type = "app";
|
|
||||||
program = "${lidarr-mb-gap}/bin/lidarr-mb-gap";
|
|
||||||
};
|
};
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
32
nix/package-cleanup.nix
Normal file
32
nix/package-cleanup.nix
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
{ pkgs, src }:
|
||||||
|
|
||||||
|
pkgs.python3Packages.buildPythonApplication {
|
||||||
|
pname = "lidarr-cleanup-singles";
|
||||||
|
version = "1.0.0";
|
||||||
|
inherit src;
|
||||||
|
format = "pyproject";
|
||||||
|
nativeBuildInputs = with pkgs.python3Packages; [
|
||||||
|
setuptools
|
||||||
|
];
|
||||||
|
propagatedBuildInputs = with pkgs.python3Packages; [
|
||||||
|
requests
|
||||||
|
python-dotenv
|
||||||
|
plexapi
|
||||||
|
];
|
||||||
|
|
||||||
|
# Runtime dependencies for audio verification
|
||||||
|
buildInputs = [
|
||||||
|
pkgs.chromaprint
|
||||||
|
pkgs.ffmpeg
|
||||||
|
];
|
||||||
|
|
||||||
|
makeWrapperArgs = [
|
||||||
|
"--prefix PATH : ${pkgs.lib.makeBinPath [ pkgs.ffmpeg pkgs.chromaprint ]}"
|
||||||
|
];
|
||||||
|
|
||||||
|
meta = {
|
||||||
|
mainProgram = "lidarr-cleanup-singles";
|
||||||
|
description = "Identify duplicate single tracks in Lidarr";
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
{ pkgs, lib, src }:
|
{ pkgs, src }:
|
||||||
|
|
||||||
pkgs.python3Packages.buildPythonApplication {
|
pkgs.python3Packages.buildPythonApplication {
|
||||||
pname = "lidarr-mb-gap";
|
pname = "lidarr-mb-gap";
|
||||||
@@ -12,5 +12,8 @@ pkgs.python3Packages.buildPythonApplication {
|
|||||||
requests
|
requests
|
||||||
python-dotenv
|
python-dotenv
|
||||||
];
|
];
|
||||||
|
meta = {
|
||||||
|
mainProgram = "lidarr-mb-gap";
|
||||||
|
description = "Lidarr to MusicBrainz Missing Albums Finder";
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,24 +1,35 @@
|
|||||||
{ config, lib, pkgs, ... }:
|
{
|
||||||
|
config,
|
||||||
|
lib,
|
||||||
|
pkgs,
|
||||||
|
...
|
||||||
|
}:
|
||||||
|
|
||||||
let
|
let
|
||||||
reportDir = "/var/lib/lidarr-mb-gap/reports";
|
defaultHome = "/var/lib/lidarr-mb-gap";
|
||||||
envFile = "/var/lib/lidarr-mb-gap/.env";
|
|
||||||
|
|
||||||
# Determine which package to use
|
# Determine which package to use
|
||||||
lidarrMbGapPackage = if config.services.lidarr-mb-gap.package != null
|
lidarrMbGapPackage =
|
||||||
then config.services.lidarr-mb-gap.package
|
if config.services.lidarr-mb-gap.package != null then
|
||||||
else if config.services.lidarr-mb-gap.src != null
|
config.services.lidarr-mb-gap.package
|
||||||
then import ../nix/package.nix {
|
else if config.services.lidarr-mb-gap.src != null then
|
||||||
|
import ../nix/package.nix {
|
||||||
inherit pkgs;
|
inherit pkgs;
|
||||||
lib = pkgs.lib;
|
inherit (config.services.lidarr-mb-gap) src;
|
||||||
src = config.services.lidarr-mb-gap.src;
|
|
||||||
}
|
}
|
||||||
else throw "services.lidarr-mb-gap: Either 'package' or 'src' must be set.";
|
else
|
||||||
|
throw "services.lidarr-mb-gap: Either 'package' or 'src' must be set.";
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
options.services.lidarr-mb-gap = {
|
options.services.lidarr-mb-gap = {
|
||||||
enable = lib.mkEnableOption "Lidarr MusicBrainz Gap Reporter";
|
enable = lib.mkEnableOption "Lidarr MusicBrainz Gap Reporter";
|
||||||
|
|
||||||
|
home = lib.mkOption {
|
||||||
|
type = lib.types.str;
|
||||||
|
default = defaultHome;
|
||||||
|
description = "Home directory for the lidarr-mb-gap user";
|
||||||
|
};
|
||||||
|
|
||||||
package = lib.mkOption {
|
package = lib.mkOption {
|
||||||
type = lib.types.nullOr lib.types.package;
|
type = lib.types.nullOr lib.types.package;
|
||||||
default = null;
|
default = null;
|
||||||
@@ -33,13 +44,15 @@ in
|
|||||||
|
|
||||||
reportDir = lib.mkOption {
|
reportDir = lib.mkOption {
|
||||||
type = lib.types.str;
|
type = lib.types.str;
|
||||||
default = reportDir;
|
default = "${config.services.lidarr-mb-gap.home}/reports";
|
||||||
|
defaultText = lib.literalExpression ''"''${home}/reports"'';
|
||||||
description = "Directory where reports will be generated";
|
description = "Directory where reports will be generated";
|
||||||
};
|
};
|
||||||
|
|
||||||
envFile = lib.mkOption {
|
envFile = lib.mkOption {
|
||||||
type = lib.types.str;
|
type = lib.types.str;
|
||||||
default = envFile;
|
default = "${config.services.lidarr-mb-gap.home}/.env";
|
||||||
|
defaultText = lib.literalExpression ''"''${home}/.env"'';
|
||||||
description = "Path to .env file with LIDARR_URL and LIDARR_API_KEY";
|
description = "Path to .env file with LIDARR_URL and LIDARR_API_KEY";
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -67,14 +80,22 @@ in
|
|||||||
description = "Path on VPS where reports should be synced";
|
description = "Path on VPS where reports should be synced";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
vpsPort = lib.mkOption {
|
||||||
|
type = lib.types.port;
|
||||||
|
default = 22;
|
||||||
|
description = "SSH port for VPS connection";
|
||||||
|
};
|
||||||
|
|
||||||
sshKeyFile = lib.mkOption {
|
sshKeyFile = lib.mkOption {
|
||||||
type = lib.types.nullOr lib.types.str;
|
type = lib.types.str;
|
||||||
default = null;
|
default = "${config.services.lidarr-mb-gap.home}/.ssh/id_ed25519";
|
||||||
description = "Path to SSH private key file for rsync. If null, uses default SSH key location.";
|
defaultText = lib.literalExpression ''"''${home}/.ssh/id_ed25519"'';
|
||||||
|
description = "Path to SSH private key file for rsync";
|
||||||
};
|
};
|
||||||
|
|
||||||
sshKnownHosts = lib.mkOption {
|
sshKnownHosts = lib.mkOption {
|
||||||
type = lib.types.attrsOf (lib.types.submodule {
|
type = lib.types.attrsOf (
|
||||||
|
lib.types.submodule {
|
||||||
options = {
|
options = {
|
||||||
hostNames = lib.mkOption {
|
hostNames = lib.mkOption {
|
||||||
type = lib.types.listOf lib.types.str;
|
type = lib.types.listOf lib.types.str;
|
||||||
@@ -85,8 +106,9 @@ in
|
|||||||
description = "SSH public key for the host";
|
description = "SSH public key for the host";
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
});
|
}
|
||||||
default = {};
|
);
|
||||||
|
default = { };
|
||||||
description = "SSH known hosts configuration for the VPS (same format as programs.ssh.knownHosts)";
|
description = "SSH known hosts configuration for the VPS (same format as programs.ssh.knownHosts)";
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
@@ -94,17 +116,17 @@ in
|
|||||||
config = lib.mkIf config.services.lidarr-mb-gap.enable {
|
config = lib.mkIf config.services.lidarr-mb-gap.enable {
|
||||||
systemd.tmpfiles.rules = [
|
systemd.tmpfiles.rules = [
|
||||||
"d ${config.services.lidarr-mb-gap.reportDir} 0755 lidarr-mb-gap lidarr-mb-gap -"
|
"d ${config.services.lidarr-mb-gap.reportDir} 0755 lidarr-mb-gap lidarr-mb-gap -"
|
||||||
"d /var/lib/lidarr-mb-gap/.ssh 0700 lidarr-mb-gap lidarr-mb-gap -"
|
"d ${config.services.lidarr-mb-gap.home}/.ssh 0700 lidarr-mb-gap lidarr-mb-gap -"
|
||||||
];
|
];
|
||||||
|
|
||||||
users.users.lidarr-mb-gap = {
|
users.users.lidarr-mb-gap = {
|
||||||
isSystemUser = true;
|
isSystemUser = true;
|
||||||
group = "lidarr-mb-gap";
|
group = "lidarr-mb-gap";
|
||||||
home = "/var/lib/lidarr-mb-gap";
|
home = config.services.lidarr-mb-gap.home;
|
||||||
createHome = true;
|
createHome = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
users.groups.lidarr-mb-gap = {};
|
users.groups.lidarr-mb-gap = { };
|
||||||
|
|
||||||
programs.ssh.knownHosts = config.services.lidarr-mb-gap.sshKnownHosts;
|
programs.ssh.knownHosts = config.services.lidarr-mb-gap.sshKnownHosts;
|
||||||
|
|
||||||
@@ -132,19 +154,20 @@ in
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Sync to VPS if enabled
|
# Sync to VPS if enabled
|
||||||
${lib.optionalString (config.services.lidarr-mb-gap.syncToVPS && config.services.lidarr-mb-gap.vpsHost != null) ''
|
${lib.optionalString
|
||||||
|
(config.services.lidarr-mb-gap.syncToVPS && config.services.lidarr-mb-gap.vpsHost != null)
|
||||||
|
''
|
||||||
# Set up SSH options
|
# Set up SSH options
|
||||||
SSH_OPTS=""
|
|
||||||
${lib.optionalString (config.services.lidarr-mb-gap.sshKeyFile != null) ''
|
|
||||||
SSH_OPTS="-i ${config.services.lidarr-mb-gap.sshKeyFile}"
|
SSH_OPTS="-i ${config.services.lidarr-mb-gap.sshKeyFile}"
|
||||||
''}
|
|
||||||
|
|
||||||
# Use SSH options with rsync
|
# Use SSH options with rsync (use full path to ssh)
|
||||||
${pkgs.rsync}/bin/rsync -avz --delete \
|
${pkgs.rsync}/bin/rsync -avz --delete \
|
||||||
-e "ssh $SSH_OPTS -o StrictHostKeyChecking=yes" \
|
-e "${pkgs.openssh}/bin/ssh $SSH_OPTS -p ${toString config.services.lidarr-mb-gap.vpsPort} -o \
|
||||||
|
StrictHostKeyChecking=yes" \
|
||||||
${config.services.lidarr-mb-gap.reportDir}/ \
|
${config.services.lidarr-mb-gap.reportDir}/ \
|
||||||
${config.services.lidarr-mb-gap.vpsHost}:${config.services.lidarr-mb-gap.vpsPath}/
|
${config.services.lidarr-mb-gap.vpsHost}:${config.services.lidarr-mb-gap.vpsPath}/
|
||||||
''}
|
''
|
||||||
|
}
|
||||||
'';
|
'';
|
||||||
StandardOutput = "journal";
|
StandardOutput = "journal";
|
||||||
StandardError = "journal";
|
StandardError = "journal";
|
||||||
@@ -161,4 +184,3 @@ in
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
1
src-cleanup/__init__.py
Normal file
1
src-cleanup/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Lidarr Cleanup Singles - Remove duplicate single tracks"""
|
||||||
281
src-cleanup/audio_verification.py
Normal file
281
src-cleanup/audio_verification.py
Normal file
@@ -0,0 +1,281 @@
|
|||||||
|
"""Audio verification using multiple methods"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
from difflib import SequenceMatcher
|
||||||
|
from typing import Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def map_docker_path(file_path: str, docker_mount: Optional[str] = None) -> str:
|
||||||
|
"""Map Docker container path to host path"""
|
||||||
|
if not docker_mount:
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
container_path, host_path = docker_mount.split(":", 1)
|
||||||
|
if not file_path.startswith(container_path):
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
return file_path.replace(container_path, host_path, 1)
|
||||||
|
|
||||||
|
|
||||||
|
def get_audio_fingerprint(
|
||||||
|
file_path: str, docker_mount: Optional[str] = None
|
||||||
|
) -> Optional[Tuple[str, int]]:
|
||||||
|
"""Generate audio fingerprint using fpcalc. Returns (fingerprint, duration)"""
|
||||||
|
mapped_path = map_docker_path(file_path, docker_mount)
|
||||||
|
logger.debug(f"Generating fingerprint for: {mapped_path}")
|
||||||
|
|
||||||
|
if not os.path.exists(mapped_path):
|
||||||
|
logger.warning(f"File not found: {mapped_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["fpcalc", "-json", "-length", "180", mapped_path],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=60,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
logger.warning(f"fpcalc failed for {mapped_path}: {result.stderr}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
data = json.loads(result.stdout)
|
||||||
|
fingerprint = data.get("fingerprint")
|
||||||
|
duration = data.get("duration")
|
||||||
|
|
||||||
|
if not fingerprint or duration is None:
|
||||||
|
logger.warning(
|
||||||
|
f"fpcalc output missing fingerprint or duration for {mapped_path}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
logger.debug(f"Successfully generated fingerprint (duration: {duration}s)")
|
||||||
|
return fingerprint, duration
|
||||||
|
except (
|
||||||
|
subprocess.TimeoutExpired,
|
||||||
|
FileNotFoundError,
|
||||||
|
json.JSONDecodeError,
|
||||||
|
Exception,
|
||||||
|
) as e:
|
||||||
|
logger.warning(f"Error generating fingerprint for {mapped_path}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_properties(
|
||||||
|
file_path: str, docker_mount: Optional[str] = None
|
||||||
|
) -> Optional[Dict]:
|
||||||
|
"""Get audio file properties using ffprobe"""
|
||||||
|
mapped_path = map_docker_path(file_path, docker_mount)
|
||||||
|
if not os.path.exists(mapped_path):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
"ffprobe",
|
||||||
|
"-v",
|
||||||
|
"quiet",
|
||||||
|
"-print_format",
|
||||||
|
"json",
|
||||||
|
"-show_format",
|
||||||
|
"-show_streams",
|
||||||
|
mapped_path,
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
data = json.loads(result.stdout)
|
||||||
|
audio_stream = next(
|
||||||
|
(s for s in data.get("streams", []) if s.get("codec_type") == "audio"), None
|
||||||
|
)
|
||||||
|
format_info = data.get("format", {})
|
||||||
|
|
||||||
|
if not audio_stream:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"duration": float(format_info.get("duration", 0)),
|
||||||
|
"size": int(format_info.get("size", 0)),
|
||||||
|
"bitrate": int(format_info.get("bit_rate", 0)),
|
||||||
|
"sample_rate": int(audio_stream.get("sample_rate", 0)),
|
||||||
|
"channels": int(audio_stream.get("channels", 0)),
|
||||||
|
"codec": audio_stream.get("codec_name", ""),
|
||||||
|
"bit_depth": int(audio_stream.get("bits_per_raw_sample", 0)),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Could not get file properties for {mapped_path}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _format_context(log_context: Optional[str]) -> str:
|
||||||
|
"""Format log context message"""
|
||||||
|
return f" ({log_context})" if log_context else ""
|
||||||
|
|
||||||
|
|
||||||
|
def compare_fingerprints(
|
||||||
|
fp1_data: Optional[Tuple[str, int]],
|
||||||
|
fp2_data: Optional[Tuple[str, int]],
|
||||||
|
log_context: Optional[str] = None,
|
||||||
|
return_message: bool = False,
|
||||||
|
) -> Union[bool, Tuple[bool, str]]:
|
||||||
|
"""Compare audio fingerprints. Returns match or (match, message) if return_message=True"""
|
||||||
|
if not fp1_data or not fp2_data:
|
||||||
|
message = "Fingerprint comparison failed: missing fingerprint"
|
||||||
|
logger.debug(f"{message}{_format_context(log_context)}")
|
||||||
|
return (False, message) if return_message else False
|
||||||
|
|
||||||
|
fp1, duration1 = fp1_data
|
||||||
|
fp2, duration2 = fp2_data
|
||||||
|
|
||||||
|
duration_diff = abs(duration1 - duration2)
|
||||||
|
if duration_diff > 5:
|
||||||
|
message = f"Fingerprint comparison: duration mismatch ({duration1}s vs {duration2}s, diff: {duration_diff}s)"
|
||||||
|
logger.debug(f"{message}{_format_context(log_context)}")
|
||||||
|
return (False, message) if return_message else False
|
||||||
|
|
||||||
|
if fp1 == fp2:
|
||||||
|
message = "Fingerprint comparison: exact match"
|
||||||
|
logger.debug(f"{message}{_format_context(log_context)}")
|
||||||
|
return (True, message) if return_message else True
|
||||||
|
|
||||||
|
try:
|
||||||
|
similarity = SequenceMatcher(None, fp1, fp2).ratio()
|
||||||
|
|
||||||
|
if duration_diff <= 1:
|
||||||
|
threshold = 0.90
|
||||||
|
elif duration_diff <= 3:
|
||||||
|
threshold = 0.93
|
||||||
|
else:
|
||||||
|
threshold = 0.95
|
||||||
|
|
||||||
|
match = similarity >= threshold
|
||||||
|
message = f"Fingerprint comparison: similarity={similarity:.3f}, duration_diff={duration_diff}s, threshold={threshold:.2f}, match={match}"
|
||||||
|
logger.debug(f"{message}{_format_context(log_context)}")
|
||||||
|
return (match, message) if return_message else match
|
||||||
|
except Exception as e:
|
||||||
|
message = (
|
||||||
|
f"Fingerprint comparison failed: exception {type(e).__name__}: {str(e)}"
|
||||||
|
)
|
||||||
|
logger.debug(f"{message}{_format_context(log_context)}")
|
||||||
|
return (False, message) if return_message else False
|
||||||
|
|
||||||
|
|
||||||
|
def check_mb_recording_id(single_track_info, album_track_info) -> Tuple[int, str]:
|
||||||
|
"""Check MusicBrainz Recording ID match. Returns (score_delta, message)"""
|
||||||
|
if not (single_track_info and album_track_info):
|
||||||
|
return 0, "⚠ MusicBrainz Recording ID unavailable"
|
||||||
|
|
||||||
|
single_mb_id = single_track_info.get("foreignRecordingId")
|
||||||
|
album_mb_id = album_track_info.get("foreignRecordingId")
|
||||||
|
|
||||||
|
if not (single_mb_id and album_mb_id):
|
||||||
|
return 0, "⚠ MusicBrainz Recording ID unavailable"
|
||||||
|
|
||||||
|
if single_mb_id == album_mb_id:
|
||||||
|
return 50, "✓ MusicBrainz Recording ID match (+50)"
|
||||||
|
|
||||||
|
return -30, "✗ Different MusicBrainz Recording IDs (-30)"
|
||||||
|
|
||||||
|
|
||||||
|
def check_quality_profile(
|
||||||
|
single_file_info, album_file_info
|
||||||
|
) -> Tuple[int, Optional[str]]:
|
||||||
|
"""Check Lidarr quality profile match. Returns (score_delta, message)"""
|
||||||
|
single_quality = (
|
||||||
|
single_file_info.get("quality", {}).get("quality", {}).get("name", "")
|
||||||
|
)
|
||||||
|
album_quality = (
|
||||||
|
album_file_info.get("quality", {}).get("quality", {}).get("name", "")
|
||||||
|
)
|
||||||
|
|
||||||
|
if not (single_quality and album_quality):
|
||||||
|
return 0, None
|
||||||
|
|
||||||
|
if single_quality == album_quality:
|
||||||
|
return 10, f"✓ Same quality ({single_quality}) (+10)"
|
||||||
|
|
||||||
|
return 0, f"⚠ Different quality ({single_quality} vs {album_quality})"
|
||||||
|
|
||||||
|
|
||||||
|
def check_file_properties(single_props, album_props) -> List[Tuple[int, str]]:
|
||||||
|
"""Check file properties. Returns list of (score_delta, message) tuples"""
|
||||||
|
if not (single_props and album_props):
|
||||||
|
return []
|
||||||
|
|
||||||
|
results = []
|
||||||
|
|
||||||
|
duration_diff = abs(single_props["duration"] - album_props["duration"])
|
||||||
|
if duration_diff <= 1:
|
||||||
|
results.append((15, f"✓ Duration match ({duration_diff:.1f}s diff) (+15)"))
|
||||||
|
elif duration_diff <= 3:
|
||||||
|
results.append((5, f"⚠ Close duration ({duration_diff:.1f}s diff) (+5)"))
|
||||||
|
else:
|
||||||
|
results.append((-10, f"✗ Duration mismatch ({duration_diff:.1f}s diff) (-10)"))
|
||||||
|
|
||||||
|
size_ratio = min(single_props["size"], album_props["size"]) / max(
|
||||||
|
single_props["size"], album_props["size"]
|
||||||
|
)
|
||||||
|
if size_ratio >= 0.95:
|
||||||
|
results.append((15, f"✓ File size match ({size_ratio:.2%}) (+15)"))
|
||||||
|
elif size_ratio >= 0.85:
|
||||||
|
results.append((5, f"⚠ Similar file size ({size_ratio:.2%}) (+5)"))
|
||||||
|
else:
|
||||||
|
results.append((0, f"⚠ Different file sizes ({size_ratio:.2%})"))
|
||||||
|
|
||||||
|
if single_props["bitrate"] > 0 and album_props["bitrate"] > 0:
|
||||||
|
bitrate_ratio = min(single_props["bitrate"], album_props["bitrate"]) / max(
|
||||||
|
single_props["bitrate"], album_props["bitrate"]
|
||||||
|
)
|
||||||
|
if bitrate_ratio >= 0.90:
|
||||||
|
results.append((10, f"✓ Bitrate match ({bitrate_ratio:.2%}) (+10)"))
|
||||||
|
|
||||||
|
if single_props["sample_rate"] == album_props["sample_rate"]:
|
||||||
|
results.append(
|
||||||
|
(5, f"✓ Sample rate match ({single_props['sample_rate']}Hz) (+5)")
|
||||||
|
)
|
||||||
|
|
||||||
|
if single_props["codec"] and album_props["codec"]:
|
||||||
|
if single_props["codec"] == album_props["codec"]:
|
||||||
|
results.append((5, f"✓ Same codec ({single_props['codec']}) (+5)"))
|
||||||
|
else:
|
||||||
|
results.append(
|
||||||
|
(
|
||||||
|
0,
|
||||||
|
f"⚠ Different codecs ({single_props['codec']} vs {album_props['codec']})",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if single_props["channels"] == album_props["channels"]:
|
||||||
|
results.append((5, f"✓ Same channels ({single_props['channels']}) (+5)"))
|
||||||
|
else:
|
||||||
|
results.append(
|
||||||
|
(
|
||||||
|
0,
|
||||||
|
f"⚠ Different channels ({single_props['channels']} vs {album_props['channels']})",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if single_props["bit_depth"] > 0 and album_props["bit_depth"] > 0:
|
||||||
|
if single_props["bit_depth"] == album_props["bit_depth"]:
|
||||||
|
results.append(
|
||||||
|
(5, f"✓ Same bit depth ({single_props['bit_depth']}-bit) (+5)")
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
results.append(
|
||||||
|
(
|
||||||
|
0,
|
||||||
|
f"⚠ Different bit depths ({single_props['bit_depth']}-bit vs {album_props['bit_depth']}-bit)",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
267
src-cleanup/duplicate_finder.py
Normal file
267
src-cleanup/duplicate_finder.py
Normal file
@@ -0,0 +1,267 @@
|
|||||||
|
"""Functions to find duplicate singles in Lidarr"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
from lidarr_client import fetch_tracks_for_album, get_trackfile_info
|
||||||
|
from track_verification import verify_audio_match
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_title(title: str) -> str:
|
||||||
|
"""Normalize a track title for comparison"""
|
||||||
|
return " ".join(title.lower().split())
|
||||||
|
|
||||||
|
|
||||||
|
def build_album_track_map(
|
||||||
|
base_url: str, headers: Dict[str, str], albums: List[Dict]
|
||||||
|
) -> Dict[Tuple[int, str], List[Dict]]:
|
||||||
|
"""Create a mapping of tracks present on full albums"""
|
||||||
|
album_track_map: Dict[Tuple[int, str], List[Dict]] = defaultdict(list)
|
||||||
|
|
||||||
|
def process_album_for_map(album):
|
||||||
|
"""Process single album and add tracks to map"""
|
||||||
|
album_id = album.get("id")
|
||||||
|
artist_id = album.get("artistId")
|
||||||
|
album_title = album.get("title", "Unknown")
|
||||||
|
|
||||||
|
if not (album_id and artist_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
tracks = fetch_tracks_for_album(base_url, headers, album_id)
|
||||||
|
if not tracks:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping album '{album_title}' (albumId: {album_id}) - could not fetch tracks"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
def add_track_to_map(track):
|
||||||
|
"""Add track to album_track_map"""
|
||||||
|
title = track.get("title")
|
||||||
|
track_id = track.get("id")
|
||||||
|
track_file_id = track.get("trackFileId")
|
||||||
|
|
||||||
|
if not (title and track_file_id and track_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
key = (artist_id, normalize_title(title))
|
||||||
|
album_track_map[key].append(
|
||||||
|
{
|
||||||
|
"album_id": album_id,
|
||||||
|
"album_title": album_title,
|
||||||
|
"track_id": track_id,
|
||||||
|
"track_file_id": track_file_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
tracks_with_files = filter(lambda track: track.get("hasFile"), tracks)
|
||||||
|
list(map(add_track_to_map, tracks_with_files))
|
||||||
|
|
||||||
|
album_albums = filter(
|
||||||
|
lambda album: album.get("albumType", "").lower() == "album", albums
|
||||||
|
)
|
||||||
|
list(map(process_album_for_map, album_albums))
|
||||||
|
|
||||||
|
return album_track_map
|
||||||
|
|
||||||
|
|
||||||
|
def create_unverified_duplicate(
|
||||||
|
artist_id, album_id, album_title, title, track_file_id, duplicate_albums
|
||||||
|
) -> Dict:
|
||||||
|
"""Create duplicate entry for unverified tracks"""
|
||||||
|
return {
|
||||||
|
"artist_id": artist_id,
|
||||||
|
"single_album_id": album_id,
|
||||||
|
"single_album_title": album_title,
|
||||||
|
"track_title": title,
|
||||||
|
"single_track_file_id": track_file_id,
|
||||||
|
"duplicate_albums": duplicate_albums,
|
||||||
|
"verified_albums": duplicate_albums,
|
||||||
|
"verification_results": ["Audio verification disabled"],
|
||||||
|
"confidence_scores": [0],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def verify_and_mark_album_track(
|
||||||
|
base_url,
|
||||||
|
headers,
|
||||||
|
track_id,
|
||||||
|
track_file_id,
|
||||||
|
album_track,
|
||||||
|
docker_mount,
|
||||||
|
single_file_path,
|
||||||
|
) -> Tuple[bool, Optional[Dict], str, int]:
|
||||||
|
"""Verify album track and mark for migration if perfect match"""
|
||||||
|
album_track_id = album_track["track_id"]
|
||||||
|
album_track_file_id = album_track["track_file_id"]
|
||||||
|
|
||||||
|
album_track_file_info = get_trackfile_info(base_url, album_track_file_id, headers)
|
||||||
|
album_file_path = (
|
||||||
|
album_track_file_info.get("path") if album_track_file_info else None
|
||||||
|
)
|
||||||
|
|
||||||
|
match, result_message, confidence = verify_audio_match(
|
||||||
|
base_url,
|
||||||
|
headers,
|
||||||
|
track_id,
|
||||||
|
track_file_id,
|
||||||
|
album_track_id,
|
||||||
|
album_track_file_id,
|
||||||
|
docker_mount,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not match:
|
||||||
|
logger.debug(
|
||||||
|
f"Audio mismatch: single trackFileId {track_file_id} does not match album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)"
|
||||||
|
)
|
||||||
|
return False, None, result_message, confidence
|
||||||
|
|
||||||
|
album_track["confidence"] = confidence
|
||||||
|
album_track["migration_status"] = (
|
||||||
|
"eligible"
|
||||||
|
if confidence >= 95 and single_file_path and album_file_path
|
||||||
|
else "not_eligible"
|
||||||
|
)
|
||||||
|
if album_track["migration_status"] == "eligible":
|
||||||
|
album_track["single_file_path"] = single_file_path
|
||||||
|
album_track["album_file_path"] = album_file_path
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Audio match confirmed: single trackFileId {track_file_id} matches album '{album_track['album_title']}' trackFileId {album_track_file_id} (confidence: {confidence}/100)"
|
||||||
|
)
|
||||||
|
return True, album_track, result_message, confidence
|
||||||
|
|
||||||
|
|
||||||
|
def process_single_track(
|
||||||
|
base_url,
|
||||||
|
headers,
|
||||||
|
album_id,
|
||||||
|
artist_id,
|
||||||
|
album_title,
|
||||||
|
track,
|
||||||
|
album_track_map,
|
||||||
|
verify_audio,
|
||||||
|
docker_mount,
|
||||||
|
) -> Optional[Dict]:
|
||||||
|
"""Process a single track and return duplicate info or None"""
|
||||||
|
title = track.get("title")
|
||||||
|
track_id = track.get("id")
|
||||||
|
track_file_id = track.get("trackFileId")
|
||||||
|
|
||||||
|
if not (title and track_file_id and track_id):
|
||||||
|
return None
|
||||||
|
|
||||||
|
key = (artist_id, normalize_title(title))
|
||||||
|
if key not in album_track_map:
|
||||||
|
return None
|
||||||
|
|
||||||
|
duplicate_albums = album_track_map[key]
|
||||||
|
if not duplicate_albums:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not verify_audio:
|
||||||
|
return create_unverified_duplicate(
|
||||||
|
artist_id, album_id, album_title, title, track_file_id, duplicate_albums
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Verifying audio for '{title}' from single '{album_title}' against {len(duplicate_albums)} album track(s)..."
|
||||||
|
)
|
||||||
|
|
||||||
|
single_track_file_info = get_trackfile_info(base_url, track_file_id, headers)
|
||||||
|
single_file_path = (
|
||||||
|
single_track_file_info.get("path") if single_track_file_info else None
|
||||||
|
)
|
||||||
|
|
||||||
|
verification_data = list(
|
||||||
|
map(
|
||||||
|
lambda album_track: verify_and_mark_album_track(
|
||||||
|
base_url,
|
||||||
|
headers,
|
||||||
|
track_id,
|
||||||
|
track_file_id,
|
||||||
|
album_track,
|
||||||
|
docker_mount,
|
||||||
|
single_file_path,
|
||||||
|
),
|
||||||
|
duplicate_albums,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
verified_albums = [
|
||||||
|
track for match, track, _, _ in verification_data if match and track
|
||||||
|
]
|
||||||
|
verification_results = [result for _, _, result, _ in verification_data]
|
||||||
|
confidence_scores = [conf for _, _, _, conf in verification_data]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"artist_id": artist_id,
|
||||||
|
"single_album_id": album_id,
|
||||||
|
"single_album_title": album_title,
|
||||||
|
"track_title": title,
|
||||||
|
"single_track_file_id": track_file_id,
|
||||||
|
"duplicate_albums": duplicate_albums,
|
||||||
|
"verified_albums": verified_albums,
|
||||||
|
"verification_results": verification_results,
|
||||||
|
"confidence_scores": confidence_scores,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def process_single_album(
|
||||||
|
base_url, headers, album, album_track_map, verify_audio, docker_mount
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""Process a single album and return list of duplicates found"""
|
||||||
|
album_id = album.get("id")
|
||||||
|
artist_id = album.get("artistId")
|
||||||
|
album_title = album.get("title", "")
|
||||||
|
|
||||||
|
if not (album_id and artist_id):
|
||||||
|
return []
|
||||||
|
|
||||||
|
tracks = fetch_tracks_for_album(base_url, headers, album_id)
|
||||||
|
if not tracks:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping single album '{album_title}' (albumId: {album_id}) - could not fetch tracks"
|
||||||
|
)
|
||||||
|
return []
|
||||||
|
|
||||||
|
tracks_with_files = filter(lambda track: track.get("hasFile"), tracks)
|
||||||
|
process_track = lambda track: process_single_track(
|
||||||
|
base_url,
|
||||||
|
headers,
|
||||||
|
album_id,
|
||||||
|
artist_id,
|
||||||
|
album_title,
|
||||||
|
track,
|
||||||
|
album_track_map,
|
||||||
|
verify_audio,
|
||||||
|
docker_mount,
|
||||||
|
)
|
||||||
|
duplicate_infos = map(process_track, tracks_with_files)
|
||||||
|
|
||||||
|
return list(filter(lambda x: x is not None, duplicate_infos))
|
||||||
|
|
||||||
|
|
||||||
|
def find_duplicate_singles(
|
||||||
|
base_url: str,
|
||||||
|
headers: Dict[str, str],
|
||||||
|
albums: List[Dict],
|
||||||
|
album_track_map: Dict[Tuple[int, str], List[Dict]],
|
||||||
|
verify_audio: bool = True,
|
||||||
|
docker_mount: Optional[str] = None,
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""Identify single tracks that duplicate album tracks"""
|
||||||
|
single_albums = filter(
|
||||||
|
lambda album: album.get("albumType", "").lower() == "single", albums
|
||||||
|
)
|
||||||
|
|
||||||
|
album_duplicates = map(
|
||||||
|
lambda album: process_single_album(
|
||||||
|
base_url, headers, album, album_track_map, verify_audio, docker_mount
|
||||||
|
),
|
||||||
|
single_albums,
|
||||||
|
)
|
||||||
|
|
||||||
|
return [dup for album_dups in album_duplicates for dup in album_dups]
|
||||||
89
src-cleanup/lidarr_client.py
Normal file
89
src-cleanup/lidarr_client.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
"""Lidarr API client functions"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_json(
|
||||||
|
url: str,
|
||||||
|
headers: Dict[str, str],
|
||||||
|
params: Optional[Dict[str, object]] = None,
|
||||||
|
raise_on_error: bool = True,
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""Fetch JSON from URL with error handling"""
|
||||||
|
try:
|
||||||
|
resp = requests.get(url, headers=headers, params=params, timeout=60)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.warning(f"Error fetching {url}: {e}")
|
||||||
|
if raise_on_error:
|
||||||
|
raise
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def get_trackfile_info(
|
||||||
|
base_url: str, track_file_id: int, headers: Dict[str, str]
|
||||||
|
) -> Optional[Dict]:
|
||||||
|
"""Get trackfile information including file path and quality"""
|
||||||
|
try:
|
||||||
|
resp = requests.get(
|
||||||
|
f"{base_url.rstrip('/')}/api/v1/trackfile/{track_file_id}",
|
||||||
|
headers=headers,
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.warning(f"Could not fetch trackfile {track_file_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_track_info(
|
||||||
|
base_url: str, track_id: int, headers: Dict[str, str]
|
||||||
|
) -> Optional[Dict]:
|
||||||
|
"""Get track information including MusicBrainz recording ID"""
|
||||||
|
try:
|
||||||
|
resp = requests.get(
|
||||||
|
f"{base_url.rstrip('/')}/api/v1/track/{track_id}",
|
||||||
|
headers=headers,
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.warning(f"Could not fetch track {track_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_all_artists(base_url: str, headers: Dict[str, str]) -> List[Dict]:
|
||||||
|
"""Fetch all artists from Lidarr"""
|
||||||
|
return get_json(f"{base_url}/api/v1/artist", headers)
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_albums_for_artist(
|
||||||
|
base_url: str, headers: Dict[str, str], artist_id: int
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""Fetch all albums for an artist"""
|
||||||
|
return get_json(
|
||||||
|
f"{base_url}/api/v1/album",
|
||||||
|
headers,
|
||||||
|
params={"artistId": artist_id},
|
||||||
|
raise_on_error=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_tracks_for_album(
|
||||||
|
base_url: str, headers: Dict[str, str], album_id: int
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""Fetch all tracks for an album"""
|
||||||
|
return get_json(
|
||||||
|
f"{base_url.rstrip('/')}/api/v1/track",
|
||||||
|
headers,
|
||||||
|
params={"albumId": album_id},
|
||||||
|
raise_on_error=False,
|
||||||
|
)
|
||||||
49
src-cleanup/lidarr_delete.py
Normal file
49
src-cleanup/lidarr_delete.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
"""Lidarr album deletion functions"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def unmonitor_and_delete_album(
|
||||||
|
base_url: str, headers: Dict[str, str], album_id: int
|
||||||
|
) -> bool:
|
||||||
|
"""Unmonitor and delete an album from Lidarr. Returns success status."""
|
||||||
|
try:
|
||||||
|
# First unmonitor the album
|
||||||
|
logger.debug(f"Unmonitoring album {album_id}...")
|
||||||
|
album_resp = requests.get(
|
||||||
|
f"{base_url.rstrip('/')}/api/v1/album/{album_id}",
|
||||||
|
headers=headers,
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
album_resp.raise_for_status()
|
||||||
|
album_data = album_resp.json()
|
||||||
|
|
||||||
|
album_data["monitored"] = False
|
||||||
|
|
||||||
|
update_resp = requests.put(
|
||||||
|
f"{base_url.rstrip('/')}/api/v1/album/{album_id}",
|
||||||
|
headers=headers,
|
||||||
|
json=album_data,
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
update_resp.raise_for_status()
|
||||||
|
|
||||||
|
# Then delete the album (deleteFiles=true to remove files, addImportListExclusion=false)
|
||||||
|
logger.debug(f"Deleting album {album_id}...")
|
||||||
|
delete_resp = requests.delete(
|
||||||
|
f"{base_url.rstrip('/')}/api/v1/album/{album_id}",
|
||||||
|
headers=headers,
|
||||||
|
params={"deleteFiles": "true", "addImportListExclusion": "false"},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
delete_resp.raise_for_status()
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete album {album_id}: {e}")
|
||||||
|
return False
|
||||||
291
src-cleanup/main.py
Normal file
291
src-cleanup/main.py
Normal file
@@ -0,0 +1,291 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
lidarr_cleanup_singles
|
||||||
|
|
||||||
|
Identifies single-track releases that can safely be removed because
|
||||||
|
the same track already exists on a full album in Lidarr.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from duplicate_finder import build_album_track_map, find_duplicate_singles
|
||||||
|
from lidarr_client import fetch_all_artists, fetch_albums_for_artist
|
||||||
|
from lidarr_delete import unmonitor_and_delete_album
|
||||||
|
from plex_metadata import get_plex_server, get_smart_playlist_ids, migrate_plex_metadata
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Identify single tracks that are duplicates of album tracks in Lidarr."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--base-url",
|
||||||
|
default=os.getenv("LIDARR_URL"),
|
||||||
|
help="Base URL of the Lidarr instance. Can also be set via LIDARR_URL env var.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--api-key",
|
||||||
|
default=os.getenv("LIDARR_API_KEY"),
|
||||||
|
help="API key for Lidarr. Can also be set via LIDARR_API_KEY env var.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no-audio-verify",
|
||||||
|
action="store_true",
|
||||||
|
help="Skip audio fingerprint verification (faster but less accurate)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--docker-mount",
|
||||||
|
default=os.getenv("DOCKER_MOUNT"),
|
||||||
|
help="Docker mount mapping in format 'container_path:host_path'. Can also be set via DOCKER_MOUNT env var.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--debug",
|
||||||
|
action="store_true",
|
||||||
|
help="Enable debug logging",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--migrate-metadata",
|
||||||
|
action="store_true",
|
||||||
|
help="Migrate metadata (ratings, play counts) from singles to album tracks. Only applies to perfect matches (confidence >= 95).",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--delete",
|
||||||
|
action="store_true",
|
||||||
|
help="Delete single albums after successful metadata migration. Only deletes perfect matches (confidence >= 95) with successful migration.",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG if args.debug else logging.INFO,
|
||||||
|
format="[%(levelname)s] %(message)s",
|
||||||
|
handlers=[logging.StreamHandler(sys.stdout)],
|
||||||
|
)
|
||||||
|
|
||||||
|
if not args.base_url:
|
||||||
|
logger.error(
|
||||||
|
"LIDARR_URL not set. Provide --base-url or set LIDARR_URL environment variable."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not args.api_key:
|
||||||
|
logger.error(
|
||||||
|
"LIDARR_API_KEY not set. Provide --api-key or set LIDARR_API_KEY environment variable."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
base_url = args.base_url.rstrip("/")
|
||||||
|
headers = {"X-Api-Key": args.api_key}
|
||||||
|
|
||||||
|
logger.info("Fetching artists...")
|
||||||
|
artists = fetch_all_artists(base_url, headers)
|
||||||
|
if not artists:
|
||||||
|
logger.warning("No artists found. Exiting.")
|
||||||
|
return
|
||||||
|
|
||||||
|
artist_map = {
|
||||||
|
artist.get("id"): artist.get("artistName", "Unknown")
|
||||||
|
for artist in artists
|
||||||
|
if artist.get("id")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Fetching albums for each artist...")
|
||||||
|
albums = [
|
||||||
|
album
|
||||||
|
for artist in artists
|
||||||
|
if artist.get("id")
|
||||||
|
for album in fetch_albums_for_artist(base_url, headers, artist["id"])
|
||||||
|
]
|
||||||
|
|
||||||
|
if not albums:
|
||||||
|
logger.warning("No albums found in the library.")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Building album track map...")
|
||||||
|
album_track_map = build_album_track_map(base_url, headers, albums)
|
||||||
|
|
||||||
|
verify_audio = not args.no_audio_verify
|
||||||
|
docker_mount = args.docker_mount if args.docker_mount else None
|
||||||
|
|
||||||
|
if not verify_audio:
|
||||||
|
logger.info(
|
||||||
|
"Scanning for duplicate singles (audio verification disabled - using title matching only)..."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
mount_msg = f" (Docker mount: {docker_mount})" if docker_mount else ""
|
||||||
|
logger.info(
|
||||||
|
f"Scanning for duplicate singles with audio verification{mount_msg}..."
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"NOTE: Audio verification requires 'fpcalc' (chromaprint) to be installed"
|
||||||
|
)
|
||||||
|
|
||||||
|
duplicates = find_duplicate_singles(
|
||||||
|
base_url,
|
||||||
|
headers,
|
||||||
|
albums,
|
||||||
|
album_track_map,
|
||||||
|
verify_audio=verify_audio,
|
||||||
|
docker_mount=docker_mount,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not duplicates:
|
||||||
|
logger.info("No duplicate singles found. The library appears clean.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if args.migrate_metadata:
|
||||||
|
plex_url = os.getenv("PLEX_URL")
|
||||||
|
plex_token = os.getenv("PLEX_TOKEN")
|
||||||
|
|
||||||
|
if not (plex_url and plex_token):
|
||||||
|
logger.error(
|
||||||
|
"PLEX_URL and PLEX_TOKEN environment variables required for metadata migration"
|
||||||
|
)
|
||||||
|
logger.error("Set them in your .env file or environment")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"Connecting to Plex server at {plex_url}...")
|
||||||
|
plex_server = get_plex_server(plex_url, plex_token)
|
||||||
|
|
||||||
|
if not plex_server:
|
||||||
|
logger.error(
|
||||||
|
"Failed to connect to Plex server. Skipping metadata migration."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
smart_playlist_ids = get_smart_playlist_ids(plex_server)
|
||||||
|
|
||||||
|
logger.info("Migrating Plex metadata for perfect matches (confidence >= 95)...")
|
||||||
|
migration_count = 0
|
||||||
|
albums_to_delete = []
|
||||||
|
|
||||||
|
for dup in duplicates:
|
||||||
|
for album_track in dup.get("verified_albums", []):
|
||||||
|
if album_track.get("migration_status") != "eligible":
|
||||||
|
continue
|
||||||
|
|
||||||
|
single_file_path = album_track.get("single_file_path")
|
||||||
|
album_file_path = album_track.get("album_file_path")
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Migrating Plex metadata for '{dup['track_title']}' to album '{album_track['album_title']}'..."
|
||||||
|
)
|
||||||
|
success, message = migrate_plex_metadata(
|
||||||
|
plex_server,
|
||||||
|
single_file_path,
|
||||||
|
album_file_path,
|
||||||
|
docker_mount,
|
||||||
|
smart_playlist_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
album_track["migration_message"] = message
|
||||||
|
album_track["migration_success"] = success
|
||||||
|
|
||||||
|
if success:
|
||||||
|
migration_count += 1
|
||||||
|
logger.info(f" ✓ {message}")
|
||||||
|
if args.delete and dup["single_album_id"] not in [
|
||||||
|
a["album_id"] for a in albums_to_delete
|
||||||
|
]:
|
||||||
|
albums_to_delete.append(
|
||||||
|
{
|
||||||
|
"album_id": dup["single_album_id"],
|
||||||
|
"album_title": dup["single_album_title"],
|
||||||
|
"artist_name": artist_map.get(
|
||||||
|
dup["artist_id"], "Unknown"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(f" ✗ {message}")
|
||||||
|
|
||||||
|
logger.info(f"Completed Plex metadata migration for {migration_count} track(s)")
|
||||||
|
logger.info("")
|
||||||
|
|
||||||
|
if args.delete and albums_to_delete:
|
||||||
|
logger.info(
|
||||||
|
f"Deleting {len(albums_to_delete)} single album(s) from Lidarr..."
|
||||||
|
)
|
||||||
|
deleted_count = 0
|
||||||
|
|
||||||
|
for album_info in albums_to_delete:
|
||||||
|
album_id = album_info["album_id"]
|
||||||
|
album_title = album_info["album_title"]
|
||||||
|
artist_name = album_info["artist_name"]
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Deleting album: {artist_name} - {album_title} (albumId: {album_id})..."
|
||||||
|
)
|
||||||
|
if unmonitor_and_delete_album(base_url, headers, album_id):
|
||||||
|
deleted_count += 1
|
||||||
|
logger.info(f" ✓ Successfully deleted album {album_id}")
|
||||||
|
else:
|
||||||
|
logger.error(f" ✗ Failed to delete album {album_id}")
|
||||||
|
|
||||||
|
logger.info(f"Deleted {deleted_count}/{len(albums_to_delete)} album(s)")
|
||||||
|
logger.info("")
|
||||||
|
|
||||||
|
verified_count = sum(1 for dup in duplicates if dup.get("verified_albums"))
|
||||||
|
logger.info(
|
||||||
|
f"Found {len(duplicates)} single track(s) that are duplicates of album tracks ({verified_count} verified by audio fingerprint):"
|
||||||
|
)
|
||||||
|
|
||||||
|
for dup in duplicates:
|
||||||
|
artist_id = dup["artist_id"]
|
||||||
|
artist_name = artist_map.get(artist_id, f"Unknown (ID: {artist_id})")
|
||||||
|
duplicate_albums = dup["duplicate_albums"]
|
||||||
|
verified_albums = dup.get("verified_albums", duplicate_albums)
|
||||||
|
verification_results = dup.get("verification_results", [])
|
||||||
|
confidence_scores = dup.get("confidence_scores", [])
|
||||||
|
album_names = [album["album_title"] for album in duplicate_albums]
|
||||||
|
|
||||||
|
logger.info(f"Artist: {artist_name}")
|
||||||
|
logger.info(f" Single: '{dup['single_album_title']}'")
|
||||||
|
logger.info(
|
||||||
|
f" Track: '{dup['track_title']}' (trackFileId: {dup['single_track_file_id']})"
|
||||||
|
)
|
||||||
|
|
||||||
|
for i, result in enumerate(verification_results):
|
||||||
|
confidence = confidence_scores[i] if i < len(confidence_scores) else 0
|
||||||
|
logger.info(f" {result}")
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f" Already present on {len(duplicate_albums)} album(s): {', '.join(album_names)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if verify_audio and not verified_albums:
|
||||||
|
logger.info(" ⚠ NOT safe to delete (audio verification failed)")
|
||||||
|
elif verify_audio:
|
||||||
|
verified_names = [album["album_title"] for album in verified_albums]
|
||||||
|
max_confidence = max(confidence_scores) if confidence_scores else 0
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f" ✓ LIKELY safe to delete (audio verified on {len(verified_albums)} album(s): {', '.join(verified_names)})"
|
||||||
|
)
|
||||||
|
logger.info(f" Max confidence: {max_confidence}/100")
|
||||||
|
|
||||||
|
perfect_matches = [
|
||||||
|
a for a in verified_albums if a.get("confidence", 0) >= 95
|
||||||
|
]
|
||||||
|
for album_track in perfect_matches:
|
||||||
|
migration_msg = album_track.get("migration_message", "")
|
||||||
|
if migration_msg:
|
||||||
|
logger.info(f" Metadata: {migration_msg}")
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
" ⚠ CAUTION: Always check for different versions (remaster, radio edit, live, etc)"
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
286
src-cleanup/plex_metadata.py
Normal file
286
src-cleanup/plex_metadata.py
Normal file
@@ -0,0 +1,286 @@
|
|||||||
|
"""Plex metadata migration functions"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_plex_server(plex_url: str, plex_token: str):
|
||||||
|
"""Connect to Plex server"""
|
||||||
|
try:
|
||||||
|
from plexapi.server import PlexServer
|
||||||
|
|
||||||
|
return PlexServer(plex_url, plex_token)
|
||||||
|
except ImportError:
|
||||||
|
logger.error("python-plexapi not installed. Install with: pip install plexapi")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to connect to Plex server: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def find_plex_track_by_path(
|
||||||
|
plex_server, file_path: str, docker_mount: Optional[str] = None
|
||||||
|
):
|
||||||
|
"""Find a Plex track by its file path"""
|
||||||
|
from audio_verification import map_docker_path
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
mapped_path = map_docker_path(file_path, docker_mount)
|
||||||
|
logger.debug(
|
||||||
|
f"Searching for track: lidarr_path={file_path}, mapped_path={mapped_path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
music_sections = [
|
||||||
|
s for s in plex_server.library.sections() if s.type == "artist"
|
||||||
|
]
|
||||||
|
if not music_sections:
|
||||||
|
logger.warning("No music sections found in Plex")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Strategy: Check track.locations (list of file paths)
|
||||||
|
for section in music_sections:
|
||||||
|
all_tracks = section.searchTracks()
|
||||||
|
for track in all_tracks:
|
||||||
|
track_locations = getattr(track, "locations", [])
|
||||||
|
if mapped_path in track_locations or file_path in track_locations:
|
||||||
|
logger.debug(
|
||||||
|
f"Found track by locations match: {track.title} - {track_locations[0] if track_locations else 'unknown'}"
|
||||||
|
)
|
||||||
|
return track
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
f"Could not find Plex track. Paths tried: {file_path}, {mapped_path}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error finding Plex track for path {file_path}: {e}")
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_smart_playlist_ids(plex_server) -> set:
|
||||||
|
"""Get set of smart playlist IDs to exclude from migration"""
|
||||||
|
try:
|
||||||
|
smart_playlists = [
|
||||||
|
p.ratingKey
|
||||||
|
for p in plex_server.playlists()
|
||||||
|
if p.playlistType == "audio" and p.smart
|
||||||
|
]
|
||||||
|
if smart_playlists:
|
||||||
|
logger.info(
|
||||||
|
f"Found {len(smart_playlists)} smart playlists (will exclude from migration)"
|
||||||
|
)
|
||||||
|
return set(smart_playlists)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Could not get smart playlists: {e}")
|
||||||
|
return set()
|
||||||
|
|
||||||
|
|
||||||
|
def get_plex_playlists_for_track(
|
||||||
|
plex_server, track, exclude_smart_playlists: set = None
|
||||||
|
) -> List:
|
||||||
|
"""Get all manual playlists containing this track (excludes smart playlists)"""
|
||||||
|
if exclude_smart_playlists is None:
|
||||||
|
exclude_smart_playlists = set()
|
||||||
|
|
||||||
|
try:
|
||||||
|
return [
|
||||||
|
playlist
|
||||||
|
for playlist in plex_server.playlists()
|
||||||
|
if playlist.playlistType == "audio"
|
||||||
|
and playlist.ratingKey not in exclude_smart_playlists
|
||||||
|
and any(item.ratingKey == track.ratingKey for item in playlist.items())
|
||||||
|
]
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Could not get playlists: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_rating(
|
||||||
|
single_track, album_track, single_rating, original_album_rating
|
||||||
|
) -> Tuple[List[str], List[str], List[str]]:
|
||||||
|
"""Migrate rating. Returns (changes, already_has, failures)"""
|
||||||
|
if not single_rating:
|
||||||
|
return [], [], []
|
||||||
|
|
||||||
|
if original_album_rating:
|
||||||
|
logger.info(f" Album already has rating: {original_album_rating}/10")
|
||||||
|
return [], [f"rating ({original_album_rating}/10)"], []
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f" Setting rating to {single_rating}/10...")
|
||||||
|
album_track.rate(single_rating)
|
||||||
|
album_track.reload()
|
||||||
|
new_rating = getattr(album_track, "userRating", None)
|
||||||
|
|
||||||
|
if new_rating != single_rating:
|
||||||
|
logger.warning(
|
||||||
|
f" ⚠ Rating mismatch: expected {single_rating}, got {new_rating}"
|
||||||
|
)
|
||||||
|
return [], [], [f"rating (set to {single_rating} but got {new_rating})"]
|
||||||
|
|
||||||
|
logger.info(f" ✓ Rating verified: {new_rating}/10")
|
||||||
|
return [f"rating ({single_rating}/10) ✓ verified"], [], []
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to migrate rating: {e}")
|
||||||
|
return [], [], [f"rating (error: {e})"]
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_play_count(
|
||||||
|
album_track, single_plays, album_plays
|
||||||
|
) -> Tuple[List[str], List[str], List[str]]:
|
||||||
|
"""Migrate play count. Returns (changes, already_has, failures)"""
|
||||||
|
if single_plays <= 0:
|
||||||
|
return [], [], []
|
||||||
|
|
||||||
|
expected_count = album_plays + single_plays
|
||||||
|
logger.info(
|
||||||
|
f" Migrating play count: single={single_plays}, album={album_plays}, expected={expected_count}"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
list(
|
||||||
|
map(
|
||||||
|
lambda i: (
|
||||||
|
album_track.markPlayed(),
|
||||||
|
(
|
||||||
|
logger.debug(
|
||||||
|
f" Marked played {i + 1}/{single_plays} times..."
|
||||||
|
)
|
||||||
|
if (i + 1) % 10 == 0
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
)[0],
|
||||||
|
range(single_plays),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
album_track.reload()
|
||||||
|
new_count = getattr(album_track, "viewCount", 0) or 0
|
||||||
|
|
||||||
|
if new_count != expected_count:
|
||||||
|
logger.warning(
|
||||||
|
f" ⚠ Play count mismatch: expected {expected_count}, got {new_count}"
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
[f"play count (expected {expected_count} but got {new_count})"],
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f" ✓ Play count verified: {new_count}")
|
||||||
|
return (
|
||||||
|
[f"play count ({album_plays} + {single_plays} = {new_count}) ✓ verified"],
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to migrate play count: {e}")
|
||||||
|
return [], [], [f"play count (error: {e})"]
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_playlist(playlist, album_track) -> Tuple[List[str], List[str], List[str]]:
|
||||||
|
"""Migrate single playlist. Returns (changes, already_has, failures)"""
|
||||||
|
playlist_name = playlist.title
|
||||||
|
|
||||||
|
try:
|
||||||
|
if any(item.ratingKey == album_track.ratingKey for item in playlist.items()):
|
||||||
|
logger.info(f" Album already in playlist: '{playlist_name}'")
|
||||||
|
return [], [f"playlist '{playlist_name}'"], []
|
||||||
|
|
||||||
|
logger.info(f" Adding to playlist: '{playlist_name}'...")
|
||||||
|
playlist.addItems(album_track)
|
||||||
|
playlist.reload()
|
||||||
|
|
||||||
|
if not any(
|
||||||
|
item.ratingKey == album_track.ratingKey for item in playlist.items()
|
||||||
|
):
|
||||||
|
logger.warning(f" ⚠ Playlist '{playlist_name}' add failed verification")
|
||||||
|
return [], [], [f"playlist '{playlist_name}' (add failed)"]
|
||||||
|
|
||||||
|
logger.info(f" ✓ Playlist '{playlist_name}' verified")
|
||||||
|
return [f"added to playlist '{playlist_name}' ✓ verified"], [], []
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to add to playlist '{playlist_name}': {e}")
|
||||||
|
return [], [], [f"playlist '{playlist_name}' (error: {e})"]
|
||||||
|
|
||||||
|
|
||||||
|
def format_migration_message(
|
||||||
|
changes: List[str], already_has: List[str], failures: List[str]
|
||||||
|
) -> str:
|
||||||
|
"""Format migration result message"""
|
||||||
|
parts = list(
|
||||||
|
filter(
|
||||||
|
None,
|
||||||
|
[
|
||||||
|
f"✅ Migrated: {', '.join(changes)}" if changes else None,
|
||||||
|
f"ℹ️ Already has: {', '.join(already_has)}" if already_has else None,
|
||||||
|
f"❌ Failed: {', '.join(failures)}" if failures else None,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return " | ".join(parts) if parts else "No metadata to migrate"
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_plex_metadata(
|
||||||
|
plex_server,
|
||||||
|
single_file_path: str,
|
||||||
|
album_file_path: str,
|
||||||
|
docker_mount: Optional[str] = None,
|
||||||
|
exclude_smart_playlists: set = None,
|
||||||
|
) -> Tuple[bool, str]:
|
||||||
|
"""Migrate Plex metadata from single to album track. Returns (success, message)"""
|
||||||
|
if not plex_server:
|
||||||
|
return False, "Plex server not connected"
|
||||||
|
|
||||||
|
single_track = find_plex_track_by_path(plex_server, single_file_path, docker_mount)
|
||||||
|
album_track = find_plex_track_by_path(plex_server, album_file_path, docker_mount)
|
||||||
|
|
||||||
|
if not single_track:
|
||||||
|
return False, "Could not find single track in Plex"
|
||||||
|
if not album_track:
|
||||||
|
return False, "Could not find album track in Plex"
|
||||||
|
|
||||||
|
single_rating = getattr(single_track, "userRating", None)
|
||||||
|
single_plays = getattr(single_track, "viewCount", 0) or 0
|
||||||
|
single_playlists = get_plex_playlists_for_track(
|
||||||
|
plex_server, single_track, exclude_smart_playlists
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f" Single track metadata: rating={single_rating or 'none'}, plays={single_plays}, playlists={len(single_playlists)}"
|
||||||
|
)
|
||||||
|
if single_playlists:
|
||||||
|
logger.info(
|
||||||
|
f" Single is in playlists: {', '.join(p.title for p in single_playlists)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
original_album_rating = getattr(album_track, "userRating", None)
|
||||||
|
album_plays = getattr(album_track, "viewCount", 0) or 0
|
||||||
|
|
||||||
|
rating_changes, rating_already, rating_failures = migrate_rating(
|
||||||
|
single_track, album_track, single_rating, original_album_rating
|
||||||
|
)
|
||||||
|
|
||||||
|
plays_changes, plays_already, plays_failures = migrate_play_count(
|
||||||
|
album_track, single_plays, album_plays
|
||||||
|
)
|
||||||
|
|
||||||
|
playlist_results = list(
|
||||||
|
map(lambda p: migrate_playlist(p, album_track), single_playlists)
|
||||||
|
)
|
||||||
|
playlist_changes = [c for result in playlist_results for c in result[0]]
|
||||||
|
playlist_already = [a for result in playlist_results for a in result[1]]
|
||||||
|
playlist_failures = [f for result in playlist_results for f in result[2]]
|
||||||
|
|
||||||
|
all_changes = rating_changes + plays_changes + playlist_changes
|
||||||
|
all_already = rating_already + plays_already + playlist_already
|
||||||
|
all_failures = rating_failures + plays_failures + playlist_failures
|
||||||
|
|
||||||
|
message = format_migration_message(all_changes, all_already, all_failures)
|
||||||
|
return len(all_failures) == 0, message
|
||||||
21
src-cleanup/pyproject.toml
Normal file
21
src-cleanup/pyproject.toml
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
py-modules = ["main", "lidarr_client", "audio_verification", "track_verification", "plex_metadata", "duplicate_finder", "lidarr_delete"]
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "lidarr-cleanup-singles"
|
||||||
|
version = "1.0.0"
|
||||||
|
description = "Identify and optionally delete duplicate single tracks in Lidarr"
|
||||||
|
requires-python = ">=3.8"
|
||||||
|
dependencies = [
|
||||||
|
"requests",
|
||||||
|
"python-dotenv",
|
||||||
|
"plexapi",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
lidarr-cleanup-singles = "main:main"
|
||||||
|
|
||||||
85
src-cleanup/track_verification.py
Normal file
85
src-cleanup/track_verification.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
"""Track verification using multiple methods"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Dict, Optional, Tuple
|
||||||
|
|
||||||
|
from audio_verification import (
|
||||||
|
check_file_properties,
|
||||||
|
check_mb_recording_id,
|
||||||
|
check_quality_profile,
|
||||||
|
compare_fingerprints,
|
||||||
|
get_audio_fingerprint,
|
||||||
|
get_file_properties,
|
||||||
|
)
|
||||||
|
from lidarr_client import get_track_info, get_trackfile_info
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_audio_match(
|
||||||
|
base_url: str,
|
||||||
|
headers: Dict[str, str],
|
||||||
|
single_track_id: int,
|
||||||
|
single_track_file_id: int,
|
||||||
|
album_track_id: int,
|
||||||
|
album_track_file_id: int,
|
||||||
|
docker_mount: Optional[str] = None,
|
||||||
|
) -> Tuple[bool, Optional[str], int]:
|
||||||
|
"""Verify tracks using multiple methods. Returns (match, message, confidence_score)"""
|
||||||
|
logger.debug(
|
||||||
|
f"Verifying audio match: single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}"
|
||||||
|
)
|
||||||
|
|
||||||
|
single_file_info = get_trackfile_info(base_url, single_track_file_id, headers)
|
||||||
|
album_file_info = get_trackfile_info(base_url, album_track_file_id, headers)
|
||||||
|
|
||||||
|
if not (single_file_info and album_file_info):
|
||||||
|
return False, "Could not fetch track file info", 0
|
||||||
|
|
||||||
|
single_path = single_file_info.get("path")
|
||||||
|
album_path = album_file_info.get("path")
|
||||||
|
if not (single_path and album_path):
|
||||||
|
return False, "Missing file paths", 0
|
||||||
|
|
||||||
|
single_track_info = get_track_info(base_url, single_track_id, headers)
|
||||||
|
album_track_info = get_track_info(base_url, album_track_id, headers)
|
||||||
|
|
||||||
|
mb_score, mb_msg = check_mb_recording_id(single_track_info, album_track_info)
|
||||||
|
quality_score, quality_msg = check_quality_profile(
|
||||||
|
single_file_info, album_file_info
|
||||||
|
)
|
||||||
|
|
||||||
|
single_props = get_file_properties(single_path, docker_mount)
|
||||||
|
album_props = get_file_properties(album_path, docker_mount)
|
||||||
|
prop_checks = check_file_properties(single_props, album_props)
|
||||||
|
|
||||||
|
single_fp = get_audio_fingerprint(single_path, docker_mount)
|
||||||
|
album_fp = get_audio_fingerprint(album_path, docker_mount)
|
||||||
|
log_context = f"single trackFileId {single_track_file_id} vs album trackFileId {album_track_file_id}"
|
||||||
|
|
||||||
|
if single_fp and album_fp:
|
||||||
|
fp_match, fp_message = compare_fingerprints(
|
||||||
|
single_fp, album_fp, log_context, return_message=True
|
||||||
|
)
|
||||||
|
fp_score = 20 if fp_match else 0
|
||||||
|
fp_msg = f"✓ Audio fingerprint match (+20)" if fp_match else f"⚠ {fp_message}"
|
||||||
|
else:
|
||||||
|
fp_score, fp_msg = 0, "⚠ Audio fingerprint unavailable"
|
||||||
|
|
||||||
|
all_checks = [
|
||||||
|
(mb_score, mb_msg),
|
||||||
|
(quality_score, quality_msg) if quality_msg else None,
|
||||||
|
*prop_checks,
|
||||||
|
(fp_score, fp_msg),
|
||||||
|
]
|
||||||
|
|
||||||
|
valid_checks = list(filter(lambda x: x is not None, all_checks))
|
||||||
|
confidence_score = sum(score for score, _ in valid_checks)
|
||||||
|
verification_results = [msg for _, msg in valid_checks]
|
||||||
|
|
||||||
|
match = confidence_score >= 70
|
||||||
|
result_message = f"Confidence: {confidence_score}/100 | " + " | ".join(
|
||||||
|
verification_results
|
||||||
|
)
|
||||||
|
|
||||||
|
return match, result_message, confidence_score
|
||||||
Reference in New Issue
Block a user