Nanny7's picture
feat: Phase 5 complete - Full stack AI Todo application with Dapr and Kafka
5c244a3
"""
Chatbot Service - Phase IV with Gordon Agent Integration
FastAPI service using cagent (Gordon) for NLP instead of Qwen
"""
from fastapi import FastAPI, WebSocket, HTTPException, Header, WebSocketDisconnect
from pydantic import BaseModel
import httpx
import os
import logging
import subprocess
import json
import re
from typing import Optional
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Gordon Todo Chatbot Service", version="2.0.0")
# Environment variables
BACKEND_API_URL = os.getenv("BACKEND_API_URL", "http://backend-service:8000")
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
if not QWEN_API_KEY:
logger.warning("QWEN_API_KEY not set. Qwen API will not work!")
class ChatRequest(BaseModel):
message: str
user_token: Optional[str] = None
class GordonAgent:
"""Hybrid Agent: Qwen API with Ollama fallback"""
QWEN_API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
OLLAMA_API_URL = os.getenv("OLLAMA_API_URL", "http://todo-ollama:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:0.5b")
@staticmethod
async def parse_message(message: str) -> dict:
"""
Parse user message into structured todo action
Tries Qwen API first, falls back to Ollama if unavailable
Returns: {"action": "CREATE/DELETE/LIST/UPDATE", "data": {...}}
"""
prompt = f"""You are a todo task manager. Parse user messages and extract the action.
ONLY output valid JSON. No other text or explanations.
Actions:
1. CREATE: User wants to add/create/make a new todo
Keywords: "task", "todo", "add", "create", "remind", "urgent", "important"
Output: {{"action": "CREATE", "todos": [{{"title": "task description", "priority": "HIGH/MEDIUM/LOW"}}]}}
2. DELETE: User wants to remove/delete a todo
Keywords: "delete", "remove", "forget", "erase"
Output: {{"action": "DELETE", "title": "task name"}}
3. LIST: User wants to see/list their todos
Keywords: "show", "list", "get", "my tasks", "all todos"
Output: {{"action": "LIST"}}
4. UPDATE: User wants to mark a todo as complete or change it
Keywords: "mark done", "complete", "finish", "update"
Output: {{"action": "UPDATE", "title": "task name", "status": "completed"}}
Priority detection rules:
- "urgent", "important", "critical" β†’ HIGH
- "soon", "later" β†’ MEDIUM
- Default β†’ LOW
Examples:
Input: "urgent task fix the bug"
Output: {{"action": "CREATE", "todos": [{{"title": "fix the bug", "priority": "HIGH"}}]}}
Input: "add task buy milk"
Output: {{"action": "CREATE", "todos": [{{"title": "buy milk", "priority": "LOW"}}]}}
Input: "delete task buy milk"
Output: {{"action": "DELETE", "title": "buy milk"}}
Input: "show my tasks"
Output: {{"action": "LIST"}}
Input: "mark done call mom"
Output: {{"action": "UPDATE", "title": "call mom", "status": "completed"}}
Remember: ONLY return valid JSON, no markdown, no code blocks, no explanations.
User message: {message}
Output:"""
def extract_json(output: str) -> dict:
"""Extract JSON from LLM output"""
# Try to find JSON with "action" key
json_match = re.search(r'\{[^{}]*"action"[^{}]*\}', output, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except:
pass
# Try parsing entire output as JSON
try:
return json.loads(output)
except json.JSONDecodeError:
# If model outputs Chinese wrapper, try to extract English JSON
if 'ζ“δ½œζˆεŠŸ' in output or '"code"' in output:
# Model returned API response format instead of pure JSON
# Fallback to rule-based extraction
return None
return None
def rule_based_parse(message: str) -> dict:
"""Simple rule-based parser as final fallback"""
msg_lower = message.lower().strip()
# LIST
if any(word in msg_lower for word in ['show', 'list', 'get my', 'all todos', 'my tasks']):
return {"action": "LIST"}
# DELETE
if any(word in msg_lower for word in ['delete', 'remove']):
# Extract title after the keyword
for keyword in ['delete ', 'remove ']:
if keyword in msg_lower:
title = msg_lower.split(keyword)[1].strip()
return {"action": "DELETE", "title": title}
return {"action": "DELETE", "title": message.split()[-1]}
# UPDATE / MARK DONE
if any(word in msg_lower for word in ['mark done', 'complete', 'finish']):
# Extract title
for keyword in ['mark done ', 'done ', 'complete ', 'finish ']:
if keyword in msg_lower:
title = msg_lower.split(keyword)[1].strip()
return {"action": "UPDATE", "title": title, "status": "completed"}
return {"action": "UPDATE", "title": message.split()[-1], "status": "completed"}
# CREATE (default)
# Determine priority
priority = "LOW"
if any(word in msg_lower for word in ['urgent', 'important', 'critical']):
priority = "HIGH"
elif any(word in msg_lower for word in ['soon', 'later']):
priority = "MEDIUM"
# Extract title - remove common keywords
title = msg_lower
for keyword in ['task ', 'todo ', 'add ', 'create ', 'make ', 'urgent ', 'important ']:
if keyword in title:
title = title.replace(keyword, '', 1)
title = title.strip() or message
return {"action": "CREATE", "todos": [{"title": title, "priority": priority}]}
# Try Qwen API first
if QWEN_API_KEY:
try:
logger.info(f"Trying Qwen API with message: {message[:100]}...")
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
f"{GordonAgent.QWEN_API_URL}/chat/completions",
headers={
"Authorization": f"Bearer {QWEN_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "qwen-plus",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0,
"max_tokens": 512
}
)
if response.status_code == 200:
result = response.json()
output = result.get("choices", [{}])[0].get("message", {}).get("content", "").strip()
logger.info(f"Qwen raw output: {output[:500]}")
parsed = extract_json(output)
if parsed:
logger.info(f"Parsed intent from Qwen: {parsed}")
return parsed
else:
logger.warning(f"Qwen API returned {response.status_code}, trying Ollama fallback")
except Exception as e:
logger.warning(f"Qwen API failed: {e}, trying Ollama fallback")
# Fallback to Ollama
try:
logger.info(f"Using Ollama with message: {message[:100]}...")
async with httpx.AsyncClient(timeout=90.0) as client:
response = await client.post(
f"{GordonAgent.OLLAMA_API_URL}/api/generate",
json={
"model": GordonAgent.OLLAMA_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0}
}
)
if response.status_code == 200:
output = response.json().get("response", "").strip()
logger.info(f"Ollama raw output: {output[:500]}")
parsed = extract_json(output)
if parsed:
logger.info(f"Parsed intent from Ollama: {parsed}")
return parsed
else:
logger.error(f"Ollama error: {response.status_code}")
return {"error": f"Ollama error: {response.status_code}"}
except Exception as e:
logger.warning(f"Ollama exception: {e}, using rule-based fallback")
# Final fallback: rule-based parser
logger.info("Using rule-based parser")
return rule_based_parse(message)
async def call_backend(intent: dict, user_token: str):
"""Call backend API based on parsed intent"""
headers = {"Content-Type": "application/json"}
if user_token:
headers["Authorization"] = f"Bearer {user_token}"
async with httpx.AsyncClient(timeout=60.0) as client:
action = intent.get("action", "").upper()
# CREATE TODO
if action == "CREATE":
todos = intent.get("todos", [])
created = []
for todo_data in todos:
logger.info(f"Creating todo: {todo_data}")
payload = {
"title": todo_data.get("title", "Untitled"),
"priority": todo_data.get("priority", "MEDIUM").lower()
}
try:
response = await client.post(
f"{BACKEND_API_URL}/api/todos/",
json=payload,
headers=headers
)
if response.status_code >= 400:
logger.error(f"Backend error: {response.status_code} - {response.text[:200]}")
return {"error": f"Backend error: {response.status_code}"}
created.append(response.json())
except Exception as e:
logger.error(f"Create todo exception: {e}")
return {"error": str(e)}
return {"created": created, "count": len(created)}
# LIST TODOS
elif action == "LIST":
try:
response = await client.get(
f"{BACKEND_API_URL}/api/todos/",
headers=headers
)
if response.status_code >= 400:
return {"error": f"Backend error: {response.status_code}"}
return response.json()
except Exception as e:
return {"error": str(e)}
# DELETE TODO
elif action == "DELETE":
title = intent.get("title", "").lower()
# First, get all todos to find matching ones
try:
response = await client.get(
f"{BACKEND_API_URL}/api/todos/",
headers=headers
)
if response.status_code >= 400:
return {"error": f"Backend error: {response.status_code}"}
todos = response.json()
# Find todos with matching title
deleted = []
for todo in todos:
if title in todo.get("title", "").lower():
del_response = await client.delete(
f"{BACKEND_API_URL}/api/todos/{todo['id']}/",
headers=headers
)
if del_response.status_code < 400:
deleted.append(todo)
return {"deleted": deleted, "count": len(deleted)}
except Exception as e:
return {"error": str(e)}
# UPDATE TODO
elif action == "UPDATE":
title = intent.get("title", "").lower()
status = intent.get("status", "completed")
# Get all todos and find matching ones
try:
response = await client.get(
f"{BACKEND_API_URL}/api/todos/",
headers=headers
)
if response.status_code >= 400:
return {"error": f"Backend error: {response.status_code}"}
todos = response.json()
# Update matching todos
updated = []
for todo in todos:
if title in todo.get("title", "").lower():
update_response = await client.put(
f"{BACKEND_API_URL}/api/todos/{todo['id']}",
json={"status": status},
headers=headers
)
if update_response.status_code < 400:
updated.append(update_response.json())
return {"updated": updated, "count": len(updated)}
except Exception as e:
return {"error": str(e)}
else:
return {"error": "Unknown action"}
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "Hybrid Todo Chatbot",
"version": "2.3.0",
"agent": "Qwen API + Ollama fallback",
"status": "running",
"endpoints": {
"health": "/api/health",
"chat": "/api/chat",
"websocket": "/ws/chat/{token}"
}
}
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
qwen_status = "ok" if QWEN_API_KEY else "not_configured"
# Check Ollama
ollama_status = "unknown"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{GordonAgent.OLLAMA_API_URL}/api/tags")
ollama_status = "ok" if response.status_code == 200 else "error"
except:
ollama_status = "not_available"
return {
"status": "healthy",
"service": "hybrid-chatbot",
"providers": {
"qwen": {"status": qwen_status, "api_url": GordonAgent.QWEN_API_URL},
"ollama": {"status": ollama_status, "api_url": GordonAgent.OLLAMA_API_URL}
},
"models": {
"qwen": "qwen-plus",
"ollama": GordonAgent.OLLAMA_MODEL
},
"timestamp": datetime.utcnow().isoformat()
}
@app.websocket("/ws/chat/{token}")
async def websocket_endpoint(websocket: WebSocket, token: str):
"""WebSocket endpoint for real-time chat"""
await websocket.accept()
try:
# Send greeting
await websocket.send_json({
"type": "message",
"text": "πŸ‘‹ Hello! I'm Gordon, your AI Todo Agent.\n\n"
"Try:\n"
"β€’ 'task buy milk' - create new todo\n"
"β€’ 'urgent task fix bug' - create high priority\n"
"β€’ 'show my tasks' - list all todos\n"
"β€’ 'delete buy milk' - remove todo\n"
"β€’ 'mark done fix bug' - complete todo"
})
while True:
# Receive message
data = await websocket.receive_text()
msg_data = json.loads(data)
user_message = msg_data.get("message", "").strip()
if not user_message:
await websocket.send_json({
"type": "error",
"text": "⚠️ Empty message"
})
continue
# Parse with Gordon Agent
intent = await GordonAgent.parse_message(user_message)
if "error" in intent:
await websocket.send_json({
"type": "error",
"text": f"❌ {intent.get('error')}"
})
continue
action = intent.get("action", "").upper()
# Execute action via backend
result = await call_backend(intent, token)
if "error" in result:
await websocket.send_json({
"type": "error",
"text": f"❌ Error: {result.get('error')}"
})
continue
# Format response based on action
if action == "CREATE":
count = result.get("count", 0)
await websocket.send_json({
"type": "success",
"text": f"βœ… Created {count} todo{'s' if count > 1 else ''}!",
"data": result.get("created")
})
elif action == "LIST":
todos = result if isinstance(result, list) else []
if not todos:
await websocket.send_json({
"type": "message",
"text": "πŸ“­ You have no todos yet!"
})
else:
# Format todo list
text = f"πŸ“‹ **Your Todos ({len(todos)}):**\n\n"
for todo in todos:
priority = todo.get("priority", "medium").upper()
status = todo.get("status", "pending")
title = todo.get("title", "Untitled")
emoji = "πŸ”΄" if priority == "HIGH" else "🟑" if priority == "MEDIUM" else "🟒"
check = "βœ…" if status == "completed" else "⏳"
text += f"{check} {emoji} {title} [{priority}]\n"
await websocket.send_json({
"type": "todos",
"text": text,
"count": len(todos),
"data": todos
})
elif action == "DELETE":
count = result.get("count", 0)
if count == 0:
await websocket.send_json({
"type": "warning",
"text": f"⚠️ No todo found matching: '{intent.get('title')}'"
})
else:
await websocket.send_json({
"type": "success",
"text": f"πŸ—‘οΈ Deleted {count} todo{'s' if count > 1 else ''}!",
"data": result.get("deleted")
})
elif action == "UPDATE":
count = result.get("count", 0)
if count == 0:
await websocket.send_json({
"type": "warning",
"text": f"⚠️ No todo found matching: '{intent.get('title')}'"
})
else:
status_emoji = "βœ…" if intent.get("status") == "completed" else "⏳"
await websocket.send_json({
"type": "success",
"text": f"{status_emoji} Updated {count} todo{'s' if count > 1 else ''}!",
"data": result.get("updated")
})
else:
await websocket.send_json({
"type": "message",
"text": "πŸ€” I didn't understand that. Try:\n"
"β€’ 'task [description]'\n"
"β€’ 'delete [task name]'\n"
"β€’ 'show'\n"
"β€’ 'mark done [task name]'"
})
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for token: {token}")
except Exception as e:
logger.error(f"WebSocket error: {e}")
try:
await websocket.close()
except:
pass
@app.post("/api/chat")
async def chat(request: ChatRequest):
"""REST endpoint for chat (non-WebSocket)"""
try:
logger.info(f"REST chat message: {request.message[:100]}...")
# Parse with Gordon
intent = await GordonAgent.parse_message(request.message)
if "error" in intent:
return {
"response": f"❌ {intent.get('error')}",
"intent": intent
}
# Execute via backend
result = await call_backend(intent, request.user_token)
# Format response
action = intent.get("action", "").upper()
if "error" in result:
response_text = f"❌ Error: {result.get('error')}"
elif action == "CREATE":
count = result.get("count", 0)
response_text = f"βœ… Created {count} todo{'s' if count > 1 else ''}!"
elif action == "LIST":
todos = result if isinstance(result, list) else []
response_text = f"πŸ“‹ You have {len(todos)} todo{'s' if len(todos) != 1 else ''}"
elif action == "DELETE":
count = result.get("count", 0)
response_text = f"πŸ—‘οΈ Deleted {count} todo{'s' if count > 1 else ''}!" if count > 0 else "⚠️ No matching todo found"
elif action == "UPDATE":
count = result.get("count", 0)
response_text = f"βœ… Updated {count} todo{'s' if count > 1 else ''}!" if count > 0 else "⚠️ No matching todo found"
else:
response_text = "βœ… Done"
return {
"response": response_text,
"intent": intent,
"result": result
}
except Exception as e:
logger.error(f"Chat exception: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)