Spaces:
Sleeping
Sleeping
File size: 11,438 Bytes
5fea842 ac45446 5fea842 6bc8ca2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 | # -*- coding: utf-8 -*-
"""
Created on Fri Feb 7 13:26:43 2025
@author: Jacob Dearmon
"""
import os
import time
import csv
import datetime
import base64
import gradio as gr
import openai
import io
from PIL import Image
from pinecone import Pinecone
# ---------------------------------------------------
# 1. Convert local SERMONS logo (JFIF) to PIL Image
# ---------------------------------------------------
def to_base64(path_to_img):
"""Convert an image file to Base64 string."""
with open(path_to_img, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return encoded
def base64_to_image(base64_string):
"""Convert Base64 string back to PIL Image."""
image_data = base64.b64decode(base64_string)
# Pillow can handle JFIF as itβs effectively a JPEG
return Image.open(io.BytesIO(image_data))
# Update the path to your JFIF logo file here
SERMONS_LOGO_B64 = to_base64("DP_logo.jfif")
SERMONS_LOGO_IMG = base64_to_image(SERMONS_LOGO_B64)
# ---------------------------------------------------
# 2. Configuration
# ---------------------------------------------------
openai.api_key = os.getenv("OPENAI_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
# From your screenshot: "Cloud: AWS | Region: us-east-1 | Dimension: 1536"
PINECONE_ENV = "us-east-1"
INDEX_NAME = "idx-sermons-1536" # name from Pinecone console
EMBED_DIMENSION = 1536 # matches your screenshot
EMBED_MODEL = "text-embedding-ada-002"
CHAT_MODEL = "gpt-4o"
TOP_K = 20
SIMILARITY_THRESHOLD = 0.4
NEGATIVE_FEEDBACK_CSV = "negative_feedback.csv"
NEUTRAL_FEEDBACK_CSV = "neutral_feedback.csv"
SESSION_HISTORY_CSV = "session_history.csv"
# ---------------------------------------------------
# 2.5. Automatically Initialize Pinecone Index
# ---------------------------------------------------
def init_pinecone_index(index_name=INDEX_NAME, dimension=EMBED_DIMENSION):
"""
Creates (or reuses) the Pinecone index with the given name and dimension.
Returns a Pinecone index object.
"""
pc = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
existing_indexes = pc.list_indexes().names() # get list of index names
if index_name not in existing_indexes:
print(f"[Info] Creating Pinecone index '{index_name}' in env '{PINECONE_ENV}'...")
pc.create_index(name=index_name, dimension=dimension)
time.sleep(5) # short pause
else:
print(f"[Info] Reusing existing Pinecone index '{index_name}' in env '{PINECONE_ENV}'.")
return pc.Index(index_name)
# Initialize Pinecone Index
pc_index = init_pinecone_index()
# ---------------------------------------------------
# 3. Session Memory
# ---------------------------------------------------
session_history = [
{
"role": "system",
"content": "You are a helpful AI assistant specialized in sermons and biblical questions. Answer in a compassionate and loving tone, while recognizing the emotive content of the question - if any."
}
]
# ---------------------------------------------------
# 4. Helper Functions
# ---------------------------------------------------
def embed_text(text: str):
"""Get embeddings from OpenAI."""
try:
resp = openai.Embedding.create(model=EMBED_MODEL, input=[text])
return resp["data"][0]["embedding"]
except Exception as e:
print(f"[Error] Embedding failed: {e}")
return None
def query_index(user_query: str, top_k=TOP_K):
"""Query Pinecone for relevant matches based on 'user_query' embeddings."""
vector = embed_text(user_query)
if vector is None:
return []
try:
response = pc_index.query(vector=vector, top_k=top_k, include_metadata=True)
return response.matches
except Exception as e:
print(f"[Error] Pinecone query failed: {e}")
return []
def build_rag_answer(user_query, matches):
"""
Build a RAG-based answer using retrieved chunks as context for the LLM.
"""
# Combine top matches into a context string
combined_context = "\n\n".join(
f"Chunk ID: {m.id}\n{m.metadata.get('text', '')}"
for m in matches
)
# Create a system message with retrieved context
context_system_message = {
"role": "system",
"content": (
"Relevant reference text from Pinecone:\n"
f"CONTEXT:\n{combined_context}\n\n"
"Answer the user's question using this context where helpful."
)
}
# Full conversation: existing history + new system context + user query
conversation = session_history + [
context_system_message,
{"role": "user", "content": user_query}
]
try:
response = openai.ChatCompletion.create(
model=CHAT_MODEL,
messages=conversation,
temperature=0.2,
max_tokens=1750
)
final_answer = response["choices"][0]["message"]["content"].strip()
except Exception as e:
print(f"[Error] ChatCompletion failed: {e}")
final_answer = "Error generating RAG answer."
# Append the new assistant message to session history
session_history.append({"role": "assistant", "content": final_answer})
return final_answer
def direct_llm_call(user_query):
"""
If no relevant results or below threshold, do a direct LLM call with session history only.
"""
conversation = session_history + [
{"role": "user", "content": user_query}
]
try:
response = openai.ChatCompletion.create(
model=CHAT_MODEL,
messages=conversation,
temperature=0.2
)
final_answer = response["choices"][0]["message"]["content"].strip()
except Exception as e:
print(f"[Error] Direct LLM call failed: {e}")
final_answer = "Error generating direct LLM answer."
session_history.append({"role": "assistant", "content": final_answer})
return final_answer
def query_rag(user_query: str) -> str:
"""
Main pipeline:
1) Add user query to session history
2) Query Pinecone
3) If top match above threshold -> build RAG answer
else do direct call
"""
user_query = user_query.strip()
if not user_query:
return "Please enter a valid query."
# Add user query to session memory
session_history.append({"role": "user", "content": user_query})
# Retrieve relevant context from Pinecone
matches = query_index(user_query, top_k=TOP_K)
if not matches:
# If no matches, do direct LLM call
return direct_llm_call(user_query)
top_score = matches[0].score or 0.0
if top_score >= SIMILARITY_THRESHOLD:
return build_rag_answer(user_query, matches)
else:
return direct_llm_call(user_query)
# ---------------------------------------------------
# 5. Feedback + Logging
# ---------------------------------------------------
def incorporate_feedback_into_pinecone(user_query, answer):
"""
If thumbs-up, store Q&A as a new chunk in Pinecone.
"""
text_chunk = f"User Query: {user_query}\nAI Answer: {answer}"
vector = embed_text(text_chunk)
if vector is None:
return
feedback_id = f"feedback_{int(time.time())}"
metadata = {"source": "feedback", "text": text_chunk}
try:
pc_index.upsert([
{"id": feedback_id, "values": vector, "metadata": metadata}
])
print("[Info] User feedback upserted to Pinecone.")
except Exception as e:
print(f"[Error] Could not upsert feedback: {e}")
def store_feedback_to_csv(user_query, answer, csv_path):
"""
Log negative/neutral feedback in separate CSV.
"""
file_exists = os.path.exists(csv_path)
with open(csv_path, mode="a", newline="", encoding="utf-8") as f:
fieldnames = ["timestamp", "query", "answer"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow({
"timestamp": datetime.datetime.now().isoformat(),
"query": user_query,
"answer": answer
})
print(f"[Info] Feedback logged to {csv_path}.")
def store_session_history(user_query, answer, feedback):
"""
Log (Q, A, feedback) to a single CSV: session_history.csv
"""
file_exists = os.path.exists(SESSION_HISTORY_CSV)
with open(SESSION_HISTORY_CSV, mode="a", newline="", encoding="utf-8") as f:
fieldnames = ["timestamp", "user_query", "ai_answer", "feedback"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow({
"timestamp": datetime.datetime.now().isoformat(),
"user_query": user_query,
"ai_answer": answer,
"feedback": feedback
})
print(f"[Info] Session Q&A stored in {SESSION_HISTORY_CSV}.")
def handle_feedback(user_query, answer, feedback_option):
"""
Called when user selects feedback in Gradio UI.
"""
if not user_query.strip() or not answer.strip():
return "No valid Q&A to provide feedback on."
if feedback_option == "π":
incorporate_feedback_into_pinecone(user_query, answer)
store_session_history(user_query, answer, "positive")
return "π Your Q&A has been stored in Pinecone (and logged)."
elif feedback_option == "βοΈ":
store_feedback_to_csv(user_query, answer, NEUTRAL_FEEDBACK_CSV)
store_session_history(user_query, answer, "neutral")
return "βοΈ Q&A logged to neutral_feedback.csv and session_history.csv."
else: # "π"
store_feedback_to_csv(user_query, answer, NEGATIVE_FEEDBACK_CSV)
store_session_history(user_query, answer, "negative")
return "π Q&A logged to negative_feedback.csv and session_history.csv."
# ---------------------------------------------------
# 6. Gradio Interface
# ---------------------------------------------------
def run_query(user_query):
return query_rag(user_query)
with gr.Blocks() as demo:
# Row with two columns: (1) SERMONS jfif logo, (2) headings
with gr.Row():
with gr.Column(scale=1, min_width=100):
gr.Image(
value=SERMONS_LOGO_IMG,
label=None,
show_label=False,
width=80,
height=80
)
with gr.Column(scale=6):
gr.Markdown("## Derek Prince RAG Demo")
gr.Markdown("Ask questions about DP's sermons data, stored in Pinecone.\n"
"Now with session memory!")
with gr.Column():
user_query = gr.Textbox(
label="Your Query",
lines=1,
placeholder="Ask about a sermon..."
)
get_answer_btn = gr.Button("Get Answer")
answer_output = gr.Textbox(label="AI Answer", lines=4)
feedback_radio = gr.Radio(
choices=["π", "βοΈ", "π"],
value="βοΈ",
label="Feedback"
)
feedback_btn = gr.Button("Submit Feedback")
feedback_result = gr.Label()
get_answer_btn.click(fn=run_query, inputs=[user_query], outputs=[answer_output])
feedback_btn.click(
fn=handle_feedback,
inputs=[user_query, answer_output, feedback_radio],
outputs=[feedback_result]
)
if __name__ == "__main__":
demo.launch()
|