# -*- coding: utf-8 -*- """ Created on Fri Feb 7 13:26:43 2025 @author: Jacob Dearmon """ import os import time import csv import datetime import base64 import gradio as gr import openai import io from PIL import Image from pinecone import Pinecone # --------------------------------------------------- # 1. Convert local SERMONS logo (JFIF) to PIL Image # --------------------------------------------------- def to_base64(path_to_img): """Convert an image file to Base64 string.""" with open(path_to_img, "rb") as f: encoded = base64.b64encode(f.read()).decode("utf-8") return encoded def base64_to_image(base64_string): """Convert Base64 string back to PIL Image.""" image_data = base64.b64decode(base64_string) # Pillow can handle JFIF as it’s effectively a JPEG return Image.open(io.BytesIO(image_data)) # Update the path to your JFIF logo file here SERMONS_LOGO_B64 = to_base64("DP_logo.jfif") SERMONS_LOGO_IMG = base64_to_image(SERMONS_LOGO_B64) # --------------------------------------------------- # 2. Configuration # --------------------------------------------------- openai.api_key = os.getenv("OPENAI_API_KEY") PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") # From your screenshot: "Cloud: AWS | Region: us-east-1 | Dimension: 1536" PINECONE_ENV = "us-east-1" INDEX_NAME = "idx-sermons-1536" # name from Pinecone console EMBED_DIMENSION = 1536 # matches your screenshot EMBED_MODEL = "text-embedding-ada-002" CHAT_MODEL = "gpt-4o" TOP_K = 20 SIMILARITY_THRESHOLD = 0.4 NEGATIVE_FEEDBACK_CSV = "negative_feedback.csv" NEUTRAL_FEEDBACK_CSV = "neutral_feedback.csv" SESSION_HISTORY_CSV = "session_history.csv" # --------------------------------------------------- # 2.5. Automatically Initialize Pinecone Index # --------------------------------------------------- def init_pinecone_index(index_name=INDEX_NAME, dimension=EMBED_DIMENSION): """ Creates (or reuses) the Pinecone index with the given name and dimension. Returns a Pinecone index object. """ pc = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENV) existing_indexes = pc.list_indexes().names() # get list of index names if index_name not in existing_indexes: print(f"[Info] Creating Pinecone index '{index_name}' in env '{PINECONE_ENV}'...") pc.create_index(name=index_name, dimension=dimension) time.sleep(5) # short pause else: print(f"[Info] Reusing existing Pinecone index '{index_name}' in env '{PINECONE_ENV}'.") return pc.Index(index_name) # Initialize Pinecone Index pc_index = init_pinecone_index() # --------------------------------------------------- # 3. Session Memory # --------------------------------------------------- session_history = [ { "role": "system", "content": "You are a helpful AI assistant specialized in sermons and biblical questions. Answer in a compassionate and loving tone, while recognizing the emotive content of the question - if any." } ] # --------------------------------------------------- # 4. Helper Functions # --------------------------------------------------- def embed_text(text: str): """Get embeddings from OpenAI.""" try: resp = openai.Embedding.create(model=EMBED_MODEL, input=[text]) return resp["data"][0]["embedding"] except Exception as e: print(f"[Error] Embedding failed: {e}") return None def query_index(user_query: str, top_k=TOP_K): """Query Pinecone for relevant matches based on 'user_query' embeddings.""" vector = embed_text(user_query) if vector is None: return [] try: response = pc_index.query(vector=vector, top_k=top_k, include_metadata=True) return response.matches except Exception as e: print(f"[Error] Pinecone query failed: {e}") return [] def build_rag_answer(user_query, matches): """ Build a RAG-based answer using retrieved chunks as context for the LLM. """ # Combine top matches into a context string combined_context = "\n\n".join( f"Chunk ID: {m.id}\n{m.metadata.get('text', '')}" for m in matches ) # Create a system message with retrieved context context_system_message = { "role": "system", "content": ( "Relevant reference text from Pinecone:\n" f"CONTEXT:\n{combined_context}\n\n" "Answer the user's question using this context where helpful." ) } # Full conversation: existing history + new system context + user query conversation = session_history + [ context_system_message, {"role": "user", "content": user_query} ] try: response = openai.ChatCompletion.create( model=CHAT_MODEL, messages=conversation, temperature=0.2, max_tokens=1750 ) final_answer = response["choices"][0]["message"]["content"].strip() except Exception as e: print(f"[Error] ChatCompletion failed: {e}") final_answer = "Error generating RAG answer." # Append the new assistant message to session history session_history.append({"role": "assistant", "content": final_answer}) return final_answer def direct_llm_call(user_query): """ If no relevant results or below threshold, do a direct LLM call with session history only. """ conversation = session_history + [ {"role": "user", "content": user_query} ] try: response = openai.ChatCompletion.create( model=CHAT_MODEL, messages=conversation, temperature=0.2 ) final_answer = response["choices"][0]["message"]["content"].strip() except Exception as e: print(f"[Error] Direct LLM call failed: {e}") final_answer = "Error generating direct LLM answer." session_history.append({"role": "assistant", "content": final_answer}) return final_answer def query_rag(user_query: str) -> str: """ Main pipeline: 1) Add user query to session history 2) Query Pinecone 3) If top match above threshold -> build RAG answer else do direct call """ user_query = user_query.strip() if not user_query: return "Please enter a valid query." # Add user query to session memory session_history.append({"role": "user", "content": user_query}) # Retrieve relevant context from Pinecone matches = query_index(user_query, top_k=TOP_K) if not matches: # If no matches, do direct LLM call return direct_llm_call(user_query) top_score = matches[0].score or 0.0 if top_score >= SIMILARITY_THRESHOLD: return build_rag_answer(user_query, matches) else: return direct_llm_call(user_query) # --------------------------------------------------- # 5. Feedback + Logging # --------------------------------------------------- def incorporate_feedback_into_pinecone(user_query, answer): """ If thumbs-up, store Q&A as a new chunk in Pinecone. """ text_chunk = f"User Query: {user_query}\nAI Answer: {answer}" vector = embed_text(text_chunk) if vector is None: return feedback_id = f"feedback_{int(time.time())}" metadata = {"source": "feedback", "text": text_chunk} try: pc_index.upsert([ {"id": feedback_id, "values": vector, "metadata": metadata} ]) print("[Info] User feedback upserted to Pinecone.") except Exception as e: print(f"[Error] Could not upsert feedback: {e}") def store_feedback_to_csv(user_query, answer, csv_path): """ Log negative/neutral feedback in separate CSV. """ file_exists = os.path.exists(csv_path) with open(csv_path, mode="a", newline="", encoding="utf-8") as f: fieldnames = ["timestamp", "query", "answer"] writer = csv.DictWriter(f, fieldnames=fieldnames) if not file_exists: writer.writeheader() writer.writerow({ "timestamp": datetime.datetime.now().isoformat(), "query": user_query, "answer": answer }) print(f"[Info] Feedback logged to {csv_path}.") def store_session_history(user_query, answer, feedback): """ Log (Q, A, feedback) to a single CSV: session_history.csv """ file_exists = os.path.exists(SESSION_HISTORY_CSV) with open(SESSION_HISTORY_CSV, mode="a", newline="", encoding="utf-8") as f: fieldnames = ["timestamp", "user_query", "ai_answer", "feedback"] writer = csv.DictWriter(f, fieldnames=fieldnames) if not file_exists: writer.writeheader() writer.writerow({ "timestamp": datetime.datetime.now().isoformat(), "user_query": user_query, "ai_answer": answer, "feedback": feedback }) print(f"[Info] Session Q&A stored in {SESSION_HISTORY_CSV}.") def handle_feedback(user_query, answer, feedback_option): """ Called when user selects feedback in Gradio UI. """ if not user_query.strip() or not answer.strip(): return "No valid Q&A to provide feedback on." if feedback_option == "πŸ‘": incorporate_feedback_into_pinecone(user_query, answer) store_session_history(user_query, answer, "positive") return "πŸ‘ Your Q&A has been stored in Pinecone (and logged)." elif feedback_option == "βš–οΈ": store_feedback_to_csv(user_query, answer, NEUTRAL_FEEDBACK_CSV) store_session_history(user_query, answer, "neutral") return "βš–οΈ Q&A logged to neutral_feedback.csv and session_history.csv." else: # "πŸ‘Ž" store_feedback_to_csv(user_query, answer, NEGATIVE_FEEDBACK_CSV) store_session_history(user_query, answer, "negative") return "πŸ‘Ž Q&A logged to negative_feedback.csv and session_history.csv." # --------------------------------------------------- # 6. Gradio Interface # --------------------------------------------------- def run_query(user_query): return query_rag(user_query) with gr.Blocks() as demo: # Row with two columns: (1) SERMONS jfif logo, (2) headings with gr.Row(): with gr.Column(scale=1, min_width=100): gr.Image( value=SERMONS_LOGO_IMG, label=None, show_label=False, width=80, height=80 ) with gr.Column(scale=6): gr.Markdown("## Derek Prince RAG Demo") gr.Markdown("Ask questions about DP's sermons data, stored in Pinecone.\n" "Now with session memory!") with gr.Column(): user_query = gr.Textbox( label="Your Query", lines=1, placeholder="Ask about a sermon..." ) get_answer_btn = gr.Button("Get Answer") answer_output = gr.Textbox(label="AI Answer", lines=4) feedback_radio = gr.Radio( choices=["πŸ‘", "βš–οΈ", "πŸ‘Ž"], value="βš–οΈ", label="Feedback" ) feedback_btn = gr.Button("Submit Feedback") feedback_result = gr.Label() get_answer_btn.click(fn=run_query, inputs=[user_query], outputs=[answer_output]) feedback_btn.click( fn=handle_feedback, inputs=[user_query, answer_output, feedback_radio], outputs=[feedback_result] ) if __name__ == "__main__": demo.launch()