Files
mcp-spotify/spotify_client.py

166 lines
5.5 KiB
Python
Raw Normal View History

2026-05-22 22:23:16 +02:00
import os
import pathlib
from dotenv import load_dotenv
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from spotipy.cache_handler import CacheFileHandler
load_dotenv()
_SCOPES = " ".join([
"playlist-read-private",
"playlist-read-collaborative",
"playlist-modify-public",
"playlist-modify-private",
"user-library-read",
])
# Cache relative to this file so Claude Code always finds it regardless of cwd
_CACHE_PATH = pathlib.Path(__file__).parent / ".cache"
_client: spotipy.Spotify | None = None
def get_client() -> spotipy.Spotify:
global _client
if _client is None:
client_id = os.environ.get("SPOTIPY_CLIENT_ID")
client_secret = os.environ.get("SPOTIPY_CLIENT_SECRET")
redirect_uri = os.environ.get("SPOTIPY_REDIRECT_URI", "http://localhost:8888/callback")
if not client_id or not client_secret:
raise RuntimeError(
"Missing SPOTIPY_CLIENT_ID or SPOTIPY_CLIENT_SECRET. "
"Copy .env.example to .env and fill in your Spotify app credentials."
)
_client = spotipy.Spotify(
auth_manager=SpotifyOAuth(
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
scope=_SCOPES,
cache_handler=CacheFileHandler(cache_path=str(_CACHE_PATH)),
)
)
return _client
def _format_duration(ms: int) -> str:
if ms is None:
return "unknown"
2026-05-22 22:23:16 +02:00
seconds = ms // 1000
minutes, seconds = divmod(seconds, 60)
return f"{minutes}:{seconds:02d}"
def list_playlists() -> list[dict]:
sp = get_client()
results = []
response = sp.current_user_playlists(limit=50)
while response:
for item in response["items"]:
results.append({
"id": item["id"],
"name": item["name"],
"description": item.get("description", ""),
"tracks_total": (item.get("tracks") or {}).get("total", 0),
2026-05-22 22:23:16 +02:00
"public": item["public"],
"uri": item["uri"],
})
response = sp.next(response) if response["next"] else None
return results
def get_playlist_tracks(playlist_id: str) -> list[dict]:
sp = get_client()
results = []
response = sp.playlist_items(playlist_id, additional_types=["track"])
while response:
for item in response["items"]:
track = item.get("track")
if not track or track.get("type") != "track":
continue
results.append({
"name": track["name"],
"artists": [a["name"] for a in track["artists"]],
"album": track["album"]["name"],
"duration_ms": track["duration_ms"],
"duration": _format_duration(track["duration_ms"]),
"popularity": track["popularity"],
"uri": track["uri"],
"added_at": item["added_at"],
})
response = sp.next(response) if response["next"] else None
return results
def list_saved_tracks(limit: int = 50) -> list[dict]:
sp = get_client()
results = []
response = sp.current_user_saved_tracks(limit=min(limit, 50))
while response and len(results) < limit:
for item in response["items"]:
if len(results) >= limit:
break
track = item["track"]
results.append({
"name": track["name"],
"artists": [a["name"] for a in track["artists"]],
"album": track["album"]["name"],
"duration_ms": track["duration_ms"],
"duration": _format_duration(track["duration_ms"]),
"popularity": track["popularity"],
"uri": track["uri"],
"added_at": item["added_at"],
})
if response["next"] and len(results) < limit:
response = sp.next(response)
else:
break
return results
def search_tracks(query: str, limit: int = 10) -> list[dict]:
sp = get_client()
# Spotify's search endpoint returns at most 50 results per call; no pagination available
2026-05-22 22:23:16 +02:00
response = sp.search(q=query, type="track", limit=min(limit, 50))
results = []
for track in response["tracks"]["items"]:
results.append({
"name": track["name"],
"artists": [a["name"] for a in track["artists"]],
"album": track["album"]["name"],
"duration_ms": track["duration_ms"],
"duration": _format_duration(track["duration_ms"]),
"popularity": track["popularity"],
"uri": track["uri"],
})
return results
def create_playlist(name: str, description: str = "", public: bool = False) -> dict:
sp = get_client()
user_id = sp.current_user()["id"]
playlist = sp.user_playlist_create(
user=user_id,
name=name,
public=public,
description=description,
)
return {
"id": playlist["id"],
"name": playlist["name"],
"uri": playlist["uri"],
"public": playlist["public"],
}
def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict:
if not track_uris:
raise ValueError("track_uris must not be empty")
2026-05-22 22:23:16 +02:00
sp = get_client()
for i in range(0, len(track_uris), 100):
sp.playlist_add_items(playlist_id, track_uris[i : i + 100])
# Only reached if all batches succeed — any SpotifyException propagates to the caller
2026-05-22 22:23:16 +02:00
return {"added": len(track_uris)}