Shafaq25's picture
Update app.py
3137489 verified
import os
import sys
import logging
import gradio as gr
import requests
from pinecone import Pinecone, ServerlessSpec
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.writers import DocumentWriter
from haystack_integrations.document_stores.pinecone import PineconeDocumentStore
from haystack_integrations.components.retrievers.pinecone import PineconeEmbeddingRetriever
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.utils import Secret
# --- Logging ---
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# --- Environment Variables ---
api_key = os.getenv("PINECONE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("Please set the PINECONE_API_KEY as an environment variable.")
if not openai_api_key:
raise ValueError("Please set the OPENAI_API_KEY as an environment variable.")
os.environ["OPENAI_API_KEY"] = openai_api_key
# --- Pinecone Setup ---
index_name = "quickstart"
dimension = 1536
pc = Pinecone(api_key=api_key)
# Create index if not exists
if index_name not in [idx['name'] for idx in pc.list_indexes()]:
pc.create_index(
name=index_name,
dimension=dimension,
metric="euclidean",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# --- Document Loading and Processing ---
os.makedirs("data/paul_graham", exist_ok=True)
file_path = "data/paul_graham/paul_graham_essay.txt"
if not os.path.exists(file_path):
url = "https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt"
r = requests.get(url)
with open(file_path, "w") as f:
f.write(r.text)
# --- Haystack Pipeline for Indexing ---
document_store = PineconeDocumentStore(api_key=Secret.from_env_var("PINECONE_API_KEY"), index=index_name)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", TextFileToDocument())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="word", split_length=100))
indexing_pipeline.add_component("embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter.documents", "splitter.documents")
indexing_pipeline.connect("splitter.documents", "embedder.documents")
indexing_pipeline.connect("embedder.documents", "writer.documents")
if document_store.count_documents() == 0:
logging.info("Indexing the document...")
indexing_pipeline.run({"converter": {"sources": [file_path]}})
logging.info("Indexing complete.")
# --- Haystack Query Pipeline ---
template = """
Given the following context, answer the user's question.
If the context isn't sufficient, say that you don't have enough information.
Context:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query }}
"""
query_pipeline = Pipeline()
query_pipeline.add_component("embedder", OpenAITextEmbedder())
query_pipeline.add_component("retriever", PineconeEmbeddingRetriever(document_store=document_store))
query_pipeline.add_component("prompt_builder", PromptBuilder(template=template))
query_pipeline.add_component("llm", OpenAIGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY")))
query_pipeline.connect("embedder.embedding", "retriever.query_embedding") # Corrected connection
query_pipeline.connect("retriever.documents", "prompt_builder.documents")
query_pipeline.connect("prompt_builder", "llm")
# --- Query Function ---
def ask_question(prompt):
try:
results = query_pipeline.run({"embedder": {"text": prompt}, "prompt_builder": {"query": prompt}})
response = results["llm"]["replies"][0]
return str(response)
except Exception as e:
return f"โŒ Error: {str(e)}"
# --- Gradio UI ---
with gr.Blocks(css="""body { background-color: #f5f5dc; font-family: 'Georgia', 'Merriweather', serif;}h1, h2, h3 { color: #4e342e;}.gr-box, .gr-column, .gr-group { border-radius: 15px; padding: 20px; background-color: #fffaf0; box-shadow: 2px 4px 14px rgba(0, 0, 0, 0.1); margin-top: 10px;}textarea, input[type="text"] { background-color: #fffaf0; border: 1px solid #d2b48c; color: #4e342e; border-radius: 8px;}button { background-color: #a1887f; color: white; font-weight: bold; border-radius: 8px; transition: background-color 0.3s ease;}button:hover { background-color: #8d6e63;}.gr-button { border-radius: 8px !important;}""") as demo:
with gr.Column():
gr.Markdown("""
<div style='text-align: center;'>
<h1>๐Ÿง  Paul Graham Essay Q&A</h1>
<div style='font-size: 1.1em; color: #6d4c41; margin-bottom: 1em;'>
Explore insights from Paul Graham's essay using semantic search powered by <strong>Haystack</strong> + <strong>Pinecone</strong>.
</div>
</div>
""")
with gr.Accordion("โ„น๏ธ What is Pinecone Vector Indexing?", open=False):
gr.Markdown("""**Pinecone** is a vector database that stores document embeddings (numeric representations of meaning). When you ask a question, it's converted into a vector and compared against stored vectors to find the most relevant answers โ€” even if they don't match word-for-word.""")
gr.Markdown("### ๐Ÿ“– Ask your question below:")
with gr.Group():
with gr.Row():
user_input = gr.Textbox(
placeholder="E.g., What does Paul Graham say about startups?",
label="Your Question",
lines=2
)
with gr.Row():
output = gr.Textbox(label="Answer", lines=6)
with gr.Row():
submit_btn = gr.Button("๐Ÿ” Search Essay")
clear_btn = gr.Button("๐Ÿงน Clear")
submit_btn.click(fn=ask_question, inputs=user_input, outputs=output)
clear_btn.click(fn=lambda: ("", ""), inputs=None, outputs=[user_input, output])
demo.launch()