sekpona kokou
Rename app title to Data Analyst agent with MCP
138544a
import os
import re
import base64
from pathlib import Path
from dotenv import load_dotenv
import gradio as gr
from smolagents import ToolCallingAgent, AzureOpenAIModel
from smolagents.mcp_client import MCPClient
from tools import generate_chart, LAST_CHART, AGENT_STEPS
from prompt import AGENT_INSTRUCTIONS
# Config
BASE_DIR = Path(__file__).parent
load_dotenv(BASE_DIR.parent / ".env")
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "https://sitsope-mcp-server-test.hf.space/gradio_api/mcp/sse")
AZURE_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "https://collier-llm.openai.azure.com/")
AZURE_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-12-01-preview")
AZURE_MODEL = os.getenv("AZURE_OPENAI_MODEL", "gpt-4o")
SUGGESTIONS = [
"How many unique questions?",
"Sub-options per question",
"Questions above average sub-options",
"Distribution by question type",
"Which question has the most sub-options?",
"Percentage of rows per question",
]
# Keep CSS minimal to avoid HF Spaces iframe layout bugs
CSS = """
.gradio-container {
max-width: none !important;
width: 100% !important;
margin: 0 auto !important;
padding-left: 20px !important;
padding-right: 20px !important;
}
"""
# Agent setup
mcp_client = MCPClient({"url": MCP_SERVER_URL, "transport": "sse"}, structured_output=False)
tools = mcp_client.get_tools()
model = AzureOpenAIModel(
model_id=AZURE_MODEL,
azure_endpoint=AZURE_ENDPOINT,
api_key=AZURE_API_KEY,
api_version=AZURE_API_VER,
)
agent = ToolCallingAgent(
tools=[*tools, generate_chart],
model=model,
instructions=AGENT_INSTRUCTIONS,
)
# Agent runner
def run_agent(question: str):
LAST_CHART["path"] = None
AGENT_STEPS.clear()
response = agent.run(question)
if hasattr(agent, "memory") and hasattr(agent.memory, "steps"):
for step in agent.memory.steps:
tool_name = getattr(step, "tool_name", None) or getattr(step, "action", None)
tool_input = getattr(step, "tool_arguments", None) or getattr(step, "tool_input", "")
observation = getattr(step, "observations", None) or getattr(step, "observation", "")
if tool_name and tool_name != "final_answer":
input_str = str(tool_input)[:300] + ("…" if len(str(tool_input)) > 300 else "")
obs_str = str(observation)[:500] + ("…" if len(str(observation)) > 500 else "")
AGENT_STEPS.append((str(tool_name), input_str, obs_str))
chart_path = LAST_CHART["path"]
if chart_path is None:
match = re.search(r"(chart_[^\s]+\.png)", str(response))
if match:
chart_path = match.group(1)
return str(response), chart_path
# Event handlers
def img_to_base64(path: str) -> str:
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode()
def ask_agent(question: str, history: list):
history = history or []
if not question.strip():
return history, gr.update(visible=(len(history) == 0)), ""
response, chart_path = run_agent(question)
assistant_content = response
if chart_path and os.path.exists(chart_path):
b64 = img_to_base64(chart_path)
assistant_content = f"![chart](data:image/png;base64,{b64})\n\n{response}"
updated_history = history + [
{"role": "user", "content": question},
{"role": "assistant", "content": assistant_content},
]
return updated_history, gr.update(visible=False), ""
def new_chat():
return [], gr.update(visible=True), ""
# Gradio app
with gr.Blocks(title="Data Analyst agent with MCP", fill_height=True) as demo:
with gr.Row(equal_height=True):
with gr.Column(scale=1, min_width=260):
gr.Markdown("## Data Analyst agent with MCP")
new_btn = gr.Button("+ New chat", variant="secondary")
gr.Markdown("### Suggestions")
suggestion_buttons = []
for text in SUGGESTIONS:
suggestion_buttons.append(gr.Button(text, size="sm"))
with gr.Column(scale=4):
welcome = gr.Markdown(
"## How can I help you today?\nAsk anything about your survey data and I will generate stats and charts.",
visible=True,
)
chatbot = gr.Chatbot(
show_label=False,
avatar_images=(None, "https://huggingface.co/front/assets/huggingface_logo-noborder.svg"),
height=620,
)
with gr.Row():
question_box = gr.Textbox(
placeholder="Ask anything about the Stack Overflow survey…",
show_label=False,
lines=1,
max_lines=6,
scale=8,
)
send_btn = gr.Button("Send", variant="primary", scale=1, min_width=90)
gr.Markdown("Generated content may be inaccurate or false.")
send_btn.click(ask_agent, inputs=[question_box, chatbot], outputs=[chatbot, welcome, question_box])
question_box.submit(ask_agent, inputs=[question_box, chatbot], outputs=[chatbot, welcome, question_box])
new_btn.click(new_chat, outputs=[chatbot, welcome, question_box])
for button, text in zip(suggestion_buttons, SUGGESTIONS):
button.click(lambda t=text: t, outputs=question_box)
if __name__ == "__main__":
demo.launch(
theme=gr.themes.Soft(),
css=CSS,
server_name="0.0.0.0",
server_port=int(os.getenv("PORT", 7860)),
share=False,
)