Talker / app.py
BolyosCsaba
plural for Manifests
2ae4955
"""
Talker - OFP Chat Agent with AI Integration
Architecture (mirrors OFPBadWord pattern):
1. Create a FastAPI app.
2. Register /manifest and /ofp directly on it β€” no routing hacks needed.
3. Define the Gradio UI.
4. Mount Gradio inside FastAPI with gr.mount_gradio_app(app, demo, path="/").
5. Serve with uvicorn.
No ZeroGPU: Qwen3-0.6B with thinking disabled and max_tokens=300 is fast
enough on CPU (~10-20 s). ZeroGPU's GPU-acquisition latency often exceeds the
inference time for a tiny model.
"""
import gradio as gr
import os
import logging
import yaml
import json
import uuid
import uvicorn
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from src.chat_agent import ChatAgent
from src.llm_client import LLMClient
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
CONFIG_FILE = 'config/config.yaml'
try:
with open(CONFIG_FILE, 'r') as f:
config = yaml.safe_load(f)
logger.info("Configuration loaded successfully")
except FileNotFoundError:
logger.warning("Config file not found, using defaults")
config = {
'agent': {
'speaker_uri': 'tag:talker.service,2025:agent-01',
'service_url': 'https://bladeszasza-talker.hf.space/ofp',
'convener_uri': 'tag:convener.service,2025:default',
'convener_url': 'https://convener-service.com/ofp'
},
'llm': {
'model': 'Qwen/Qwen3-0.6B',
'max_tokens': 300,
'temperature': 0.7,
'system_prompt': (
'You are Talker, a friendly and witty conversational AI in an Open Floor Protocol '
'multi-agent conversation. Keep replies short and natural β€” 1 to 3 sentences. '
'Be warm, direct, and engaging. Do not explain your reasoning; just respond like '
'a person in a real conversation.'
)
},
'conversation': {
'auto_respond': True,
'max_history': 20
},
'ui': {
'title': 'πŸ’¬ Talker - OFP Chat Agent',
'theme': 'soft',
'show_debug_panel': True,
'show_settings': True
}
}
# ---------------------------------------------------------------------------
# LLM + Agent
# ---------------------------------------------------------------------------
llm_client = LLMClient(
model=config['llm'].get('model', 'Qwen/Qwen3-0.6B'),
system_prompt=config['llm'].get('system_prompt')
)
agent = ChatAgent(
speaker_uri=config['agent']['speaker_uri'],
service_url=config['agent']['service_url'],
llm_client=llm_client,
convener_uri=config['agent'].get('convener_uri'),
convener_url=config['agent'].get('convener_url')
)
# ---------------------------------------------------------------------------
# FastAPI app β€” OFP endpoints registered here, Gradio mounted later
# ---------------------------------------------------------------------------
app = FastAPI()
def _build_manifest() -> dict:
return {
"identification": {
"speakerUri": config['agent']['speaker_uri'],
"serviceUrl": config['agent']['service_url'],
"conversationalName": "Talker AI Assistant",
"role": "Conversational Agent",
"synopsis": "AI-powered conversational assistant using Qwen3 for Open Floor Protocol"
},
"capabilities": [{
"keyphrases": ["chat", "conversation", "AI assistant", "questions", "help", "talk"],
"supportedLayers": ["text"],
"descriptions": ["Engages in natural conversations and answers questions using Qwen3"]
}]
}
@app.get("/manifest")
async def get_manifest():
return JSONResponse(content=_build_manifest())
@app.post("/ofp")
async def receive_ofp_envelope(envelope: dict):
try:
logger.info(f"Received OFP envelope: {json.dumps(envelope, indent=2)}")
if "openFloor" not in envelope:
return JSONResponse(
content={"status": "error", "message": "Invalid OFP envelope β€” missing 'openFloor' key"},
status_code=400
)
ofp_data = envelope["openFloor"]
events = ofp_data.get("events", [])
conversation = ofp_data.get("conversation", {})
for event in events:
event_type = event.get("eventType")
logger.info(f"Processing event: {event_type}")
# ── getManifests ──────────────────────────────────────────────
if event_type == "getManifests":
return JSONResponse(content={
"openFloor": {
"schema": {"version": "1.0.0"},
"conversation": conversation,
"sender": {
"speakerUri": config['agent']['speaker_uri'],
"serviceUrl": config['agent']['service_url']
},
"events": [{
"eventType": "publishManifests",
"to": event.get("to", {}),
"parameters": {"servicingManifests": [_build_manifest()]}
}]
}
})
# ── utterance ─────────────────────────────────────────────────
elif event_type == "utterance":
params = event.get("parameters", {})
dialog_event = params.get("dialogEvent", {})
features = dialog_event.get("features", {})
tokens = features.get("text", {}).get("tokens", [])
text = ' '.join(t.get('value', '') for t in tokens).strip()
speaker_uri = dialog_event.get("speakerUri", "unknown")
logger.info(f"Utterance from {speaker_uri}: {text}")
if speaker_uri == config['agent']['speaker_uri']:
logger.info("Ignoring own message")
continue
if not text:
continue
messages = [{"role": "system", "content": config['llm'].get('system_prompt', '')}]
messages.extend(agent.conversation_history)
messages.append({"role": "user", "content": text})
response_text = llm_client.generate_response_from_messages(
messages=messages,
max_tokens=config['llm'].get('max_tokens', 300),
temperature=config['llm'].get('temperature', 0.7)
)
if response_text.startswith("Sorry, I encountered an error:"):
logger.error(f"LLM error: {response_text}")
continue
agent.conversation_history.append({"role": "user", "content": text})
agent.conversation_history.append({"role": "assistant", "content": response_text})
agent.messages_processed += 1
agent.responses_sent += 1
if config['conversation'].get('auto_respond', True):
return JSONResponse(content={
"openFloor": {
"schema": {"version": "1.0.0"},
"conversation": conversation,
"sender": {
"speakerUri": config['agent']['speaker_uri'],
"serviceUrl": config['agent']['service_url']
},
"events": [{
"eventType": "utterance",
"to": event.get("to", {}),
"parameters": {
"dialogEvent": {
"id": f"de:{uuid.uuid4()}",
"speakerUri": config['agent']['speaker_uri'],
"span": {
"startTime": datetime.now(timezone.utc)
.isoformat().replace('+00:00', 'Z')
},
"features": {
"text": {
"mimeType": "text/plain",
"tokens": [{"value": response_text, "confidence": 1.0}]
}
}
}
}
}]
}
})
# Acknowledge unhandled events
return JSONResponse(content={
"openFloor": {
"schema": {"version": "1.0.0"},
"conversation": conversation,
"sender": {
"speakerUri": config['agent']['speaker_uri'],
"serviceUrl": config['agent']['service_url']
},
"events": []
}
})
except Exception as e:
logger.error(f"Error processing OFP envelope: {e}", exc_info=True)
return JSONResponse(
content={"status": "error", "message": str(e)},
status_code=500
)
# ---------------------------------------------------------------------------
# Gradio helpers
# ---------------------------------------------------------------------------
def chat_response(message: str, history: list) -> str:
messages = [{"role": "system", "content": config['llm'].get('system_prompt', '')}]
for user_msg, assistant_msg in history:
messages.append({"role": "user", "content": user_msg})
messages.append({"role": "assistant", "content": assistant_msg})
messages.append({"role": "user", "content": message})
response = llm_client.generate_response_from_messages(
messages=messages,
max_tokens=config['llm'].get('max_tokens', 300),
temperature=config['llm'].get('temperature', 0.7)
)
agent.conversation_history.append({"role": "user", "content": message})
agent.conversation_history.append({"role": "assistant", "content": response})
max_hist = config['conversation'].get('max_history', 20) * 2
if len(agent.conversation_history) > max_hist:
agent.conversation_history = agent.conversation_history[-max_hist:]
return response
def get_status_info() -> str:
s = agent.get_status()
return (
f"**Agent Status:**\n"
f"- Speaker URI: `{s['speaker_uri']}`\n"
f"- Messages Processed: {s['messages_processed']}\n"
f"- Responses Sent: {s['responses_sent']}\n"
f"- History Length: {s['history_length']}\n"
f"- Conversation ID: `{s.get('conversation_id', 'N/A')}`"
)
def get_recent_logs() -> str:
logs = agent.get_status().get('recent_logs', [])
return "\n".join(logs) if logs else "No recent activity"
def clear_conversation() -> str:
agent.clear_history()
return "Conversation history cleared!"
def get_ofp_info() -> str:
base = config['agent']['service_url'].replace('/ofp', '')
return (
"### OFP Endpoints\n\n"
f"**Manifest URL:** `{base}/manifest`\n\n"
f"**OFP Endpoint:** `{base}/ofp`\n\n"
"**Test with curl:**\n"
"```bash\n"
f"# Health check\n"
f"curl {base}/manifest\n\n"
f"# Send utterance\n"
f"curl -X POST {base}/ofp \\\n"
" -H 'Content-Type: application/json' \\\n"
""" -d '{"openFloor":{"schema":{"version":"1.0.0"},"conversation":{"id":"conv:test"},"sender":{"speakerUri":"tag:test,2025:user"},"events":[{"eventType":"utterance","parameters":{"dialogEvent":{"id":"de:test","speakerUri":"tag:test,2025:user","span":{"startTime":"2025-01-01T00:00:00Z"},"features":{"text":{"mimeType":"text/plain","tokens":[{"value":"Hello!"}]}}}}}]}}'\n"""
"```"
)
# ---------------------------------------------------------------------------
# CSS
# ---------------------------------------------------------------------------
custom_css = """
.gradio-container {
font-family: 'Inter', system-ui, -apple-system, sans-serif !important;
max-width: 1400px !important; margin: 0 auto !important;
}
.hero-header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 3rem 2rem; border-radius: 16px; margin-bottom: 2rem;
box-shadow: 0 10px 40px rgba(102,126,234,0.3); text-align: center;
}
.hero-header h1 { color: white !important; font-size: 2.75rem !important; font-weight: 700 !important; }
.hero-subtitle { color: rgba(255,255,255,0.95) !important; font-size: 1.15rem !important; }
.hero-stats { color: rgba(255,255,255,0.9) !important; font-weight: 600 !important; margin-top: 1rem !important; }
.status-card {
background: linear-gradient(135deg, #f7fafc 0%, #edf2f7 100%);
border-radius: 12px; padding: 1.5rem; text-align: center;
border: 2px solid #e2e8f0; transition: all 0.3s ease;
}
.status-card:hover { transform: translateY(-2px); box-shadow: 0 4px 12px rgba(0,0,0,0.1); border-color: #667eea; }
button { border-radius: 8px !important; font-weight: 600 !important; }
"""
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
with gr.Blocks(
title=config['ui'].get('title', 'Talker'),
theme=gr.themes.Soft(primary_hue="purple", secondary_hue="blue", neutral_hue="slate"),
css=custom_css
) as demo:
gr.HTML(f"""
<div class="hero-header">
<h1>πŸ’¬ Talker AI</h1>
<p class="hero-subtitle">Intelligent conversational agent powered by Qwen3 for Open Floor Protocol</p>
<p class="hero-stats">πŸ€– {config['llm']['model']} | 🌐 OFP v1.0.0 | πŸ–₯️ CPU inference</p>
</div>
""")
with gr.Tab("πŸ’¬ Chat"):
gr.ChatInterface(
fn=chat_response,
examples=[
"Hello! How are you?",
"What is the Open Floor Protocol?",
"Explain quantum computing in simple terms",
"Tell me a joke!",
],
cache_examples=False
)
if config['ui'].get('show_debug_panel', True):
with gr.Tab("πŸ” Status & Debug"):
gr.Markdown("### πŸ“Š Real-time Agent Status")
with gr.Row(equal_height=True):
gr.HTML('<div class="status-card"><div style="font-size:2rem">βœ…</div><div style="font-size:0.875rem;color:#718096;margin:0.5rem 0">Status</div><div style="font-size:1.5rem;font-weight:700;color:#2d3748">Active</div></div>')
gr.HTML(f'<div class="status-card"><div style="font-size:2rem">πŸ€–</div><div style="font-size:0.875rem;color:#718096;margin:0.5rem 0">Model</div><div style="font-size:1.25rem;font-weight:700;color:#2d3748">{config["llm"]["model"]}</div></div>')
gr.HTML('<div class="status-card"><div style="font-size:2rem">🌐</div><div style="font-size:0.875rem;color:#718096;margin:0.5rem 0">Protocol</div><div style="font-size:1.25rem;font-weight:700;color:#2d3748">OFP v1.0.0</div></div>')
gr.Markdown("---")
with gr.Row():
with gr.Column():
gr.Markdown("#### πŸ“ˆ Agent Metrics")
status_display = gr.Markdown(get_status_info())
gr.Button("πŸ”„ Refresh Status", variant="secondary").click(fn=get_status_info, outputs=status_display)
with gr.Column():
gr.Markdown("#### πŸ“ Activity Logs")
logs_display = gr.Textbox(value=get_recent_logs(), lines=10, max_lines=20, show_label=False)
gr.Button("πŸ”„ Refresh Logs", variant="secondary").click(fn=get_recent_logs, outputs=logs_display)
gr.Markdown("---")
clear_result = gr.Textbox(interactive=False, show_label=False)
gr.Button("πŸ—‘οΈ Clear Conversation History", variant="stop").click(fn=clear_conversation, outputs=clear_result)
if config['ui'].get('show_settings', True):
with gr.Tab("βš™οΈ Configuration"):
gr.Markdown(get_ofp_info())
gr.Markdown(f"---\n**OFP v1.0.0** | Built with [Gradio](https://gradio.app) | Powered by {config['llm']['model']}")
# ---------------------------------------------------------------------------
# Entry point β€” mount Gradio inside FastAPI, serve with uvicorn
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("\n" + "=" * 60)
print("πŸ’¬ Talker - OFP Chat Agent")
print("=" * 60)
print(f"Model: {config['llm']['model']}")
print(f"Speaker URI: {config['agent']['speaker_uri']}")
print(f"\nπŸ“‘ OFP endpoints:")
print(f" GET /manifest")
print(f" POST /ofp")
print("=" * 60 + "\n")
app_with_gradio = gr.mount_gradio_app(app, demo, path="/")
uvicorn.run(app_with_gradio, host="0.0.0.0", port=7860, log_level="info")