rag_dp_model / app.py
jtdearmon's picture
Update app.py
ac45446 verified
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 7 13:26:43 2025
@author: Jacob Dearmon
"""
import os
import time
import csv
import datetime
import base64
import gradio as gr
import openai
import io
from PIL import Image
from pinecone import Pinecone
# ---------------------------------------------------
# 1. Convert local SERMONS logo (JFIF) to PIL Image
# ---------------------------------------------------
def to_base64(path_to_img):
"""Convert an image file to Base64 string."""
with open(path_to_img, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return encoded
def base64_to_image(base64_string):
"""Convert Base64 string back to PIL Image."""
image_data = base64.b64decode(base64_string)
# Pillow can handle JFIF as it’s effectively a JPEG
return Image.open(io.BytesIO(image_data))
# Update the path to your JFIF logo file here
SERMONS_LOGO_B64 = to_base64("DP_logo.jfif")
SERMONS_LOGO_IMG = base64_to_image(SERMONS_LOGO_B64)
# ---------------------------------------------------
# 2. Configuration
# ---------------------------------------------------
openai.api_key = os.getenv("OPENAI_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
# From your screenshot: "Cloud: AWS | Region: us-east-1 | Dimension: 1536"
PINECONE_ENV = "us-east-1"
INDEX_NAME = "idx-sermons-1536" # name from Pinecone console
EMBED_DIMENSION = 1536 # matches your screenshot
EMBED_MODEL = "text-embedding-ada-002"
CHAT_MODEL = "gpt-4o"
TOP_K = 20
SIMILARITY_THRESHOLD = 0.4
NEGATIVE_FEEDBACK_CSV = "negative_feedback.csv"
NEUTRAL_FEEDBACK_CSV = "neutral_feedback.csv"
SESSION_HISTORY_CSV = "session_history.csv"
# ---------------------------------------------------
# 2.5. Automatically Initialize Pinecone Index
# ---------------------------------------------------
def init_pinecone_index(index_name=INDEX_NAME, dimension=EMBED_DIMENSION):
"""
Creates (or reuses) the Pinecone index with the given name and dimension.
Returns a Pinecone index object.
"""
pc = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
existing_indexes = pc.list_indexes().names() # get list of index names
if index_name not in existing_indexes:
print(f"[Info] Creating Pinecone index '{index_name}' in env '{PINECONE_ENV}'...")
pc.create_index(name=index_name, dimension=dimension)
time.sleep(5) # short pause
else:
print(f"[Info] Reusing existing Pinecone index '{index_name}' in env '{PINECONE_ENV}'.")
return pc.Index(index_name)
# Initialize Pinecone Index
pc_index = init_pinecone_index()
# ---------------------------------------------------
# 3. Session Memory
# ---------------------------------------------------
session_history = [
{
"role": "system",
"content": "You are a helpful AI assistant specialized in sermons and biblical questions. Answer in a compassionate and loving tone, while recognizing the emotive content of the question - if any."
}
]
# ---------------------------------------------------
# 4. Helper Functions
# ---------------------------------------------------
def embed_text(text: str):
"""Get embeddings from OpenAI."""
try:
resp = openai.Embedding.create(model=EMBED_MODEL, input=[text])
return resp["data"][0]["embedding"]
except Exception as e:
print(f"[Error] Embedding failed: {e}")
return None
def query_index(user_query: str, top_k=TOP_K):
"""Query Pinecone for relevant matches based on 'user_query' embeddings."""
vector = embed_text(user_query)
if vector is None:
return []
try:
response = pc_index.query(vector=vector, top_k=top_k, include_metadata=True)
return response.matches
except Exception as e:
print(f"[Error] Pinecone query failed: {e}")
return []
def build_rag_answer(user_query, matches):
"""
Build a RAG-based answer using retrieved chunks as context for the LLM.
"""
# Combine top matches into a context string
combined_context = "\n\n".join(
f"Chunk ID: {m.id}\n{m.metadata.get('text', '')}"
for m in matches
)
# Create a system message with retrieved context
context_system_message = {
"role": "system",
"content": (
"Relevant reference text from Pinecone:\n"
f"CONTEXT:\n{combined_context}\n\n"
"Answer the user's question using this context where helpful."
)
}
# Full conversation: existing history + new system context + user query
conversation = session_history + [
context_system_message,
{"role": "user", "content": user_query}
]
try:
response = openai.ChatCompletion.create(
model=CHAT_MODEL,
messages=conversation,
temperature=0.2,
max_tokens=1750
)
final_answer = response["choices"][0]["message"]["content"].strip()
except Exception as e:
print(f"[Error] ChatCompletion failed: {e}")
final_answer = "Error generating RAG answer."
# Append the new assistant message to session history
session_history.append({"role": "assistant", "content": final_answer})
return final_answer
def direct_llm_call(user_query):
"""
If no relevant results or below threshold, do a direct LLM call with session history only.
"""
conversation = session_history + [
{"role": "user", "content": user_query}
]
try:
response = openai.ChatCompletion.create(
model=CHAT_MODEL,
messages=conversation,
temperature=0.2
)
final_answer = response["choices"][0]["message"]["content"].strip()
except Exception as e:
print(f"[Error] Direct LLM call failed: {e}")
final_answer = "Error generating direct LLM answer."
session_history.append({"role": "assistant", "content": final_answer})
return final_answer
def query_rag(user_query: str) -> str:
"""
Main pipeline:
1) Add user query to session history
2) Query Pinecone
3) If top match above threshold -> build RAG answer
else do direct call
"""
user_query = user_query.strip()
if not user_query:
return "Please enter a valid query."
# Add user query to session memory
session_history.append({"role": "user", "content": user_query})
# Retrieve relevant context from Pinecone
matches = query_index(user_query, top_k=TOP_K)
if not matches:
# If no matches, do direct LLM call
return direct_llm_call(user_query)
top_score = matches[0].score or 0.0
if top_score >= SIMILARITY_THRESHOLD:
return build_rag_answer(user_query, matches)
else:
return direct_llm_call(user_query)
# ---------------------------------------------------
# 5. Feedback + Logging
# ---------------------------------------------------
def incorporate_feedback_into_pinecone(user_query, answer):
"""
If thumbs-up, store Q&A as a new chunk in Pinecone.
"""
text_chunk = f"User Query: {user_query}\nAI Answer: {answer}"
vector = embed_text(text_chunk)
if vector is None:
return
feedback_id = f"feedback_{int(time.time())}"
metadata = {"source": "feedback", "text": text_chunk}
try:
pc_index.upsert([
{"id": feedback_id, "values": vector, "metadata": metadata}
])
print("[Info] User feedback upserted to Pinecone.")
except Exception as e:
print(f"[Error] Could not upsert feedback: {e}")
def store_feedback_to_csv(user_query, answer, csv_path):
"""
Log negative/neutral feedback in separate CSV.
"""
file_exists = os.path.exists(csv_path)
with open(csv_path, mode="a", newline="", encoding="utf-8") as f:
fieldnames = ["timestamp", "query", "answer"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow({
"timestamp": datetime.datetime.now().isoformat(),
"query": user_query,
"answer": answer
})
print(f"[Info] Feedback logged to {csv_path}.")
def store_session_history(user_query, answer, feedback):
"""
Log (Q, A, feedback) to a single CSV: session_history.csv
"""
file_exists = os.path.exists(SESSION_HISTORY_CSV)
with open(SESSION_HISTORY_CSV, mode="a", newline="", encoding="utf-8") as f:
fieldnames = ["timestamp", "user_query", "ai_answer", "feedback"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow({
"timestamp": datetime.datetime.now().isoformat(),
"user_query": user_query,
"ai_answer": answer,
"feedback": feedback
})
print(f"[Info] Session Q&A stored in {SESSION_HISTORY_CSV}.")
def handle_feedback(user_query, answer, feedback_option):
"""
Called when user selects feedback in Gradio UI.
"""
if not user_query.strip() or not answer.strip():
return "No valid Q&A to provide feedback on."
if feedback_option == "πŸ‘":
incorporate_feedback_into_pinecone(user_query, answer)
store_session_history(user_query, answer, "positive")
return "πŸ‘ Your Q&A has been stored in Pinecone (and logged)."
elif feedback_option == "βš–οΈ":
store_feedback_to_csv(user_query, answer, NEUTRAL_FEEDBACK_CSV)
store_session_history(user_query, answer, "neutral")
return "βš–οΈ Q&A logged to neutral_feedback.csv and session_history.csv."
else: # "πŸ‘Ž"
store_feedback_to_csv(user_query, answer, NEGATIVE_FEEDBACK_CSV)
store_session_history(user_query, answer, "negative")
return "πŸ‘Ž Q&A logged to negative_feedback.csv and session_history.csv."
# ---------------------------------------------------
# 6. Gradio Interface
# ---------------------------------------------------
def run_query(user_query):
return query_rag(user_query)
with gr.Blocks() as demo:
# Row with two columns: (1) SERMONS jfif logo, (2) headings
with gr.Row():
with gr.Column(scale=1, min_width=100):
gr.Image(
value=SERMONS_LOGO_IMG,
label=None,
show_label=False,
width=80,
height=80
)
with gr.Column(scale=6):
gr.Markdown("## Derek Prince RAG Demo")
gr.Markdown("Ask questions about DP's sermons data, stored in Pinecone.\n"
"Now with session memory!")
with gr.Column():
user_query = gr.Textbox(
label="Your Query",
lines=1,
placeholder="Ask about a sermon..."
)
get_answer_btn = gr.Button("Get Answer")
answer_output = gr.Textbox(label="AI Answer", lines=4)
feedback_radio = gr.Radio(
choices=["πŸ‘", "βš–οΈ", "πŸ‘Ž"],
value="βš–οΈ",
label="Feedback"
)
feedback_btn = gr.Button("Submit Feedback")
feedback_result = gr.Label()
get_answer_btn.click(fn=run_query, inputs=[user_query], outputs=[answer_output])
feedback_btn.click(
fn=handle_feedback,
inputs=[user_query, answer_output, feedback_radio],
outputs=[feedback_result]
)
if __name__ == "__main__":
demo.launch()