Spaces:
Sleeping
Sleeping
| # ============================================================= | |
| # File: backend/api/routes/agent.py | |
| # ============================================================= | |
| from fastapi import APIRouter | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| import os | |
| import sys | |
| import json | |
| from pathlib import Path | |
| from typing import AsyncGenerator | |
| # Add backend to path for imports | |
| backend_dir = Path(__file__).parent.parent.parent | |
| sys.path.insert(0, str(backend_dir)) | |
| from api.services.agent_orchestrator import AgentOrchestrator | |
| from api.models.agent import AgentRequest, AgentResponse | |
| router = APIRouter() | |
| orchestrator = AgentOrchestrator( | |
| rag_mcp_url=os.getenv("RAG_MCP_URL", "http://localhost:8001"), | |
| web_mcp_url=os.getenv("WEB_MCP_URL", "http://localhost:8002"), | |
| admin_mcp_url=os.getenv("ADMIN_MCP_URL", "http://localhost:8003"), | |
| llm_backend=os.getenv("LLM_BACKEND", "ollama") | |
| ) | |
| class ChatRequest(BaseModel): | |
| tenant_id: str | |
| user_id: str | None = None | |
| message: str | |
| conversation_history: list[dict] = [] | |
| temperature: float = 0.0 | |
| async def agent_chat(req: ChatRequest): | |
| agent_req = AgentRequest( | |
| tenant_id=req.tenant_id, | |
| user_id=req.user_id, | |
| message=req.message, | |
| conversation_history=req.conversation_history, | |
| temperature=req.temperature | |
| ) | |
| return await orchestrator.handle(agent_req) | |
| async def agent_chat_stream(req: ChatRequest): | |
| """Stream agent response word by word using Server-Sent Events.""" | |
| agent_req = AgentRequest( | |
| tenant_id=req.tenant_id, | |
| user_id=req.user_id, | |
| message=req.message, | |
| conversation_history=req.conversation_history, | |
| temperature=req.temperature | |
| ) | |
| async def generate_stream() -> AsyncGenerator[str, None]: | |
| """Generate streaming response.""" | |
| try: | |
| # FIRST: Check admin rules - if any rule matches, respond according to rule | |
| yield f"data: {json.dumps({'status': 'processing', 'message': 'Checking rules...'})}\n\n" | |
| matches = await orchestrator.redflag.check(agent_req.tenant_id, agent_req.message) | |
| if matches: | |
| # Categorize rules: brief response rules vs blocking rules | |
| brief_response_rules = [] | |
| blocking_rules = [] | |
| for match in matches: | |
| rule_text = (match.description or match.pattern or "").lower() | |
| is_brief_rule = ( | |
| match.severity == "low" and ( | |
| "greeting" in rule_text or | |
| "brief" in rule_text or | |
| "simple response" in rule_text or | |
| "keep.*response.*brief" in rule_text or | |
| "do not.*verbose" in rule_text or | |
| "respond.*briefly" in rule_text | |
| ) | |
| ) | |
| if is_brief_rule: | |
| brief_response_rules.append(match) | |
| else: | |
| blocking_rules.append(match) | |
| # Handle brief response rules (greetings, etc.) - return immediately | |
| if brief_response_rules and not blocking_rules: | |
| brief_responses = [ | |
| "Hello! How can I help you today?", | |
| "Hi there! What can I assist you with?", | |
| "Hello! I'm here to help. What do you need?", | |
| "Hi! How can I assist you?" | |
| ] | |
| import random | |
| brief_response = random.choice(brief_responses) | |
| # Stream the brief response word by word | |
| yield f"data: {json.dumps({'status': 'streaming', 'message': ''})}\n\n" | |
| words = brief_response.split() | |
| for word in words: | |
| yield f"data: {json.dumps({'token': word + ' ', 'done': False})}\n\n" | |
| yield f"data: {json.dumps({'token': '', 'done': True})}\n\n" | |
| return | |
| # Handle blocking rules (security, compliance, etc.) | |
| if blocking_rules: | |
| matches = blocking_rules | |
| if matches: | |
| # For red flags, generate streaming response via LLM | |
| violations_details = [] | |
| for i, m in enumerate(matches, 1): | |
| rule_name = m.description or m.pattern or "Policy violation" | |
| detail = f"{i}. **{rule_name}** (Severity: {m.severity})" | |
| if m.matched_text: | |
| detail += f"\n - Detected phrase: \"{m.matched_text}\"" | |
| violations_details.append(detail) | |
| llm_prompt = f"""A user made the following request: "{agent_req.message}" | |
| However, this request violates company policies. The following policy violations were detected: | |
| {chr(10).join(violations_details)} | |
| Your task: Write a clear, professional, and empathetic response to inform the user that: | |
| 1. Their request cannot be processed due to policy violations | |
| 2. Which specific policy was violated (mention it naturally) | |
| 3. The incident has been logged for security review | |
| 4. They should contact an administrator if they need assistance or believe this is an error | |
| Write a natural, conversational response (2-4 sentences) that feels helpful rather than robotic. Be professional but understanding. | |
| Response:""" | |
| async for token in orchestrator.llm.stream_call(llm_prompt, agent_req.temperature): | |
| yield f"data: {json.dumps({'token': token, 'done': False})}\n\n" | |
| yield f"data: {json.dumps({'token': '', 'done': True})}\n\n" | |
| return | |
| # STEP 2: ONLY IF NO RULES MATCHED - Use orchestrator.handle() for proper tool routing | |
| # This ensures news queries use web search, admin queries use RAG, etc. | |
| yield f"data: {json.dumps({'status': 'processing', 'message': 'Processing your request...'})}\n\n" | |
| # Use the orchestrator's handle method which has all the logic for news queries, RAG, web search, etc. | |
| response = await orchestrator.handle(agent_req) | |
| # Stream the response character-by-character for smoother experience | |
| yield f"data: {json.dumps({'status': 'streaming', 'message': ''})}\n\n" | |
| import asyncio | |
| # Stream character by character with small delay for smooth animation | |
| for char in response.text: | |
| yield f"data: {json.dumps({'token': char, 'done': False})}\n\n" | |
| # Small delay for readability (adjust as needed) | |
| await asyncio.sleep(0.01) | |
| yield f"data: {json.dumps({'token': '', 'done': True})}\n\n" | |
| except Exception as e: | |
| error_msg = json.dumps({'error': str(e), 'done': True}) | |
| yield f"data: {error_msg}\n\n" | |
| return StreamingResponse( | |
| generate_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no" | |
| } | |
| ) | |
| async def agent_debug(req: ChatRequest): | |
| """ | |
| Returns detailed debugging information about agent reasoning. | |
| Includes intent classification, tool selection, reasoning trace, and tool traces. | |
| """ | |
| agent_req = AgentRequest( | |
| tenant_id=req.tenant_id, | |
| user_id=req.user_id, | |
| message=req.message, | |
| conversation_history=req.conversation_history, | |
| temperature=req.temperature | |
| ) | |
| response = await orchestrator.handle(agent_req) | |
| return { | |
| "request": { | |
| "tenant_id": req.tenant_id, | |
| "user_id": req.user_id, | |
| "message": req.message[:200], | |
| "temperature": req.temperature | |
| }, | |
| "response": { | |
| "text": response.text[:500] + "..." if len(response.text) > 500 else response.text, | |
| "decision": response.decision.dict() if response.decision else None, | |
| "tool_traces": response.tool_traces, | |
| "reasoning_trace": response.reasoning_trace | |
| }, | |
| "debug_info": { | |
| "intent": response.reasoning_trace[1].get("intent") if len(response.reasoning_trace) > 1 else None, | |
| "tool_selection": next((t for t in response.reasoning_trace if t.get("step") == "tool_selection"), None), | |
| "tool_scores": next((t for t in response.reasoning_trace if t.get("step") == "tool_scoring"), None), | |
| "redflag_check": next((t for t in response.reasoning_trace if t.get("step") == "redflag_check"), None), | |
| "total_steps": len(response.reasoning_trace) | |
| } | |
| } | |
| async def agent_plan(req: ChatRequest): | |
| """ | |
| Returns only the agent's planning output (tool selection decision). | |
| Useful for understanding what tools the agent would use without executing them. | |
| """ | |
| from ..services.intent_classifier import IntentClassifier | |
| from ..services.tool_selector import ToolSelector | |
| from ..services.tool_scoring import ToolScoringService | |
| import os | |
| # Create minimal orchestrator components for planning only | |
| llm = orchestrator.llm | |
| intent_classifier = IntentClassifier(llm_client=llm) | |
| tool_selector = ToolSelector(llm_client=llm) | |
| tool_scorer = ToolScoringService() | |
| # Classify intent | |
| intent = await intent_classifier.classify(req.message) | |
| # Pre-fetch RAG for context (optional) | |
| rag_results = [] | |
| try: | |
| rag_prefetch = await orchestrator.mcp.call_rag(req.tenant_id, req.message) | |
| if isinstance(rag_prefetch, dict): | |
| rag_results = rag_prefetch.get("results") or rag_prefetch.get("hits") or [] | |
| except Exception: | |
| pass | |
| # Score tools | |
| tool_scores = tool_scorer.score(req.message, intent, rag_results) | |
| # Select tools | |
| ctx = { | |
| "tenant_id": req.tenant_id, | |
| "rag_results": rag_results, | |
| "tool_scores": tool_scores | |
| } | |
| decision = await tool_selector.select(intent, req.message, ctx) | |
| return { | |
| "tenant_id": req.tenant_id, | |
| "message": req.message, | |
| "intent": intent, | |
| "tool_scores": tool_scores, | |
| "plan": decision.dict(), | |
| "steps": decision.tool_input.get("steps", []) if decision.tool_input else [], | |
| "reason": decision.reason | |
| } | |