Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, UploadFile, File | |
| from pydantic import BaseModel, Json | |
| from uuid import uuid4, UUID | |
| from typing import Optional | |
| import pymupdf | |
| from pinecone import Pinecone, ServerlessSpec | |
| import os | |
| from dotenv import load_dotenv | |
| from rag import * | |
| from fastapi.responses import StreamingResponse | |
| import json | |
| from prompts import * | |
| load_dotenv() | |
| pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
| common_namespace = os.environ.get("COMMON_NAMESPACE") | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| import time | |
| index_name = os.environ.get("INDEX_NAME") # change if desired | |
| existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] | |
| if index_name not in existing_indexes: | |
| pc.create_index( | |
| name=index_name, | |
| dimension=3072, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
| ) | |
| while not pc.describe_index(index_name).status["ready"]: | |
| time.sleep(1) | |
| index = pc.Index(index_name) | |
| app = FastAPI() | |
| class StyleWriter(BaseModel): | |
| style: str | |
| tonality: str | |
| class UserInput(BaseModel): | |
| prompt: str | |
| enterprise_id: str | |
| stream: Optional[bool] = False | |
| messages: Optional[list[dict]] = [] | |
| style_tonality: Optional[StyleWriter] = None | |
| class EnterpriseData(BaseModel): | |
| name: str | |
| id: Optional[str] = None | |
| filename: Optional[str] = None | |
| tasks = [] | |
| def greet_json(): | |
| return {"Hello": "World!"} | |
| async def upload_file(file: UploadFile, enterprise_data: Json[EnterpriseData]): | |
| try: | |
| # Read the uploaded file | |
| contents = await file.read() | |
| enterprise_name = enterprise_data.name.replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip() | |
| if enterprise_data.filename is not None: | |
| filename = enterprise_data.filename | |
| else: | |
| filename = file.filename | |
| # Assign a new UUID if id is not provided | |
| if enterprise_data.id is None: | |
| clean_name = remove_non_standard_ascii(enterprise_name) | |
| enterprise_data.id = f"{clean_name}_{uuid4()}" | |
| # Open the file with PyMuPDF | |
| pdf_document = pymupdf.open(stream=contents, filetype="pdf") | |
| # Extract all text from the document | |
| text = "" | |
| for page in pdf_document: | |
| text += page.get_text() | |
| # Split the text into chunks | |
| text_chunks = get_text_chunks(text) | |
| # Create a vector store | |
| vector_store = get_vectorstore(text_chunks, filename=filename, file_type="pdf", namespace=enterprise_data.id, index=index,enterprise_name=enterprise_name) | |
| if vector_store: | |
| return { | |
| "file_name":filename, | |
| "enterprise_id": enterprise_data.id, | |
| "number_of_chunks": len(text_chunks), | |
| "filename_id":vector_store["filename_id"], | |
| "enterprise_name":enterprise_name | |
| } | |
| else: | |
| raise HTTPException(status_code=500, detail="Could not create vector store") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| finally: | |
| await file.close() | |
| def get_documents(enterprise_id: str): | |
| try: | |
| docs_names = [] | |
| for ids in index.list(namespace=enterprise_id): | |
| for id in ids: | |
| name_doc = "_".join(id.split("_")[:-1]) | |
| if name_doc not in docs_names: | |
| docs_names.append(name_doc) | |
| return docs_names | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| def delete_document(enterprise_id: str, filename_id: str): | |
| try: | |
| for ids in index.list(prefix=f"{filename_id}_", namespace=enterprise_id): | |
| index.delete(ids=ids, namespace=enterprise_id) | |
| return {"message": "Document deleted", "chunks_deleted": ids} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| def delete_all_documents(enterprise_id: str): | |
| try: | |
| index.delete(namespace=enterprise_id,delete_all=True) | |
| return {"message": "All documents deleted"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| import async_timeout | |
| import asyncio | |
| GENERATION_TIMEOUT_SEC = 60 | |
| async def stream_generator(response, prompt): | |
| async with async_timeout.timeout(GENERATION_TIMEOUT_SEC): | |
| try: | |
| async for chunk in response: | |
| if isinstance(chunk, bytes): | |
| chunk = chunk.decode('utf-8') # Convert bytes to str if needed | |
| yield json.dumps({"prompt": prompt, "content": chunk}) | |
| except asyncio.TimeoutError: | |
| raise HTTPException(status_code=504, detail="Stream timed out") | |
| def generate_answer(user_input: UserInput): | |
| try: | |
| prompt = user_input.prompt | |
| enterprise_id = user_input.enterprise_id | |
| template_prompt = base_template | |
| context = get_retreive_answer(enterprise_id, prompt, index, common_namespace) | |
| #final_prompt_simplified = prompt_formatting(prompt,template,context) | |
| if not context: | |
| context = "" | |
| if user_input.style_tonality is None: | |
| prompt_formated = prompt_reformatting(template_prompt,context,prompt) | |
| answer = generate_response_via_langchain(prompt, model="gpt-4o",stream=user_input.stream,context = context , messages=user_input.messages,template=template_prompt) | |
| else: | |
| prompt_formated = prompt_reformatting(template_prompt,context,prompt,style=user_input.style_tonality.style,tonality=user_input.style_tonality.tonality) | |
| answer = generate_response_via_langchain(prompt, model="gpt-4o",stream=user_input.stream,context = context , messages=user_input.messages,style=user_input.style_tonality.style,tonality=user_input.style_tonality.tonality,template=template_prompt) | |
| if user_input.stream: | |
| return StreamingResponse(stream_generator(answer,prompt_formated), media_type="application/json") | |
| return { | |
| "prompt": prompt_formated, | |
| "answer": answer, | |
| "context": context, | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |