Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Request | |
| from pydantic import BaseModel | |
| import requests | |
| import os | |
| from scrape import extract_text | |
| from llama_index.llms.gemini import Gemini | |
| from dotenv import load_dotenv | |
| import google.generativeai as genai | |
| load_dotenv() | |
| from llama_index.core.node_parser import SentenceSplitter | |
| from llama_index.core.schema import TextNode | |
| from llama_index.embeddings.gemini import GeminiEmbedding | |
| from llama_index.core import ServiceContext | |
| from pinecone import Pinecone, ServerlessSpec | |
| import pinecone | |
| from llama_index.vector_stores.pinecone import PineconeVectorStore | |
| from llama_index.core import VectorStoreIndex | |
| from llama_index.core import StorageContext | |
| from llama_index.core import Settings | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from starlette.middleware.cors import CORSMiddleware | |
| # FastAPI initialization | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["DELETE", "GET", "POST", "PUT"], | |
| allow_headers=["*"], | |
| ) | |
| # Load environment variables | |
| google_api_key = os.getenv("GOOGLE_API_KEY") | |
| pinecone_api_key = os.getenv("PINE_CONE_API_KEY") | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # In-memory state management | |
| state = { | |
| "index_created": False, | |
| "vector_store": None, | |
| } | |
| class URLInput(BaseModel): | |
| url: str | |
| index_name: str | |
| class QueryInput(BaseModel): | |
| query: str | |
| index_name: str | |
| class delete_index(BaseModel): | |
| index_name:str | |
| async def test(): | |
| return {"Message":"Application is working !!"} | |
| async def create_index(url_input: URLInput): | |
| user_input = url_input.url | |
| html_text = requests.get(user_input).text | |
| extracted_text = extract_text(html_text) | |
| text_parser = SentenceSplitter(chunk_size=550, chunk_overlap=200) | |
| text_chunks = text_parser.split_text(extracted_text) | |
| nodes = [] | |
| for idx, text_chunk in enumerate(text_chunks): | |
| node = TextNode(text=text_chunk) | |
| nodes.append(node) | |
| embed_model = OpenAIEmbedding() | |
| for node in nodes: | |
| node_embedding = embed_model.get_text_embedding(node.get_content(metadata_mode="all")) | |
| node.embedding = node_embedding | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| index_name = url_input.index_name | |
| create_index_custom(index_name, pc) | |
| pinecone_index = pc.Index(index_name) | |
| vector_store = PineconeVectorStore(pinecone_index=pinecone_index) | |
| if pinecone_index.describe_index_stats()['total_vector_count'] != len(text_chunks): | |
| vector_store.add(nodes) | |
| state["index_created"] = True | |
| state["vector_store"] = vector_store | |
| return {"message": "Index created successfully"} | |
| async def query_index(query_input: QueryInput): | |
| if not state["index_created"]: | |
| raise HTTPException(status_code=400, detail="Index not created") | |
| query = query_input.query | |
| index_name = query_input.index_name | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| if index_name not in pc.list_indexes().names(): | |
| return {"message":"Index Not present ! Please try a different Index Name!"} | |
| pinecone_index = pc.Index(index_name) | |
| vector_store = PineconeVectorStore(pinecone_index=pinecone_index) | |
| # vector_store = state["vector_store"] | |
| index = VectorStoreIndex.from_vector_store(vector_store=vector_store) | |
| query_engine = index.as_query_engine() | |
| response = query_engine.query(query) | |
| return {"response": response.response} | |
| async def delete_index(delete_input:delete_index): | |
| index_name = delete_input.index_name | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| if index_name not in pc.list_indexes().names(): | |
| return {"message":"Index is not already present to delete it !"} | |
| pc.delete_index(index_name) | |
| return {"message": "Index deleted successfully"} | |
| def create_index_custom(index_name: str, pc): | |
| if index_name not in pc.list_indexes().names(): | |
| pc.create_index( | |
| name=index_name, | |
| dimension=1536, # Replace with your model dimensions | |
| metric="cosine", # Replace with your model metric | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") | |
| ) |