Learn-AI / app.py
fffffwl's picture
optimized prompt
d3512ba
import json
import os
import re
import time
import uuid
import sys
import logging
import asyncio
import traceback
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
from typing import List, Optional, Tuple
import gradio as gr
import requests
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
MCP_SSE_URL = "https://api.topcoder-dev.com/v6/mcp/sse"
MCP_MESSAGES_URL = "https://api.topcoder-dev.com/v6/mcp/messages"
MCP_HTTP_BASE = "https://api.topcoder-dev.com/v6/mcp/mcp"
# Console logger for runtime diagnostics
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("challenge_scout")
@dataclass
class Challenge:
id: str
title: str
prize: float
deadline: str
tags: List[str]
description: str = ""
FALLBACK_DATA: List[Challenge] = [
Challenge(id="123", title="Build a Minimal MCP Agent UI", prize=300.0, deadline="2025-08-20", tags=["mcp", "python", "gradio"]),
Challenge(id="456", title="Topcoder Data Parsing Toolkit", prize=500.0, deadline="2025-08-15", tags=["data", "api", "json"]),
Challenge(id="789", title="AI-Powered Web Application", prize=250.0, deadline="2025-08-18", tags=["ai", "web", "optimization"]),
]
def _open_mcp_session(
connect_timeout: float = 5.0,
read_window_seconds: float = 8.0,
debug: bool = False,
max_retries: int = 2,
) -> Tuple[Optional[str], str]:
"""Open SSE and extract a sessionId from the first data event.
We keep the stream open for up to `read_window_seconds` to wait for the
line like: "data: /v6/mcp/messages?sessionId=<uuid>".
"""
logs: List[str] = []
headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
backoff = 0.6
for attempt in range(1, max_retries + 2):
if debug:
logger.info(f"SSE dialing attempt={attempt} url={MCP_SSE_URL}")
try:
t0 = time.time()
resp = requests.get(
MCP_SSE_URL,
headers=headers,
timeout=(connect_timeout, read_window_seconds + 2.0),
stream=True,
)
elapsed = time.time() - t0
logs.append(f"SSE attempt {attempt}: status={resp.status_code}, elapsed={elapsed:.2f}s")
if debug:
logger.info(f"SSE attempt {attempt} status={resp.status_code} elapsed={elapsed:.2f}s")
resp.raise_for_status()
start = time.time()
pattern = re.compile(r"^data:\s*/v6/mcp/messages\?sessionId=([a-f0-9-]+)\s*$", re.IGNORECASE)
for line in resp.iter_lines(decode_unicode=True, chunk_size=1):
if line is None:
if time.time() - start > read_window_seconds:
logs.append("SSE: timeout waiting for data line")
break
continue
if not isinstance(line, str):
if time.time() - start > read_window_seconds:
logs.append("SSE: timeout (non-str line)")
break
continue
if debug:
logs.append(f"SSE line: {line[:160]}")
logger.info(f"SSE line: {line[:160]}")
m = pattern.match(line)
if m:
sid = m.group(1)
logs.append(f"SSE: obtained sessionId={sid}")
if debug:
logger.info(f"SSE obtained sessionId={sid}")
return sid, "\n".join(logs)
if time.time() - start > read_window_seconds:
logs.append("SSE: read window exceeded without sessionId")
if debug:
logger.info("SSE: read window exceeded without sessionId")
break
except Exception as e:
logs.append(f"SSE error on attempt {attempt}: {e}")
if debug:
logger.info(f"SSE error on attempt {attempt}: {e}")
time.sleep(backoff)
backoff *= 1.6
return None, "\n".join(logs)
def _mcp_list_challenges(
debug: bool = False,
retries: int = 2,
) -> Tuple[List[Challenge], Optional[str], str]:
"""Call MCP using the official SDK (JSON-RPC over streamable HTTP) and return compact challenges.
Returns (challenges, error, debug_logs)
"""
logs: List[str] = []
async def _do() -> Tuple[List[Challenge], Optional[str], str]:
try:
t0 = time.time()
async with sse_client(MCP_SSE_URL) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
names = [t.name for t in tools.tools]
logs.append(f"SDK tools: {names}")
# Prefer a known tool name pattern
tool_name = None
for cand in [
"query-tc-challenges",
"query-tc-challenges-public",
"query-tc-challenges-private",
]:
if cand in names:
tool_name = cand
break
if not tool_name and names:
tool_name = names[0]
if not tool_name:
return [], "No tools available", "\n".join(logs)
# Call with minimal args (tool often supports pagination; start small)
args = {"page": 1, "pageSize": 20}
logs.append(f"Calling tool {tool_name} with args: {args}")
try:
result = await session.call_tool(tool_name, args)
except Exception as e:
logs.append(f"call_tool error: {e}")
return [], str(e), "\n".join(logs)
items: List[Challenge] = []
payload = getattr(result, "structuredContent", None)
if not payload:
# Fallback to text content join and attempt JSON extraction of an object with data[] or array
combined_text = "\n".join(
getattr(c, "text", "") for c in getattr(result, "content", []) if hasattr(c, "text")
)
obj_match = re.search(r"\{[\s\S]*\}$", combined_text)
if obj_match:
try:
payload = json.loads(obj_match.group(0))
except Exception:
payload = None
# Normalize list of challenge-like dicts
raw_list = []
if isinstance(payload, dict) and isinstance(payload.get("data"), list):
raw_list = payload.get("data")
logs.append(f"Received {len(raw_list)} items from data[]")
elif isinstance(payload, list):
raw_list = payload
logs.append(f"Received {len(raw_list)} items from top-level list")
else:
logs.append(f"Unexpected payload shape: {type(payload)}")
def to_ch(item: dict) -> Challenge:
title = (
str(item.get("name") or item.get("title") or item.get("challengeTitle") or "")
)
# prize from prizeSets → sum values if present
prize = 0.0
prize_sets = item.get("prizeSets")
if isinstance(prize_sets, list):
for s in prize_sets:
prizes = s.get("prizes") if isinstance(s, dict) else None
if isinstance(prizes, list):
for p in prizes:
val = None
if isinstance(p, dict):
val = p.get("value") or p.get("amount")
try:
prize += float(val or 0)
except Exception:
pass
else:
prize_val = item.get("totalPrizes") or item.get("prize") or item.get("totalPrize") or 0
try:
prize = float(prize_val or 0)
except Exception:
prize = 0.0
# deadline candidates
deadline = (
str(
item.get("endDate")
or item.get("submissionEndDate")
or item.get("registrationEndDate")
or item.get("startDate")
or ""
)
)
# tags candidates
tg = item.get("tags")
if not isinstance(tg, list):
tg = []
# add some fallbacks/contextual labels
for extra in [item.get("track"), item.get("type"), item.get("status")]:
if extra:
tg.append(extra)
# skills may be list of dicts or strings
sk = item.get("skills")
if isinstance(sk, list):
for s in sk:
if isinstance(s, dict):
name = s.get("name") or s.get("id")
if name:
tg.append(str(name))
else:
tg.append(str(s))
tg = [str(x) for x in tg if x]
desc = str(item.get("description") or "")
return Challenge(
id=str(item.get("id", "")),
title=title,
prize=prize,
deadline=deadline,
tags=tg,
description=desc,
)
for it in raw_list:
if isinstance(it, dict):
items.append(to_ch(it))
logs.append(f"Normalized {len(items)} challenges")
return items, None, "\n".join(logs)
except Exception as e:
logs.append(f"SDK fatal: {e}")
logs.append(traceback.format_exc())
# Try to unwrap ExceptionGroup-like messages if any
try:
from exceptiongroup import ExceptionGroup # type: ignore
except Exception:
ExceptionGroup = None # type: ignore
if ExceptionGroup and isinstance(e, ExceptionGroup): # type: ignore[arg-type]
for idx, sub in enumerate(e.exceptions): # type: ignore[attr-defined]
logs.append(f" sub[{idx}]: {type(sub).__name__}: {sub}")
return [], str(e), "\n".join(logs)
# Run the async block
return asyncio.run(_do())
def shortlist(challenge_list: List[Challenge], keyword: str, min_prize: float) -> List[Challenge]:
keyword_lower = keyword.lower().strip()
results = []
for ch in challenge_list:
if ch.prize < min_prize:
continue
hay = f"{ch.title} {' '.join(ch.tags)}".lower()
if keyword_lower and keyword_lower not in hay:
continue
results.append(ch)
# lightweight ranking: by prize desc
results.sort(key=lambda c: c.prize, reverse=True)
return results
def _parse_deadline(dt_str: str) -> Optional[datetime]:
if not dt_str:
return None
try:
# Try ISO 8601 with Z
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
except Exception:
try:
return datetime.strptime(dt_str, "%Y-%m-%d")
except Exception:
return None
def _filter_by_days(items: List[Challenge], days_ahead: int) -> List[Challenge]:
if days_ahead <= 0:
return items
now = datetime.now(timezone.utc)
limit = now + timedelta(days=days_ahead)
kept: List[Challenge] = []
for ch in items:
dt = _parse_deadline(ch.deadline)
if dt is None:
kept.append(ch)
continue
if now <= dt <= limit:
kept.append(ch)
return kept
def _generate_plan(items: List[Challenge], keyword: str, min_prize: float, days_ahead: int, debug: bool = False) -> Tuple[str, str]:
# Keep context tiny: only top 8 rows of compact fields
top = items[:8]
compact = [
{
"title": c.title,
"prize": c.prize,
"deadline": c.deadline,
"tags": c.tags[:5],
}
for c in top
]
logs: List[str] = []
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
if not api_key:
logs.append("Missing OPENAI_API_KEY")
return "", "\n".join(logs)
try:
client = OpenAI(
api_key=api_key,
base_url=base_url
)
prompt = (
"You are a concise challenge scout. Given compact challenge metadata, output:\n"
"- Top 3 picks (title + brief reason)\n"
"- Quick plan of action (3 bullets)\n"
f"Constraints: keyword='{keyword}', min_prize>={min_prize}, within {days_ahead} days.\n"
f"Data: {json.dumps(compact)}"
)
if debug:
logs.append(f"PLAN OpenAI prompt: {prompt[:1500]}")
response = client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a helpful, terse assistant."},
{"role": "user", "content": prompt},
],
temperature=0.3,
timeout=20
)
text = response.choices[0].message.content.strip()
if debug:
logs.append(f"PLAN OpenAI output: {text[:800]}")
logger.info("\n".join(logs))
return text, "\n".join(logs)
except Exception as e:
if debug:
logs.append(f"PLAN OpenAI error: {e}")
logger.info("\n".join(logs))
return "", "\n".join(logs)
def _generate_plan_fixed(
ranked: List[tuple[Challenge, float, str]],
keyword: str,
days_ahead: int,
debug: bool = False,
) -> Tuple[str, str]:
"""Generate a plan using the already-ranked top picks without changing order.
The LLM is only used to phrase reasons and the action plan. It must not re-rank.
"""
top = ranked[:3]
data = [
{
"title": c.title,
"prize": c.prize,
"deadline": c.deadline,
"tags": c.tags[:6],
"score": round(s, 4),
"reason": (r or "").strip()[:300],
}
for c, s, r in top
]
logs: List[str] = []
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
if not api_key:
logs.append("Missing OPENAI_API_KEY")
return "", "\n".join(logs)
try:
client = OpenAI(api_key=api_key, base_url=base_url)
prompt = (
"You are a concise challenge scout. You are given pre-ranked top picks (in order).\n"
"Do NOT change the order or add/remove items.\n"
"Output exactly:\n"
"- Top 3 picks (title + short reason).\n"
"- Quick plan of action (3 bullets).\n"
f"Constraints: query='{keyword}', within {days_ahead} days.\n"
f"Ranked data: {json.dumps(data)}"
)
if debug:
logs.append(f"PLAN(FIXED) prompt: {prompt[:1200]}")
resp = client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "Be terse. Do not re-rank or add items."},
{"role": "user", "content": prompt},
],
temperature=0.3,
timeout=20,
)
text = (resp.choices[0].message.content or "").strip()
if debug:
logs.append(f"PLAN(FIXED) output: {text[:800]}")
return text, "\n".join(logs)
except Exception as e:
if debug:
logs.append(f"PLAN(FIXED) error: {e}")
return "", "\n".join(logs)
def _score_items(items: List[Challenge], keyword: str, debug: bool = False) -> Tuple[List[tuple[Challenge, float, str]], str]:
"""Score challenges using OpenAI API and return (challenge, score, reason) tuples"""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
results: List[tuple[Challenge, float, str]] = []
slog: List[str] = []
if not items:
return results, "\n".join(slog)
if not api_key:
if debug:
slog.append("Missing OPENAI_API_KEY")
return results, "\n".join(slog)
compact = [
{
"id": c.id,
"title": c.title,
"prize": c.prize,
"deadline": c.deadline,
"tags": c.tags[:6],
"description": (c.description or "").replace("\n", " ")[:400],
}
for c in items[:8]
]
scoring_prompt = (
"You are an expert Topcoder challenge analyst. Analyze items and rate match to the query.\n"
f"Query: {keyword}\n"
"Items: " + json.dumps(compact) + "\n\n"
"Instructions:\n"
"- Consider skills, tags, and brief description.\n"
"- Relevance matters most, but higher prize should be explicitly favored.\n"
"- When two items are similarly relevant, prioritize the one with the larger prize.\n"
"- Incorporate prize into the score (0-1), not just the reason.\n"
"- Return ONLY JSON array of objects: [{id, score, reason}] where 0<=score<=1.\n"
"- Do not include any extra text."
)
try:
client = OpenAI(
api_key=api_key,
base_url=base_url
)
if debug:
slog.append(f"OpenAI model: {os.getenv('OPENAI_MODEL', 'gpt-4o-mini')}")
slog.append(f"OpenAI prompt: {scoring_prompt[:1500]}")
response = client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a helpful, terse assistant. Return JSON only."},
{"role": "user", "content": scoring_prompt},
],
temperature=0.2,
timeout=40
)
text = response.choices[0].message.content.strip()
if debug:
slog.append(f"OpenAI raw output: {text[:800]}")
# Extract JSON from response
m = re.search(r"\[\s*\{[\s\S]*\}\s*\]", text)
if not m:
if debug:
slog.append("No valid JSON array found in response")
return results, "\n".join(slog)
try:
arr = json.loads(m.group(0))
score_map: dict[str, tuple[float, str]] = {}
for obj in arr:
if isinstance(obj, dict):
cid = str(obj.get("id", ""))
score = float(obj.get("score", 0))
reason = str(obj.get("reason", ""))
score_map[cid] = (max(0.0, min(1.0, score)), reason)
# Build results with scores
out: List[tuple[Challenge, float, str]] = []
for c in items:
if c.id in score_map:
s, r = score_map[c.id]
out.append((c, s, r))
else:
# Fallback: light prize-based score for missing items
out.append((c, min(c.prize / 1000.0, 1.0) * 0.3, ""))
if debug:
slog.append(f"OpenAI parsed {len(score_map)} scores")
logger.info("\n".join(slog))
return out, "\n".join(slog)
except json.JSONDecodeError as e:
if debug:
slog.append(f"JSON decode error: {e}")
return results, "\n".join(slog)
except Exception as e:
if debug:
slog.append(f"OpenAI API error: {e}")
logger.info("\n".join(slog))
return results, "\n".join(slog)
def _require_llm_config() -> Tuple[bool, str]:
"""Check if OpenAI API is properly configured"""
if os.getenv("OPENAI_API_KEY"):
return True, ""
return False, "Set OPENAI_API_KEY for LLM scoring"
def _extract_keywords(requirements: str, debug: bool = False) -> Tuple[str, str]:
"""Use LLM to extract 1-3 broad, concise keywords from free-form requirements."""
logs: List[str] = []
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
if not api_key:
return "", "Missing OPENAI_API_KEY"
try:
client = OpenAI(api_key=api_key, base_url=base_url)
prompt = (
"You extract compact search keywords.\n"
"Given a user's requirements, output a comma-separated list of up to 3 broad keywords (1-3 words each).\n"
"Keep them minimal, generic, and deduplicated.\n"
"Examples:\n"
"- 'Need an LLM project with Python and simple UI' -> LLM, Python, UI\n"
"- 'Data visualization challenges about dashboards' -> data visualization, dashboard\n"
f"Requirements: {requirements.strip()}\n"
"Return only the keywords, comma-separated."
)
if debug:
logs.append(f"KW prompt: {prompt[:500]}")
resp = client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a terse assistant. Return only keywords."},
{"role": "user", "content": prompt},
],
temperature=0.2,
timeout=20,
)
text = (resp.choices[0].message.content or "").strip()
# normalize to simple comma-separated
text = re.sub(r"\s*[,\n]\s*", ", ", text)
# keep at most 3
parts = [p.strip() for p in text.split(",") if p.strip()]
keywords = ", ".join(parts[:3])
if debug:
logs.append(f"KW extracted: {keywords}")
return keywords, "\n".join(logs)
except Exception as e:
if debug:
logs.append(f"KW error: {e}")
return "", "\n".join(logs)
def _filter_active(items: List[Challenge]) -> List[Challenge]:
"""Keep items that appear to be active/online based on tags or status hints."""
kept: List[Challenge] = []
for ch in items:
tags_lower = [t.lower() for t in ch.tags]
if any(t == "active" or t == "open" for t in tags_lower):
kept.append(ch)
else:
# If status is unknown, keep it (avoid over-filtering)
if not any(t in ("completed", "closed", "cancelled", "draft") for t in tags_lower):
kept.append(ch)
return kept
def _stars_for_score(score: float) -> str:
n = max(1, min(5, int(round(score * 5))))
return "★" * n + "☆" * (5 - n)
def run_query(requirements: str):
# Defaults for agent flow
min_prize = 0.0
days_ahead = 90
# Extract compact keywords from requirements
keyword, _ = _extract_keywords(requirements, debug=False)
# Always use MCP in production UI; fallback to local sample on error
items, err, _ = _mcp_list_challenges(debug=False)
if err:
items = FALLBACK_DATA
status = f"MCP fallback: {err}"
else:
status = "MCP OK"
# Apply time window first, then keyword/prize filtering
items = _filter_by_days(items, days_ahead)
items = _filter_active(items)
# Apply light shortlist by prize only; rely on AI scoring for relevance
filtered = shortlist(items, "", min_prize)
# Enforce LLM config for scoring
ok, cfg_msg = _require_llm_config()
if not ok:
status = f"{status} | LLM config required: {cfg_msg}"
else:
scored, _ = _score_items(filtered, keyword, debug=False)
if scored:
# Blend LLM relevance with normalized prize to unify ranking
# prize_norm in [0,1] across the current candidate set
prizes = [c.prize for c, _, _ in scored]
pmin = min(prizes) if prizes else 0.0
pmax = max(prizes) if prizes else 0.0
denom = (pmax - pmin) if (pmax - pmin) > 0 else 1.0
def prize_norm(v: float) -> float:
return max(0.0, min(1.0, (v - pmin) / denom))
alpha = 0.75 # relevance weight; 25% weight to prize
ranked = []
for c, s, r in scored:
cs = alpha * float(s) + (1 - alpha) * prize_norm(c.prize)
ranked.append((c, cs, r))
ranked.sort(key=lambda t: t[1], reverse=True)
filtered = [c for c, _, _ in ranked]
# If MCP returned items but filtering removed all (e.g., many items missing prize), show unfiltered
if not filtered and items:
filtered = items
status = f"{status} (no matches; showing unfiltered)"
plan_text, _ = _generate_plan_fixed(ranked, keyword, days_ahead, debug=False) if ok and 'ranked' in locals() and ranked else ("", "")
# Build a map from id to (score, reason) for star display
id_to_score_reason: dict[str, tuple[float, str]] = {}
if ok and 'ranked' in locals():
for c, s, r in ranked:
id_to_score_reason[c.id] = (s, r)
rows = []
for c in filtered:
s, r = id_to_score_reason.get(c.id, (0.0, ""))
stars = _stars_for_score(s)
rows.append([
c.title,
f"${c.prize:,.0f}",
c.deadline,
", ".join(c.tags),
stars,
(r[:160] + ("…" if len(r) > 160 else "")) if r else "",
c.id,
])
return rows, status, plan_text
with gr.Blocks(title="Topcoder Challenge Scout") as demo:
gr.Markdown("**Topcoder Challenge Scout** — agent picks tools, you provide requirements")
requirements = gr.Textbox(label="Requirements", placeholder="e.g. Looking for recent active LLM development challenges with web UI", lines=3)
gr.Markdown("Default filters: within last 90 days, active status. The agent extracts minimal keywords automatically.")
run_btn = gr.Button("Find challenges")
status = gr.Textbox(label="Status", interactive=False)
table = gr.Dataframe(headers=["Title", "Prize", "Deadline", "Tags", "Recommend", "AI Reason", "Id"], wrap=True)
plan_md = gr.Markdown("", label="Plan")
run_btn.click(
fn=run_query,
inputs=[requirements],
outputs=[table, status, plan_md],
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")))