| | """ |
| | Chatbot Service - Phase IV with Gordon Agent Integration |
| | FastAPI service using cagent (Gordon) for NLP instead of Qwen |
| | """ |
| | from fastapi import FastAPI, WebSocket, HTTPException, Header, WebSocketDisconnect |
| | from pydantic import BaseModel |
| | import httpx |
| | import os |
| | import logging |
| | import subprocess |
| | import json |
| | import re |
| | from typing import Optional |
| | from datetime import datetime |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | app = FastAPI(title="Gordon Todo Chatbot Service", version="2.0.0") |
| |
|
| | |
| | BACKEND_API_URL = os.getenv("BACKEND_API_URL", "http://backend-service:8000") |
| | QWEN_API_KEY = os.getenv("QWEN_API_KEY") |
| |
|
| | if not QWEN_API_KEY: |
| | logger.warning("QWEN_API_KEY not set. Qwen API will not work!") |
| |
|
| |
|
| | class ChatRequest(BaseModel): |
| | message: str |
| | user_token: Optional[str] = None |
| |
|
| |
|
| | class GordonAgent: |
| | """Hybrid Agent: Qwen API with Ollama fallback""" |
| |
|
| | QWEN_API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" |
| | OLLAMA_API_URL = os.getenv("OLLAMA_API_URL", "http://todo-ollama:11434") |
| | OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:0.5b") |
| |
|
| | @staticmethod |
| | async def parse_message(message: str) -> dict: |
| | """ |
| | Parse user message into structured todo action |
| | Tries Qwen API first, falls back to Ollama if unavailable |
| | Returns: {"action": "CREATE/DELETE/LIST/UPDATE", "data": {...}} |
| | """ |
| | prompt = f"""You are a todo task manager. Parse user messages and extract the action. |
| | |
| | ONLY output valid JSON. No other text or explanations. |
| | |
| | Actions: |
| | 1. CREATE: User wants to add/create/make a new todo |
| | Keywords: "task", "todo", "add", "create", "remind", "urgent", "important" |
| | Output: {{"action": "CREATE", "todos": [{{"title": "task description", "priority": "HIGH/MEDIUM/LOW"}}]}} |
| | |
| | 2. DELETE: User wants to remove/delete a todo |
| | Keywords: "delete", "remove", "forget", "erase" |
| | Output: {{"action": "DELETE", "title": "task name"}} |
| | |
| | 3. LIST: User wants to see/list their todos |
| | Keywords: "show", "list", "get", "my tasks", "all todos" |
| | Output: {{"action": "LIST"}} |
| | |
| | 4. UPDATE: User wants to mark a todo as complete or change it |
| | Keywords: "mark done", "complete", "finish", "update" |
| | Output: {{"action": "UPDATE", "title": "task name", "status": "completed"}} |
| | |
| | Priority detection rules: |
| | - "urgent", "important", "critical" β HIGH |
| | - "soon", "later" β MEDIUM |
| | - Default β LOW |
| | |
| | Examples: |
| | |
| | Input: "urgent task fix the bug" |
| | Output: {{"action": "CREATE", "todos": [{{"title": "fix the bug", "priority": "HIGH"}}]}} |
| | |
| | Input: "add task buy milk" |
| | Output: {{"action": "CREATE", "todos": [{{"title": "buy milk", "priority": "LOW"}}]}} |
| | |
| | Input: "delete task buy milk" |
| | Output: {{"action": "DELETE", "title": "buy milk"}} |
| | |
| | Input: "show my tasks" |
| | Output: {{"action": "LIST"}} |
| | |
| | Input: "mark done call mom" |
| | Output: {{"action": "UPDATE", "title": "call mom", "status": "completed"}} |
| | |
| | Remember: ONLY return valid JSON, no markdown, no code blocks, no explanations. |
| | |
| | User message: {message} |
| | |
| | Output:""" |
| |
|
| | def extract_json(output: str) -> dict: |
| | """Extract JSON from LLM output""" |
| | |
| | json_match = re.search(r'\{[^{}]*"action"[^{}]*\}', output, re.DOTALL) |
| | if json_match: |
| | try: |
| | return json.loads(json_match.group()) |
| | except: |
| | pass |
| | |
| | try: |
| | return json.loads(output) |
| | except json.JSONDecodeError: |
| | |
| | if 'ζδ½ζε' in output or '"code"' in output: |
| | |
| | |
| | return None |
| | return None |
| |
|
| | def rule_based_parse(message: str) -> dict: |
| | """Simple rule-based parser as final fallback""" |
| | msg_lower = message.lower().strip() |
| |
|
| | |
| | if any(word in msg_lower for word in ['show', 'list', 'get my', 'all todos', 'my tasks']): |
| | return {"action": "LIST"} |
| |
|
| | |
| | if any(word in msg_lower for word in ['delete', 'remove']): |
| | |
| | for keyword in ['delete ', 'remove ']: |
| | if keyword in msg_lower: |
| | title = msg_lower.split(keyword)[1].strip() |
| | return {"action": "DELETE", "title": title} |
| | return {"action": "DELETE", "title": message.split()[-1]} |
| |
|
| | |
| | if any(word in msg_lower for word in ['mark done', 'complete', 'finish']): |
| | |
| | for keyword in ['mark done ', 'done ', 'complete ', 'finish ']: |
| | if keyword in msg_lower: |
| | title = msg_lower.split(keyword)[1].strip() |
| | return {"action": "UPDATE", "title": title, "status": "completed"} |
| | return {"action": "UPDATE", "title": message.split()[-1], "status": "completed"} |
| |
|
| | |
| | |
| | priority = "LOW" |
| | if any(word in msg_lower for word in ['urgent', 'important', 'critical']): |
| | priority = "HIGH" |
| | elif any(word in msg_lower for word in ['soon', 'later']): |
| | priority = "MEDIUM" |
| |
|
| | |
| | title = msg_lower |
| | for keyword in ['task ', 'todo ', 'add ', 'create ', 'make ', 'urgent ', 'important ']: |
| | if keyword in title: |
| | title = title.replace(keyword, '', 1) |
| | title = title.strip() or message |
| |
|
| | return {"action": "CREATE", "todos": [{"title": title, "priority": priority}]} |
| |
|
| | |
| | if QWEN_API_KEY: |
| | try: |
| | logger.info(f"Trying Qwen API with message: {message[:100]}...") |
| | async with httpx.AsyncClient(timeout=15.0) as client: |
| | response = await client.post( |
| | f"{GordonAgent.QWEN_API_URL}/chat/completions", |
| | headers={ |
| | "Authorization": f"Bearer {QWEN_API_KEY}", |
| | "Content-Type": "application/json" |
| | }, |
| | json={ |
| | "model": "qwen-plus", |
| | "messages": [{"role": "user", "content": prompt}], |
| | "temperature": 0, |
| | "max_tokens": 512 |
| | } |
| | ) |
| |
|
| | if response.status_code == 200: |
| | result = response.json() |
| | output = result.get("choices", [{}])[0].get("message", {}).get("content", "").strip() |
| | logger.info(f"Qwen raw output: {output[:500]}") |
| | parsed = extract_json(output) |
| | if parsed: |
| | logger.info(f"Parsed intent from Qwen: {parsed}") |
| | return parsed |
| | else: |
| | logger.warning(f"Qwen API returned {response.status_code}, trying Ollama fallback") |
| | except Exception as e: |
| | logger.warning(f"Qwen API failed: {e}, trying Ollama fallback") |
| |
|
| | |
| | try: |
| | logger.info(f"Using Ollama with message: {message[:100]}...") |
| | async with httpx.AsyncClient(timeout=90.0) as client: |
| | response = await client.post( |
| | f"{GordonAgent.OLLAMA_API_URL}/api/generate", |
| | json={ |
| | "model": GordonAgent.OLLAMA_MODEL, |
| | "prompt": prompt, |
| | "stream": False, |
| | "options": {"temperature": 0} |
| | } |
| | ) |
| |
|
| | if response.status_code == 200: |
| | output = response.json().get("response", "").strip() |
| | logger.info(f"Ollama raw output: {output[:500]}") |
| | parsed = extract_json(output) |
| | if parsed: |
| | logger.info(f"Parsed intent from Ollama: {parsed}") |
| | return parsed |
| | else: |
| | logger.error(f"Ollama error: {response.status_code}") |
| | return {"error": f"Ollama error: {response.status_code}"} |
| | except Exception as e: |
| | logger.warning(f"Ollama exception: {e}, using rule-based fallback") |
| |
|
| | |
| | logger.info("Using rule-based parser") |
| | return rule_based_parse(message) |
| |
|
| |
|
| | async def call_backend(intent: dict, user_token: str): |
| | """Call backend API based on parsed intent""" |
| | headers = {"Content-Type": "application/json"} |
| | if user_token: |
| | headers["Authorization"] = f"Bearer {user_token}" |
| |
|
| | async with httpx.AsyncClient(timeout=60.0) as client: |
| | action = intent.get("action", "").upper() |
| | |
| | |
| | if action == "CREATE": |
| | todos = intent.get("todos", []) |
| | created = [] |
| | |
| | for todo_data in todos: |
| | logger.info(f"Creating todo: {todo_data}") |
| | payload = { |
| | "title": todo_data.get("title", "Untitled"), |
| | "priority": todo_data.get("priority", "MEDIUM").lower() |
| | } |
| | |
| | try: |
| | response = await client.post( |
| | f"{BACKEND_API_URL}/api/todos/", |
| | json=payload, |
| | headers=headers |
| | ) |
| | |
| | if response.status_code >= 400: |
| | logger.error(f"Backend error: {response.status_code} - {response.text[:200]}") |
| | return {"error": f"Backend error: {response.status_code}"} |
| | |
| | created.append(response.json()) |
| | except Exception as e: |
| | logger.error(f"Create todo exception: {e}") |
| | return {"error": str(e)} |
| | |
| | return {"created": created, "count": len(created)} |
| | |
| | |
| | elif action == "LIST": |
| | try: |
| | response = await client.get( |
| | f"{BACKEND_API_URL}/api/todos/", |
| | headers=headers |
| | ) |
| | |
| | if response.status_code >= 400: |
| | return {"error": f"Backend error: {response.status_code}"} |
| | |
| | return response.json() |
| | except Exception as e: |
| | return {"error": str(e)} |
| | |
| | |
| | elif action == "DELETE": |
| | title = intent.get("title", "").lower() |
| | |
| | |
| | try: |
| | response = await client.get( |
| | f"{BACKEND_API_URL}/api/todos/", |
| | headers=headers |
| | ) |
| | |
| | if response.status_code >= 400: |
| | return {"error": f"Backend error: {response.status_code}"} |
| | |
| | todos = response.json() |
| | |
| | |
| | deleted = [] |
| | for todo in todos: |
| | if title in todo.get("title", "").lower(): |
| | del_response = await client.delete( |
| | f"{BACKEND_API_URL}/api/todos/{todo['id']}/", |
| | headers=headers |
| | ) |
| | |
| | if del_response.status_code < 400: |
| | deleted.append(todo) |
| | |
| | return {"deleted": deleted, "count": len(deleted)} |
| | |
| | except Exception as e: |
| | return {"error": str(e)} |
| | |
| | |
| | elif action == "UPDATE": |
| | title = intent.get("title", "").lower() |
| | status = intent.get("status", "completed") |
| | |
| | |
| | try: |
| | response = await client.get( |
| | f"{BACKEND_API_URL}/api/todos/", |
| | headers=headers |
| | ) |
| | |
| | if response.status_code >= 400: |
| | return {"error": f"Backend error: {response.status_code}"} |
| | |
| | todos = response.json() |
| | |
| | |
| | updated = [] |
| | for todo in todos: |
| | if title in todo.get("title", "").lower(): |
| | update_response = await client.put( |
| | f"{BACKEND_API_URL}/api/todos/{todo['id']}", |
| | json={"status": status}, |
| | headers=headers |
| | ) |
| | |
| | if update_response.status_code < 400: |
| | updated.append(update_response.json()) |
| | |
| | return {"updated": updated, "count": len(updated)} |
| | |
| | except Exception as e: |
| | return {"error": str(e)} |
| | |
| | else: |
| | return {"error": "Unknown action"} |
| |
|
| |
|
| | @app.get("/") |
| | async def root(): |
| | """Root endpoint""" |
| | return { |
| | "service": "Hybrid Todo Chatbot", |
| | "version": "2.3.0", |
| | "agent": "Qwen API + Ollama fallback", |
| | "status": "running", |
| | "endpoints": { |
| | "health": "/api/health", |
| | "chat": "/api/chat", |
| | "websocket": "/ws/chat/{token}" |
| | } |
| | } |
| |
|
| |
|
| | @app.get("/api/health") |
| | async def health_check(): |
| | """Health check endpoint""" |
| |
|
| | qwen_status = "ok" if QWEN_API_KEY else "not_configured" |
| |
|
| | |
| | ollama_status = "unknown" |
| | try: |
| | async with httpx.AsyncClient(timeout=5.0) as client: |
| | response = await client.get(f"{GordonAgent.OLLAMA_API_URL}/api/tags") |
| | ollama_status = "ok" if response.status_code == 200 else "error" |
| | except: |
| | ollama_status = "not_available" |
| |
|
| | return { |
| | "status": "healthy", |
| | "service": "hybrid-chatbot", |
| | "providers": { |
| | "qwen": {"status": qwen_status, "api_url": GordonAgent.QWEN_API_URL}, |
| | "ollama": {"status": ollama_status, "api_url": GordonAgent.OLLAMA_API_URL} |
| | }, |
| | "models": { |
| | "qwen": "qwen-plus", |
| | "ollama": GordonAgent.OLLAMA_MODEL |
| | }, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| |
|
| |
|
| | @app.websocket("/ws/chat/{token}") |
| | async def websocket_endpoint(websocket: WebSocket, token: str): |
| | """WebSocket endpoint for real-time chat""" |
| | await websocket.accept() |
| | |
| | try: |
| | |
| | await websocket.send_json({ |
| | "type": "message", |
| | "text": "π Hello! I'm Gordon, your AI Todo Agent.\n\n" |
| | "Try:\n" |
| | "β’ 'task buy milk' - create new todo\n" |
| | "β’ 'urgent task fix bug' - create high priority\n" |
| | "β’ 'show my tasks' - list all todos\n" |
| | "β’ 'delete buy milk' - remove todo\n" |
| | "β’ 'mark done fix bug' - complete todo" |
| | }) |
| | |
| | while True: |
| | |
| | data = await websocket.receive_text() |
| | msg_data = json.loads(data) |
| | user_message = msg_data.get("message", "").strip() |
| | |
| | if not user_message: |
| | await websocket.send_json({ |
| | "type": "error", |
| | "text": "β οΈ Empty message" |
| | }) |
| | continue |
| | |
| | |
| | intent = await GordonAgent.parse_message(user_message) |
| | |
| | if "error" in intent: |
| | await websocket.send_json({ |
| | "type": "error", |
| | "text": f"β {intent.get('error')}" |
| | }) |
| | continue |
| | |
| | action = intent.get("action", "").upper() |
| | |
| | |
| | result = await call_backend(intent, token) |
| | |
| | if "error" in result: |
| | await websocket.send_json({ |
| | "type": "error", |
| | "text": f"β Error: {result.get('error')}" |
| | }) |
| | continue |
| | |
| | |
| | if action == "CREATE": |
| | count = result.get("count", 0) |
| | await websocket.send_json({ |
| | "type": "success", |
| | "text": f"β
Created {count} todo{'s' if count > 1 else ''}!", |
| | "data": result.get("created") |
| | }) |
| | |
| | elif action == "LIST": |
| | todos = result if isinstance(result, list) else [] |
| | |
| | if not todos: |
| | await websocket.send_json({ |
| | "type": "message", |
| | "text": "π You have no todos yet!" |
| | }) |
| | else: |
| | |
| | text = f"π **Your Todos ({len(todos)}):**\n\n" |
| | |
| | for todo in todos: |
| | priority = todo.get("priority", "medium").upper() |
| | status = todo.get("status", "pending") |
| | title = todo.get("title", "Untitled") |
| | |
| | emoji = "π΄" if priority == "HIGH" else "π‘" if priority == "MEDIUM" else "π’" |
| | check = "β
" if status == "completed" else "β³" |
| | |
| | text += f"{check} {emoji} {title} [{priority}]\n" |
| | |
| | await websocket.send_json({ |
| | "type": "todos", |
| | "text": text, |
| | "count": len(todos), |
| | "data": todos |
| | }) |
| | |
| | elif action == "DELETE": |
| | count = result.get("count", 0) |
| | |
| | if count == 0: |
| | await websocket.send_json({ |
| | "type": "warning", |
| | "text": f"β οΈ No todo found matching: '{intent.get('title')}'" |
| | }) |
| | else: |
| | await websocket.send_json({ |
| | "type": "success", |
| | "text": f"ποΈ Deleted {count} todo{'s' if count > 1 else ''}!", |
| | "data": result.get("deleted") |
| | }) |
| | |
| | elif action == "UPDATE": |
| | count = result.get("count", 0) |
| | |
| | if count == 0: |
| | await websocket.send_json({ |
| | "type": "warning", |
| | "text": f"β οΈ No todo found matching: '{intent.get('title')}'" |
| | }) |
| | else: |
| | status_emoji = "β
" if intent.get("status") == "completed" else "β³" |
| | await websocket.send_json({ |
| | "type": "success", |
| | "text": f"{status_emoji} Updated {count} todo{'s' if count > 1 else ''}!", |
| | "data": result.get("updated") |
| | }) |
| | |
| | else: |
| | await websocket.send_json({ |
| | "type": "message", |
| | "text": "π€ I didn't understand that. Try:\n" |
| | "β’ 'task [description]'\n" |
| | "β’ 'delete [task name]'\n" |
| | "β’ 'show'\n" |
| | "β’ 'mark done [task name]'" |
| | }) |
| | |
| | except WebSocketDisconnect: |
| | logger.info(f"WebSocket disconnected for token: {token}") |
| | except Exception as e: |
| | logger.error(f"WebSocket error: {e}") |
| | try: |
| | await websocket.close() |
| | except: |
| | pass |
| |
|
| |
|
| | @app.post("/api/chat") |
| | async def chat(request: ChatRequest): |
| | """REST endpoint for chat (non-WebSocket)""" |
| | try: |
| | logger.info(f"REST chat message: {request.message[:100]}...") |
| | |
| | |
| | intent = await GordonAgent.parse_message(request.message) |
| | |
| | if "error" in intent: |
| | return { |
| | "response": f"β {intent.get('error')}", |
| | "intent": intent |
| | } |
| | |
| | |
| | result = await call_backend(intent, request.user_token) |
| | |
| | |
| | action = intent.get("action", "").upper() |
| | |
| | if "error" in result: |
| | response_text = f"β Error: {result.get('error')}" |
| | elif action == "CREATE": |
| | count = result.get("count", 0) |
| | response_text = f"β
Created {count} todo{'s' if count > 1 else ''}!" |
| | elif action == "LIST": |
| | todos = result if isinstance(result, list) else [] |
| | response_text = f"π You have {len(todos)} todo{'s' if len(todos) != 1 else ''}" |
| | elif action == "DELETE": |
| | count = result.get("count", 0) |
| | response_text = f"ποΈ Deleted {count} todo{'s' if count > 1 else ''}!" if count > 0 else "β οΈ No matching todo found" |
| | elif action == "UPDATE": |
| | count = result.get("count", 0) |
| | response_text = f"β
Updated {count} todo{'s' if count > 1 else ''}!" if count > 0 else "β οΈ No matching todo found" |
| | else: |
| | response_text = "β
Done" |
| | |
| | return { |
| | "response": response_text, |
| | "intent": intent, |
| | "result": result |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Chat exception: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run(app, host="0.0.0.0", port=8001) |
| |
|