Spaces:
Sleeping
Sleeping
Clean up: Remove docs, scripts, tests directories and simplify pyproject.toml and requirements.txt
fd77b1e
| import asyncio | |
| import gradio as gr | |
| from src.infrastructure.providers.llm_provider import LLMClient | |
| from src.domain.models.tool import ToolRequest | |
| from src.application.services.tool_service import ToolExecutor | |
| from src.application.services.prompt_service import PromptBuilder | |
| from config.system_prompts import topcoder_system_prompt | |
| from config.static_responses import REJECT_RESPONSE | |
| from config.prompt_templates import PromptTemplates | |
| from src.infrastructure.providers.response_filter import filter_tool_response | |
| from src.infrastructure.providers.long_field_manager import long_field_manager | |
| from src.mcp.client.manager import get_mcp_manager | |
| llm_client = LLMClient() | |
| tool_executor = ToolExecutor() | |
| prompt_builder = PromptBuilder() | |
| _mcp_triggered_once = False | |
| async def _ensure_mcp_session_once() -> None: | |
| global _mcp_triggered_once | |
| if _mcp_triggered_once: | |
| return | |
| try: | |
| print("MCP warm init: starting new session for first user message...") | |
| mcp = get_mcp_manager() | |
| await mcp.ensure_initialized() | |
| try: | |
| print(f"MCP session initialised for query: {mcp.mcp_session_id}") | |
| except Exception: | |
| print("MCP session initialised for query") | |
| _mcp_triggered_once = True | |
| except Exception: | |
| print("MCP warm init: failed to establish session; will retry on next message") | |
| return | |
| async def agent_response(user_message, history): | |
| # Trigger MCP session once on first incoming message | |
| await _ensure_mcp_session_once() | |
| # STEP 1: Dynamic tool/resource decision using compact specs | |
| decision_prompt = prompt_builder.build_tool_decision_prompt(user_message) | |
| decision_json = await llm_client.decide_tool(decision_prompt) | |
| # Handle both tool and resource decisions | |
| tool = decision_json.get("tool") | |
| resource = decision_json.get("resource") | |
| params = decision_json.get("params", {}) | |
| # Log LLM decision for debugging | |
| print(f"π LLM Decision - Tool: {tool}, Resource: {resource}") | |
| print(f"π Initial params from LLM decision: {params}") | |
| # For resources, also extract endpoint and method from top level | |
| if resource: | |
| endpoint = decision_json.get("endpoint") | |
| method = decision_json.get("method") | |
| if endpoint: | |
| params["endpoint"] = endpoint | |
| if method: | |
| params["method"] = method | |
| # STEP 2: Clean up parameters (remove empty values) | |
| if params: | |
| params = {k: v for k, v in params.items() if v and v != ""} | |
| # STEP 3: General chat | |
| if tool == "chat" or resource == "chat": | |
| return await llm_client.chat([ | |
| {"role": "system", "content": topcoder_system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ]) | |
| # STEP 4: Rejection | |
| if tool == "reject" or resource == "reject": | |
| return REJECT_RESPONSE | |
| # STEP 5: Determine what to execute | |
| available_tools = tool_executor.get_available_tools() | |
| available_resources = tool_executor.get_available_resources() | |
| print(f"π Available Tools: {available_tools}") | |
| print(f"π Available Resources: {available_resources}") | |
| if tool and tool in available_tools: | |
| # Execute tool | |
| target_name = tool | |
| target_type = "tool" | |
| print(f"π Selected Tool: {target_name}") | |
| elif resource and resource in available_resources: | |
| # Execute resource | |
| target_name = resource | |
| target_type = "resource" | |
| print(f"π Selected Resource: {target_name}") | |
| else: | |
| # Invalid selection | |
| invalid_name = tool or resource | |
| return f"β '{invalid_name}' not available. Available tools: {', '.join(available_tools)}, Available resources: {', '.join(available_resources)}" | |
| # STEP 6: Parameter extraction if needed | |
| if not params: | |
| if target_type == "tool": | |
| param_extraction_prompt = PromptTemplates.get_gradio_tool_param_extraction_prompt( | |
| target_name, user_message, tool_executor.get_tool_parameters(target_name) | |
| ) | |
| param_response = await llm_client.complete_json(param_extraction_prompt) | |
| params = param_response.get("params", {}) | |
| # Clean up extracted parameters too | |
| params = {k: v for k, v in params.items() if v and v != ""} | |
| else: # resource | |
| # For resources, we need to extract both endpoint parameters AND API parameters | |
| param_extraction_prompt = PromptTemplates.get_gradio_resource_param_extraction_prompt( | |
| target_name, user_message | |
| ) | |
| param_response = await llm_client.complete_json(param_extraction_prompt) | |
| extracted_params = param_response.get("params", {}) | |
| # Clean up extracted parameters | |
| extracted_params = {k: v for k, v in extracted_params.items() if v and v != ""} | |
| # Merge with existing params (endpoint and method from LLM decision) | |
| params.update(extracted_params) | |
| # Log extracted parameters for debugging | |
| print(f"π Extracted params for {target_name}: {extracted_params}") | |
| # STEP 7: Execute tool or resource using compact specs | |
| print(f"π Final params being sent to {target_name}: {params}") | |
| request = ToolRequest(tool=target_name, params=params) | |
| tool_result = await tool_executor.execute(request) | |
| if tool_result.status != "success": | |
| return f"β Error executing `{target_name}`: {tool_result.message}" | |
| # STEP 8: Filter response before summarizing | |
| filtered_data = filter_tool_response(target_name, tool_result.data) | |
| # Avoid hardcoding member-specific wrapping; rely on generic filter output | |
| # STEP 8b: Omit long fields by default unless the query explicitly asks for them. | |
| # Map target kind for config bucketing | |
| kind = "tools" if target_type == "tool" else "resources" | |
| filtered_or_redacted = long_field_manager.prepare_data_for_prompt( | |
| kind=kind, | |
| name=target_name, | |
| data=filtered_data, | |
| user_query=user_message, | |
| ) | |
| # Ensure we never return empty data due to redaction; keep a minimal summary fallback | |
| if not filtered_or_redacted or (isinstance(filtered_or_redacted, dict) and len(filtered_or_redacted.keys()) == 0): | |
| filtered_or_redacted = {"summary": "Data available with long fields omitted by default.", "context": context} | |
| # Build dynamic summarization prompt based on the target type | |
| if target_type == "tool": | |
| context = f"Tool used: {target_name}" | |
| else: | |
| context = f"Resource used: {target_name}" | |
| summarization_prompt = PromptTemplates.get_gradio_response_summarization_prompt( | |
| user_message, context, filtered_or_redacted | |
| ) | |
| final_response = await llm_client.chat([ | |
| {"role": "system", "content": "You are a helpful Topcoder assistant summarizing tool/resource outputs."}, | |
| {"role": "user", "content": summarization_prompt.strip()} | |
| ]) | |
| return final_response | |
| def launch_ui(): | |
| demo = gr.ChatInterface( | |
| fn=agent_response, | |
| title="π§ Topcoder MCP Agent", | |
| theme="soft", | |
| examples=["Active AI Challenges", "Help", "Tell me a joke"], | |
| type="messages" # Fix deprecation warning | |
| ) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| launch_ui() | |