_Bot / app.py
akashraut's picture
Update app.py
0792e6b verified
# app.py
import gradio as gr
import asyncio, json, os, sys
from dotenv import load_dotenv
from langchain_core.tools import Tool
from langchain_groq import ChatGroq
from langgraph.prebuilt import create_react_agent # Keep this (works fine)
load_dotenv()
# --- Compact MCP Client ---
class MCPClient:
"""Manages and communicates with a single tool server subprocess."""
def __init__(self, command: str, args: list):
self.process: asyncio.subprocess.Process = None
self._lock = asyncio.Lock()
self._cmd = [command] + args
self._req_id = 0
async def _send_request(self, method: str, params: dict = None) -> dict:
async with self._lock:
self._req_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self._req_id,
}
self.process.stdin.write(json.dumps(request).encode() + b"\n")
await self.process.stdin.drain()
while line := await self.process.stdout.readline():
response = json.loads(line)
if response.get("id") == self._req_id:
if "error" in response:
raise RuntimeError(f"Server error: {response['error']}")
return response["result"]
raise ConnectionError("Server process closed unexpectedly.")
async def get_tools(self) -> list[Tool]:
self.process = await asyncio.create_subprocess_exec(
*self._cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
tool_schemas = await self._send_request("discover")
return [
Tool(
name=s['name'],
description=s['description'],
func=None,
coroutine=self._create_tool_coro(s['name']),
args_schema=s['args_schema']
)
for s in tool_schemas
]
def _create_tool_coro(self, tool_name: str):
async def _tool_coro(tool_input):
return await self._send_request(
"execute", {"tool_name": tool_name, "tool_args": tool_input}
)
return _tool_coro
# --- Global Agent Executor ---
_agent_executor = None
async def get_agent_executor():
global _agent_executor
if _agent_executor is None:
if not os.getenv("GROQ_API_KEY"):
raise ValueError("GROQ_API_KEY secret not set.")
client = MCPClient(command=sys.executable, args=["server.py"])
tools = await client.get_tools()
model = ChatGroq(model="openai/gpt-oss-20b")
# Keep create_react_agent (compatible with your environment)
_agent_executor = create_react_agent(model, tools)
return _agent_executor
# --- Gradio Chat Logic ---
async def respond_to_chat(message: str, history: list):
agent = await get_agent_executor()
history_langchain_format = []
for human, ai in history:
history_langchain_format.append({"role": "user", "content": human})
history_langchain_format.append({"role": "assistant", "content": ai})
history_langchain_format.append({"role": "user", "content": message})
try:
response = await agent.ainvoke({"messages": history_langchain_format})
return response['messages'][-1].content
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return "Sorry, an error occurred while processing your request."
# --- UI ---
demo = gr.ChatInterface(
fn=respond_to_chat,
title="Gold & Silver AI Forecast",
description="Ask about live prices and future forecasts for gold and silver.",
examples=[
"What's the price of silver today?",
"Give me a 5-day forecast for gold."
]
)
if __name__ == "__main__":
demo.launch()