# app.py import gradio as gr import asyncio, json, os, sys from dotenv import load_dotenv from langchain_core.tools import Tool from langchain_groq import ChatGroq from langgraph.prebuilt import create_react_agent # Keep this (works fine) load_dotenv() # --- Compact MCP Client --- class MCPClient: """Manages and communicates with a single tool server subprocess.""" def __init__(self, command: str, args: list): self.process: asyncio.subprocess.Process = None self._lock = asyncio.Lock() self._cmd = [command] + args self._req_id = 0 async def _send_request(self, method: str, params: dict = None) -> dict: async with self._lock: self._req_id += 1 request = { "jsonrpc": "2.0", "method": method, "params": params or {}, "id": self._req_id, } self.process.stdin.write(json.dumps(request).encode() + b"\n") await self.process.stdin.drain() while line := await self.process.stdout.readline(): response = json.loads(line) if response.get("id") == self._req_id: if "error" in response: raise RuntimeError(f"Server error: {response['error']}") return response["result"] raise ConnectionError("Server process closed unexpectedly.") async def get_tools(self) -> list[Tool]: self.process = await asyncio.create_subprocess_exec( *self._cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE ) tool_schemas = await self._send_request("discover") return [ Tool( name=s['name'], description=s['description'], func=None, coroutine=self._create_tool_coro(s['name']), args_schema=s['args_schema'] ) for s in tool_schemas ] def _create_tool_coro(self, tool_name: str): async def _tool_coro(tool_input): return await self._send_request( "execute", {"tool_name": tool_name, "tool_args": tool_input} ) return _tool_coro # --- Global Agent Executor --- _agent_executor = None async def get_agent_executor(): global _agent_executor if _agent_executor is None: if not os.getenv("GROQ_API_KEY"): raise ValueError("GROQ_API_KEY secret not set.") client = MCPClient(command=sys.executable, args=["server.py"]) tools = await client.get_tools() model = ChatGroq(model="openai/gpt-oss-20b") # Keep create_react_agent (compatible with your environment) _agent_executor = create_react_agent(model, tools) return _agent_executor # --- Gradio Chat Logic --- async def respond_to_chat(message: str, history: list): agent = await get_agent_executor() history_langchain_format = [] for human, ai in history: history_langchain_format.append({"role": "user", "content": human}) history_langchain_format.append({"role": "assistant", "content": ai}) history_langchain_format.append({"role": "user", "content": message}) try: response = await agent.ainvoke({"messages": history_langchain_format}) return response['messages'][-1].content except Exception as e: print(f"ERROR: {e}", file=sys.stderr) return "Sorry, an error occurred while processing your request." # --- UI --- demo = gr.ChatInterface( fn=respond_to_chat, title="Gold & Silver AI Forecast", description="Ask about live prices and future forecasts for gold and silver.", examples=[ "What's the price of silver today?", "Give me a 5-day forecast for gold." ] ) if __name__ == "__main__": demo.launch()