| | from fastapi import APIRouter, Depends, HTTPException |
| | from sqlalchemy.orm import Session |
| | from pydantic import BaseModel |
| | from typing import List, Optional |
| | import logging |
| | import PyPDF2 |
| | import io |
| | import uuid |
| |
|
| | from core.database import get_db |
| | from models import db_models |
| | from services.rag_service import rag_service |
| | from services.s3_service import s3_service |
| | from api.auth import get_current_user |
| | from core.config import settings |
| | from openai import OpenAI |
| |
|
| | router = APIRouter(prefix="/api/rag", tags=["RAG Document Management"]) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | class RAGIndexRequest(BaseModel): |
| | file_key: str |
| |
|
| | class RAGIndexResponse(BaseModel): |
| | id: int |
| | filename: str |
| | azure_doc_id: str |
| | chunk_count: int |
| | message: str |
| |
|
| | class RAGDocumentResponse(BaseModel): |
| | id: int |
| | filename: str |
| | azure_doc_id: str |
| | chunk_count: int |
| | source_id: Optional[int] |
| | created_at: str |
| |
|
| | class Config: |
| | from_attributes = True |
| |
|
| | def extract_text_from_pdf(file_content: bytes) -> str: |
| | """Extract text from PDF file.""" |
| | try: |
| | pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_content)) |
| | text = "" |
| | for page in pdf_reader.pages: |
| | text += page.extract_text() + "\n" |
| | return text.strip() |
| | except Exception as e: |
| | logger.error(f"Error extracting PDF text: {e}") |
| | raise HTTPException(status_code=400, detail=f"Failed to extract text: {str(e)}") |
| |
|
| | def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: |
| | """Split text into overlapping chunks.""" |
| | chunks = [] |
| | start = 0 |
| | while start < len(text): |
| | end = start + chunk_size |
| | chunks.append(text[start:end]) |
| | start += (chunk_size - overlap) |
| | return chunks |
| |
|
| | @router.post("/index", response_model=RAGIndexResponse) |
| | async def index_document( |
| | request: RAGIndexRequest, |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db)): |
| | """ |
| | Index a document for AI search (one-time operation). |
| | Downloads from S3, extracts text, generates embeddings, stores in Azure Search. |
| | """ |
| | try: |
| | |
| | source = db.query(db_models.Source).filter( |
| | db_models.Source.s3_key == request.file_key, |
| | db_models.Source.user_id == current_user.id |
| | ).first() |
| | |
| | if not source: |
| | raise HTTPException(status_code=404, detail="File not found") |
| | |
| | |
| | existing = db.query(db_models.RAGDocument).filter( |
| | db_models.RAGDocument.source_id == source.id, |
| | db_models.RAGDocument.user_id == current_user.id |
| | ).first() |
| | |
| | if existing: |
| | return RAGIndexResponse( |
| | id=existing.id, |
| | filename=existing.filename, |
| | azure_doc_id=existing.azure_doc_id, |
| | chunk_count=existing.chunk_count, |
| | message="Document already indexed" |
| | ) |
| | |
| | |
| | logger.info(f"Downloading {request.file_key}...") |
| | |
| | |
| | import tempfile |
| | import os |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(source.filename)[1]) as tmp: |
| | temp_file = tmp.name |
| | |
| | s3_service.s3_client.download_file( |
| | settings.AWS_S3_BUCKET, |
| | request.file_key, |
| | temp_file |
| | ) |
| | |
| | |
| | try: |
| | with open(temp_file, "rb") as f: |
| | file_content = f.read() |
| | |
| | if source.filename.lower().endswith('.pdf'): |
| | text = extract_text_from_pdf(file_content) |
| | elif source.filename.lower().endswith('.txt'): |
| | text = file_content.decode('utf-8') |
| | else: |
| | raise HTTPException(status_code=400, detail="Only PDF and TXT supported") |
| | |
| | if len(text) < 10: |
| | raise HTTPException(status_code=400, detail="No readable text content found in file") |
| | |
| | |
| | chunks = chunk_text(text) |
| | logger.info(f"Created {len(chunks)} chunks") |
| | |
| | |
| | doc_id = str(uuid.uuid4()) |
| | chunk_count = rag_service.index_document( |
| | chunks=chunks, |
| | filename=source.filename, |
| | user_id=current_user.id, |
| | doc_id=doc_id |
| | ) |
| | |
| | |
| | rag_doc = db_models.RAGDocument( |
| | filename=source.filename, |
| | azure_doc_id=doc_id, |
| | chunk_count=chunk_count, |
| | user_id=current_user.id, |
| | source_id=source.id |
| | ) |
| | db.add(rag_doc) |
| | db.commit() |
| | db.refresh(rag_doc) |
| | |
| | logger.info(f"Successfully indexed {source.filename}") |
| | |
| | return RAGIndexResponse( |
| | id=rag_doc.id, |
| | filename=rag_doc.filename, |
| | azure_doc_id=rag_doc.azure_doc_id, |
| | chunk_count=rag_doc.chunk_count, |
| | message="Document indexed successfully for AI conversation" |
| | ) |
| | finally: |
| | if os.path.exists(temp_file): |
| | os.remove(temp_file) |
| | |
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error indexing document: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Indexing failed: {str(e)}") |
| |
|
| | @router.get("/documents", response_model=List[RAGDocumentResponse]) |
| | async def list_indexed_documents( |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db) |
| | ): |
| | """List all documents that have been processed and are ready for chatting.""" |
| | documents = db.query(db_models.RAGDocument).filter( |
| | db_models.RAGDocument.user_id == current_user.id |
| | ).order_by(db_models.RAGDocument.created_at.desc()).all() |
| | |
| | return [ |
| | RAGDocumentResponse( |
| | id=doc.id, |
| | filename=doc.filename, |
| | azure_doc_id=doc.azure_doc_id, |
| | chunk_count=doc.chunk_count, |
| | source_id=doc.source_id, |
| | created_at=doc.created_at.isoformat() |
| | ) |
| | for doc in documents |
| | ] |
| |
|
| | @router.delete("/documents/{doc_id}") |
| | async def delete_indexed_document( |
| | doc_id: str, |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db) |
| | ): |
| | """Remove a document from the AI search index.""" |
| | |
| | rag_doc = db.query(db_models.RAGDocument).filter( |
| | db_models.RAGDocument.azure_doc_id == doc_id, |
| | db_models.RAGDocument.user_id == current_user.id |
| | ).first() |
| | |
| | if not rag_doc: |
| | raise HTTPException(status_code=404, detail="Document index entry not found") |
| | |
| | try: |
| | |
| | rag_service.delete_document(doc_id) |
| | |
| | |
| | db.delete(rag_doc) |
| | db.commit() |
| | |
| | return {"message": "AI index for document deleted successfully"} |
| | |
| | except Exception as e: |
| | logger.error(f"Error deleting document index: {e}") |
| | raise HTTPException(status_code=500, detail=f"Deletion failed: {str(e)}") |
| |
|
| | class RAGSummaryRequest(BaseModel): |
| | rag_doc_id: int |
| |
|
| | @router.post("/summary") |
| | async def generate_document_summary( |
| | request: RAGSummaryRequest, |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db) |
| | ): |
| | """ |
| | Generate an on-the-fly summary for an indexed document. |
| | No data is stored in the database. |
| | """ |
| | try: |
| | |
| | rag_doc = db.query(db_models.RAGDocument).filter( |
| | db_models.RAGDocument.id == request.rag_doc_id, |
| | db_models.RAGDocument.user_id == current_user.id |
| | ).first() |
| |
|
| | if not rag_doc: |
| | raise HTTPException(status_code=404, detail="Document not found") |
| |
|
| | |
| | |
| | results = rag_service.search_document( |
| | query="Give me a general overview and executive summary of this document.", |
| | doc_id=rag_doc.azure_doc_id, |
| | user_id=current_user.id, |
| | top_k=8 |
| | ) |
| |
|
| | if not results: |
| | return {"summary": "No content found to summarize."} |
| |
|
| | context = "\n\n".join([r["content"] for r in results]) |
| |
|
| | |
| | openai_client = OpenAI(api_key=settings.OPENAI_API_KEY) |
| | response = openai_client.chat.completions.create( |
| | model="gpt-4o-mini", |
| | messages=[ |
| | { |
| | "role": "system", |
| | "content": "You are a professional document analyst. Provide a concise, high-level summary (3-5 sentences) of the document based on the provided context." |
| | }, |
| | {"role": "user", "content": f"Context from '{rag_doc.filename}':\n\n{context}"} |
| | ], |
| | temperature=0.5 |
| | ) |
| |
|
| | return {"summary": response.choices[0].message.content} |
| |
|
| | except Exception as e: |
| | logger.error(f"Summary generation failed: {e}") |
| | raise HTTPException(status_code=500, detail=f"Failed to generate summary: {str(e)}") |
| |
|