Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import shutil | |
| from typing import List, Dict, Any | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles # Add this import | |
| from pydantic import BaseModel | |
| from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader, PDFLoader | |
| from aimakerspace.openai_utils.prompts import ( | |
| UserRolePrompt, | |
| SystemRolePrompt, | |
| AssistantRolePrompt, | |
| ) | |
| from aimakerspace.openai_utils.embedding import EmbeddingModel | |
| from aimakerspace.vectordatabase import VectorDatabase | |
| from aimakerspace.openai_utils.chatmodel import ChatOpenAI | |
| app = FastAPI() | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, replace with your frontend URL | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize components | |
| text_splitter = CharacterTextSplitter() | |
| vector_db = None | |
| chat_openai = ChatOpenAI(model_name="gpt-3.5-turbo") | |
| system_template = """\ | |
| Use the following context to answer a users question. If you cannot find the answer in the context, say you don't know the answer.""" | |
| system_role_prompt = SystemRolePrompt(system_template) | |
| user_prompt_template = """\ | |
| Context: | |
| {context} | |
| Question: | |
| {question} | |
| """ | |
| user_role_prompt = UserRolePrompt(user_prompt_template) | |
| class QuestionRequest(BaseModel): | |
| question: str | |
| class QuestionResponse(BaseModel): | |
| response: str | |
| context: List[tuple] | |
| def process_file(file_path: str, file_name: str): | |
| print(f"Processing file: {file_name}") | |
| # Create appropriate loader | |
| if file_name.lower().endswith('.pdf'): | |
| loader = PDFLoader(file_path) | |
| else: | |
| loader = TextFileLoader(file_path) | |
| # Load and process the documents | |
| documents = loader.load_documents() | |
| texts = text_splitter.split_texts(documents) | |
| return texts | |
| class RetrievalAugmentedQAPipeline: | |
| def __init__(self, llm: ChatOpenAI, vector_db_retriever: VectorDatabase) -> None: | |
| self.llm = llm | |
| self.vector_db_retriever = vector_db_retriever | |
| async def run_pipeline(self, user_query: str): | |
| context_list = self.vector_db_retriever.search_by_text(user_query, k=4) | |
| context_prompt = "" | |
| for context in context_list: | |
| context_prompt += context[0] + "\n" | |
| formatted_system_prompt = system_role_prompt.create_message() | |
| formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt) | |
| # Remove await since run is not an async method | |
| response = await self.llm.run([formatted_system_prompt, formatted_user_prompt]) | |
| return {"response": response, "context": context_list} | |
| async def upload_file(file: UploadFile = File(...)): | |
| global vector_db | |
| # Validate file type | |
| if not file.filename.lower().endswith(('.txt', '.pdf')): | |
| raise HTTPException(status_code=400, detail="Only .txt and .pdf files are allowed") | |
| # Create a temporary file | |
| suffix = f".{file.filename.split('.')[-1]}" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: | |
| # Copy the uploaded file content to the temporary file | |
| content = await file.read() | |
| print(f"File read complete. Size: {len(content)} bytes") | |
| with open(temp_file.name, "wb") as f: | |
| f.write(content) | |
| try: | |
| # Process the file | |
| texts = process_file(temp_file.name, file.filename) | |
| print(f"Processing {len(texts)} text chunks") | |
| # Create a vector store | |
| vector_db = VectorDatabase() | |
| vector_db = await vector_db.abuild_from_list(texts) | |
| print("Document processing complete") | |
| return {"message": f"File {file.filename} processed successfully", "chunks": len(texts)} | |
| finally: | |
| # Clean up the temporary file | |
| try: | |
| os.unlink(temp_file.name) | |
| except Exception as e: | |
| print(f"Error cleaning up temporary file: {e}") | |
| async def ask_question(request: QuestionRequest): | |
| global vector_db | |
| if vector_db is None: | |
| raise HTTPException(status_code=400, detail="Please upload a file first") | |
| # Create a chain | |
| retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline( | |
| vector_db_retriever=vector_db, | |
| llm=chat_openai | |
| ) | |
| # Run the pipeline | |
| result = await retrieval_augmented_qa_pipeline.run_pipeline(request.question) | |
| return QuestionResponse( | |
| response=result["response"], | |
| context=result["context"] | |
| ) | |
| async def health_check(): | |
| return {"status": "healthy"} | |
| # app.mount("/", StaticFiles(directory="../frontend/build", html=True), name="frontend") | |