competitive-programming-assistant / topcoder_mcp_client.py
Millionaire-456's picture
Upload 4 files
9d445ba verified
import asyncio
import aiohttp
import json
import uuid
import datetime
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class MCPResponse:
"""Structured response from MCP server"""
success: bool
data: Any = None
error: str = None
class TopcoderMCPClient:
"""
Modern MCP Client for TopCoder API using Streamable HTTP transport
Based on MCP specification 2025-03-26
"""
def __init__(self, base_url: str = "https://api.topcoder-dev.com/v6/mcp"):
self.base_url = base_url
self.session_id = str(uuid.uuid4())
self.request_id = 1
self.initialized = False
# Session configuration
self.timeout = aiohttp.ClientTimeout(total=30)
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "TopCoder-MCP-Client/1.0"
}
def _get_next_id(self) -> int:
"""Generate next request ID"""
self.request_id += 1
return self.request_id
def _create_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Create a JSON-RPC 2.0 request"""
request = {
"jsonrpc": "2.0",
"method": method,
"id": self._get_next_id()
}
if params is not None:
request["params"] = params
return request
async def _send_request(self, request: Dict[str, Any], endpoint: str = "mcp") -> MCPResponse:
"""Send request using Streamable HTTP transport"""
try:
url = f"{self.base_url}/{endpoint}"
logger.info(f"Sending request to {url}: {request['method']}")
logger.debug(f"Request payload: {json.dumps(request, indent=2)}")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(
url,
json=request,
headers=self.headers
) as response:
logger.info(f"Response status: {response.status}")
logger.debug(f"Response headers: {dict(response.headers)}")
if response.status != 200:
error_text = await response.text()
logger.error(f"HTTP Error {response.status}: {error_text}")
return MCPResponse(
success=False,
error=f"HTTP {response.status}: {error_text}"
)
# Handle different content types
content_type = response.headers.get('content-type', '').lower()
if 'text/event-stream' in content_type:
# Handle SSE response
return await self._parse_sse_response(response)
elif 'application/json' in content_type:
# Handle JSON response
data = await response.json()
logger.debug(f"JSON response: {json.dumps(data, indent=2)}")
if 'error' in data:
return MCPResponse(
success=False,
error=data['error'].get('message', 'Unknown MCP error')
)
return MCPResponse(success=True, data=data.get('result'))
else:
# Handle plain text response
text = await response.text()
logger.debug(f"Text response: {text}")
return MCPResponse(success=True, data=text)
except asyncio.TimeoutError:
logger.error("Request timeout")
return MCPResponse(success=False, error="Request timeout")
except aiohttp.ClientError as e:
logger.error(f"Client error: {e}")
return MCPResponse(success=False, error=f"Client error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
return MCPResponse(success=False, error=f"Unexpected error: {e}")
async def _parse_sse_response(self, response) -> MCPResponse:
"""Parse Server-Sent Events response"""
try:
text = await response.text()
logger.debug(f"SSE Response: {text}")
# Parse SSE format
lines = text.strip().split('\n')
data_lines = []
for line in lines:
line = line.strip()
if line.startswith('data: '):
data_content = line[6:] # Remove 'data: ' prefix
if data_content and data_content != '[DONE]':
try:
data = json.loads(data_content)
data_lines.append(data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in SSE data: {data_content}")
if data_lines:
# Return the last complete response
last_response = data_lines[-1]
if 'error' in last_response:
return MCPResponse(
success=False,
error=last_response['error'].get('message', 'SSE error')
)
return MCPResponse(success=True, data=last_response.get('result'))
return MCPResponse(success=False, error="No valid data in SSE response")
except Exception as e:
logger.error(f"Error parsing SSE response: {e}")
return MCPResponse(success=False, error=f"SSE parsing error: {e}")
async def initialize(self) -> MCPResponse:
"""Initialize MCP session"""
if self.initialized:
return MCPResponse(success=True, data="Already initialized")
logger.info("Initializing MCP session...")
params = {
"protocolVersion": "2025-03-26", # Use latest protocol version
"capabilities": {
"tools": {}
},
"clientInfo": {
"name": "topcoder-mcp-client",
"version": "1.0.0"
}
}
request = self._create_request("initialize", params)
response = await self._send_request(request)
if response.success:
self.initialized = True
logger.info("MCP session initialized successfully")
# Extract server capabilities if available
if response.data and isinstance(response.data, dict):
server_info = response.data.get('serverInfo', {})
capabilities = response.data.get('capabilities', {})
logger.info(f"Server: {server_info.get('name', 'Unknown')} v{server_info.get('version', 'Unknown')}")
logger.info(f"Server capabilities: {list(capabilities.keys())}")
return response
async def list_tools(self) -> MCPResponse:
"""List available MCP tools"""
if not self.initialized:
init_response = await self.initialize()
if not init_response.success:
return init_response
logger.info("Listing available tools...")
request = self._create_request("tools/list")
return await self._send_request(request)
async def list_resources(self) -> MCPResponse:
"""List available MCP resources"""
if not self.initialized:
init_response = await self.initialize()
if not init_response.success:
return init_response
logger.info("Listing available resources...")
request = self._create_request("resources/list")
return await self._send_request(request)
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> MCPResponse:
"""Call an MCP tool"""
if not self.initialized:
init_response = await self.initialize()
if not init_response.success:
return init_response
logger.info(f"Calling tool: {tool_name}")
params = {
"name": tool_name
}
if arguments:
params["arguments"] = arguments
request = self._create_request("tools/call", params)
return await self._send_request(request)
async def query_challenges(self, **kwargs) -> MCPResponse:
"""Query TopCoder challenges using MCP"""
# Common challenge query parameters
params = {
"status": kwargs.get("status", "Completed"),
"perPage": kwargs.get("per_page", 10),
"page": kwargs.get("page", 1),
"sortBy": kwargs.get("sort_by", "startDate"),
"sortOrder": kwargs.get("sort_order", "desc")
}
# Add additional parameters if provided
for key, value in kwargs.items():
if key not in ["status", "per_page", "page", "sort_by", "sort_order"] and value is not None:
params[key] = value
return await self.call_tool("query-tc-challenges", params)
async def get_challenge_details(self, challenge_id: str) -> MCPResponse:
"""Get details for a specific challenge"""
return await self.call_tool("get-tc-challenge", {"id": challenge_id})
async def search_members(self, query: str, limit: int = 10) -> MCPResponse:
"""Search TopCoder members"""
return await self.call_tool("search-tc-members", {"query": query, "limit": limit})
async def ping(self) -> MCPResponse:
"""Ping the MCP server"""
request = self._create_request("ping")
return await self._send_request(request)
# Utility functions for easy usage
async def create_mcp_client(base_url: str = None) -> TopcoderMCPClient:
"""Create and initialize MCP client"""
if base_url is None:
base_url = "https://api.topcoder-dev.com/v6/mcp"
client = TopcoderMCPClient(base_url)
init_response = await client.initialize()
if not init_response.success:
logger.error(f"Failed to initialize MCP client: {init_response.error}")
raise RuntimeError(f"MCP initialization failed: {init_response.error}")
return client
# Example usage and testing
async def test_mcp_connection():
"""Test MCP connection and basic functionality"""
try:
logger.info("πŸ”— Testing TopCoder MCP Connection...")
# Create and initialize client
client = await create_mcp_client()
# Test ping
logger.info("πŸ“‘ Testing ping...")
ping_response = await client.ping()
if ping_response.success:
logger.info("βœ… Ping successful")
else:
logger.warning(f"⚠️ Ping failed: {ping_response.error}")
# List available tools
logger.info("πŸ”§ Listing available tools...")
tools_response = await client.list_tools()
if tools_response.success and tools_response.data:
tools = tools_response.data.get("tools", [])
logger.info(f"βœ… Found {len(tools)} tools:")
for tool in tools[:5]: # Show first 5 tools
logger.info(f" - {tool.get('name', 'Unknown')}: {tool.get('description', 'No description')}")
else:
logger.warning(f"⚠️ Failed to list tools: {tools_response.error}")
# List available resources
logger.info("πŸ“š Listing available resources...")
resources_response = await client.list_resources()
if resources_response.success and resources_response.data:
resources = resources_response.data.get("resources", [])
logger.info(f"βœ… Found {len(resources)} resources:")
for resource in resources[:5]: # Show first 5 resources
logger.info(f" - {resource.get('uri', 'Unknown')}: {resource.get('description', 'No description')}")
else:
logger.warning(f"⚠️ Failed to list resources: {resources_response.error}")
# Query challenges
logger.info("πŸ† Querying challenges...")
challenges_response = await client.query_challenges(
status="Completed",
per_page=3,
page=1
)
if challenges_response.success and challenges_response.data:
challenges = challenges_response.data
if isinstance(challenges, list):
logger.info(f"βœ… Retrieved {len(challenges)} challenges:")
for challenge in challenges:
if isinstance(challenge, dict):
name = challenge.get('name', 'Unknown')
challenge_id = challenge.get('id', 'Unknown')
logger.info(f" - {name} (ID: {challenge_id})")
else:
logger.info(f"βœ… Challenge query response: {challenges}")
else:
logger.warning(f"⚠️ Failed to query challenges: {challenges_response.error}")
logger.info("πŸŽ‰ MCP connection test completed!")
return client
except Exception as e:
logger.error(f"❌ MCP connection test failed: {e}")
raise
# Main execution
if __name__ == "__main__":
async def main():
# Test the MCP client
client = await test_mcp_connection()
# Example: Get more specific challenge data
print("\n" + "="*50)
print("ADDITIONAL EXAMPLES:")
print("="*50)
# Search for algorithm challenges
logger.info("πŸ” Searching for algorithm challenges...")
algo_response = await client.query_challenges(
status="Completed",
track="Algorithm",
per_page=2
)
if algo_response.success:
logger.info("βœ… Algorithm challenges found")
# Test error handling with invalid tool
logger.info("πŸ§ͺ Testing error handling...")
invalid_response = await client.call_tool("invalid-tool-name")
if not invalid_response.success:
logger.info(f"βœ… Error handling works: {invalid_response.error}")
# Run the async main function
asyncio.run(main())