from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel import requests import os from scrape import extract_text from llama_index.llms.gemini import Gemini from dotenv import load_dotenv import google.generativeai as genai load_dotenv() from llama_index.core.node_parser import SentenceSplitter from llama_index.core.schema import TextNode from llama_index.embeddings.gemini import GeminiEmbedding from llama_index.core import ServiceContext from pinecone import Pinecone, ServerlessSpec import pinecone from llama_index.vector_stores.pinecone import PineconeVectorStore from llama_index.core import VectorStoreIndex from llama_index.core import StorageContext from llama_index.core import Settings from llama_index.embeddings.openai import OpenAIEmbedding from starlette.middleware.cors import CORSMiddleware # FastAPI initialization app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["DELETE", "GET", "POST", "PUT"], allow_headers=["*"], ) # Load environment variables google_api_key = os.getenv("GOOGLE_API_KEY") pinecone_api_key = os.getenv("PINE_CONE_API_KEY") openai_api_key = os.getenv("OPENAI_API_KEY") # In-memory state management state = { "index_created": False, "vector_store": None, } class URLInput(BaseModel): url: str index_name: str class QueryInput(BaseModel): query: str index_name: str class delete_index(BaseModel): index_name:str @app.get("/") async def test(): return {"Message":"Application is working !!"} @app.post("/create_index/") async def create_index(url_input: URLInput): user_input = url_input.url html_text = requests.get(user_input).text extracted_text = extract_text(html_text) text_parser = SentenceSplitter(chunk_size=550, chunk_overlap=200) text_chunks = text_parser.split_text(extracted_text) nodes = [] for idx, text_chunk in enumerate(text_chunks): node = TextNode(text=text_chunk) nodes.append(node) embed_model = OpenAIEmbedding() for node in nodes: node_embedding = embed_model.get_text_embedding(node.get_content(metadata_mode="all")) node.embedding = node_embedding pc = Pinecone(api_key=pinecone_api_key) index_name = url_input.index_name create_index_custom(index_name, pc) pinecone_index = pc.Index(index_name) vector_store = PineconeVectorStore(pinecone_index=pinecone_index) if pinecone_index.describe_index_stats()['total_vector_count'] != len(text_chunks): vector_store.add(nodes) state["index_created"] = True state["vector_store"] = vector_store return {"message": "Index created successfully"} @app.post("/query/") async def query_index(query_input: QueryInput): if not state["index_created"]: raise HTTPException(status_code=400, detail="Index not created") query = query_input.query index_name = query_input.index_name pc = Pinecone(api_key=pinecone_api_key) if index_name not in pc.list_indexes().names(): return {"message":"Index Not present ! Please try a different Index Name!"} pinecone_index = pc.Index(index_name) vector_store = PineconeVectorStore(pinecone_index=pinecone_index) # vector_store = state["vector_store"] index = VectorStoreIndex.from_vector_store(vector_store=vector_store) query_engine = index.as_query_engine() response = query_engine.query(query) return {"response": response.response} @app.delete("/delete_index/") async def delete_index(delete_input:delete_index): index_name = delete_input.index_name pc = Pinecone(api_key=pinecone_api_key) if index_name not in pc.list_indexes().names(): return {"message":"Index is not already present to delete it !"} pc.delete_index(index_name) return {"message": "Index deleted successfully"} def create_index_custom(index_name: str, pc): if index_name not in pc.list_indexes().names(): pc.create_index( name=index_name, dimension=1536, # Replace with your model dimensions metric="cosine", # Replace with your model metric spec=ServerlessSpec(cloud="aws", region="us-east-1") )