import asyncio import aiohttp import json import uuid import datetime import logging from typing import Dict, Any, Optional, List from dataclasses import dataclass # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class MCPResponse: """Structured response from MCP server""" success: bool data: Any = None error: str = None class TopcoderMCPClient: """ Modern MCP Client for TopCoder API using Streamable HTTP transport Based on MCP specification 2025-03-26 """ def __init__(self, base_url: str = "https://api.topcoder-dev.com/v6/mcp"): self.base_url = base_url self.session_id = str(uuid.uuid4()) self.request_id = 1 self.initialized = False # Session configuration self.timeout = aiohttp.ClientTimeout(total=30) self.headers = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "User-Agent": "TopCoder-MCP-Client/1.0" } def _get_next_id(self) -> int: """Generate next request ID""" self.request_id += 1 return self.request_id def _create_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Create a JSON-RPC 2.0 request""" request = { "jsonrpc": "2.0", "method": method, "id": self._get_next_id() } if params is not None: request["params"] = params return request async def _send_request(self, request: Dict[str, Any], endpoint: str = "mcp") -> MCPResponse: """Send request using Streamable HTTP transport""" try: url = f"{self.base_url}/{endpoint}" logger.info(f"Sending request to {url}: {request['method']}") logger.debug(f"Request payload: {json.dumps(request, indent=2)}") async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.post( url, json=request, headers=self.headers ) as response: logger.info(f"Response status: {response.status}") logger.debug(f"Response headers: {dict(response.headers)}") if response.status != 200: error_text = await response.text() logger.error(f"HTTP Error {response.status}: {error_text}") return MCPResponse( success=False, error=f"HTTP {response.status}: {error_text}" ) # Handle different content types content_type = response.headers.get('content-type', '').lower() if 'text/event-stream' in content_type: # Handle SSE response return await self._parse_sse_response(response) elif 'application/json' in content_type: # Handle JSON response data = await response.json() logger.debug(f"JSON response: {json.dumps(data, indent=2)}") if 'error' in data: return MCPResponse( success=False, error=data['error'].get('message', 'Unknown MCP error') ) return MCPResponse(success=True, data=data.get('result')) else: # Handle plain text response text = await response.text() logger.debug(f"Text response: {text}") return MCPResponse(success=True, data=text) except asyncio.TimeoutError: logger.error("Request timeout") return MCPResponse(success=False, error="Request timeout") except aiohttp.ClientError as e: logger.error(f"Client error: {e}") return MCPResponse(success=False, error=f"Client error: {e}") except Exception as e: logger.error(f"Unexpected error: {e}") return MCPResponse(success=False, error=f"Unexpected error: {e}") async def _parse_sse_response(self, response) -> MCPResponse: """Parse Server-Sent Events response""" try: text = await response.text() logger.debug(f"SSE Response: {text}") # Parse SSE format lines = text.strip().split('\n') data_lines = [] for line in lines: line = line.strip() if line.startswith('data: '): data_content = line[6:] # Remove 'data: ' prefix if data_content and data_content != '[DONE]': try: data = json.loads(data_content) data_lines.append(data) except json.JSONDecodeError: logger.warning(f"Invalid JSON in SSE data: {data_content}") if data_lines: # Return the last complete response last_response = data_lines[-1] if 'error' in last_response: return MCPResponse( success=False, error=last_response['error'].get('message', 'SSE error') ) return MCPResponse(success=True, data=last_response.get('result')) return MCPResponse(success=False, error="No valid data in SSE response") except Exception as e: logger.error(f"Error parsing SSE response: {e}") return MCPResponse(success=False, error=f"SSE parsing error: {e}") async def initialize(self) -> MCPResponse: """Initialize MCP session""" if self.initialized: return MCPResponse(success=True, data="Already initialized") logger.info("Initializing MCP session...") params = { "protocolVersion": "2025-03-26", # Use latest protocol version "capabilities": { "tools": {} }, "clientInfo": { "name": "topcoder-mcp-client", "version": "1.0.0" } } request = self._create_request("initialize", params) response = await self._send_request(request) if response.success: self.initialized = True logger.info("MCP session initialized successfully") # Extract server capabilities if available if response.data and isinstance(response.data, dict): server_info = response.data.get('serverInfo', {}) capabilities = response.data.get('capabilities', {}) logger.info(f"Server: {server_info.get('name', 'Unknown')} v{server_info.get('version', 'Unknown')}") logger.info(f"Server capabilities: {list(capabilities.keys())}") return response async def list_tools(self) -> MCPResponse: """List available MCP tools""" if not self.initialized: init_response = await self.initialize() if not init_response.success: return init_response logger.info("Listing available tools...") request = self._create_request("tools/list") return await self._send_request(request) async def list_resources(self) -> MCPResponse: """List available MCP resources""" if not self.initialized: init_response = await self.initialize() if not init_response.success: return init_response logger.info("Listing available resources...") request = self._create_request("resources/list") return await self._send_request(request) async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> MCPResponse: """Call an MCP tool""" if not self.initialized: init_response = await self.initialize() if not init_response.success: return init_response logger.info(f"Calling tool: {tool_name}") params = { "name": tool_name } if arguments: params["arguments"] = arguments request = self._create_request("tools/call", params) return await self._send_request(request) async def query_challenges(self, **kwargs) -> MCPResponse: """Query TopCoder challenges using MCP""" # Common challenge query parameters params = { "status": kwargs.get("status", "Completed"), "perPage": kwargs.get("per_page", 10), "page": kwargs.get("page", 1), "sortBy": kwargs.get("sort_by", "startDate"), "sortOrder": kwargs.get("sort_order", "desc") } # Add additional parameters if provided for key, value in kwargs.items(): if key not in ["status", "per_page", "page", "sort_by", "sort_order"] and value is not None: params[key] = value return await self.call_tool("query-tc-challenges", params) async def get_challenge_details(self, challenge_id: str) -> MCPResponse: """Get details for a specific challenge""" return await self.call_tool("get-tc-challenge", {"id": challenge_id}) async def search_members(self, query: str, limit: int = 10) -> MCPResponse: """Search TopCoder members""" return await self.call_tool("search-tc-members", {"query": query, "limit": limit}) async def ping(self) -> MCPResponse: """Ping the MCP server""" request = self._create_request("ping") return await self._send_request(request) # Utility functions for easy usage async def create_mcp_client(base_url: str = None) -> TopcoderMCPClient: """Create and initialize MCP client""" if base_url is None: base_url = "https://api.topcoder-dev.com/v6/mcp" client = TopcoderMCPClient(base_url) init_response = await client.initialize() if not init_response.success: logger.error(f"Failed to initialize MCP client: {init_response.error}") raise RuntimeError(f"MCP initialization failed: {init_response.error}") return client # Example usage and testing async def test_mcp_connection(): """Test MCP connection and basic functionality""" try: logger.info("๐Ÿ”— Testing TopCoder MCP Connection...") # Create and initialize client client = await create_mcp_client() # Test ping logger.info("๐Ÿ“ก Testing ping...") ping_response = await client.ping() if ping_response.success: logger.info("โœ… Ping successful") else: logger.warning(f"โš ๏ธ Ping failed: {ping_response.error}") # List available tools logger.info("๐Ÿ”ง Listing available tools...") tools_response = await client.list_tools() if tools_response.success and tools_response.data: tools = tools_response.data.get("tools", []) logger.info(f"โœ… Found {len(tools)} tools:") for tool in tools[:5]: # Show first 5 tools logger.info(f" - {tool.get('name', 'Unknown')}: {tool.get('description', 'No description')}") else: logger.warning(f"โš ๏ธ Failed to list tools: {tools_response.error}") # List available resources logger.info("๐Ÿ“š Listing available resources...") resources_response = await client.list_resources() if resources_response.success and resources_response.data: resources = resources_response.data.get("resources", []) logger.info(f"โœ… Found {len(resources)} resources:") for resource in resources[:5]: # Show first 5 resources logger.info(f" - {resource.get('uri', 'Unknown')}: {resource.get('description', 'No description')}") else: logger.warning(f"โš ๏ธ Failed to list resources: {resources_response.error}") # Query challenges logger.info("๐Ÿ† Querying challenges...") challenges_response = await client.query_challenges( status="Completed", per_page=3, page=1 ) if challenges_response.success and challenges_response.data: challenges = challenges_response.data if isinstance(challenges, list): logger.info(f"โœ… Retrieved {len(challenges)} challenges:") for challenge in challenges: if isinstance(challenge, dict): name = challenge.get('name', 'Unknown') challenge_id = challenge.get('id', 'Unknown') logger.info(f" - {name} (ID: {challenge_id})") else: logger.info(f"โœ… Challenge query response: {challenges}") else: logger.warning(f"โš ๏ธ Failed to query challenges: {challenges_response.error}") logger.info("๐ŸŽ‰ MCP connection test completed!") return client except Exception as e: logger.error(f"โŒ MCP connection test failed: {e}") raise # Main execution if __name__ == "__main__": async def main(): # Test the MCP client client = await test_mcp_connection() # Example: Get more specific challenge data print("\n" + "="*50) print("ADDITIONAL EXAMPLES:") print("="*50) # Search for algorithm challenges logger.info("๐Ÿ” Searching for algorithm challenges...") algo_response = await client.query_challenges( status="Completed", track="Algorithm", per_page=2 ) if algo_response.success: logger.info("โœ… Algorithm challenges found") # Test error handling with invalid tool logger.info("๐Ÿงช Testing error handling...") invalid_response = await client.call_tool("invalid-tool-name") if not invalid_response.success: logger.info(f"โœ… Error handling works: {invalid_response.error}") # Run the async main function asyncio.run(main())