Spaces:
Sleeping
Sleeping
yuvrajsingh6
Initial commit: Analytical Finance Chatbot with Next.js frontend and FastAPI backend
c5b5cc8
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from csv_engine import CSVEngine | |
| from llm_client import LLMClient | |
| import os | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| from contextlib import asynccontextmanager | |
| # Memory storage for conversations | |
| conversations = {} | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| if not engine.load_all_csvs(): | |
| print("Warning: No CSV files found.") | |
| if not engine.validate_schema(): | |
| print("Warning: CSV files are invalid or empty.") | |
| yield | |
| # Shutdown | |
| app = FastAPI(title="Analytical Chatbot API", lifespan=lifespan) | |
| # Enable CORS for the frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, restrict this to your frontend URL | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class ChatRequest(BaseModel): | |
| question: str | |
| class OldChatResponse(BaseModel): | |
| answer: str | |
| DATA_DIR = os.getenv("DATA_DIR", "data") | |
| engine = CSVEngine(DATA_DIR) | |
| llm = LLMClient() | |
| # --- Compatibility Endpoints for DevoChat UI --- | |
| async def auth_status(): | |
| return {"logged_in": True, "user": {"id": "user_123", "username": "admin", "role": "admin"}} | |
| async def auth_user(): | |
| return {"id": "user_123", "username": "admin", "role": "admin"} | |
| async def chat_models(): | |
| return { | |
| "default": "analytical", | |
| "models": [{ | |
| "model_name": "analytical", | |
| "model_alias": "Analytical Engine", | |
| "capabilities": {"stream": True, "inference": False, "search": False, "deep_research": False, "image": False, "mcp": False}, | |
| "controls": {"temperature": True, "reason": True, "verbosity": True, "system_message": True}, | |
| "billing": {"in_billing": 0, "out_billing": 0}, | |
| "description": "Financial Analysis Engine", | |
| "endpoint": "/chat/conversation" | |
| }] | |
| } | |
| async def image_models(): | |
| return {"default": None, "models": []} | |
| async def realtime_models(): | |
| return {"default": None, "models": []} | |
| async def get_notice(): | |
| return {"message": "Welcome to the Analytical Finance Chatbot!", "hash": "v1"} | |
| async def new_conversation(): | |
| conversation_id = str(uuid.uuid4()) | |
| now = datetime.now().isoformat() | |
| conversations[conversation_id] = { | |
| "conversation_id": conversation_id, | |
| "messages": [], | |
| "created_at": now, | |
| "updated_at": now, | |
| "model": "analytical" | |
| } | |
| return conversations[conversation_id] | |
| async def get_all_conversations(): | |
| return list(conversations.values()) | |
| async def get_conversation(conversation_id: str): | |
| if conversation_id not in conversations: | |
| raise HTTPException(status_code=404, detail="Conversation not found") | |
| return conversations[conversation_id] | |
| async def get_alias(request: dict): | |
| # Simple alias generator: take first 20 chars of the text | |
| text = request.get("text", "New Conversation") | |
| alias = text[:20] + "..." if len(text) > 20 else text | |
| return {"alias": alias} | |
| # --- Core Analytical Chat Endpoint (Streaming version for DevoChat) --- | |
| class DevoChatRequest(BaseModel): | |
| conversation_id: str | |
| user_message: list # List of dicts with 'text' or 'image' | |
| async def stream_analytical_response(question: str, conversation_id: str): | |
| try: | |
| # Get facts and schema | |
| facts_text = engine.get_analytical_facts(question) | |
| sample = engine.get_schema_sample() | |
| # Get answer from LLM | |
| # Note: Our current LLMClient doesn't support streaming from Groq yet, | |
| # so we'll simulate streaming for the UI. | |
| full_answer = llm.get_answer(sample, question, facts_text) | |
| # Update history | |
| if conversation_id in conversations: | |
| conversations[conversation_id]["messages"].append({"role": "user", "content": question}) | |
| conversations[conversation_id]["messages"].append({"role": "assistant", "content": full_answer}) | |
| conversations[conversation_id]["updated_at"] = datetime.now().isoformat() | |
| # Stream chunks to the UI | |
| # DevoChat expects "data: {\"content\": \"...\"}\n\n" | |
| chunk_size = 20 | |
| for i in range(0, len(full_answer), chunk_size): | |
| chunk = full_answer[i:i+chunk_size] | |
| yield f"data: {json.dumps({'content': chunk})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" | |
| async def chat_conversation(request: DevoChatRequest): | |
| conversation_id = request.conversation_id | |
| # Extract the text message from the list | |
| user_text = "" | |
| for item in request.user_message: | |
| if item.get("type") == "text": | |
| user_text = item.get("text") | |
| break | |
| if not user_text: | |
| raise HTTPException(status_code=400, detail="No text message found") | |
| return StreamingResponse( | |
| stream_analytical_response(user_text, conversation_id), | |
| media_type="text/event-stream" | |
| ) | |
| # --- Keep original endpoint for compatibility --- | |
| async def chat_legacy(request: ChatRequest): | |
| facts_text = engine.get_analytical_facts(request.question) | |
| sample = engine.get_schema_sample() | |
| answer = llm.get_answer(sample, request.question, facts_text) | |
| return OldChatResponse(answer=answer) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |