|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import tempfile |
|
|
import json |
|
|
import requests |
|
|
import asyncio |
|
|
import traceback |
|
|
from dotenv import load_dotenv |
|
|
from typing import List, Optional |
|
|
|
|
|
|
|
|
from fastapi import FastAPI, HTTPException, status |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel, Field, HttpUrl |
|
|
|
|
|
|
|
|
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredEmailLoader |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_community.vectorstores import SupabaseVectorStore |
|
|
from langchain_together import ChatTogether |
|
|
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI |
|
|
from langchain.chains import RetrievalQA |
|
|
from langchain.prompts import PromptTemplate |
|
|
from google.generativeai.types import HarmCategory, HarmBlockThreshold |
|
|
|
|
|
|
|
|
from supabase.client import Client, create_client |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") |
|
|
SUPABASE_URL = os.environ.get("SUPABASE_URL") |
|
|
SUPABASE_SERVICE_KEY = os.environ.get("SUPABASE_SERVICE_KEY") |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Intelligent Document Q&A API", |
|
|
description="An API to find answers within documents (PDF, DOCX, EML). It ingests documents from a URL, stores them in a persistent vector database, and allows you to ask multiple questions.", |
|
|
version="1.1.0" |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
llm: Optional[ChatTogether] = None |
|
|
gemini_embedder: Optional[GoogleGenerativeAIEmbeddings] = None |
|
|
supabase_client: Optional[Client] = None |
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
"""On application startup, initialize components.""" |
|
|
print("Application startup: Initializing components...") |
|
|
initialize_components() |
|
|
|
|
|
def initialize_components(): |
|
|
"""Initializes models and clients if they haven't been already.""" |
|
|
global llm, gemini_embedder, supabase_client |
|
|
if llm is None: |
|
|
print("Initializing Gemini AI LLM...") |
|
|
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.2, convert_system_message_to_human=True) |
|
|
if gemini_embedder is None: |
|
|
print("Initializing Google Gemini Embedding model...") |
|
|
gemini_embedder = GoogleGenerativeAIEmbeddings( |
|
|
model="models/embedding-001", |
|
|
safety_settings={ |
|
|
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, |
|
|
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, |
|
|
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, |
|
|
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, |
|
|
} |
|
|
) |
|
|
if supabase_client is None and SUPABASE_URL and SUPABASE_SERVICE_KEY: |
|
|
print("Initializing Supabase client...") |
|
|
supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) |
|
|
print("Models and clients are ready.") |
|
|
|
|
|
|
|
|
|
|
|
PROMPT_TEMPLATE = PromptTemplate( |
|
|
template=""" |
|
|
**Your Role:** You are a helpful AI assistant. Your task is to answer the user's question based *only* on the provided document context. |
|
|
**Strict Constraints:** |
|
|
- Your answer **MUST** be derived solely from the `Document Context`. Do not use any external knowledge. |
|
|
- If the answer is not in the context, state: "The answer to this question could not be found in the provided document." |
|
|
--- |
|
|
**Document Context:** {context} |
|
|
--- |
|
|
**User Query:** {question} |
|
|
--- |
|
|
**Helpful Answer:** |
|
|
""", |
|
|
input_variables=["question", "context"] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class QueryRequest(BaseModel): |
|
|
document_url: HttpUrl = Field(..., description="A single public URL to a document (PDF, DOCX, EML).") |
|
|
questions: List[str] = Field(..., min_length=1, description="A non-empty list of questions to ask about the document.") |
|
|
|
|
|
|
|
|
class QueryResponse(BaseModel): |
|
|
answers: List[str] |
|
|
document_url: HttpUrl |
|
|
message: str |
|
|
|
|
|
|
|
|
|
|
|
@app.post('/query', response_model=QueryResponse, tags=["Document Q&A"]) |
|
|
async def query_document(payload: QueryRequest): |
|
|
""" |
|
|
This endpoint ingests a document if new, then finds answers to questions within it. |
|
|
- If a document has been processed before, it uses the cached version from the vector store. |
|
|
- Otherwise, it downloads, processes, and stores the document for future queries. |
|
|
""" |
|
|
print(f"\n--- New Request Received for /query ---") |
|
|
|
|
|
if not supabase_client: |
|
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database client is not available. Check Supabase credentials.") |
|
|
|
|
|
doc_url = str(payload.document_url) |
|
|
questions = payload.questions |
|
|
print(f"Processing document from URL: {doc_url}") |
|
|
print(f"Answering {len(questions)} questions.") |
|
|
|
|
|
vectorstore = None |
|
|
ingestion_message = "" |
|
|
|
|
|
try: |
|
|
|
|
|
print("Step 1: Checking for existing document vectors in Supabase...") |
|
|
response = supabase_client.from_("documents").select("id", count='exact').eq("metadata->>source", doc_url).limit(1).execute() |
|
|
|
|
|
if response.count > 0: |
|
|
print("DATABASE HIT: Found pre-processed vectors. Skipping ingestion.") |
|
|
ingestion_message = "Document already processed. Using existing vectors from database." |
|
|
vectorstore = SupabaseVectorStore( |
|
|
client=supabase_client, |
|
|
embedding=gemini_embedder, |
|
|
table_name="documents", |
|
|
query_name="match_documents" |
|
|
) |
|
|
else: |
|
|
print("DATABASE MISS: Processing and embedding new document.") |
|
|
ingestion_message = "New document processed and vectors stored in database." |
|
|
temp_file_path = None |
|
|
try: |
|
|
print("Step 1a: Downloading document...") |
|
|
http_response = requests.get(doc_url) |
|
|
http_response.raise_for_status() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".tmp") as temp_file: |
|
|
temp_file.write(http_response.content) |
|
|
temp_file_path = temp_file.name |
|
|
|
|
|
lower_doc_url = doc_url.lower() |
|
|
if lower_doc_url.endswith('.pdf'): loader = PyPDFLoader(temp_file_path) |
|
|
elif lower_doc_url.endswith('.docx'): loader = Docx2txtLoader(temp_file_path) |
|
|
elif lower_doc_url.endswith('.eml'): loader = UnstructuredEmailLoader(temp_file_path) |
|
|
else: loader = PyPDFLoader(temp_file_path) |
|
|
|
|
|
pages = loader.load() |
|
|
if not pages: raise ValueError("Could not load content from the document.") |
|
|
print(f"Step 2: Loaded {len(pages)} pages/sections.") |
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
|
|
docs = text_splitter.split_documents(pages) |
|
|
if not docs: raise ValueError("Document could not be split into processable chunks.") |
|
|
|
|
|
for doc in docs: |
|
|
doc.metadata = {"source": doc_url} |
|
|
|
|
|
print(f"Step 3: Split document into {len(docs)} chunks. Uploading to Supabase...") |
|
|
vectorstore = await SupabaseVectorStore.afrom_documents( |
|
|
documents=docs, |
|
|
embedding=gemini_embedder, |
|
|
client=supabase_client, |
|
|
table_name="documents", |
|
|
query_name="match_documents", |
|
|
chunk_size=50 |
|
|
) |
|
|
print("Step 4: Supabase vector store created successfully.") |
|
|
|
|
|
finally: |
|
|
if temp_file_path and os.path.exists(temp_file_path): |
|
|
os.remove(temp_file_path) |
|
|
print(f"Temporary file removed: {temp_file_path}") |
|
|
|
|
|
|
|
|
retriever = vectorstore.as_retriever(search_kwargs={"k": 15, "filter": {"source": doc_url}}) |
|
|
print("Step 5: Initialized retriever with source document filter.") |
|
|
|
|
|
qa_chain = RetrievalQA.from_chain_type( |
|
|
llm=llm, |
|
|
chain_type="stuff", |
|
|
retriever=retriever, |
|
|
return_source_documents=False, |
|
|
chain_type_kwargs={"prompt": PROMPT_TEMPLATE} |
|
|
) |
|
|
print("Step 6: RAG QA Chain created.") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"ERROR during document processing or vector store setup: {e}") |
|
|
traceback.print_exc() |
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to process document: {str(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
async def get_answer(chain, query): |
|
|
print(f"Processing query: '{query}'") |
|
|
try: |
|
|
result = await chain.ainvoke({"query": query}) |
|
|
answer = result.get('result', 'Error: Could not process this question.').strip() |
|
|
print(f"Successfully answered: '{query}'") |
|
|
return answer |
|
|
except Exception as e: |
|
|
error_message = f"Error for query '{query}': {str(e)}" |
|
|
print(f"ERROR invoking chain: {error_message}") |
|
|
traceback.print_exc() |
|
|
return error_message |
|
|
|
|
|
print(f"Step 7: Starting parallel processing for {len(questions)} questions...") |
|
|
tasks = [get_answer(qa_chain, q) for q in questions] |
|
|
answers = await asyncio.gather(*tasks) |
|
|
print("All questions processed successfully.") |
|
|
|
|
|
final_response = { |
|
|
"answers": answers, |
|
|
"document_url": doc_url, |
|
|
"message": ingestion_message |
|
|
} |
|
|
print(f"Final response prepared: {json.dumps(final_response, indent=2)}") |
|
|
return final_response |
|
|
|
|
|
except Exception as e: |
|
|
print(f"ERROR during question answering: {e}") |
|
|
traceback.print_exc() |
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"An error occurred during question answering: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", tags=["General"]) |
|
|
def read_root(): |
|
|
"""A simple health check endpoint to confirm the API is running.""" |
|
|
return {"status": "ok", "name": "Intelligent Document Q&A API"} |
|
|
|