import json import os import re import time import uuid import sys import logging import asyncio import traceback from datetime import datetime, timezone, timedelta from dataclasses import dataclass from typing import List, Optional, Tuple import gradio as gr import requests from mcp import ClientSession from mcp.client.sse import sse_client from openai import OpenAI from dotenv import load_dotenv load_dotenv() MCP_SSE_URL = "https://api.topcoder-dev.com/v6/mcp/sse" MCP_MESSAGES_URL = "https://api.topcoder-dev.com/v6/mcp/messages" MCP_HTTP_BASE = "https://api.topcoder-dev.com/v6/mcp/mcp" # Console logger for runtime diagnostics logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", stream=sys.stdout, ) logger = logging.getLogger("challenge_scout") @dataclass class Challenge: id: str title: str prize: float deadline: str tags: List[str] description: str = "" FALLBACK_DATA: List[Challenge] = [ Challenge(id="123", title="Build a Minimal MCP Agent UI", prize=300.0, deadline="2025-08-20", tags=["mcp", "python", "gradio"]), Challenge(id="456", title="Topcoder Data Parsing Toolkit", prize=500.0, deadline="2025-08-15", tags=["data", "api", "json"]), Challenge(id="789", title="AI-Powered Web Application", prize=250.0, deadline="2025-08-18", tags=["ai", "web", "optimization"]), ] def _open_mcp_session( connect_timeout: float = 5.0, read_window_seconds: float = 8.0, debug: bool = False, max_retries: int = 2, ) -> Tuple[Optional[str], str]: """Open SSE and extract a sessionId from the first data event. We keep the stream open for up to `read_window_seconds` to wait for the line like: "data: /v6/mcp/messages?sessionId=". """ logs: List[str] = [] headers = { "Accept": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", } backoff = 0.6 for attempt in range(1, max_retries + 2): if debug: logger.info(f"SSE dialing attempt={attempt} url={MCP_SSE_URL}") try: t0 = time.time() resp = requests.get( MCP_SSE_URL, headers=headers, timeout=(connect_timeout, read_window_seconds + 2.0), stream=True, ) elapsed = time.time() - t0 logs.append(f"SSE attempt {attempt}: status={resp.status_code}, elapsed={elapsed:.2f}s") if debug: logger.info(f"SSE attempt {attempt} status={resp.status_code} elapsed={elapsed:.2f}s") resp.raise_for_status() start = time.time() pattern = re.compile(r"^data:\s*/v6/mcp/messages\?sessionId=([a-f0-9-]+)\s*$", re.IGNORECASE) for line in resp.iter_lines(decode_unicode=True, chunk_size=1): if line is None: if time.time() - start > read_window_seconds: logs.append("SSE: timeout waiting for data line") break continue if not isinstance(line, str): if time.time() - start > read_window_seconds: logs.append("SSE: timeout (non-str line)") break continue if debug: logs.append(f"SSE line: {line[:160]}") logger.info(f"SSE line: {line[:160]}") m = pattern.match(line) if m: sid = m.group(1) logs.append(f"SSE: obtained sessionId={sid}") if debug: logger.info(f"SSE obtained sessionId={sid}") return sid, "\n".join(logs) if time.time() - start > read_window_seconds: logs.append("SSE: read window exceeded without sessionId") if debug: logger.info("SSE: read window exceeded without sessionId") break except Exception as e: logs.append(f"SSE error on attempt {attempt}: {e}") if debug: logger.info(f"SSE error on attempt {attempt}: {e}") time.sleep(backoff) backoff *= 1.6 return None, "\n".join(logs) def _mcp_list_challenges( debug: bool = False, retries: int = 2, ) -> Tuple[List[Challenge], Optional[str], str]: """Call MCP using the official SDK (JSON-RPC over streamable HTTP) and return compact challenges. Returns (challenges, error, debug_logs) """ logs: List[str] = [] async def _do() -> Tuple[List[Challenge], Optional[str], str]: try: t0 = time.time() async with sse_client(MCP_SSE_URL) as (read, write): async with ClientSession(read, write) as session: await session.initialize() tools = await session.list_tools() names = [t.name for t in tools.tools] logs.append(f"SDK tools: {names}") # Prefer a known tool name pattern tool_name = None for cand in [ "query-tc-challenges", "query-tc-challenges-public", "query-tc-challenges-private", ]: if cand in names: tool_name = cand break if not tool_name and names: tool_name = names[0] if not tool_name: return [], "No tools available", "\n".join(logs) # Call with minimal args (tool often supports pagination; start small) args = {"page": 1, "pageSize": 20} logs.append(f"Calling tool {tool_name} with args: {args}") try: result = await session.call_tool(tool_name, args) except Exception as e: logs.append(f"call_tool error: {e}") return [], str(e), "\n".join(logs) items: List[Challenge] = [] payload = getattr(result, "structuredContent", None) if not payload: # Fallback to text content join and attempt JSON extraction of an object with data[] or array combined_text = "\n".join( getattr(c, "text", "") for c in getattr(result, "content", []) if hasattr(c, "text") ) obj_match = re.search(r"\{[\s\S]*\}$", combined_text) if obj_match: try: payload = json.loads(obj_match.group(0)) except Exception: payload = None # Normalize list of challenge-like dicts raw_list = [] if isinstance(payload, dict) and isinstance(payload.get("data"), list): raw_list = payload.get("data") logs.append(f"Received {len(raw_list)} items from data[]") elif isinstance(payload, list): raw_list = payload logs.append(f"Received {len(raw_list)} items from top-level list") else: logs.append(f"Unexpected payload shape: {type(payload)}") def to_ch(item: dict) -> Challenge: title = ( str(item.get("name") or item.get("title") or item.get("challengeTitle") or "") ) # prize from prizeSets → sum values if present prize = 0.0 prize_sets = item.get("prizeSets") if isinstance(prize_sets, list): for s in prize_sets: prizes = s.get("prizes") if isinstance(s, dict) else None if isinstance(prizes, list): for p in prizes: val = None if isinstance(p, dict): val = p.get("value") or p.get("amount") try: prize += float(val or 0) except Exception: pass else: prize_val = item.get("totalPrizes") or item.get("prize") or item.get("totalPrize") or 0 try: prize = float(prize_val or 0) except Exception: prize = 0.0 # deadline candidates deadline = ( str( item.get("endDate") or item.get("submissionEndDate") or item.get("registrationEndDate") or item.get("startDate") or "" ) ) # tags candidates tg = item.get("tags") if not isinstance(tg, list): tg = [] # add some fallbacks/contextual labels for extra in [item.get("track"), item.get("type"), item.get("status")]: if extra: tg.append(extra) # skills may be list of dicts or strings sk = item.get("skills") if isinstance(sk, list): for s in sk: if isinstance(s, dict): name = s.get("name") or s.get("id") if name: tg.append(str(name)) else: tg.append(str(s)) tg = [str(x) for x in tg if x] desc = str(item.get("description") or "") return Challenge( id=str(item.get("id", "")), title=title, prize=prize, deadline=deadline, tags=tg, description=desc, ) for it in raw_list: if isinstance(it, dict): items.append(to_ch(it)) logs.append(f"Normalized {len(items)} challenges") return items, None, "\n".join(logs) except Exception as e: logs.append(f"SDK fatal: {e}") logs.append(traceback.format_exc()) # Try to unwrap ExceptionGroup-like messages if any try: from exceptiongroup import ExceptionGroup # type: ignore except Exception: ExceptionGroup = None # type: ignore if ExceptionGroup and isinstance(e, ExceptionGroup): # type: ignore[arg-type] for idx, sub in enumerate(e.exceptions): # type: ignore[attr-defined] logs.append(f" sub[{idx}]: {type(sub).__name__}: {sub}") return [], str(e), "\n".join(logs) # Run the async block return asyncio.run(_do()) def shortlist(challenge_list: List[Challenge], keyword: str, min_prize: float) -> List[Challenge]: keyword_lower = keyword.lower().strip() results = [] for ch in challenge_list: if ch.prize < min_prize: continue hay = f"{ch.title} {' '.join(ch.tags)}".lower() if keyword_lower and keyword_lower not in hay: continue results.append(ch) # lightweight ranking: by prize desc results.sort(key=lambda c: c.prize, reverse=True) return results def _parse_deadline(dt_str: str) -> Optional[datetime]: if not dt_str: return None try: # Try ISO 8601 with Z return datetime.fromisoformat(dt_str.replace("Z", "+00:00")) except Exception: try: return datetime.strptime(dt_str, "%Y-%m-%d") except Exception: return None def _filter_by_days(items: List[Challenge], days_ahead: int) -> List[Challenge]: if days_ahead <= 0: return items now = datetime.now(timezone.utc) limit = now + timedelta(days=days_ahead) kept: List[Challenge] = [] for ch in items: dt = _parse_deadline(ch.deadline) if dt is None: kept.append(ch) continue if now <= dt <= limit: kept.append(ch) return kept def _generate_plan(items: List[Challenge], keyword: str, min_prize: float, days_ahead: int, debug: bool = False) -> Tuple[str, str]: # Keep context tiny: only top 8 rows of compact fields top = items[:8] compact = [ { "title": c.title, "prize": c.prize, "deadline": c.deadline, "tags": c.tags[:5], } for c in top ] logs: List[str] = [] api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") if not api_key: logs.append("Missing OPENAI_API_KEY") return "", "\n".join(logs) try: client = OpenAI( api_key=api_key, base_url=base_url ) prompt = ( "You are a concise challenge scout. Given compact challenge metadata, output:\n" "- Top 3 picks (title + brief reason)\n" "- Quick plan of action (3 bullets)\n" f"Constraints: keyword='{keyword}', min_prize>={min_prize}, within {days_ahead} days.\n" f"Data: {json.dumps(compact)}" ) if debug: logs.append(f"PLAN OpenAI prompt: {prompt[:1500]}") response = client.chat.completions.create( model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "You are a helpful, terse assistant."}, {"role": "user", "content": prompt}, ], temperature=0.3, timeout=20 ) text = response.choices[0].message.content.strip() if debug: logs.append(f"PLAN OpenAI output: {text[:800]}") logger.info("\n".join(logs)) return text, "\n".join(logs) except Exception as e: if debug: logs.append(f"PLAN OpenAI error: {e}") logger.info("\n".join(logs)) return "", "\n".join(logs) def _generate_plan_fixed( ranked: List[tuple[Challenge, float, str]], keyword: str, days_ahead: int, debug: bool = False, ) -> Tuple[str, str]: """Generate a plan using the already-ranked top picks without changing order. The LLM is only used to phrase reasons and the action plan. It must not re-rank. """ top = ranked[:3] data = [ { "title": c.title, "prize": c.prize, "deadline": c.deadline, "tags": c.tags[:6], "score": round(s, 4), "reason": (r or "").strip()[:300], } for c, s, r in top ] logs: List[str] = [] api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") if not api_key: logs.append("Missing OPENAI_API_KEY") return "", "\n".join(logs) try: client = OpenAI(api_key=api_key, base_url=base_url) prompt = ( "You are a concise challenge scout. You are given pre-ranked top picks (in order).\n" "Do NOT change the order or add/remove items.\n" "Output exactly:\n" "- Top 3 picks (title + short reason).\n" "- Quick plan of action (3 bullets).\n" f"Constraints: query='{keyword}', within {days_ahead} days.\n" f"Ranked data: {json.dumps(data)}" ) if debug: logs.append(f"PLAN(FIXED) prompt: {prompt[:1200]}") resp = client.chat.completions.create( model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "Be terse. Do not re-rank or add items."}, {"role": "user", "content": prompt}, ], temperature=0.3, timeout=20, ) text = (resp.choices[0].message.content or "").strip() if debug: logs.append(f"PLAN(FIXED) output: {text[:800]}") return text, "\n".join(logs) except Exception as e: if debug: logs.append(f"PLAN(FIXED) error: {e}") return "", "\n".join(logs) def _score_items(items: List[Challenge], keyword: str, debug: bool = False) -> Tuple[List[tuple[Challenge, float, str]], str]: """Score challenges using OpenAI API and return (challenge, score, reason) tuples""" api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") results: List[tuple[Challenge, float, str]] = [] slog: List[str] = [] if not items: return results, "\n".join(slog) if not api_key: if debug: slog.append("Missing OPENAI_API_KEY") return results, "\n".join(slog) compact = [ { "id": c.id, "title": c.title, "prize": c.prize, "deadline": c.deadline, "tags": c.tags[:6], "description": (c.description or "").replace("\n", " ")[:400], } for c in items[:8] ] scoring_prompt = ( "You are an expert Topcoder challenge analyst. Analyze items and rate match to the query.\n" f"Query: {keyword}\n" "Items: " + json.dumps(compact) + "\n\n" "Instructions:\n" "- Consider skills, tags, and brief description.\n" "- Relevance matters most, but higher prize should be explicitly favored.\n" "- When two items are similarly relevant, prioritize the one with the larger prize.\n" "- Incorporate prize into the score (0-1), not just the reason.\n" "- Return ONLY JSON array of objects: [{id, score, reason}] where 0<=score<=1.\n" "- Do not include any extra text." ) try: client = OpenAI( api_key=api_key, base_url=base_url ) if debug: slog.append(f"OpenAI model: {os.getenv('OPENAI_MODEL', 'gpt-4o-mini')}") slog.append(f"OpenAI prompt: {scoring_prompt[:1500]}") response = client.chat.completions.create( model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "You are a helpful, terse assistant. Return JSON only."}, {"role": "user", "content": scoring_prompt}, ], temperature=0.2, timeout=40 ) text = response.choices[0].message.content.strip() if debug: slog.append(f"OpenAI raw output: {text[:800]}") # Extract JSON from response m = re.search(r"\[\s*\{[\s\S]*\}\s*\]", text) if not m: if debug: slog.append("No valid JSON array found in response") return results, "\n".join(slog) try: arr = json.loads(m.group(0)) score_map: dict[str, tuple[float, str]] = {} for obj in arr: if isinstance(obj, dict): cid = str(obj.get("id", "")) score = float(obj.get("score", 0)) reason = str(obj.get("reason", "")) score_map[cid] = (max(0.0, min(1.0, score)), reason) # Build results with scores out: List[tuple[Challenge, float, str]] = [] for c in items: if c.id in score_map: s, r = score_map[c.id] out.append((c, s, r)) else: # Fallback: light prize-based score for missing items out.append((c, min(c.prize / 1000.0, 1.0) * 0.3, "")) if debug: slog.append(f"OpenAI parsed {len(score_map)} scores") logger.info("\n".join(slog)) return out, "\n".join(slog) except json.JSONDecodeError as e: if debug: slog.append(f"JSON decode error: {e}") return results, "\n".join(slog) except Exception as e: if debug: slog.append(f"OpenAI API error: {e}") logger.info("\n".join(slog)) return results, "\n".join(slog) def _require_llm_config() -> Tuple[bool, str]: """Check if OpenAI API is properly configured""" if os.getenv("OPENAI_API_KEY"): return True, "" return False, "Set OPENAI_API_KEY for LLM scoring" def _extract_keywords(requirements: str, debug: bool = False) -> Tuple[str, str]: """Use LLM to extract 1-3 broad, concise keywords from free-form requirements.""" logs: List[str] = [] api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") if not api_key: return "", "Missing OPENAI_API_KEY" try: client = OpenAI(api_key=api_key, base_url=base_url) prompt = ( "You extract compact search keywords.\n" "Given a user's requirements, output a comma-separated list of up to 3 broad keywords (1-3 words each).\n" "Keep them minimal, generic, and deduplicated.\n" "Examples:\n" "- 'Need an LLM project with Python and simple UI' -> LLM, Python, UI\n" "- 'Data visualization challenges about dashboards' -> data visualization, dashboard\n" f"Requirements: {requirements.strip()}\n" "Return only the keywords, comma-separated." ) if debug: logs.append(f"KW prompt: {prompt[:500]}") resp = client.chat.completions.create( model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "You are a terse assistant. Return only keywords."}, {"role": "user", "content": prompt}, ], temperature=0.2, timeout=20, ) text = (resp.choices[0].message.content or "").strip() # normalize to simple comma-separated text = re.sub(r"\s*[,\n]\s*", ", ", text) # keep at most 3 parts = [p.strip() for p in text.split(",") if p.strip()] keywords = ", ".join(parts[:3]) if debug: logs.append(f"KW extracted: {keywords}") return keywords, "\n".join(logs) except Exception as e: if debug: logs.append(f"KW error: {e}") return "", "\n".join(logs) def _filter_active(items: List[Challenge]) -> List[Challenge]: """Keep items that appear to be active/online based on tags or status hints.""" kept: List[Challenge] = [] for ch in items: tags_lower = [t.lower() for t in ch.tags] if any(t == "active" or t == "open" for t in tags_lower): kept.append(ch) else: # If status is unknown, keep it (avoid over-filtering) if not any(t in ("completed", "closed", "cancelled", "draft") for t in tags_lower): kept.append(ch) return kept def _stars_for_score(score: float) -> str: n = max(1, min(5, int(round(score * 5)))) return "★" * n + "☆" * (5 - n) def run_query(requirements: str): # Defaults for agent flow min_prize = 0.0 days_ahead = 90 # Extract compact keywords from requirements keyword, _ = _extract_keywords(requirements, debug=False) # Always use MCP in production UI; fallback to local sample on error items, err, _ = _mcp_list_challenges(debug=False) if err: items = FALLBACK_DATA status = f"MCP fallback: {err}" else: status = "MCP OK" # Apply time window first, then keyword/prize filtering items = _filter_by_days(items, days_ahead) items = _filter_active(items) # Apply light shortlist by prize only; rely on AI scoring for relevance filtered = shortlist(items, "", min_prize) # Enforce LLM config for scoring ok, cfg_msg = _require_llm_config() if not ok: status = f"{status} | LLM config required: {cfg_msg}" else: scored, _ = _score_items(filtered, keyword, debug=False) if scored: # Blend LLM relevance with normalized prize to unify ranking # prize_norm in [0,1] across the current candidate set prizes = [c.prize for c, _, _ in scored] pmin = min(prizes) if prizes else 0.0 pmax = max(prizes) if prizes else 0.0 denom = (pmax - pmin) if (pmax - pmin) > 0 else 1.0 def prize_norm(v: float) -> float: return max(0.0, min(1.0, (v - pmin) / denom)) alpha = 0.75 # relevance weight; 25% weight to prize ranked = [] for c, s, r in scored: cs = alpha * float(s) + (1 - alpha) * prize_norm(c.prize) ranked.append((c, cs, r)) ranked.sort(key=lambda t: t[1], reverse=True) filtered = [c for c, _, _ in ranked] # If MCP returned items but filtering removed all (e.g., many items missing prize), show unfiltered if not filtered and items: filtered = items status = f"{status} (no matches; showing unfiltered)" plan_text, _ = _generate_plan_fixed(ranked, keyword, days_ahead, debug=False) if ok and 'ranked' in locals() and ranked else ("", "") # Build a map from id to (score, reason) for star display id_to_score_reason: dict[str, tuple[float, str]] = {} if ok and 'ranked' in locals(): for c, s, r in ranked: id_to_score_reason[c.id] = (s, r) rows = [] for c in filtered: s, r = id_to_score_reason.get(c.id, (0.0, "")) stars = _stars_for_score(s) rows.append([ c.title, f"${c.prize:,.0f}", c.deadline, ", ".join(c.tags), stars, (r[:160] + ("…" if len(r) > 160 else "")) if r else "", c.id, ]) return rows, status, plan_text with gr.Blocks(title="Topcoder Challenge Scout") as demo: gr.Markdown("**Topcoder Challenge Scout** — agent picks tools, you provide requirements") requirements = gr.Textbox(label="Requirements", placeholder="e.g. Looking for recent active LLM development challenges with web UI", lines=3) gr.Markdown("Default filters: within last 90 days, active status. The agent extracts minimal keywords automatically.") run_btn = gr.Button("Find challenges") status = gr.Textbox(label="Status", interactive=False) table = gr.Dataframe(headers=["Title", "Prize", "Deadline", "Tags", "Recommend", "AI Reason", "Id"], wrap=True) plan_md = gr.Markdown("", label="Plan") run_btn.click( fn=run_query, inputs=[requirements], outputs=[table, status, plan_md], ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")))