| | |
| |
|
| | import gradio as gr |
| | import asyncio, json, os, sys |
| | from dotenv import load_dotenv |
| | from langchain_core.tools import Tool |
| | from langchain_groq import ChatGroq |
| | from langgraph.prebuilt import create_react_agent |
| |
|
| | load_dotenv() |
| |
|
| | |
| | class MCPClient: |
| | """Manages and communicates with a single tool server subprocess.""" |
| |
|
| | def __init__(self, command: str, args: list): |
| | self.process: asyncio.subprocess.Process = None |
| | self._lock = asyncio.Lock() |
| | self._cmd = [command] + args |
| | self._req_id = 0 |
| |
|
| | async def _send_request(self, method: str, params: dict = None) -> dict: |
| | async with self._lock: |
| | self._req_id += 1 |
| | request = { |
| | "jsonrpc": "2.0", |
| | "method": method, |
| | "params": params or {}, |
| | "id": self._req_id, |
| | } |
| | self.process.stdin.write(json.dumps(request).encode() + b"\n") |
| | await self.process.stdin.drain() |
| |
|
| | while line := await self.process.stdout.readline(): |
| | response = json.loads(line) |
| | if response.get("id") == self._req_id: |
| | if "error" in response: |
| | raise RuntimeError(f"Server error: {response['error']}") |
| | return response["result"] |
| | raise ConnectionError("Server process closed unexpectedly.") |
| |
|
| | async def get_tools(self) -> list[Tool]: |
| | self.process = await asyncio.create_subprocess_exec( |
| | *self._cmd, |
| | stdin=asyncio.subprocess.PIPE, |
| | stdout=asyncio.subprocess.PIPE |
| | ) |
| | tool_schemas = await self._send_request("discover") |
| | return [ |
| | Tool( |
| | name=s['name'], |
| | description=s['description'], |
| | func=None, |
| | coroutine=self._create_tool_coro(s['name']), |
| | args_schema=s['args_schema'] |
| | ) |
| | for s in tool_schemas |
| | ] |
| |
|
| | def _create_tool_coro(self, tool_name: str): |
| | async def _tool_coro(tool_input): |
| | return await self._send_request( |
| | "execute", {"tool_name": tool_name, "tool_args": tool_input} |
| | ) |
| | return _tool_coro |
| |
|
| |
|
| | |
| | _agent_executor = None |
| |
|
| | async def get_agent_executor(): |
| | global _agent_executor |
| | if _agent_executor is None: |
| | if not os.getenv("GROQ_API_KEY"): |
| | raise ValueError("GROQ_API_KEY secret not set.") |
| | |
| | client = MCPClient(command=sys.executable, args=["server.py"]) |
| | tools = await client.get_tools() |
| |
|
| | model = ChatGroq(model="openai/gpt-oss-20b") |
| | |
| | |
| | _agent_executor = create_react_agent(model, tools) |
| |
|
| | return _agent_executor |
| |
|
| |
|
| | |
| | async def respond_to_chat(message: str, history: list): |
| | agent = await get_agent_executor() |
| |
|
| | history_langchain_format = [] |
| | for human, ai in history: |
| | history_langchain_format.append({"role": "user", "content": human}) |
| | history_langchain_format.append({"role": "assistant", "content": ai}) |
| |
|
| | history_langchain_format.append({"role": "user", "content": message}) |
| |
|
| | try: |
| | response = await agent.ainvoke({"messages": history_langchain_format}) |
| | return response['messages'][-1].content |
| | except Exception as e: |
| | print(f"ERROR: {e}", file=sys.stderr) |
| | return "Sorry, an error occurred while processing your request." |
| |
|
| |
|
| | |
| | demo = gr.ChatInterface( |
| | fn=respond_to_chat, |
| | title="Gold & Silver AI Forecast", |
| | description="Ask about live prices and future forecasts for gold and silver.", |
| | examples=[ |
| | "What's the price of silver today?", |
| | "Give me a 5-day forecast for gold." |
| | ] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|