Spaces:
Sleeping
Sleeping
File size: 4,763 Bytes
068aa4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
"""
FastAPI REST API Service
Exposes the multi-agent knowledge system
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict
import time
from dotenv import load_dotenv
import os
from .rag_system import RAGSystem
from agent_orchestrator import AgentOrchestrator
load_dotenv()
# Initialize FastAPI
app = FastAPI(
title="Multi-Agent Knowledge System",
description="RAG system with query understanding, retrieval, synthesis, and validation",
version="1.0.0"
)
# Initialize RAG system
api_key = os.getenv("GROQ_API_KEY")
rag_system = RAGSystem(groq_api_key=api_key)
# Request/Response Models
class QueryRequest(BaseModel):
query: str
top_k: int = 5
class SourceDocument(BaseModel):
source: str
relevance: float
class ValidationInfo(BaseModel):
status: str
confidence: int
class QueryResponse(BaseModel):
query: str
reformulated_query: str
answer: str
validation: ValidationInfo
sources: List[SourceDocument]
processing_time: float
class HealthResponse(BaseModel):
status: str
model_loaded: bool
db_connected: bool
timestamp: str
class MetricsResponse(BaseModel):
total_queries: int
avg_latency: float
avg_confidence: float
# Global metrics
metrics = {
"total_queries": 0,
"latencies": [],
"confidences": []
}
# Health check endpoint
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Check system health"""
from datetime import datetime
return HealthResponse(
status="healthy",
model_loaded=True,
db_connected=True,
timestamp=datetime.now().isoformat()
)
# Main query endpoint
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""
Process a query through the multi-agent system
Args:
query: User query
top_k: Number of documents to retrieve
Returns:
QueryResponse with answer, sources, and validation
"""
try:
start_time = time.time()
# Store top_k in rag_system temporarily
original_top_k = 5
# Run orchestrator
result = rag_system.answer_question(request.query)
# Extract data
processing_time = time.time() - start_time
# Format sources
sources = []
for doc in result.get("retrieved_documents", []):
sources.append(SourceDocument(
source=doc["source"],
relevance=doc["score"]
))
# Format validation
validation_info = result.get("validation_result", {})
validation = ValidationInfo(
status="β
VALID" if validation_info.get("is_valid") else "β οΈ NEEDS REVIEW",
confidence=validation_info.get("confidence", 0)
)
# Update metrics
metrics["total_queries"] += 1
metrics["latencies"].append(processing_time)
metrics["confidences"].append(validation.confidence)
# Build response
response = QueryResponse(
query=result.get("original_query", ""),
reformulated_query=result.get("reformulated_query", ""),
answer=result.get("final_answer", ""),
validation=validation,
sources=sources,
processing_time=processing_time
)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Metrics endpoint
@app.get("/metrics", response_model=MetricsResponse)
async def get_metrics():
"""Get system metrics"""
avg_latency = sum(metrics["latencies"]) / len(metrics["latencies"]) if metrics["latencies"] else 0
avg_confidence = sum(metrics["confidences"]) / len(metrics["confidences"]) if metrics["confidences"] else 0
return MetricsResponse(
total_queries=metrics["total_queries"],
avg_latency=avg_latency,
avg_confidence=avg_confidence
)
# Root endpoint
@app.get("/")
async def root():
"""Root endpoint"""
return {
"message": "Multi-Agent Knowledge System API",
"version": "1.0.0",
"endpoints": {
"health": "/health",
"query": "/query (POST)",
"metrics": "/metrics",
"docs": "/docs"
}
}
if __name__ == "__main__":
import uvicorn
print("\n" + "=" * 70)
print("π Starting Multi-Agent Knowledge System API")
print("=" * 70)
print("π API running at: http://localhost:8000")
print("π Documentation at: http://localhost:8000/docs")
print("=" * 70 + "\n")
uvicorn.run(app, host="0.0.0.0", port=8000)
|