| | """
|
| | FIXED ULTIMATE Topcoder Challenge Intelligence Assistant
|
| | π₯ REAL MCP Integration Fixed - No More Mock Data Fallback
|
| | """
|
| | import asyncio
|
| | import httpx
|
| | import json
|
| | import gradio as gr
|
| | import time
|
| | import os
|
| | from datetime import datetime
|
| | from typing import List, Dict, Any, Optional, Tuple
|
| | from dataclasses import dataclass, asdict
|
| |
|
| | @dataclass
|
| | class Challenge:
|
| | id: str
|
| | title: str
|
| | description: str
|
| | technologies: List[str]
|
| | difficulty: str
|
| | prize: str
|
| | time_estimate: str
|
| | registrants: int = 0
|
| | compatibility_score: float = 0.0
|
| | rationale: str = ""
|
| |
|
| | @dataclass
|
| | class UserProfile:
|
| | skills: List[str]
|
| | experience_level: str
|
| | time_available: str
|
| | interests: List[str]
|
| |
|
| | class UltimateTopcoderMCPEngine:
|
| | """FIXED: Real MCP Integration - No Mock Data Fallback"""
|
| |
|
| | def __init__(self):
|
| | print("π Initializing REAL Topcoder MCP Engine...")
|
| | self.base_url = "https://api.topcoder-dev.com/v6/mcp"
|
| | self.session_id = None
|
| | self.is_connected = False
|
| | self.connection_attempts = 0
|
| | self.max_connection_attempts = 3
|
| | print("π₯ Starting REAL MCP connection process...")
|
| |
|
| | async def initialize_connection(self) -> bool:
|
| | """FIXED: Reliable MCP connection with better error handling"""
|
| | if self.is_connected and self.session_id:
|
| | print(f"β
Already connected with session: {self.session_id[:8]}...")
|
| | return True
|
| |
|
| | self.connection_attempts += 1
|
| | print(f"π Attempting MCP connection (attempt {self.connection_attempts}/{self.max_connection_attempts})")
|
| |
|
| | headers = {
|
| | "Accept": "application/json, text/event-stream, */*",
|
| | "Accept-Language": "en-US,en;q=0.9",
|
| | "Connection": "keep-alive",
|
| | "Content-Type": "application/json",
|
| | "Origin": "https://modelcontextprotocol.io",
|
| | "Referer": "https://modelcontextprotocol.io/",
|
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
| | }
|
| |
|
| | init_request = {
|
| | "jsonrpc": "2.0",
|
| | "id": 0,
|
| | "method": "initialize",
|
| | "params": {
|
| | "protocolVersion": "2024-11-05",
|
| | "capabilities": {
|
| | "experimental": {},
|
| | "sampling": {},
|
| | "roots": {"listChanged": True}
|
| | },
|
| | "clientInfo": {
|
| | "name": "topcoder-intelligence-assistant",
|
| | "version": "2.0.0"
|
| | }
|
| | }
|
| | }
|
| |
|
| | try:
|
| | async with httpx.AsyncClient(timeout=30.0) as client:
|
| | print(f"π Connecting to {self.base_url}/mcp...")
|
| | response = await client.post(
|
| | f"{self.base_url}/mcp",
|
| | json=init_request,
|
| | headers=headers
|
| | )
|
| |
|
| | print(f"π‘ Response status: {response.status_code}")
|
| |
|
| | if response.status_code == 200:
|
| |
|
| | response_headers = dict(response.headers)
|
| | print(f"π Response headers: {list(response_headers.keys())}")
|
| |
|
| |
|
| | session_candidates = [
|
| | response_headers.get('mcp-session-id'),
|
| | response_headers.get('MCP-Session-ID'),
|
| | response_headers.get('session-id'),
|
| | response_headers.get('Session-ID')
|
| | ]
|
| |
|
| | for session_id in session_candidates:
|
| | if session_id:
|
| | self.session_id = session_id
|
| | self.is_connected = True
|
| | print(f"β
REAL MCP connection established!")
|
| | print(f"π Session ID: {self.session_id[:12]}...")
|
| | print(f"π₯ Ready for live data retrieval!")
|
| | return True
|
| |
|
| |
|
| | try:
|
| | response_data = response.json()
|
| | if "result" in response_data:
|
| |
|
| | print("π Checking response body for session info...")
|
| | print(f"Response keys: {list(response_data.get('result', {}).keys())}")
|
| | except:
|
| | pass
|
| |
|
| | print("β οΈ No session ID found in headers or body")
|
| |
|
| | else:
|
| | print(f"β Connection failed with status {response.status_code}")
|
| | print(f"Response: {response.text[:200]}...")
|
| |
|
| | except Exception as e:
|
| | print(f"β MCP connection error: {e}")
|
| |
|
| | if self.connection_attempts < self.max_connection_attempts:
|
| | print(f"π Will retry connection...")
|
| | await asyncio.sleep(1)
|
| | return await self.initialize_connection()
|
| |
|
| | print("β All connection attempts failed - this shouldn't happen if server is accessible")
|
| | return False
|
| |
|
| | async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict]:
|
| | """FIXED: Better tool calling with improved response parsing"""
|
| | if not self.session_id:
|
| | print("β No session ID available for tool call")
|
| | return None
|
| |
|
| | headers = {
|
| | "Accept": "application/json, text/event-stream, */*",
|
| | "Content-Type": "application/json",
|
| | "Origin": "https://modelcontextprotocol.io",
|
| | "mcp-session-id": self.session_id,
|
| | "MCP-Session-ID": self.session_id,
|
| | "session-id": self.session_id,
|
| | "Session-ID": self.session_id
|
| | }
|
| |
|
| | tool_request = {
|
| | "jsonrpc": "2.0",
|
| | "id": int(datetime.now().timestamp() * 1000),
|
| | "method": "tools/call",
|
| | "params": {
|
| | "name": tool_name,
|
| | "arguments": arguments
|
| | }
|
| | }
|
| |
|
| | print(f"π§ Calling tool: {tool_name} with args: {arguments}")
|
| |
|
| | try:
|
| | async with httpx.AsyncClient(timeout=45.0) as client:
|
| | response = await client.post(
|
| | f"{self.base_url}/mcp",
|
| | json=tool_request,
|
| | headers=headers
|
| | )
|
| |
|
| | print(f"π‘ Tool call status: {response.status_code}")
|
| |
|
| | if response.status_code == 200:
|
| |
|
| | content_type = response.headers.get("content-type", "")
|
| |
|
| | if "text/event-stream" in content_type:
|
| |
|
| | lines = response.text.strip().split('\n')
|
| | for line in lines:
|
| | line = line.strip()
|
| | if line.startswith('data:'):
|
| | data_content = line[5:].strip()
|
| | try:
|
| | sse_data = json.loads(data_content)
|
| | if "result" in sse_data:
|
| | print(f"β
SSE tool response received")
|
| | return sse_data["result"]
|
| | except json.JSONDecodeError:
|
| | continue
|
| | else:
|
| |
|
| | try:
|
| | json_data = response.json()
|
| | if "result" in json_data:
|
| | print(f"β
JSON tool response received")
|
| | return json_data["result"]
|
| | else:
|
| | print(f"π Response structure: {list(json_data.keys())}")
|
| | except json.JSONDecodeError:
|
| | print(f"β Failed to parse JSON response")
|
| | print(f"Raw response: {response.text[:300]}...")
|
| | else:
|
| | print(f"β Tool call failed with status {response.status_code}")
|
| | print(f"Error response: {response.text[:200]}...")
|
| |
|
| | except Exception as e:
|
| | print(f"β Tool call error: {e}")
|
| |
|
| | return None
|
| |
|
| | def convert_topcoder_challenge(self, tc_data: Dict) -> Challenge:
|
| | """FIXED: Better data extraction from Topcoder MCP response"""
|
| | try:
|
| |
|
| | challenge_id = str(tc_data.get('id', tc_data.get('challengeId', 'unknown')))
|
| | title = tc_data.get('name', tc_data.get('title', tc_data.get('challengeName', 'Topcoder Challenge')))
|
| | description = tc_data.get('description', tc_data.get('overview', 'Challenge description not available'))
|
| |
|
| |
|
| | technologies = []
|
| |
|
| |
|
| | skill_sources = [
|
| | tc_data.get('skills', []),
|
| | tc_data.get('technologies', []),
|
| | tc_data.get('tags', []),
|
| | tc_data.get('requiredSkills', [])
|
| | ]
|
| |
|
| | for skill_list in skill_sources:
|
| | if isinstance(skill_list, list):
|
| | for skill in skill_list:
|
| | if isinstance(skill, dict):
|
| | if 'name' in skill:
|
| | technologies.append(skill['name'])
|
| | elif 'skillName' in skill:
|
| | technologies.append(skill['skillName'])
|
| | elif isinstance(skill, str):
|
| | technologies.append(skill)
|
| |
|
| |
|
| | technologies = list(set(technologies))[:5]
|
| |
|
| |
|
| | if not technologies:
|
| | track = tc_data.get('track', tc_data.get('trackName', ''))
|
| | if track:
|
| | technologies.append(track)
|
| |
|
| |
|
| | total_prize = 0
|
| | prize_sources = [
|
| | tc_data.get('prizeSets', []),
|
| | tc_data.get('prizes', []),
|
| | tc_data.get('overview', {}).get('totalPrizes', 0)
|
| | ]
|
| |
|
| | for prize_source in prize_sources:
|
| | if isinstance(prize_source, list):
|
| | for prize_set in prize_source:
|
| | if isinstance(prize_set, dict):
|
| | if prize_set.get('type') == 'placement':
|
| | prizes = prize_set.get('prizes', [])
|
| | for prize in prizes:
|
| | if isinstance(prize, dict) and prize.get('type') == 'USD':
|
| | total_prize += prize.get('value', 0)
|
| | elif isinstance(prize_source, (int, float)):
|
| | total_prize = prize_source
|
| | break
|
| |
|
| | prize = f"${total_prize:,}" if total_prize > 0 else "Merit-based"
|
| |
|
| |
|
| | difficulty_mapping = {
|
| | 'First2Finish': 'Beginner',
|
| | 'Code': 'Intermediate',
|
| | 'Assembly Competition': 'Advanced',
|
| | 'UI Prototype Competition': 'Intermediate',
|
| | 'Copilot Posting': 'Beginner',
|
| | 'Bug Hunt': 'Beginner',
|
| | 'Test Suites': 'Intermediate',
|
| | 'Challenge': 'Intermediate'
|
| | }
|
| |
|
| | challenge_type = tc_data.get('type', tc_data.get('challengeType', 'Challenge'))
|
| | difficulty = difficulty_mapping.get(challenge_type, 'Intermediate')
|
| |
|
| |
|
| | registrants = tc_data.get('numOfRegistrants', tc_data.get('registrants', 0))
|
| |
|
| |
|
| | status = tc_data.get('status', 'Unknown')
|
| | if status == 'Completed':
|
| | time_estimate = "Recently completed"
|
| | elif status in ['Active', 'Draft']:
|
| | time_estimate = "Active challenge"
|
| | else:
|
| | time_estimate = "Variable duration"
|
| |
|
| |
|
| | challenge = Challenge(
|
| | id=challenge_id,
|
| | title=title,
|
| | description=description[:300] + "..." if len(description) > 300 else description,
|
| | technologies=technologies,
|
| | difficulty=difficulty,
|
| | prize=prize,
|
| | time_estimate=time_estimate,
|
| | registrants=registrants
|
| | )
|
| |
|
| | print(f"β
Converted challenge: {title} ({len(technologies)} techs, {prize})")
|
| | return challenge
|
| |
|
| | except Exception as e:
|
| | print(f"β Error converting challenge data: {e}")
|
| | print(f"Raw data keys: {list(tc_data.keys()) if isinstance(tc_data, dict) else 'Not a dict'}")
|
| |
|
| | return Challenge(
|
| | id=str(tc_data.get('id', 'unknown')),
|
| | title=str(tc_data.get('name', 'Challenge')),
|
| | description="Challenge data available",
|
| | technologies=['General'],
|
| | difficulty='Intermediate',
|
| | prize='TBD',
|
| | time_estimate='Variable',
|
| | registrants=0
|
| | )
|
| |
|
| | async def fetch_real_challenges(
|
| | self,
|
| | limit: int = 30,
|
| | status: str = None,
|
| | prize_min: int = None,
|
| | prize_max: int = None,
|
| | challenge_type: str = None,
|
| | track: str = None,
|
| | sort_by: str = None,
|
| | sort_order: str = None,
|
| | search: str = None
|
| | ) -> List[Challenge]:
|
| | """FIXED: Reliable challenge fetching with better connection handling"""
|
| |
|
| |
|
| | print(f"π Fetching real challenges (limit: {limit})")
|
| | connection_success = await self.initialize_connection()
|
| |
|
| | if not connection_success:
|
| | print("β Could not establish MCP connection")
|
| | return []
|
| |
|
| |
|
| | mcp_query = {
|
| | "perPage": min(limit, 50),
|
| | "page": 1
|
| | }
|
| |
|
| |
|
| | if status:
|
| | mcp_query["status"] = status
|
| | if prize_min is not None:
|
| | mcp_query["totalPrizesFrom"] = prize_min
|
| | if prize_max is not None:
|
| | mcp_query["totalPrizesTo"] = prize_max
|
| | if challenge_type:
|
| | mcp_query["type"] = challenge_type
|
| | if track:
|
| | mcp_query["track"] = track
|
| | if search:
|
| | mcp_query["search"] = search
|
| | if sort_by:
|
| | mcp_query["sortBy"] = sort_by
|
| | if sort_order:
|
| | mcp_query["sortOrder"] = sort_order
|
| |
|
| | print(f"π§ Query parameters: {mcp_query}")
|
| |
|
| |
|
| | result = await self.call_tool("query-tc-challenges", mcp_query)
|
| |
|
| | if not result:
|
| | print("β No result from MCP tool call")
|
| | return []
|
| |
|
| | print(f"π Raw MCP result keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}")
|
| |
|
| |
|
| | challenge_data_list = []
|
| |
|
| |
|
| | if isinstance(result, dict):
|
| |
|
| | data_candidates = [
|
| | result.get("structuredContent", {}).get("data", []),
|
| | result.get("data", []),
|
| | result.get("challenges", []),
|
| | result.get("content", [])
|
| | ]
|
| |
|
| | for candidate in data_candidates:
|
| | if isinstance(candidate, list) and len(candidate) > 0:
|
| | challenge_data_list = candidate
|
| | print(f"β
Found {len(challenge_data_list)} challenges in response")
|
| | break
|
| |
|
| |
|
| | if not challenge_data_list and isinstance(result, list):
|
| | challenge_data_list = result
|
| | print(f"β
Found {len(challenge_data_list)} challenges (direct list)")
|
| |
|
| |
|
| | challenges = []
|
| | for item in challenge_data_list:
|
| | if isinstance(item, dict):
|
| | try:
|
| | challenge = self.convert_topcoder_challenge(item)
|
| | challenges.append(challenge)
|
| | except Exception as e:
|
| | print(f"β οΈ Error converting challenge: {e}")
|
| | continue
|
| | else:
|
| | print(f"β οΈ Unexpected challenge data format: {type(item)}")
|
| |
|
| | print(f"π― Successfully converted {len(challenges)} challenges")
|
| |
|
| | if challenges:
|
| | print(f"π Sample challenge: {challenges[0].title} - {challenges[0].prize}")
|
| |
|
| | return challenges
|
| |
|
| | def calculate_advanced_compatibility_score(self, challenge: Challenge, user_profile: UserProfile, query: str) -> tuple:
|
| | """Enhanced compatibility scoring - no changes needed"""
|
| | score = 0.0
|
| | factors = []
|
| |
|
| |
|
| | user_skills_lower = [skill.lower().strip() for skill in user_profile.skills]
|
| | challenge_techs_lower = [tech.lower() for tech in challenge.technologies]
|
| | skill_matches = len(set(user_skills_lower) & set(challenge_techs_lower))
|
| |
|
| | if len(challenge.technologies) > 0:
|
| | exact_match_score = (skill_matches / len(challenge.technologies)) * 30
|
| | coverage_bonus = min(skill_matches * 10, 10)
|
| | skill_score = exact_match_score + coverage_bonus
|
| | else:
|
| | skill_score = 30
|
| |
|
| | score += skill_score
|
| |
|
| | if skill_matches > 0:
|
| | matched_skills = [t for t in challenge.technologies if t.lower() in user_skills_lower]
|
| | factors.append(f"Strong match: uses your {', '.join(matched_skills[:2])} expertise")
|
| | elif len(challenge.technologies) > 0:
|
| | factors.append(f"Growth opportunity: learn {', '.join(challenge.technologies[:2])}")
|
| | else:
|
| | factors.append("Versatile challenge suitable for multiple skill levels")
|
| |
|
| |
|
| | level_mapping = {'beginner': 1, 'intermediate': 2, 'advanced': 3}
|
| | user_level_num = level_mapping.get(user_profile.experience_level.lower(), 2)
|
| | challenge_level_num = level_mapping.get(challenge.difficulty.lower(), 2)
|
| | level_diff = abs(user_level_num - challenge_level_num)
|
| |
|
| | if level_diff == 0:
|
| | level_score = 30
|
| | factors.append(f"Perfect {user_profile.experience_level} level match")
|
| | elif level_diff == 1:
|
| | level_score = 20
|
| | factors.append("Good challenge for skill development")
|
| | else:
|
| | level_score = 5
|
| | factors.append("Stretch challenge with significant learning curve")
|
| |
|
| | score += level_score
|
| |
|
| |
|
| | query_techs = self.extract_technologies_from_query(query)
|
| | if query_techs:
|
| | query_matches = len(set([tech.lower() for tech in query_techs]) & set(challenge_techs_lower))
|
| | if len(query_techs) > 0:
|
| | query_score = min(query_matches / len(query_techs), 1.0) * 20
|
| | else:
|
| | query_score = 10
|
| | if query_matches > 0:
|
| | factors.append(f"Directly matches your interest in {', '.join(query_techs[:2])}")
|
| | else:
|
| | query_score = 10
|
| |
|
| | score += query_score
|
| |
|
| |
|
| | try:
|
| | prize_numeric = 0
|
| | if challenge.prize.startswith('$'):
|
| | prize_str = challenge.prize[1:].replace(',', '')
|
| | prize_numeric = int(prize_str) if prize_str.isdigit() else 0
|
| |
|
| | prize_score = min(prize_numeric / 1000 * 2, 8)
|
| | competition_bonus = 2 if 20 <= challenge.registrants <= 50 else 0
|
| | market_score = prize_score + competition_bonus
|
| | except:
|
| | market_score = 5
|
| |
|
| | score += market_score
|
| |
|
| | return min(score, 100.0), factors
|
| |
|
| | def extract_technologies_from_query(self, query: str) -> List[str]:
|
| | """Extract technology keywords from user query"""
|
| | tech_keywords = {
|
| | 'python', 'java', 'javascript', 'react', 'node', 'angular', 'vue',
|
| | 'aws', 'docker', 'kubernetes', 'api', 'rest', 'graphql', 'sql',
|
| | 'mongodb', 'postgresql', 'machine learning', 'ai', 'blockchain',
|
| | 'ios', 'android', 'flutter', 'swift', 'kotlin', 'c++', 'c#',
|
| | 'ruby', 'php', 'go', 'rust', 'typescript', 'html', 'css',
|
| | 'nft', 'non-fungible tokens', 'ethereum', 'smart contracts', 'solidity',
|
| | 'figma', 'ui/ux', 'design', 'testing', 'jest', 'hardhat', 'web3',
|
| | 'fastapi', 'django', 'flask', 'redis', 'tensorflow', 'd3.js', 'chart.js'
|
| | }
|
| | query_lower = query.lower()
|
| | found_techs = [tech for tech in tech_keywords if tech in query_lower]
|
| | return found_techs
|
| |
|
| | async def get_personalized_recommendations(
|
| | self, user_profile: UserProfile, query: str = "",
|
| | status: str = None, prize_min: int = None, prize_max: int = None,
|
| | challenge_type: str = None, track: str = None,
|
| | sort_by: str = None, sort_order: str = None,
|
| | limit: int = 50
|
| | ) -> Dict[str, Any]:
|
| | """FIXED: Always use real MCP data - no fallback to mock data"""
|
| | start_time = datetime.now()
|
| | print(f"π― Getting personalized recommendations for: {user_profile.skills}")
|
| |
|
| |
|
| | real_challenges = await self.fetch_real_challenges(
|
| | limit=limit,
|
| | status=status,
|
| | prize_min=prize_min,
|
| | prize_max=prize_max,
|
| | challenge_type=challenge_type,
|
| | track=track,
|
| | sort_by=sort_by,
|
| | sort_order=sort_order,
|
| | search=query if query.strip() else None
|
| | )
|
| |
|
| | if not real_challenges:
|
| |
|
| | return {
|
| | "recommendations": [],
|
| | "insights": {
|
| | "total_challenges": 0,
|
| | "average_compatibility": "0%",
|
| | "processing_time": "0.001s",
|
| | "data_source": "β οΈ MCP Connection Issue - No Data Retrieved",
|
| | "top_match": "0%",
|
| | "technologies_detected": [],
|
| | "session_active": bool(self.session_id),
|
| | "mcp_connected": self.is_connected,
|
| | "algorithm_version": "Advanced Multi-Factor v2.0",
|
| | "error_message": "Unable to retrieve live data from Topcoder MCP server"
|
| | }
|
| | }
|
| |
|
| |
|
| | challenges = real_challenges
|
| | data_source = f"π₯ REAL Topcoder MCP Server ({len(challenges)} live challenges)"
|
| | print(f"β
Using {len(challenges)} REAL Topcoder challenges!")
|
| |
|
| |
|
| | scored_challenges = []
|
| | for challenge in challenges:
|
| | score, factors = self.calculate_advanced_compatibility_score(challenge, user_profile, query)
|
| | challenge.compatibility_score = score
|
| | challenge.rationale = f"Match: {score:.0f}%. " + ". ".join(factors[:2]) + "."
|
| | scored_challenges.append(challenge)
|
| |
|
| | scored_challenges.sort(key=lambda x: x.compatibility_score, reverse=True)
|
| | recommendations = scored_challenges[:5]
|
| |
|
| | processing_time = (datetime.now() - start_time).total_seconds()
|
| | query_techs = self.extract_technologies_from_query(query)
|
| | avg_score = sum(c.compatibility_score for c in challenges) / len(challenges) if challenges else 0
|
| |
|
| | print(f"β
Generated {len(recommendations)} recommendations in {processing_time:.3f}s")
|
| | for i, rec in enumerate(recommendations, 1):
|
| | print(f" {i}. {rec.title} - {rec.compatibility_score:.0f}% compatibility")
|
| |
|
| | return {
|
| | "recommendations": [asdict(rec) for rec in recommendations],
|
| | "insights": {
|
| | "total_challenges": len(challenges),
|
| | "average_compatibility": f"{avg_score:.1f}%",
|
| | "processing_time": f"{processing_time:.3f}s",
|
| | "data_source": data_source,
|
| | "top_match": f"{recommendations[0].compatibility_score:.0f}%" if recommendations else "0%",
|
| | "technologies_detected": query_techs,
|
| | "session_active": bool(self.session_id),
|
| | "mcp_connected": self.is_connected,
|
| | "algorithm_version": "Advanced Multi-Factor v2.0",
|
| | "topcoder_total": f"{len(challenges)} live challenges retrieved"
|
| | }
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | class EnhancedLLMChatbot:
|
| | """Enhanced LLM Chatbot with OpenAI Integration + Real MCP Data"""
|
| |
|
| | def __init__(self, mcp_engine):
|
| | self.mcp_engine = mcp_engine
|
| | self.conversation_context = []
|
| | self.user_preferences = {}
|
| |
|
| |
|
| | self.openai_api_key = os.getenv("OPENAI_API_KEY", "")
|
| |
|
| | if not self.openai_api_key:
|
| | print("β οΈ OpenAI API key not found in HF secrets. Using enhanced fallback responses.")
|
| | self.llm_available = False
|
| | else:
|
| | self.llm_available = True
|
| | print("β
OpenAI API key loaded from HF secrets for intelligent responses")
|
| |
|
| | async def get_challenge_context(self, query: str, limit: int = 10) -> str:
|
| | """FIXED: Get real challenge context from working MCP"""
|
| | try:
|
| |
|
| | challenges = await self.mcp_engine.fetch_real_challenges(limit=limit, search=query)
|
| |
|
| | if not challenges:
|
| | return "MCP connection temporarily unavailable. Using enhanced intelligence algorithms."
|
| |
|
| |
|
| | context_data = {
|
| | "total_challenges_available": f"{len(challenges)}+ (from live MCP)",
|
| | "live_connection_status": "β
Connected to Topcoder MCP",
|
| | "sample_challenges": []
|
| | }
|
| |
|
| | for challenge in challenges[:5]:
|
| | challenge_info = {
|
| | "id": challenge.id,
|
| | "title": challenge.title,
|
| | "description": challenge.description[:200] + "...",
|
| | "technologies": challenge.technologies,
|
| | "difficulty": challenge.difficulty,
|
| | "prize": challenge.prize,
|
| | "registrants": challenge.registrants,
|
| | "status": "Live from MCP"
|
| | }
|
| | context_data["sample_challenges"].append(challenge_info)
|
| |
|
| | return json.dumps(context_data, indent=2)
|
| |
|
| | except Exception as e:
|
| | return f"Real-time challenge data temporarily unavailable: {str(e)}"
|
| |
|
| | async def generate_llm_response(self, user_message: str, chat_history: List) -> str:
|
| | """Generate intelligent response using OpenAI API with real MCP data"""
|
| |
|
| |
|
| | challenge_context = await self.get_challenge_context(user_message)
|
| |
|
| |
|
| | recent_history = chat_history[-4:] if len(chat_history) > 4 else chat_history
|
| | history_text = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in recent_history])
|
| |
|
| |
|
| | system_prompt = f"""You are an expert Topcoder Challenge Intelligence Assistant with REAL-TIME access to live challenge data through MCP integration.
|
| |
|
| | REAL CHALLENGE DATA CONTEXT:
|
| | {challenge_context}
|
| |
|
| | Your capabilities:
|
| | - Live access to Topcoder challenges through real MCP integration
|
| | - Advanced challenge matching algorithms with multi-factor scoring
|
| | - Real-time prize information, difficulty levels, and technology requirements
|
| | - Comprehensive skill analysis and career guidance
|
| | - Market intelligence and technology trend insights
|
| |
|
| | CONVERSATION HISTORY:
|
| | {history_text}
|
| |
|
| | Guidelines:
|
| | - Use the REAL challenge data provided above in your responses
|
| | - Reference actual challenge titles, prizes, and technologies when relevant
|
| | - Provide specific, actionable advice based on real data
|
| | - Mention that your data comes from live MCP integration with Topcoder
|
| | - Be enthusiastic about the real-time data capabilities
|
| | - If asked about specific technologies, reference actual challenges that use them
|
| | - For skill questions, suggest real challenges that match their level
|
| | - Keep responses concise but informative (max 300 words)
|
| |
|
| | User's current question: {user_message}
|
| |
|
| | Provide a helpful, intelligent response using the real challenge data context."""
|
| |
|
| |
|
| | if self.llm_available:
|
| | try:
|
| | async with httpx.AsyncClient(timeout=30.0) as client:
|
| | response = await client.post(
|
| | "https://api.openai.com/v1/chat/completions",
|
| | headers={
|
| | "Content-Type": "application/json",
|
| | "Authorization": f"Bearer {self.openai_api_key}"
|
| | },
|
| | json={
|
| | "model": "gpt-4o-mini",
|
| | "messages": [
|
| | {"role": "system", "content": "You are an expert Topcoder Challenge Intelligence Assistant with real MCP data access."},
|
| | {"role": "user", "content": system_prompt}
|
| | ],
|
| | "max_tokens": 800,
|
| | "temperature": 0.7
|
| | }
|
| | )
|
| |
|
| | if response.status_code == 200:
|
| | data = response.json()
|
| | llm_response = data["choices"][0]["message"]["content"]
|
| |
|
| |
|
| | llm_response += f"\n\n*π€ Powered by OpenAI GPT-4 + Real MCP Data β’ {len(challenge_context)} chars of live context*"
|
| |
|
| | return llm_response
|
| | else:
|
| | print(f"OpenAI API error: {response.status_code} - {response.text}")
|
| | return await self.get_fallback_response_with_context(user_message, challenge_context)
|
| |
|
| | except Exception as e:
|
| | print(f"OpenAI API error: {e}")
|
| | return await self.get_fallback_response_with_context(user_message, challenge_context)
|
| |
|
| |
|
| | return await self.get_fallback_response_with_context(user_message, challenge_context)
|
| |
|
| | async def get_fallback_response_with_context(self, user_message: str, challenge_context: str) -> str:
|
| | """Enhanced fallback using real challenge data"""
|
| | message_lower = user_message.lower()
|
| |
|
| |
|
| | try:
|
| | context_data = json.loads(challenge_context)
|
| | challenges = context_data.get("sample_challenges", [])
|
| | total_available = context_data.get("total_challenges_available", "0")
|
| | except:
|
| | challenges = []
|
| | total_available = "0"
|
| |
|
| |
|
| | tech_keywords = ['python', 'react', 'javascript', 'blockchain', 'ai', 'ml', 'java', 'nodejs', 'angular', 'vue']
|
| | matching_tech = [tech for tech in tech_keywords if tech in message_lower]
|
| |
|
| | if matching_tech and challenges:
|
| | relevant_challenges = []
|
| | for challenge in challenges:
|
| | challenge_techs = [tech.lower() for tech in challenge.get('technologies', [])]
|
| | if any(tech in challenge_techs for tech in matching_tech):
|
| | relevant_challenges.append(challenge)
|
| |
|
| | if relevant_challenges:
|
| | response = f"Great question about {', '.join(matching_tech)}! π Based on my real MCP data access, here are actual challenges:\n\n"
|
| | for i, challenge in enumerate(relevant_challenges[:3], 1):
|
| | response += f"π― **{challenge['title']}**\n"
|
| | response += f" π° Prize: {challenge['prize']}\n"
|
| | response += f" π οΈ Technologies: {', '.join(challenge['technologies'])}\n"
|
| | response += f" π Difficulty: {challenge['difficulty']}\n"
|
| | response += f" π₯ Registrants: {challenge['registrants']}\n\n"
|
| |
|
| | response += f"*These are REAL challenges from my live MCP connection to Topcoder! Total available: {total_available}*"
|
| | return response
|
| |
|
| |
|
| | if challenges:
|
| | return f"""Hi! I'm your intelligent Topcoder assistant! π€
|
| |
|
| | I have REAL MCP integration with live access to **{total_available}** challenges from Topcoder's database.
|
| |
|
| | **Currently active challenges include:**
|
| | β’ **{challenges[0]['title']}** ({challenges[0]['prize']})
|
| | β’ **{challenges[1]['title']}** ({challenges[1]['prize']})
|
| | β’ **{challenges[2]['title']}** ({challenges[2]['prize']})
|
| |
|
| | Ask me about:
|
| | π― Specific technologies (Python, React, blockchain, etc.)
|
| | π° Prize ranges and earning potential
|
| | π Difficulty levels and skill requirements
|
| | π Career advice and skill development
|
| |
|
| | *All responses powered by real-time Topcoder MCP data!*"""
|
| |
|
| | return "I'm your intelligent Topcoder assistant with real MCP data access! Ask me about challenges, skills, or career advice and I'll help you using live data from Topcoder's challenge database! π"
|
| |
|
| |
|
| | print("π Starting FIXED Topcoder Intelligence Assistant with REAL MCP Integration...")
|
| | intelligence_engine = UltimateTopcoderMCPEngine()
|
| |
|
| | print("β
FIXED MCP Integration Ready!")
|
| | print("π₯ This version will connect to real Topcoder MCP data!")
|
| | print("π No more fallback to mock data!") |