|
|
import os |
|
|
import logging |
|
|
import tempfile |
|
|
from typing import List |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
from pinecone import Pinecone, ServerlessSpec |
|
|
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext, Settings |
|
|
from llama_index.vector_stores.pinecone import PineconeVectorStore |
|
|
from llama_index.embeddings.openai import OpenAIEmbedding |
|
|
from llama_index.llms.openai import OpenAI |
|
|
|
|
|
|
|
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "dds-demo-index") |
|
|
PINECONE_REGION = os.getenv("PINECONE_REGION", "us-east-1") |
|
|
PINECONE_CLOUD = os.getenv("PINECONE_CLOUD", "aws") |
|
|
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small") |
|
|
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini") |
|
|
|
|
|
if not PINECONE_API_KEY: |
|
|
raise RuntimeError("Missing PINECONE_API_KEY. Add it in your Space settings (Secrets).") |
|
|
if not OPENAI_API_KEY: |
|
|
raise RuntimeError("Missing OPENAI_API_KEY. Add it in your Space settings (Secrets).") |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger("dds-space") |
|
|
|
|
|
|
|
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
|
|
|
|
|
|
def _ensure_index(index_name: str, dimension: int = 1536): |
|
|
existing = [idx["name"] for idx in pc.list_indexes()] |
|
|
if index_name not in existing: |
|
|
logger.info(f"Creating Pinecone index '{index_name}' (dim={dimension})...") |
|
|
pc.create_index( |
|
|
name=index_name, |
|
|
dimension=dimension, |
|
|
metric="cosine", |
|
|
spec=ServerlessSpec(cloud=PINECONE_CLOUD, region=PINECONE_REGION), |
|
|
) |
|
|
return pc.Index(index_name) |
|
|
|
|
|
pinecone_index = _ensure_index(PINECONE_INDEX_NAME, dimension=1536) |
|
|
|
|
|
|
|
|
|
|
|
Settings.embed_model = OpenAIEmbedding(model=EMBED_MODEL, api_key=OPENAI_API_KEY) |
|
|
Settings.llm = OpenAI(model=LLM_MODEL, api_key=OPENAI_API_KEY) |
|
|
|
|
|
|
|
|
vector_store = PineconeVectorStore(pinecone_index=pinecone_index) |
|
|
|
|
|
def build_or_update_index(files: List[gr.File]) -> str: |
|
|
""" |
|
|
Load the uploaded files, chunk them with LlamaIndex, and upsert into Pinecone. |
|
|
""" |
|
|
if not files: |
|
|
return "Please upload at least one file." |
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
paths = [] |
|
|
for f in files: |
|
|
|
|
|
dst = os.path.join(tmpdir, os.path.basename(f.name)) |
|
|
with open(f.name, "rb") as src, open(dst, "wb") as out: |
|
|
out.write(src.read()) |
|
|
paths.append(dst) |
|
|
|
|
|
docs = SimpleDirectoryReader(input_files=paths).load_data() |
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store) |
|
|
|
|
|
|
|
|
_ = VectorStoreIndex.from_documents( |
|
|
docs, |
|
|
storage_context=storage_context, |
|
|
show_progress=True, |
|
|
) |
|
|
|
|
|
return f"Indexed {len(files)} file(s) into Pinecone index: {PINECONE_INDEX_NAME}." |
|
|
|
|
|
def answer(query: str, top_k: int = 4) -> str: |
|
|
if not query or not query.strip(): |
|
|
return "Ask a question about your uploaded knowledge." |
|
|
|
|
|
index = VectorStoreIndex.from_vector_store(vector_store) |
|
|
qe = index.as_query_engine(similarity_top_k=top_k) |
|
|
resp = qe.query(query) |
|
|
return str(resp) |
|
|
|
|
|
|
|
|
INTRO = ( |
|
|
"Upload PDFs/TXT/Docs to build a Pinecone vector index (1536-d). " |
|
|
"Then ask questions to retrieve & summarize with LlamaIndex + OpenAI." |
|
|
) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown( |
|
|
"<h1 style='text-align:center;'>π RAG with LlamaIndex + Pinecone</h1>" |
|
|
"<p style='text-align:center;'>Omantel/DDS demo Space β minimal, production-friendly layout</p>" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 1) Upload & Index") |
|
|
file_uploader = gr.File(label="Upload documents", file_count="multiple", type="filepath") |
|
|
index_btn = gr.Button("Build / Update Index") |
|
|
index_status = gr.Markdown() |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 2) Ask a Question") |
|
|
query = gr.Textbox(label="Your question", placeholder="e.g., What is the refund policy?") |
|
|
topk = gr.Slider(1, 10, value=4, step=1, label="Top-K") |
|
|
ask_btn = gr.Button("Ask") |
|
|
answer_box = gr.Markdown() |
|
|
|
|
|
gr.Markdown(f"**How it works:** {INTRO}") |
|
|
|
|
|
index_btn.click(build_or_update_index, inputs=[file_uploader], outputs=[index_status]) |
|
|
ask_btn.click(answer, inputs=[query, topk], outputs=[answer_box]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|