2026-05-22 22:23:16 +02:00
|
|
|
import os
|
|
|
|
|
import pathlib
|
|
|
|
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
import spotipy
|
|
|
|
|
from spotipy.oauth2 import SpotifyOAuth
|
|
|
|
|
from spotipy.cache_handler import CacheFileHandler
|
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
|
_SCOPES = " ".join([
|
|
|
|
|
"playlist-read-private",
|
|
|
|
|
"playlist-read-collaborative",
|
|
|
|
|
"playlist-modify-public",
|
|
|
|
|
"playlist-modify-private",
|
|
|
|
|
"user-library-read",
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
# Cache relative to this file so Claude Code always finds it regardless of cwd
|
|
|
|
|
_CACHE_PATH = pathlib.Path(__file__).parent / ".cache"
|
|
|
|
|
|
|
|
|
|
_client: spotipy.Spotify | None = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_client() -> spotipy.Spotify:
|
|
|
|
|
global _client
|
|
|
|
|
if _client is None:
|
|
|
|
|
client_id = os.environ.get("SPOTIPY_CLIENT_ID")
|
|
|
|
|
client_secret = os.environ.get("SPOTIPY_CLIENT_SECRET")
|
|
|
|
|
redirect_uri = os.environ.get("SPOTIPY_REDIRECT_URI", "http://localhost:8888/callback")
|
|
|
|
|
if not client_id or not client_secret:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
"Missing SPOTIPY_CLIENT_ID or SPOTIPY_CLIENT_SECRET. "
|
|
|
|
|
"Copy .env.example to .env and fill in your Spotify app credentials."
|
|
|
|
|
)
|
|
|
|
|
_client = spotipy.Spotify(
|
|
|
|
|
auth_manager=SpotifyOAuth(
|
|
|
|
|
client_id=client_id,
|
|
|
|
|
client_secret=client_secret,
|
|
|
|
|
redirect_uri=redirect_uri,
|
|
|
|
|
scope=_SCOPES,
|
|
|
|
|
cache_handler=CacheFileHandler(cache_path=str(_CACHE_PATH)),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
return _client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_duration(ms: int) -> str:
|
|
|
|
|
seconds = ms // 1000
|
|
|
|
|
minutes, seconds = divmod(seconds, 60)
|
|
|
|
|
return f"{minutes}:{seconds:02d}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_playlists() -> list[dict]:
|
|
|
|
|
sp = get_client()
|
|
|
|
|
results = []
|
|
|
|
|
response = sp.current_user_playlists(limit=50)
|
|
|
|
|
while response:
|
|
|
|
|
for item in response["items"]:
|
|
|
|
|
results.append({
|
|
|
|
|
"id": item["id"],
|
|
|
|
|
"name": item["name"],
|
|
|
|
|
"description": item.get("description", ""),
|
|
|
|
|
"tracks_total": item["tracks"]["total"],
|
|
|
|
|
"public": item["public"],
|
|
|
|
|
"uri": item["uri"],
|
|
|
|
|
})
|
|
|
|
|
response = sp.next(response) if response["next"] else None
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_playlist_tracks(playlist_id: str) -> list[dict]:
|
|
|
|
|
sp = get_client()
|
|
|
|
|
results = []
|
|
|
|
|
response = sp.playlist_items(playlist_id, additional_types=["track"])
|
|
|
|
|
while response:
|
|
|
|
|
for item in response["items"]:
|
|
|
|
|
track = item.get("track")
|
|
|
|
|
if not track or track.get("type") != "track":
|
|
|
|
|
continue
|
|
|
|
|
results.append({
|
|
|
|
|
"name": track["name"],
|
|
|
|
|
"artists": [a["name"] for a in track["artists"]],
|
|
|
|
|
"album": track["album"]["name"],
|
|
|
|
|
"duration_ms": track["duration_ms"],
|
|
|
|
|
"duration": _format_duration(track["duration_ms"]),
|
|
|
|
|
"popularity": track["popularity"],
|
|
|
|
|
"uri": track["uri"],
|
|
|
|
|
"added_at": item["added_at"],
|
|
|
|
|
})
|
|
|
|
|
response = sp.next(response) if response["next"] else None
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_saved_tracks(limit: int = 50) -> list[dict]:
|
|
|
|
|
sp = get_client()
|
|
|
|
|
results = []
|
|
|
|
|
response = sp.current_user_saved_tracks(limit=min(limit, 50))
|
|
|
|
|
while response and len(results) < limit:
|
|
|
|
|
for item in response["items"]:
|
|
|
|
|
if len(results) >= limit:
|
|
|
|
|
break
|
|
|
|
|
track = item["track"]
|
|
|
|
|
results.append({
|
|
|
|
|
"name": track["name"],
|
|
|
|
|
"artists": [a["name"] for a in track["artists"]],
|
|
|
|
|
"album": track["album"]["name"],
|
|
|
|
|
"duration_ms": track["duration_ms"],
|
|
|
|
|
"duration": _format_duration(track["duration_ms"]),
|
|
|
|
|
"popularity": track["popularity"],
|
|
|
|
|
"uri": track["uri"],
|
|
|
|
|
"added_at": item["added_at"],
|
|
|
|
|
})
|
|
|
|
|
if response["next"] and len(results) < limit:
|
|
|
|
|
response = sp.next(response)
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def search_tracks(query: str, limit: int = 10) -> list[dict]:
|
|
|
|
|
sp = get_client()
|
2026-05-22 22:27:14 +02:00
|
|
|
# Spotify's search endpoint returns at most 50 results per call; no pagination available
|
2026-05-22 22:23:16 +02:00
|
|
|
response = sp.search(q=query, type="track", limit=min(limit, 50))
|
|
|
|
|
results = []
|
|
|
|
|
for track in response["tracks"]["items"]:
|
|
|
|
|
results.append({
|
|
|
|
|
"name": track["name"],
|
|
|
|
|
"artists": [a["name"] for a in track["artists"]],
|
|
|
|
|
"album": track["album"]["name"],
|
|
|
|
|
"duration_ms": track["duration_ms"],
|
|
|
|
|
"duration": _format_duration(track["duration_ms"]),
|
|
|
|
|
"popularity": track["popularity"],
|
|
|
|
|
"uri": track["uri"],
|
|
|
|
|
})
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_playlist(name: str, description: str = "", public: bool = False) -> dict:
|
|
|
|
|
sp = get_client()
|
|
|
|
|
user_id = sp.current_user()["id"]
|
|
|
|
|
playlist = sp.user_playlist_create(
|
|
|
|
|
user=user_id,
|
|
|
|
|
name=name,
|
|
|
|
|
public=public,
|
|
|
|
|
description=description,
|
|
|
|
|
)
|
|
|
|
|
return {
|
|
|
|
|
"id": playlist["id"],
|
|
|
|
|
"name": playlist["name"],
|
|
|
|
|
"uri": playlist["uri"],
|
|
|
|
|
"public": playlist["public"],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict:
|
|
|
|
|
sp = get_client()
|
|
|
|
|
for i in range(0, len(track_uris), 100):
|
|
|
|
|
sp.playlist_add_items(playlist_id, track_uris[i : i + 100])
|
2026-05-22 22:27:14 +02:00
|
|
|
# Only reached if all batches succeed — any SpotifyException propagates to the caller
|
2026-05-22 22:23:16 +02:00
|
|
|
return {"added": len(track_uris)}
|