Spotify's playlist_items endpoint now nests the track object under "item"
instead of "track" in each playlist entry. The old key is absent, causing
item.get("track") to return None for every entry and silently drop all tracks.
Also adds stderr debug traces at each component boundary (tool dispatch,
API page summary, per-item accept/skip) to make future silent-empty issues
diagnosable, and switches popularity to .get() since it is absent in the
new response shape.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
187 lines
6.5 KiB
Python
187 lines
6.5 KiB
Python
import os
|
|
import pathlib
|
|
import sys
|
|
|
|
from dotenv import load_dotenv
|
|
import spotipy
|
|
from spotipy.oauth2 import SpotifyOAuth
|
|
from spotipy.cache_handler import CacheFileHandler
|
|
|
|
|
|
def _dbg(msg: str) -> None:
|
|
print(f"[spotify-mcp] {msg}", file=sys.stderr, flush=True)
|
|
|
|
load_dotenv()
|
|
|
|
_SCOPES = " ".join([
|
|
"playlist-read-private",
|
|
"playlist-read-collaborative",
|
|
"playlist-modify-public",
|
|
"playlist-modify-private",
|
|
"user-library-read",
|
|
])
|
|
|
|
# Cache relative to this file so Claude Code always finds it regardless of cwd
|
|
_CACHE_PATH = pathlib.Path(__file__).parent / ".cache"
|
|
|
|
_client: spotipy.Spotify | None = None
|
|
|
|
|
|
def get_client() -> spotipy.Spotify:
|
|
global _client
|
|
if _client is None:
|
|
client_id = os.environ.get("SPOTIPY_CLIENT_ID")
|
|
client_secret = os.environ.get("SPOTIPY_CLIENT_SECRET")
|
|
redirect_uri = os.environ.get("SPOTIPY_REDIRECT_URI", "http://localhost:8888/callback")
|
|
if not client_id or not client_secret:
|
|
raise RuntimeError(
|
|
"Missing SPOTIPY_CLIENT_ID or SPOTIPY_CLIENT_SECRET. "
|
|
"Copy .env.example to .env and fill in your Spotify app credentials."
|
|
)
|
|
_client = spotipy.Spotify(
|
|
auth_manager=SpotifyOAuth(
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
redirect_uri=redirect_uri,
|
|
scope=_SCOPES,
|
|
cache_handler=CacheFileHandler(cache_path=str(_CACHE_PATH)),
|
|
)
|
|
)
|
|
return _client
|
|
|
|
|
|
def _format_duration(ms: int) -> str:
|
|
if ms is None:
|
|
return "unknown"
|
|
seconds = ms // 1000
|
|
minutes, seconds = divmod(seconds, 60)
|
|
return f"{minutes}:{seconds:02d}"
|
|
|
|
|
|
def list_playlists() -> list[dict]:
|
|
sp = get_client()
|
|
results = []
|
|
response = sp.current_user_playlists(limit=50)
|
|
while response:
|
|
for item in response["items"]:
|
|
results.append({
|
|
"id": item["id"],
|
|
"name": item["name"],
|
|
"description": item.get("description", ""),
|
|
"tracks_total": (item.get("tracks") or {}).get("total", 0),
|
|
"public": item["public"],
|
|
"uri": item["uri"],
|
|
})
|
|
response = sp.next(response) if response["next"] else None
|
|
return results
|
|
|
|
|
|
def get_playlist_tracks(playlist_id: str) -> list[dict]:
|
|
_dbg(f"get_playlist_tracks called with playlist_id={playlist_id!r}")
|
|
sp = get_client()
|
|
results = []
|
|
page = 0
|
|
response = sp.playlist_items(playlist_id, additional_types=["track"])
|
|
while response:
|
|
items = response.get("items") or []
|
|
total = response.get("total")
|
|
_dbg(f" page {page}: total={total}, items_in_page={len(items)}, next={bool(response.get('next'))}")
|
|
for idx, item in enumerate(items):
|
|
# Spotify API returns the track under "item" key (newer API) or "track" key (older)
|
|
track = item.get("item") or item.get("track")
|
|
if track is None:
|
|
_dbg(f" item[{idx}]: track=None (local/null track), skipping")
|
|
continue
|
|
track_type = track.get("type")
|
|
track_name = track.get("name", "<no-name>")
|
|
track_uri = track.get("uri", "<no-uri>")
|
|
if track_type != "track":
|
|
_dbg(f" item[{idx}]: SKIPPED type={track_type!r} name={track_name!r} uri={track_uri!r}")
|
|
continue
|
|
_dbg(f" item[{idx}]: OK type={track_type!r} name={track_name!r}")
|
|
results.append({
|
|
"name": track["name"],
|
|
"artists": [a["name"] for a in track["artists"]],
|
|
"album": track["album"]["name"],
|
|
"duration_ms": track["duration_ms"],
|
|
"duration": _format_duration(track["duration_ms"]),
|
|
"popularity": track.get("popularity"),
|
|
"uri": track["uri"],
|
|
"added_at": item["added_at"],
|
|
})
|
|
page += 1
|
|
response = sp.next(response) if response["next"] else None
|
|
_dbg(f"get_playlist_tracks returning {len(results)} tracks")
|
|
return results
|
|
|
|
|
|
def list_saved_tracks(limit: int = 50) -> list[dict]:
|
|
sp = get_client()
|
|
results = []
|
|
response = sp.current_user_saved_tracks(limit=min(limit, 50))
|
|
while response and len(results) < limit:
|
|
for item in response["items"]:
|
|
if len(results) >= limit:
|
|
break
|
|
track = item["track"]
|
|
results.append({
|
|
"name": track["name"],
|
|
"artists": [a["name"] for a in track["artists"]],
|
|
"album": track["album"]["name"],
|
|
"duration_ms": track["duration_ms"],
|
|
"duration": _format_duration(track["duration_ms"]),
|
|
"popularity": track["popularity"],
|
|
"uri": track["uri"],
|
|
"added_at": item["added_at"],
|
|
})
|
|
if response["next"] and len(results) < limit:
|
|
response = sp.next(response)
|
|
else:
|
|
break
|
|
return results
|
|
|
|
|
|
def search_tracks(query: str, limit: int = 10) -> list[dict]:
|
|
sp = get_client()
|
|
# Spotify's search endpoint returns at most 50 results per call; no pagination available
|
|
response = sp.search(q=query, type="track", limit=min(limit, 50))
|
|
results = []
|
|
for track in response["tracks"]["items"]:
|
|
results.append({
|
|
"name": track["name"],
|
|
"artists": [a["name"] for a in track["artists"]],
|
|
"album": track["album"]["name"],
|
|
"duration_ms": track["duration_ms"],
|
|
"duration": _format_duration(track["duration_ms"]),
|
|
"popularity": track["popularity"],
|
|
"uri": track["uri"],
|
|
})
|
|
return results
|
|
|
|
|
|
def create_playlist(name: str, description: str = "", public: bool = False) -> dict:
|
|
sp = get_client()
|
|
user_id = sp.current_user()["id"]
|
|
playlist = sp.user_playlist_create(
|
|
user=user_id,
|
|
name=name,
|
|
public=public,
|
|
description=description,
|
|
)
|
|
return {
|
|
"id": playlist["id"],
|
|
"name": playlist["name"],
|
|
"uri": playlist["uri"],
|
|
"public": playlist["public"],
|
|
}
|
|
|
|
|
|
def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict:
|
|
if not track_uris:
|
|
raise ValueError("track_uris must not be empty")
|
|
sp = get_client()
|
|
for i in range(0, len(track_uris), 100):
|
|
sp.playlist_add_items(playlist_id, track_uris[i : i + 100])
|
|
# Only reached if all batches succeed — any SpotifyException propagates to the caller
|
|
return {"added": len(track_uris)}
|