"""Gradio application for the Music Store Multi-Agent System."""
import uuid
import time
import logging
import gradio as gr
from langchain_core.messages import HumanMessage, AIMessage
from src.config import settings
from src.db.database import verify_database
from src.agents.graph import build_graph
from src.ui.styles import CUSTOM_CSS
logger = logging.getLogger(__name__)
# Module-level state
_graph = None
_checkpointer = None
_store = None
def initialize():
global _graph, _checkpointer, _store
logger.info("Initializing Music Store Agent...")
db_health = verify_database()
if db_health.get("status") != "healthy":
logger.error(f"Database unhealthy: {db_health}")
else:
logger.info(f"Database healthy: {db_health['tables']}")
try:
_graph, _checkpointer, _store = build_graph(
model_name=settings.model_name,
temperature=settings.temperature,
openai_api_key=settings.openai_api_key or None,
openai_api_base=settings.openai_api_base or None,
)
logger.info("Agent graph built successfully.")
except Exception as e:
logger.error(f"Failed to build agent graph: {e}")
raise
def _status_html(status: str, message: str, tools_used: list = None) -> str:
colors = {
"success": "#10b981",
"error": "#ef4444",
"warning": "#f59e0b",
"waiting": "#6366f1",
"idle": "#6b7280",
}
icons = {
"success": "✓",
"error": "✗",
"warning": "⚠",
"waiting": "⏳",
"idle": "●",
}
color = colors.get(status, "#6b7280")
icon = icons.get(status, "●")
tools_text = ""
if tools_used:
tools_text = f" | Data sources: {', '.join(set(tools_used))}"
return (
f'
'
f'{icon}'
f'{message}{tools_text}'
f'
'
)
def reset_conversation() -> tuple:
new_thread = str(uuid.uuid4())
logger.info(f"Conversation reset. New thread_id={new_thread}")
return [], new_thread, _status_html("idle", "New conversation started")
def show_user_message(message, history, tid):
if not message.strip():
return history, "", tid, _status_html("idle", "Ready")
if not tid:
tid = str(uuid.uuid4())
history = history + [{"role": "user", "content": message}]
return history, "", tid, _status_html("waiting", "Processing...")
def generate_response(history, tid):
if not history:
return history, tid, _status_html("idle", "Ready")
user_message = None
for msg in reversed(history):
if msg.get("role") == "user":
user_message = msg["content"]
break
if not user_message:
return history, tid, _status_html("idle", "Ready")
if not _graph:
history.append({"role": "assistant", "content": "System not initialized. Please refresh the page."})
return history, tid, _status_html("error", "System not initialized")
config = {"configurable": {"thread_id": tid}}
try:
start_time = time.time()
input_state = {"messages": [HumanMessage(content=user_message)]}
final_response = None
tools_used = []
for event in _graph.stream(input_state, config=config, stream_mode="updates"):
for node_name, node_output in event.items():
logger.info(f"Graph event: node={node_name}")
if node_name in ("music_tool_node",):
tools_used.append("music_catalog")
elif node_name == "invoice_information_subagent":
tools_used.append("invoice_lookup")
if isinstance(node_output, dict) and "messages" in node_output:
for msg in node_output["messages"]:
if isinstance(msg, AIMessage) and msg.content:
final_response = msg.content
elapsed = time.time() - start_time
if final_response:
history.append({"role": "assistant", "content": final_response})
status_out = _status_html(
"success",
f"Responded in {elapsed:.1f}s",
tools_used=tools_used,
)
else:
snapshot = _graph.get_state(config)
if snapshot and hasattr(snapshot, "next") and snapshot.next:
state_messages = snapshot.values.get("messages", [])
for msg in reversed(state_messages):
if isinstance(msg, AIMessage) and msg.content:
final_response = msg.content
break
if final_response and not any(
h.get("content") == final_response for h in history if h.get("role") == "assistant"
):
history.append({"role": "assistant", "content": final_response})
status_out = _status_html("waiting", "Waiting for your input")
else:
history.append({
"role": "assistant",
"content": "I'm sorry, I wasn't able to generate a response. Please try rephrasing your question.",
})
status_out = _status_html("warning", "No response generated")
return history, tid, status_out
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
history.append({"role": "assistant", "content": "I encountered an error. Please try again."})
return history, tid, _status_html("error", str(e)[:100])
def create_app() -> gr.Blocks:
initialize()
with gr.Blocks(
title=settings.app_title,
css=CUSTOM_CSS,
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
neutral_hue="slate",
font=gr.themes.GoogleFont("Inter"),
),
) as app:
thread_id = gr.State(value="")
gr.HTML(
f"""
"""
)
chatbot = gr.Chatbot(
value=[],
type="messages",
height=480,
show_label=False,
avatar_images=(None, "https://api.dicebear.com/7.x/bottts/svg?seed=music"),
elem_classes=["chatbot-container"],
placeholder=(
"**Welcome!** Type your message below to get started.\n\n"
"Try: *\"My customer ID is 5\"* or *\"What rock albums do you have?\"*"
),
)
status = gr.HTML(
value=_status_html("idle", "Ready — type a message to begin"),
elem_classes=["status-bar"],
)
with gr.Row():
msg_input = gr.Textbox(
placeholder="Type your message here...",
show_label=False,
scale=6,
container=False,
autofocus=True,
)
send_btn = gr.Button("Send", variant="primary", scale=1, min_width=80)
with gr.Row():
reset_btn = gr.Button("New Conversation", size="sm", variant="secondary")
gr.HTML(
''
)
send_btn.click(
fn=show_user_message,
inputs=[msg_input, chatbot, thread_id],
outputs=[chatbot, msg_input, thread_id, status],
).then(
fn=generate_response,
inputs=[chatbot, thread_id],
outputs=[chatbot, thread_id, status],
)
msg_input.submit(
fn=show_user_message,
inputs=[msg_input, chatbot, thread_id],
outputs=[chatbot, msg_input, thread_id, status],
).then(
fn=generate_response,
inputs=[chatbot, thread_id],
outputs=[chatbot, thread_id, status],
)
reset_btn.click(
fn=reset_conversation,
inputs=[],
outputs=[chatbot, thread_id, status],
)
return app