import os import pathlib import sys from dotenv import load_dotenv import spotipy from spotipy.oauth2 import SpotifyOAuth from spotipy.cache_handler import CacheFileHandler def _dbg(msg: str) -> None: print(f"[spotify-mcp] {msg}", file=sys.stderr, flush=True) load_dotenv() _SCOPES = " ".join([ "playlist-read-private", "playlist-read-collaborative", "playlist-modify-public", "playlist-modify-private", "user-library-read", ]) # Cache relative to this file so Claude Code always finds it regardless of cwd _CACHE_PATH = pathlib.Path(__file__).parent / ".cache" _client: spotipy.Spotify | None = None def get_client() -> spotipy.Spotify: global _client if _client is None: client_id = os.environ.get("SPOTIPY_CLIENT_ID") client_secret = os.environ.get("SPOTIPY_CLIENT_SECRET") redirect_uri = os.environ.get("SPOTIPY_REDIRECT_URI", "http://localhost:8888/callback") if not client_id or not client_secret: raise RuntimeError( "Missing SPOTIPY_CLIENT_ID or SPOTIPY_CLIENT_SECRET. " "Copy .env.example to .env and fill in your Spotify app credentials." ) _client = spotipy.Spotify( auth_manager=SpotifyOAuth( client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope=_SCOPES, cache_handler=CacheFileHandler(cache_path=str(_CACHE_PATH)), ) ) return _client def _format_duration(ms: int) -> str: if ms is None: return "unknown" seconds = ms // 1000 minutes, seconds = divmod(seconds, 60) return f"{minutes}:{seconds:02d}" def list_playlists() -> list[dict]: sp = get_client() results = [] response = sp.current_user_playlists(limit=50) while response: for item in response["items"]: results.append({ "id": item["id"], "name": item["name"], "description": item.get("description", ""), "tracks_total": (item.get("tracks") or {}).get("total", 0), "public": item["public"], "uri": item["uri"], }) response = sp.next(response) if response["next"] else None return results def get_playlist_tracks(playlist_id: str) -> list[dict]: _dbg(f"get_playlist_tracks called with playlist_id={playlist_id!r}") sp = get_client() results = [] page = 0 response = sp.playlist_items(playlist_id, additional_types=["track"]) while response: items = response.get("items") or [] total = response.get("total") _dbg(f" page {page}: total={total}, items_in_page={len(items)}, next={bool(response.get('next'))}") for idx, item in enumerate(items): # Spotify API returns the track under "item" key (newer API) or "track" key (older) track = item.get("item") or item.get("track") if track is None: _dbg(f" item[{idx}]: track=None (local/null track), skipping") continue track_type = track.get("type") track_name = track.get("name", "") track_uri = track.get("uri", "") if track_type != "track": _dbg(f" item[{idx}]: SKIPPED type={track_type!r} name={track_name!r} uri={track_uri!r}") continue _dbg(f" item[{idx}]: OK type={track_type!r} name={track_name!r}") results.append({ "name": track["name"], "artists": [a["name"] for a in track["artists"]], "album": track["album"]["name"], "duration_ms": track["duration_ms"], "duration": _format_duration(track["duration_ms"]), "popularity": track.get("popularity"), "uri": track["uri"], "added_at": item["added_at"], }) page += 1 response = sp.next(response) if response["next"] else None _dbg(f"get_playlist_tracks returning {len(results)} tracks") return results def list_saved_tracks(limit: int = 50) -> list[dict]: sp = get_client() results = [] response = sp.current_user_saved_tracks(limit=min(limit, 50)) while response and len(results) < limit: for item in response["items"]: if len(results) >= limit: break track = item["track"] results.append({ "name": track["name"], "artists": [a["name"] for a in track["artists"]], "album": track["album"]["name"], "duration_ms": track["duration_ms"], "duration": _format_duration(track["duration_ms"]), "popularity": track["popularity"], "uri": track["uri"], "added_at": item["added_at"], }) if response["next"] and len(results) < limit: response = sp.next(response) else: break return results def search_tracks(query: str, limit: int = 10) -> list[dict]: sp = get_client() # Spotify's search endpoint returns at most 50 results per call; no pagination available response = sp.search(q=query, type="track", limit=min(limit, 50)) results = [] for track in response["tracks"]["items"]: results.append({ "name": track["name"], "artists": [a["name"] for a in track["artists"]], "album": track["album"]["name"], "duration_ms": track["duration_ms"], "duration": _format_duration(track["duration_ms"]), "popularity": track["popularity"], "uri": track["uri"], }) return results def create_playlist(name: str, description: str = "", public: bool = False) -> dict: sp = get_client() user_id = sp.current_user()["id"] playlist = sp.user_playlist_create( user=user_id, name=name, public=public, description=description, ) return { "id": playlist["id"], "name": playlist["name"], "uri": playlist["uri"], "public": playlist["public"], } def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict: if not track_uris: raise ValueError("track_uris must not be empty") sp = get_client() for i in range(0, len(track_uris), 100): sp.playlist_add_items(playlist_id, track_uris[i : i + 100]) # Only reached if all batches succeed — any SpotifyException propagates to the caller return {"added": len(track_uris)}