diff --git a/spotify_client.py b/spotify_client.py new file mode 100644 index 0000000..11f1e38 --- /dev/null +++ b/spotify_client.py @@ -0,0 +1,159 @@ +import os +import pathlib + +from dotenv import load_dotenv +import spotipy +from spotipy.oauth2 import SpotifyOAuth +from spotipy.cache_handler import CacheFileHandler + +load_dotenv() + +_SCOPES = " ".join([ + "playlist-read-private", + "playlist-read-collaborative", + "playlist-modify-public", + "playlist-modify-private", + "user-library-read", +]) + +# Cache relative to this file so Claude Code always finds it regardless of cwd +_CACHE_PATH = pathlib.Path(__file__).parent / ".cache" + +_client: spotipy.Spotify | None = None + + +def get_client() -> spotipy.Spotify: + global _client + if _client is None: + client_id = os.environ.get("SPOTIPY_CLIENT_ID") + client_secret = os.environ.get("SPOTIPY_CLIENT_SECRET") + redirect_uri = os.environ.get("SPOTIPY_REDIRECT_URI", "http://localhost:8888/callback") + if not client_id or not client_secret: + raise RuntimeError( + "Missing SPOTIPY_CLIENT_ID or SPOTIPY_CLIENT_SECRET. " + "Copy .env.example to .env and fill in your Spotify app credentials." + ) + _client = spotipy.Spotify( + auth_manager=SpotifyOAuth( + client_id=client_id, + client_secret=client_secret, + redirect_uri=redirect_uri, + scope=_SCOPES, + cache_handler=CacheFileHandler(cache_path=str(_CACHE_PATH)), + ) + ) + return _client + + +def _format_duration(ms: int) -> str: + seconds = ms // 1000 + minutes, seconds = divmod(seconds, 60) + return f"{minutes}:{seconds:02d}" + + +def list_playlists() -> list[dict]: + sp = get_client() + results = [] + response = sp.current_user_playlists(limit=50) + while response: + for item in response["items"]: + results.append({ + "id": item["id"], + "name": item["name"], + "description": item.get("description", ""), + "tracks_total": item["tracks"]["total"], + "public": item["public"], + "uri": item["uri"], + }) + response = sp.next(response) if response["next"] else None + return results + + +def get_playlist_tracks(playlist_id: str) -> list[dict]: + sp = get_client() + results = [] + response = sp.playlist_items(playlist_id, additional_types=["track"]) + while response: + for item in response["items"]: + track = item.get("track") + if not track or track.get("type") != "track": + continue + results.append({ + "name": track["name"], + "artists": [a["name"] for a in track["artists"]], + "album": track["album"]["name"], + "duration_ms": track["duration_ms"], + "duration": _format_duration(track["duration_ms"]), + "popularity": track["popularity"], + "uri": track["uri"], + "added_at": item["added_at"], + }) + response = sp.next(response) if response["next"] else None + return results + + +def list_saved_tracks(limit: int = 50) -> list[dict]: + sp = get_client() + results = [] + response = sp.current_user_saved_tracks(limit=min(limit, 50)) + while response and len(results) < limit: + for item in response["items"]: + if len(results) >= limit: + break + track = item["track"] + results.append({ + "name": track["name"], + "artists": [a["name"] for a in track["artists"]], + "album": track["album"]["name"], + "duration_ms": track["duration_ms"], + "duration": _format_duration(track["duration_ms"]), + "popularity": track["popularity"], + "uri": track["uri"], + "added_at": item["added_at"], + }) + if response["next"] and len(results) < limit: + response = sp.next(response) + else: + break + return results + + +def search_tracks(query: str, limit: int = 10) -> list[dict]: + sp = get_client() + response = sp.search(q=query, type="track", limit=min(limit, 50)) + results = [] + for track in response["tracks"]["items"]: + results.append({ + "name": track["name"], + "artists": [a["name"] for a in track["artists"]], + "album": track["album"]["name"], + "duration_ms": track["duration_ms"], + "duration": _format_duration(track["duration_ms"]), + "popularity": track["popularity"], + "uri": track["uri"], + }) + return results + + +def create_playlist(name: str, description: str = "", public: bool = False) -> dict: + sp = get_client() + user_id = sp.current_user()["id"] + playlist = sp.user_playlist_create( + user=user_id, + name=name, + public=public, + description=description, + ) + return { + "id": playlist["id"], + "name": playlist["name"], + "uri": playlist["uri"], + "public": playlist["public"], + } + + +def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict: + sp = get_client() + for i in range(0, len(track_uris), 100): + sp.playlist_add_items(playlist_id, track_uris[i : i + 100]) + return {"added": len(track_uris)}