Spaces:
Sleeping
Sleeping
File size: 5,922 Bytes
c5b5cc8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from csv_engine import CSVEngine
from llm_client import LLMClient
import os
import json
import uuid
from datetime import datetime
from contextlib import asynccontextmanager
# Memory storage for conversations
conversations = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
if not engine.load_all_csvs():
print("Warning: No CSV files found.")
if not engine.validate_schema():
print("Warning: CSV files are invalid or empty.")
yield
# Shutdown
app = FastAPI(title="Analytical Chatbot API", lifespan=lifespan)
# Enable CORS for the frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict this to your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
question: str
class OldChatResponse(BaseModel):
answer: str
DATA_DIR = os.getenv("DATA_DIR", "data")
engine = CSVEngine(DATA_DIR)
llm = LLMClient()
# --- Compatibility Endpoints for DevoChat UI ---
@app.get("/auth/status")
async def auth_status():
return {"logged_in": True, "user": {"id": "user_123", "username": "admin", "role": "admin"}}
@app.get("/auth/user")
async def auth_user():
return {"id": "user_123", "username": "admin", "role": "admin"}
@app.get("/chat_models")
async def chat_models():
return {
"default": "analytical",
"models": [{
"model_name": "analytical",
"model_alias": "Analytical Engine",
"capabilities": {"stream": True, "inference": False, "search": False, "deep_research": False, "image": False, "mcp": False},
"controls": {"temperature": True, "reason": True, "verbosity": True, "system_message": True},
"billing": {"in_billing": 0, "out_billing": 0},
"description": "Financial Analysis Engine",
"endpoint": "/chat/conversation"
}]
}
@app.get("/image_models")
async def image_models():
return {"default": None, "models": []}
@app.get("/realtime_models")
async def realtime_models():
return {"default": None, "models": []}
@app.get("/notice")
async def get_notice():
return {"message": "Welcome to the Analytical Finance Chatbot!", "hash": "v1"}
@app.post("/chat/new_conversation")
async def new_conversation():
conversation_id = str(uuid.uuid4())
now = datetime.now().isoformat()
conversations[conversation_id] = {
"conversation_id": conversation_id,
"messages": [],
"created_at": now,
"updated_at": now,
"model": "analytical"
}
return conversations[conversation_id]
@app.get("/conversation/all")
async def get_all_conversations():
return list(conversations.values())
@app.get("/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
if conversation_id not in conversations:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversations[conversation_id]
@app.post("/chat/get_alias")
async def get_alias(request: dict):
# Simple alias generator: take first 20 chars of the text
text = request.get("text", "New Conversation")
alias = text[:20] + "..." if len(text) > 20 else text
return {"alias": alias}
# --- Core Analytical Chat Endpoint (Streaming version for DevoChat) ---
class DevoChatRequest(BaseModel):
conversation_id: str
user_message: list # List of dicts with 'text' or 'image'
async def stream_analytical_response(question: str, conversation_id: str):
try:
# Get facts and schema
facts_text = engine.get_analytical_facts(question)
sample = engine.get_schema_sample()
# Get answer from LLM
# Note: Our current LLMClient doesn't support streaming from Groq yet,
# so we'll simulate streaming for the UI.
full_answer = llm.get_answer(sample, question, facts_text)
# Update history
if conversation_id in conversations:
conversations[conversation_id]["messages"].append({"role": "user", "content": question})
conversations[conversation_id]["messages"].append({"role": "assistant", "content": full_answer})
conversations[conversation_id]["updated_at"] = datetime.now().isoformat()
# Stream chunks to the UI
# DevoChat expects "data: {\"content\": \"...\"}\n\n"
chunk_size = 20
for i in range(0, len(full_answer), chunk_size):
chunk = full_answer[i:i+chunk_size]
yield f"data: {json.dumps({'content': chunk})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
@app.post("/chat/conversation")
async def chat_conversation(request: DevoChatRequest):
conversation_id = request.conversation_id
# Extract the text message from the list
user_text = ""
for item in request.user_message:
if item.get("type") == "text":
user_text = item.get("text")
break
if not user_text:
raise HTTPException(status_code=400, detail="No text message found")
return StreamingResponse(
stream_analytical_response(user_text, conversation_id),
media_type="text/event-stream"
)
# --- Keep original endpoint for compatibility ---
@app.post("/chat", response_model=OldChatResponse)
async def chat_legacy(request: ChatRequest):
facts_text = engine.get_analytical_facts(request.question)
sample = engine.get_schema_sample()
answer = llm.get_answer(sample, request.question, facts_text)
return OldChatResponse(answer=answer)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
|