Spaces:
Sleeping
Sleeping
| import time | |
| import threading | |
| from pinecone import Pinecone, ServerlessSpec | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_pinecone import PineconeVectorStore | |
| from langchain_core.documents import Document | |
| from config import PINECONE_API_KEY | |
| from chat_utils import get_chain | |
| from fastapi import HTTPException | |
| from pinecone.openapi_support.exceptions import NotFoundException | |
| # Initialize Pinecone client | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # Initialize embedding model | |
| embedding_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| # Create vector store using session_id | |
| def create_vector_store(session_id, texts): | |
| index_name = session_id | |
| # Create index if not exists | |
| existing_indexes = [index["name"] for index in pc.list_indexes()] | |
| if index_name not in existing_indexes: | |
| pc.create_index( | |
| name=index_name, | |
| dimension=768, # Adjust this if your model outputs a different dimension | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") | |
| ) | |
| while not pc.describe_index(index_name).status["ready"]: | |
| time.sleep(2) | |
| # Get index | |
| index = pc.Index(index_name) | |
| # Convert texts into Document format | |
| documents = [Document(page_content=text) for text in texts] | |
| # Create vector store and add documents | |
| vectorstore = PineconeVectorStore(index=index, embedding=embedding_model) | |
| vectorstore.add_documents(documents=documents) | |
| # Query vector store | |
| def query_vector_store(session_id, question): | |
| index_name = session_id | |
| try: | |
| index = pc.Index(index_name) | |
| vectorstore = PineconeVectorStore(index=index, embedding=embedding_model) | |
| retriever = vectorstore.as_retriever( | |
| search_type="similarity_score_threshold", | |
| search_kwargs={"k": 3, "score_threshold": 0.5}, | |
| ) | |
| docs = retriever.invoke(question) | |
| chain = get_chain() | |
| result = chain({"input_documents": docs, "question": question}, return_only_outputs=True) | |
| return result["output_text"] | |
| except NotFoundException as e: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Session '{session_id}' has expired or does not exist." | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"An unexpected error occurred: {str(e)}" | |
| ) | |
| # Delete vector store with optional delay | |
| def delete_vector_store(index_name, delay=0): | |
| def delayed_delete(): | |
| if delay: | |
| time.sleep(delay) | |
| try: | |
| pc.delete_index(index_name) | |
| print(f"Deleted index {index_name}") | |
| except Exception as e: | |
| print(f"Error deleting index {index_name}: {e}") | |
| threading.Thread(target=delayed_delete).start() | |