Spaces:
Sleeping
Sleeping
File size: 4,281 Bytes
9122a83 c8977ae 9122a83 6f23579 9122a83 02f2517 9122a83 e9e4739 9122a83 02f2517 9122a83 02f2517 9122a83 e9e4739 6269479 e9e4739 9122a83 6269479 9122a83 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import requests
import os
from scrape import extract_text
from llama_index.llms.gemini import Gemini
from dotenv import load_dotenv
import google.generativeai as genai
load_dotenv()
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import TextNode
from llama_index.embeddings.gemini import GeminiEmbedding
from llama_index.core import ServiceContext
from pinecone import Pinecone, ServerlessSpec
import pinecone
from llama_index.vector_stores.pinecone import PineconeVectorStore
from llama_index.core import VectorStoreIndex
from llama_index.core import StorageContext
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from starlette.middleware.cors import CORSMiddleware
# FastAPI initialization
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["DELETE", "GET", "POST", "PUT"],
allow_headers=["*"],
)
# Load environment variables
google_api_key = os.getenv("GOOGLE_API_KEY")
pinecone_api_key = os.getenv("PINE_CONE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
# In-memory state management
state = {
"index_created": False,
"vector_store": None,
}
class URLInput(BaseModel):
url: str
index_name: str
class QueryInput(BaseModel):
query: str
index_name: str
class delete_index(BaseModel):
index_name:str
@app.get("/")
async def test():
return {"Message":"Application is working !!"}
@app.post("/create_index/")
async def create_index(url_input: URLInput):
user_input = url_input.url
html_text = requests.get(user_input).text
extracted_text = extract_text(html_text)
text_parser = SentenceSplitter(chunk_size=550, chunk_overlap=200)
text_chunks = text_parser.split_text(extracted_text)
nodes = []
for idx, text_chunk in enumerate(text_chunks):
node = TextNode(text=text_chunk)
nodes.append(node)
embed_model = OpenAIEmbedding()
for node in nodes:
node_embedding = embed_model.get_text_embedding(node.get_content(metadata_mode="all"))
node.embedding = node_embedding
pc = Pinecone(api_key=pinecone_api_key)
index_name = url_input.index_name
create_index_custom(index_name, pc)
pinecone_index = pc.Index(index_name)
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
if pinecone_index.describe_index_stats()['total_vector_count'] != len(text_chunks):
vector_store.add(nodes)
state["index_created"] = True
state["vector_store"] = vector_store
return {"message": "Index created successfully"}
@app.post("/query/")
async def query_index(query_input: QueryInput):
if not state["index_created"]:
raise HTTPException(status_code=400, detail="Index not created")
query = query_input.query
index_name = query_input.index_name
pc = Pinecone(api_key=pinecone_api_key)
if index_name not in pc.list_indexes().names():
return {"message":"Index Not present ! Please try a different Index Name!"}
pinecone_index = pc.Index(index_name)
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
# vector_store = state["vector_store"]
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
query_engine = index.as_query_engine()
response = query_engine.query(query)
return {"response": response.response}
@app.delete("/delete_index/")
async def delete_index(delete_input:delete_index):
index_name = delete_input.index_name
pc = Pinecone(api_key=pinecone_api_key)
if index_name not in pc.list_indexes().names():
return {"message":"Index is not already present to delete it !"}
pc.delete_index(index_name)
return {"message": "Index deleted successfully"}
def create_index_custom(index_name: str, pc):
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=1536, # Replace with your model dimensions
metric="cosine", # Replace with your model metric
spec=ServerlessSpec(cloud="aws", region="us-east-1")
) |