Spaces:
Running
Running
| """ | |
| LexiBot FastAPI Backend | |
| Headless RAG API for Indian Legal Information | |
| Replaces the legacy Telegram bot (main.py) with a REST API. | |
| Designed for deployment on Hugging Face Spaces. | |
| """ | |
| import os | |
| from typing import Dict | |
| from contextlib import asynccontextmanager | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_pinecone import PineconeEmbeddings, PineconeVectorStore | |
| from langchain.chains import RetrievalQA | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain.prompts import PromptTemplate | |
| load_dotenv() | |
| # Configuration | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME") | |
| # Global state | |
| vector_store = None | |
| llm = None | |
| embeddings = None | |
| session_memories: Dict[str, ConversationBufferWindowMemory] = {} | |
| # Pydantic Models | |
| class ChatRequest(BaseModel): | |
| message: str | |
| session_id: str | |
| class ChatResponse(BaseModel): | |
| response: str | |
| sources: list[str] | |
| class HealthResponse(BaseModel): | |
| status: str | |
| # Legal RAG Prompt | |
| LEGAL_PROMPT = PromptTemplate( | |
| template="""You are LexiBot, an AI legal assistant specializing in Indian law. | |
| IMPORTANT GUIDELINES: | |
| - Provide accurate information based ONLY on the context provided | |
| - If the context doesn't contain relevant information, say "I don't have specific information about that in my legal database" | |
| - Always recommend consulting a qualified lawyer for specific legal matters | |
| - Be clear, concise, and use simple language | |
| - When citing laws, mention the specific Act name and section number | |
| CONTEXT FROM LEGAL DATABASE: | |
| {context} | |
| USER QUESTION: {question} | |
| Provide a helpful, accurate response based on the legal context above:""", | |
| input_variables=["context", "question"] | |
| ) | |
| async def lifespan(app: FastAPI): | |
| """Initialize resources on startup.""" | |
| global vector_store, llm, embeddings | |
| print("π Initializing LexiBot API...") | |
| # Validate environment | |
| if not GOOGLE_API_KEY: | |
| raise ValueError("GOOGLE_API_KEY not set") | |
| if not PINECONE_API_KEY: | |
| raise ValueError("PINECONE_API_KEY not set") | |
| # Initialize Pinecone embeddings (same as ingestion script) | |
| print(" Loading Pinecone Embeddings (multilingual-e5-large)...") | |
| embeddings = PineconeEmbeddings( | |
| model="multilingual-e5-large", | |
| pinecone_api_key=PINECONE_API_KEY | |
| ) | |
| # Initialize LLM | |
| print(" Loading Gemini LLM...") | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.5-flash", | |
| google_api_key=GOOGLE_API_KEY, | |
| temperature=0.1, | |
| max_tokens=2048 | |
| ) | |
| # Connect to Pinecone | |
| print(" Connecting to Pinecone...") | |
| vector_store = PineconeVectorStore( | |
| index_name=PINECONE_INDEX_NAME, | |
| embedding=embeddings, | |
| pinecone_api_key=PINECONE_API_KEY | |
| ) | |
| print("β LexiBot API Ready!") | |
| yield | |
| # Cleanup | |
| print("π Shutting down LexiBot API...") | |
| # Initialize FastAPI | |
| app = FastAPI( | |
| title="LexiBot API", | |
| description="Headless RAG API for Indian Legal Information", | |
| version="2.0.0", | |
| lifespan=lifespan | |
| ) | |
| # CORS Middleware - Allow all origins for frontend integration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure appropriately for production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def get_session_memory(session_id: str) -> ConversationBufferWindowMemory: | |
| """ | |
| Get or create conversation memory for a session. | |
| Note: Memory is ephemeral (RAM-based). If the container restarts, memory clears. | |
| This is acceptable for the MVP as per PRD requirements. | |
| """ | |
| if session_id not in session_memories: | |
| session_memories[session_id] = ConversationBufferWindowMemory( | |
| k=5, # Keep last 5 exchanges | |
| memory_key="chat_history", | |
| return_messages=True | |
| ) | |
| return session_memories[session_id] | |
| def extract_sources(source_documents) -> list[str]: | |
| """Extract unique act names from source documents.""" | |
| sources = set() | |
| for doc in source_documents: | |
| if "act_name" in doc.metadata: | |
| sources.add(doc.metadata["act_name"]) | |
| elif "source" in doc.metadata: | |
| sources.add(doc.metadata["source"]) | |
| return list(sources) | |
| async def root(): | |
| """Root endpoint - health check.""" | |
| return HealthResponse(status="ok") | |
| async def health_check(): | |
| """Health check endpoint for uptime monitoring.""" | |
| return HealthResponse(status="ok") | |
| async def chat(request: ChatRequest): | |
| """ | |
| Main chat endpoint for legal queries. | |
| - **message**: The user's legal question | |
| - **session_id**: Unique session identifier for conversation memory | |
| Returns: | |
| - **response**: The AI-generated legal response | |
| - **sources**: List of legal acts referenced in the response | |
| """ | |
| if not vector_store or not llm: | |
| raise HTTPException(status_code=503, detail="Service not initialized") | |
| if not request.message.strip(): | |
| raise HTTPException(status_code=400, detail="Message cannot be empty") | |
| try: | |
| # Get session memory | |
| memory = get_session_memory(request.session_id) | |
| # Create QA chain | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=vector_store.as_retriever(search_kwargs={"k": 5}), | |
| return_source_documents=True, | |
| chain_type_kwargs={"prompt": LEGAL_PROMPT} | |
| ) | |
| # Execute query | |
| result = qa_chain.invoke({"query": request.message}) | |
| # Extract response and sources | |
| response_text = result.get("result", "I couldn't process your query.") | |
| source_docs = result.get("source_documents", []) | |
| sources = extract_sources(source_docs) | |
| # Save to memory | |
| memory.save_context( | |
| {"input": request.message}, | |
| {"output": response_text} | |
| ) | |
| return ChatResponse( | |
| response=response_text, | |
| sources=sources | |
| ) | |
| except Exception as e: | |
| print(f"Error processing chat: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) | |