Files
mcp-spotify/server.py

176 lines
5.9 KiB
Python
Raw Normal View History

import asyncio
import json
import sys
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import spotify_client
server = Server("spotify-mcp")
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
return [
Tool(
name="list_playlists",
description="List the authenticated user's Spotify playlists.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="get_playlist_tracks",
description=(
"Get all tracks in a Spotify playlist with full metadata: "
"name, artists, album, duration_ms, duration (mm:ss), popularity, URI, added_at."
),
inputSchema={
"type": "object",
"properties": {
"playlist_id": {
"type": "string",
"description": "Spotify playlist ID (from list_playlists)",
},
},
"required": ["playlist_id"],
},
),
Tool(
name="list_saved_tracks",
description="Get the user's liked/saved tracks from Spotify.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of tracks to return (default 50)",
},
},
"required": [],
},
),
Tool(
name="search_tracks",
description=(
"Search Spotify for tracks. "
"Returns track URIs that can be passed to add_tracks_to_playlist."
),
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {
"type": "integer",
"description": "Maximum results to return (default 10, max 50)",
},
},
"required": ["query"],
},
),
Tool(
name="create_playlist",
description="Create a new Spotify playlist for the authenticated user.",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Playlist name"},
"description": {
"type": "string",
"description": "Playlist description (optional)",
},
"public": {
"type": "boolean",
"description": "Whether the playlist is public (default false)",
},
},
"required": ["name"],
},
),
Tool(
name="add_tracks_to_playlist",
description="Add tracks to a Spotify playlist by their URIs (e.g. spotify:track:...).",
inputSchema={
"type": "object",
"properties": {
"playlist_id": {
"type": "string",
"description": "Spotify playlist ID",
},
"track_uris": {
"type": "array",
"items": {"type": "string"},
"description": "List of Spotify track URIs",
},
},
"required": ["playlist_id", "track_uris"],
},
),
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict | None) -> list[TextContent]:
arguments = arguments or {}
try:
if name == "list_playlists":
result = await asyncio.to_thread(spotify_client.list_playlists)
elif name == "get_playlist_tracks":
result = await asyncio.to_thread(
spotify_client.get_playlist_tracks, arguments["playlist_id"]
)
elif name == "list_saved_tracks":
result = await asyncio.to_thread(
spotify_client.list_saved_tracks, arguments.get("limit", 50)
)
elif name == "search_tracks":
result = await asyncio.to_thread(
spotify_client.search_tracks,
arguments["query"],
arguments.get("limit", 10),
)
elif name == "create_playlist":
result = await asyncio.to_thread(
spotify_client.create_playlist,
arguments["name"],
arguments.get("description", ""),
arguments.get("public", False),
)
elif name == "add_tracks_to_playlist":
result = await asyncio.to_thread(
spotify_client.add_tracks_to_playlist,
arguments["playlist_id"],
arguments["track_uris"],
)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [TextContent(type="text", text=f"Error: {e}")]
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
async def _run_server() -> None:
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
if "--auth" in sys.argv:
print("Authenticating with Spotify (a browser window will open)...")
spotify_client.get_client()
print("Authentication successful! Token cached.")
sys.exit(0)
try:
spotify_client.get_client()
except RuntimeError as e:
print(f"Startup error: {e}", file=sys.stderr)
sys.exit(1)
asyncio.run(_run_server())