| | """
|
| | Real MCP Integration - Replace Mock Data with Live Topcoder MCP
|
| | This replaces your SimpleIntelligenceEngine with real MCP integration
|
| | """
|
| | import asyncio
|
| | import httpx
|
| | import json
|
| | import logging
|
| | from typing import List, Dict, Any, Optional
|
| | from dataclasses import dataclass, asdict
|
| | from datetime import datetime, timedelta
|
| |
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | @dataclass
|
| | class Challenge:
|
| | id: str
|
| | title: str
|
| | description: str
|
| | technologies: List[str]
|
| | difficulty: str
|
| | prize: str
|
| | time_estimate: str
|
| | compatibility_score: float = 0.0
|
| | rationale: str = ""
|
| |
|
| | @dataclass
|
| | class Skill:
|
| | name: str
|
| | category: str
|
| | description: str
|
| | relevance_score: float = 0.0
|
| |
|
| | @dataclass
|
| | class UserProfile:
|
| | skills: List[str]
|
| | experience_level: str
|
| | time_available: str
|
| | interests: List[str]
|
| |
|
| | class RealMCPIntelligenceEngine:
|
| | """Production MCP Integration - Real Topcoder Data"""
|
| |
|
| | def __init__(self):
|
| | self.mcp_url = "https://api.topcoder-dev.com/v6/mcp"
|
| | self.session_id = None
|
| | self.is_connected = False
|
| | self.challenges_cache = {}
|
| | self.skills_cache = {}
|
| | self.cache_expiry = None
|
| |
|
| |
|
| | asyncio.create_task(self.initialize_connection())
|
| |
|
| | async def initialize_connection(self):
|
| | """Initialize MCP connection and authenticate if needed"""
|
| | try:
|
| | async with httpx.AsyncClient(timeout=30.0) as client:
|
| |
|
| |
|
| | init_request = {
|
| | "jsonrpc": "2.0",
|
| | "id": 1,
|
| | "method": "initialize",
|
| | "params": {
|
| | "protocolVersion": "2024-11-05",
|
| | "capabilities": {
|
| | "roots": {"listChanged": True},
|
| | "sampling": {}
|
| | },
|
| | "clientInfo": {
|
| | "name": "topcoder-intelligence-assistant",
|
| | "version": "1.0.0"
|
| | }
|
| | }
|
| | }
|
| |
|
| | headers = {
|
| | "Content-Type": "application/json",
|
| | "Accept": "application/json, text/event-stream"
|
| | }
|
| |
|
| | response = await client.post(
|
| | f"{self.mcp_url}/mcp",
|
| | json=init_request,
|
| | headers=headers
|
| | )
|
| |
|
| | if response.status_code == 200:
|
| | result = response.json()
|
| | if "result" in result:
|
| | self.is_connected = True
|
| | logger.info("✅ MCP Connection established")
|
| |
|
| |
|
| | server_info = result["result"].get("serverInfo", {})
|
| | if "sessionId" in server_info:
|
| | self.session_id = server_info["sessionId"]
|
| | logger.info(f"🔑 Session ID obtained: {self.session_id[:10]}...")
|
| |
|
| | return True
|
| |
|
| | logger.warning(f"⚠️ MCP initialization failed: {response.status_code}")
|
| | return False
|
| |
|
| | except Exception as e:
|
| | logger.error(f"❌ MCP connection failed: {e}")
|
| | return False
|
| |
|
| | async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict]:
|
| | """Call an MCP tool with proper error handling"""
|
| |
|
| | if not self.is_connected:
|
| | await self.initialize_connection()
|
| |
|
| | try:
|
| | async with httpx.AsyncClient(timeout=60.0) as client:
|
| |
|
| | request_data = {
|
| | "jsonrpc": "2.0",
|
| | "id": datetime.now().timestamp(),
|
| | "method": "tools/call",
|
| | "params": {
|
| | "name": tool_name,
|
| | "arguments": arguments
|
| | }
|
| | }
|
| |
|
| | headers = {
|
| | "Content-Type": "application/json",
|
| | "Accept": "application/json"
|
| | }
|
| |
|
| |
|
| | if self.session_id:
|
| | headers["X-Session-ID"] = self.session_id
|
| |
|
| | response = await client.post(
|
| | f"{self.mcp_url}/mcp",
|
| | json=request_data,
|
| | headers=headers
|
| | )
|
| |
|
| | if response.status_code == 200:
|
| | result = response.json()
|
| | if "result" in result:
|
| | return result["result"]
|
| | elif "error" in result:
|
| | logger.error(f"MCP tool error: {result['error']}")
|
| | return None
|
| | else:
|
| | logger.error(f"MCP tool call failed: {response.status_code} - {response.text}")
|
| | return None
|
| |
|
| | except Exception as e:
|
| | logger.error(f"MCP tool call exception: {e}")
|
| | return None
|
| |
|
| | async def fetch_challenges(self, limit: int = 50, technologies: List[str] = None) -> List[Challenge]:
|
| | """Fetch real challenges from Topcoder MCP"""
|
| |
|
| |
|
| | cache_key = f"challenges_{limit}_{technologies}"
|
| | if (self.cache_expiry and datetime.now() < self.cache_expiry and
|
| | cache_key in self.challenges_cache):
|
| | return self.challenges_cache[cache_key]
|
| |
|
| | arguments = {"limit": limit}
|
| | if technologies:
|
| | arguments["technologies"] = technologies
|
| |
|
| | result = await self.call_mcp_tool("query-tc-challenges", arguments)
|
| |
|
| | if result and "content" in result:
|
| | challenges_data = result["content"]
|
| |
|
| | challenges = []
|
| | for item in challenges_data:
|
| | if isinstance(item, dict):
|
| | challenge = Challenge(
|
| | id=str(item.get("id", "")),
|
| | title=item.get("title", "Unknown Challenge"),
|
| | description=item.get("description", "")[:200] + "...",
|
| | technologies=item.get("technologies", []),
|
| | difficulty=item.get("difficulty", "Unknown"),
|
| | prize=f"${item.get('prize', 0):,}",
|
| | time_estimate=f"{item.get('duration', 0)} hours"
|
| | )
|
| | challenges.append(challenge)
|
| |
|
| |
|
| | self.challenges_cache[cache_key] = challenges
|
| | self.cache_expiry = datetime.now() + timedelta(hours=1)
|
| |
|
| | logger.info(f"✅ Fetched {len(challenges)} real challenges from MCP")
|
| | return challenges
|
| |
|
| | logger.warning("❌ Failed to fetch challenges, returning empty list")
|
| | return []
|
| |
|
| | async def fetch_skills(self, category: str = None) -> List[Skill]:
|
| | """Fetch real skills from Topcoder MCP"""
|
| |
|
| | cache_key = f"skills_{category}"
|
| | if (self.cache_expiry and datetime.now() < self.cache_expiry and
|
| | cache_key in self.skills_cache):
|
| | return self.skills_cache[cache_key]
|
| |
|
| | arguments = {}
|
| | if category:
|
| | arguments["category"] = category
|
| |
|
| | result = await self.call_mcp_tool("query-tc-skills", arguments)
|
| |
|
| | if result and "content" in result:
|
| | skills_data = result["content"]
|
| |
|
| | skills = []
|
| | for item in skills_data:
|
| | if isinstance(item, dict):
|
| | skill = Skill(
|
| | name=item.get("name", "Unknown Skill"),
|
| | category=item.get("category", "General"),
|
| | description=item.get("description", "")
|
| | )
|
| | skills.append(skill)
|
| |
|
| | self.skills_cache[cache_key] = skills
|
| | logger.info(f"✅ Fetched {len(skills)} real skills from MCP")
|
| | return skills
|
| |
|
| | logger.warning("❌ Failed to fetch skills, returning empty list")
|
| | return []
|
| |
|
| | def extract_technologies_from_query(self, query: str) -> List[str]:
|
| | """Extract technology keywords from user query"""
|
| | tech_keywords = {
|
| | 'python', 'java', 'javascript', 'react', 'node', 'angular', 'vue',
|
| | 'aws', 'docker', 'kubernetes', 'api', 'rest', 'graphql', 'sql',
|
| | 'mongodb', 'postgresql', 'machine learning', 'ai', 'blockchain',
|
| | 'ios', 'android', 'flutter', 'swift', 'kotlin', 'c++', 'c#',
|
| | 'ruby', 'php', 'go', 'rust', 'typescript', 'html', 'css'
|
| | }
|
| |
|
| | query_lower = query.lower()
|
| | found_techs = [tech for tech in tech_keywords if tech in query_lower]
|
| | return found_techs
|
| |
|
| | def calculate_compatibility_score(self, challenge: Challenge, user_profile: UserProfile, query: str) -> float:
|
| | """Calculate compatibility score using real challenge data"""
|
| |
|
| | score = 0.0
|
| | factors = []
|
| |
|
| |
|
| | user_skills_lower = [skill.lower() for skill in user_profile.skills]
|
| | challenge_techs_lower = [tech.lower() for tech in challenge.technologies]
|
| |
|
| | skill_matches = len(set(user_skills_lower) & set(challenge_techs_lower))
|
| | skill_score = min(skill_matches / max(len(challenge.technologies), 1), 1.0) * 0.4
|
| | score += skill_score
|
| | factors.append(f"Skill match: {skill_matches}/{len(challenge.technologies)} technologies")
|
| |
|
| |
|
| | experience_mapping = {
|
| | "beginner": {"Beginner": 1.0, "Intermediate": 0.7, "Advanced": 0.3},
|
| | "intermediate": {"Beginner": 0.5, "Intermediate": 1.0, "Advanced": 0.8},
|
| | "advanced": {"Beginner": 0.3, "Intermediate": 0.8, "Advanced": 1.0}
|
| | }
|
| |
|
| | exp_score = experience_mapping.get(user_profile.experience_level.lower(), {}).get(challenge.difficulty, 0.5) * 0.3
|
| | score += exp_score
|
| | factors.append(f"Experience match: {user_profile.experience_level} → {challenge.difficulty}")
|
| |
|
| |
|
| | query_techs = self.extract_technologies_from_query(query)
|
| | query_matches = len(set([tech.lower() for tech in query_techs]) & set(challenge_techs_lower))
|
| | query_score = min(query_matches / max(len(query_techs), 1), 1.0) * 0.2 if query_techs else 0.1
|
| | score += query_score
|
| | factors.append(f"Query relevance: {query_matches} matches")
|
| |
|
| |
|
| | time_mapping = {
|
| | "2-4 hours": {"1-2 hours": 1.0, "2-4 hours": 1.0, "4+ hours": 0.7},
|
| | "4-8 hours": {"2-4 hours": 0.8, "4+ hours": 1.0, "1-2 hours": 0.6},
|
| | "8+ hours": {"4+ hours": 1.0, "2-4 hours": 0.7, "1-2 hours": 0.4}
|
| | }
|
| |
|
| | time_score = 0.1
|
| | for user_time, challenge_map in time_mapping.items():
|
| | if user_time in user_profile.time_available:
|
| | time_score = challenge_map.get(challenge.time_estimate, 0.5) * 0.1
|
| | break
|
| |
|
| | score += time_score
|
| | factors.append(f"Time fit: {user_profile.time_available} vs {challenge.time_estimate}")
|
| |
|
| | return min(score, 1.0), factors
|
| |
|
| | async def get_personalized_recommendations(self, user_profile: UserProfile, query: str = "") -> Dict[str, Any]:
|
| | """Get personalized recommendations using real MCP data"""
|
| |
|
| | start_time = datetime.now()
|
| |
|
| |
|
| | query_techs = self.extract_technologies_from_query(query)
|
| | challenges = await self.fetch_challenges(limit=100, technologies=query_techs if query_techs else None)
|
| |
|
| | if not challenges:
|
| |
|
| | return {
|
| | "recommendations": [],
|
| | "insights": {
|
| | "total_challenges": 0,
|
| | "processing_time": f"{(datetime.now() - start_time).total_seconds():.3f}s",
|
| | "data_source": "MCP (No data available)",
|
| | "message": "Unable to fetch real challenge data. Please check MCP connection."
|
| | }
|
| | }
|
| |
|
| |
|
| | scored_challenges = []
|
| | for challenge in challenges:
|
| | score, factors = self.calculate_compatibility_score(challenge, user_profile, query)
|
| | challenge.compatibility_score = score
|
| | challenge.rationale = f"Score: {score:.1%}. " + "; ".join(factors[:2])
|
| | scored_challenges.append(challenge)
|
| |
|
| |
|
| | scored_challenges.sort(key=lambda x: x.compatibility_score, reverse=True)
|
| |
|
| |
|
| | recommendations = scored_challenges[:5]
|
| |
|
| |
|
| | skills = await self.fetch_skills()
|
| |
|
| |
|
| | processing_time = (datetime.now() - start_time).total_seconds()
|
| |
|
| | return {
|
| | "recommendations": [asdict(rec) for rec in recommendations],
|
| | "insights": {
|
| | "total_challenges": len(challenges),
|
| | "average_score": sum(c.compatibility_score for c in challenges) / len(challenges),
|
| | "processing_time": f"{processing_time:.3f}s",
|
| | "data_source": "Real Topcoder MCP",
|
| | "top_score": recommendations[0].compatibility_score if recommendations else 0,
|
| | "skills_available": len(skills),
|
| | "technologies_detected": query_techs,
|
| | "cache_status": "Fresh data" if not self.cache_expiry else "Cached data"
|
| | }
|
| | }
|
| |
|
| |
|
| | async def test_real_mcp_engine():
|
| | """Test the real MCP integration"""
|
| |
|
| | print("🚀 Testing Real MCP Integration")
|
| | print("=" * 50)
|
| |
|
| | engine = RealMCPIntelligenceEngine()
|
| |
|
| |
|
| | await asyncio.sleep(2)
|
| |
|
| | if not engine.is_connected:
|
| | print("❌ MCP connection failed - check authentication")
|
| | return
|
| |
|
| |
|
| | user_profile = UserProfile(
|
| | skills=["Python", "JavaScript", "API"],
|
| | experience_level="Intermediate",
|
| | time_available="4-8 hours",
|
| | interests=["web development", "API integration"]
|
| | )
|
| |
|
| |
|
| | print("\n🧠 Getting Real Recommendations...")
|
| | recommendations = await engine.get_personalized_recommendations(
|
| | user_profile,
|
| | "I want to work on Python API challenges"
|
| | )
|
| |
|
| | print(f"\n📊 Results:")
|
| | print(f" Challenges found: {recommendations['insights']['total_challenges']}")
|
| | print(f" Processing time: {recommendations['insights']['processing_time']}")
|
| | print(f" Data source: {recommendations['insights']['data_source']}")
|
| |
|
| | for i, rec in enumerate(recommendations['recommendations'][:3], 1):
|
| | print(f"\n {i}. {rec['title']}")
|
| | print(f" Score: {rec['compatibility_score']:.1%}")
|
| | print(f" Technologies: {', '.join(rec['technologies'][:3])}")
|
| |
|
| | if __name__ == "__main__":
|
| | asyncio.run(test_real_mcp_engine()) |