import os import pathlib from dotenv import load_dotenv import spotipy from spotipy.oauth2 import SpotifyOAuth from spotipy.cache_handler import CacheFileHandler load_dotenv() _SCOPES = " ".join([ "playlist-read-private", "playlist-read-collaborative", "playlist-modify-public", "playlist-modify-private", "user-library-read", ]) # Cache relative to this file so Claude Code always finds it regardless of cwd _CACHE_PATH = pathlib.Path(__file__).parent / ".cache" _client: spotipy.Spotify | None = None def get_client() -> spotipy.Spotify: global _client if _client is None: client_id = os.environ.get("SPOTIPY_CLIENT_ID") client_secret = os.environ.get("SPOTIPY_CLIENT_SECRET") redirect_uri = os.environ.get("SPOTIPY_REDIRECT_URI", "http://localhost:8888/callback") if not client_id or not client_secret: raise RuntimeError( "Missing SPOTIPY_CLIENT_ID or SPOTIPY_CLIENT_SECRET. " "Copy .env.example to .env and fill in your Spotify app credentials." ) _client = spotipy.Spotify( auth_manager=SpotifyOAuth( client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope=_SCOPES, cache_handler=CacheFileHandler(cache_path=str(_CACHE_PATH)), ) ) return _client def _format_duration(ms: int) -> str: if ms is None: return "unknown" seconds = ms // 1000 minutes, seconds = divmod(seconds, 60) return f"{minutes}:{seconds:02d}" def list_playlists() -> list[dict]: sp = get_client() results = [] response = sp.current_user_playlists(limit=50) while response: for item in response["items"]: results.append({ "id": item["id"], "name": item["name"], "description": item.get("description", ""), "tracks_total": item["tracks"]["total"], "public": item["public"], "uri": item["uri"], }) response = sp.next(response) if response["next"] else None return results def get_playlist_tracks(playlist_id: str) -> list[dict]: sp = get_client() results = [] response = sp.playlist_items(playlist_id, additional_types=["track"]) while response: for item in response["items"]: track = item.get("track") if not track or track.get("type") != "track": continue results.append({ "name": track["name"], "artists": [a["name"] for a in track["artists"]], "album": track["album"]["name"], "duration_ms": track["duration_ms"], "duration": _format_duration(track["duration_ms"]), "popularity": track["popularity"], "uri": track["uri"], "added_at": item["added_at"], }) response = sp.next(response) if response["next"] else None return results def list_saved_tracks(limit: int = 50) -> list[dict]: sp = get_client() results = [] response = sp.current_user_saved_tracks(limit=min(limit, 50)) while response and len(results) < limit: for item in response["items"]: if len(results) >= limit: break track = item["track"] results.append({ "name": track["name"], "artists": [a["name"] for a in track["artists"]], "album": track["album"]["name"], "duration_ms": track["duration_ms"], "duration": _format_duration(track["duration_ms"]), "popularity": track["popularity"], "uri": track["uri"], "added_at": item["added_at"], }) if response["next"] and len(results) < limit: response = sp.next(response) else: break return results def search_tracks(query: str, limit: int = 10) -> list[dict]: sp = get_client() # Spotify's search endpoint returns at most 50 results per call; no pagination available response = sp.search(q=query, type="track", limit=min(limit, 50)) results = [] for track in response["tracks"]["items"]: results.append({ "name": track["name"], "artists": [a["name"] for a in track["artists"]], "album": track["album"]["name"], "duration_ms": track["duration_ms"], "duration": _format_duration(track["duration_ms"]), "popularity": track["popularity"], "uri": track["uri"], }) return results def create_playlist(name: str, description: str = "", public: bool = False) -> dict: sp = get_client() user_id = sp.current_user()["id"] playlist = sp.user_playlist_create( user=user_id, name=name, public=public, description=description, ) return { "id": playlist["id"], "name": playlist["name"], "uri": playlist["uri"], "public": playlist["public"], } def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict: if not track_uris: raise ValueError("track_uris must not be empty") sp = get_client() for i in range(0, len(track_uris), 100): sp.playlist_add_items(playlist_id, track_uris[i : i + 100]) # Only reached if all batches succeed — any SpotifyException propagates to the caller return {"added": len(track_uris)}