Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on Fri Feb 7 13:26:43 2025 | |
| @author: Jacob Dearmon | |
| """ | |
| import os | |
| import time | |
| import csv | |
| import datetime | |
| import base64 | |
| import gradio as gr | |
| import openai | |
| import io | |
| from PIL import Image | |
| from pinecone import Pinecone | |
| # --------------------------------------------------- | |
| # 1. Convert local SERMONS logo (JFIF) to PIL Image | |
| # --------------------------------------------------- | |
| def to_base64(path_to_img): | |
| """Convert an image file to Base64 string.""" | |
| with open(path_to_img, "rb") as f: | |
| encoded = base64.b64encode(f.read()).decode("utf-8") | |
| return encoded | |
| def base64_to_image(base64_string): | |
| """Convert Base64 string back to PIL Image.""" | |
| image_data = base64.b64decode(base64_string) | |
| # Pillow can handle JFIF as itβs effectively a JPEG | |
| return Image.open(io.BytesIO(image_data)) | |
| # Update the path to your JFIF logo file here | |
| SERMONS_LOGO_B64 = to_base64("DP_logo.jfif") | |
| SERMONS_LOGO_IMG = base64_to_image(SERMONS_LOGO_B64) | |
| # --------------------------------------------------- | |
| # 2. Configuration | |
| # --------------------------------------------------- | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| # From your screenshot: "Cloud: AWS | Region: us-east-1 | Dimension: 1536" | |
| PINECONE_ENV = "us-east-1" | |
| INDEX_NAME = "idx-sermons-1536" # name from Pinecone console | |
| EMBED_DIMENSION = 1536 # matches your screenshot | |
| EMBED_MODEL = "text-embedding-ada-002" | |
| CHAT_MODEL = "gpt-4o" | |
| TOP_K = 20 | |
| SIMILARITY_THRESHOLD = 0.4 | |
| NEGATIVE_FEEDBACK_CSV = "negative_feedback.csv" | |
| NEUTRAL_FEEDBACK_CSV = "neutral_feedback.csv" | |
| SESSION_HISTORY_CSV = "session_history.csv" | |
| # --------------------------------------------------- | |
| # 2.5. Automatically Initialize Pinecone Index | |
| # --------------------------------------------------- | |
| def init_pinecone_index(index_name=INDEX_NAME, dimension=EMBED_DIMENSION): | |
| """ | |
| Creates (or reuses) the Pinecone index with the given name and dimension. | |
| Returns a Pinecone index object. | |
| """ | |
| pc = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENV) | |
| existing_indexes = pc.list_indexes().names() # get list of index names | |
| if index_name not in existing_indexes: | |
| print(f"[Info] Creating Pinecone index '{index_name}' in env '{PINECONE_ENV}'...") | |
| pc.create_index(name=index_name, dimension=dimension) | |
| time.sleep(5) # short pause | |
| else: | |
| print(f"[Info] Reusing existing Pinecone index '{index_name}' in env '{PINECONE_ENV}'.") | |
| return pc.Index(index_name) | |
| # Initialize Pinecone Index | |
| pc_index = init_pinecone_index() | |
| # --------------------------------------------------- | |
| # 3. Session Memory | |
| # --------------------------------------------------- | |
| session_history = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful AI assistant specialized in sermons and biblical questions. Answer in a compassionate and loving tone, while recognizing the emotive content of the question - if any." | |
| } | |
| ] | |
| # --------------------------------------------------- | |
| # 4. Helper Functions | |
| # --------------------------------------------------- | |
| def embed_text(text: str): | |
| """Get embeddings from OpenAI.""" | |
| try: | |
| resp = openai.Embedding.create(model=EMBED_MODEL, input=[text]) | |
| return resp["data"][0]["embedding"] | |
| except Exception as e: | |
| print(f"[Error] Embedding failed: {e}") | |
| return None | |
| def query_index(user_query: str, top_k=TOP_K): | |
| """Query Pinecone for relevant matches based on 'user_query' embeddings.""" | |
| vector = embed_text(user_query) | |
| if vector is None: | |
| return [] | |
| try: | |
| response = pc_index.query(vector=vector, top_k=top_k, include_metadata=True) | |
| return response.matches | |
| except Exception as e: | |
| print(f"[Error] Pinecone query failed: {e}") | |
| return [] | |
| def build_rag_answer(user_query, matches): | |
| """ | |
| Build a RAG-based answer using retrieved chunks as context for the LLM. | |
| """ | |
| # Combine top matches into a context string | |
| combined_context = "\n\n".join( | |
| f"Chunk ID: {m.id}\n{m.metadata.get('text', '')}" | |
| for m in matches | |
| ) | |
| # Create a system message with retrieved context | |
| context_system_message = { | |
| "role": "system", | |
| "content": ( | |
| "Relevant reference text from Pinecone:\n" | |
| f"CONTEXT:\n{combined_context}\n\n" | |
| "Answer the user's question using this context where helpful." | |
| ) | |
| } | |
| # Full conversation: existing history + new system context + user query | |
| conversation = session_history + [ | |
| context_system_message, | |
| {"role": "user", "content": user_query} | |
| ] | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model=CHAT_MODEL, | |
| messages=conversation, | |
| temperature=0.2, | |
| max_tokens=1750 | |
| ) | |
| final_answer = response["choices"][0]["message"]["content"].strip() | |
| except Exception as e: | |
| print(f"[Error] ChatCompletion failed: {e}") | |
| final_answer = "Error generating RAG answer." | |
| # Append the new assistant message to session history | |
| session_history.append({"role": "assistant", "content": final_answer}) | |
| return final_answer | |
| def direct_llm_call(user_query): | |
| """ | |
| If no relevant results or below threshold, do a direct LLM call with session history only. | |
| """ | |
| conversation = session_history + [ | |
| {"role": "user", "content": user_query} | |
| ] | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model=CHAT_MODEL, | |
| messages=conversation, | |
| temperature=0.2 | |
| ) | |
| final_answer = response["choices"][0]["message"]["content"].strip() | |
| except Exception as e: | |
| print(f"[Error] Direct LLM call failed: {e}") | |
| final_answer = "Error generating direct LLM answer." | |
| session_history.append({"role": "assistant", "content": final_answer}) | |
| return final_answer | |
| def query_rag(user_query: str) -> str: | |
| """ | |
| Main pipeline: | |
| 1) Add user query to session history | |
| 2) Query Pinecone | |
| 3) If top match above threshold -> build RAG answer | |
| else do direct call | |
| """ | |
| user_query = user_query.strip() | |
| if not user_query: | |
| return "Please enter a valid query." | |
| # Add user query to session memory | |
| session_history.append({"role": "user", "content": user_query}) | |
| # Retrieve relevant context from Pinecone | |
| matches = query_index(user_query, top_k=TOP_K) | |
| if not matches: | |
| # If no matches, do direct LLM call | |
| return direct_llm_call(user_query) | |
| top_score = matches[0].score or 0.0 | |
| if top_score >= SIMILARITY_THRESHOLD: | |
| return build_rag_answer(user_query, matches) | |
| else: | |
| return direct_llm_call(user_query) | |
| # --------------------------------------------------- | |
| # 5. Feedback + Logging | |
| # --------------------------------------------------- | |
| def incorporate_feedback_into_pinecone(user_query, answer): | |
| """ | |
| If thumbs-up, store Q&A as a new chunk in Pinecone. | |
| """ | |
| text_chunk = f"User Query: {user_query}\nAI Answer: {answer}" | |
| vector = embed_text(text_chunk) | |
| if vector is None: | |
| return | |
| feedback_id = f"feedback_{int(time.time())}" | |
| metadata = {"source": "feedback", "text": text_chunk} | |
| try: | |
| pc_index.upsert([ | |
| {"id": feedback_id, "values": vector, "metadata": metadata} | |
| ]) | |
| print("[Info] User feedback upserted to Pinecone.") | |
| except Exception as e: | |
| print(f"[Error] Could not upsert feedback: {e}") | |
| def store_feedback_to_csv(user_query, answer, csv_path): | |
| """ | |
| Log negative/neutral feedback in separate CSV. | |
| """ | |
| file_exists = os.path.exists(csv_path) | |
| with open(csv_path, mode="a", newline="", encoding="utf-8") as f: | |
| fieldnames = ["timestamp", "query", "answer"] | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| if not file_exists: | |
| writer.writeheader() | |
| writer.writerow({ | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "query": user_query, | |
| "answer": answer | |
| }) | |
| print(f"[Info] Feedback logged to {csv_path}.") | |
| def store_session_history(user_query, answer, feedback): | |
| """ | |
| Log (Q, A, feedback) to a single CSV: session_history.csv | |
| """ | |
| file_exists = os.path.exists(SESSION_HISTORY_CSV) | |
| with open(SESSION_HISTORY_CSV, mode="a", newline="", encoding="utf-8") as f: | |
| fieldnames = ["timestamp", "user_query", "ai_answer", "feedback"] | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| if not file_exists: | |
| writer.writeheader() | |
| writer.writerow({ | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "user_query": user_query, | |
| "ai_answer": answer, | |
| "feedback": feedback | |
| }) | |
| print(f"[Info] Session Q&A stored in {SESSION_HISTORY_CSV}.") | |
| def handle_feedback(user_query, answer, feedback_option): | |
| """ | |
| Called when user selects feedback in Gradio UI. | |
| """ | |
| if not user_query.strip() or not answer.strip(): | |
| return "No valid Q&A to provide feedback on." | |
| if feedback_option == "π": | |
| incorporate_feedback_into_pinecone(user_query, answer) | |
| store_session_history(user_query, answer, "positive") | |
| return "π Your Q&A has been stored in Pinecone (and logged)." | |
| elif feedback_option == "βοΈ": | |
| store_feedback_to_csv(user_query, answer, NEUTRAL_FEEDBACK_CSV) | |
| store_session_history(user_query, answer, "neutral") | |
| return "βοΈ Q&A logged to neutral_feedback.csv and session_history.csv." | |
| else: # "π" | |
| store_feedback_to_csv(user_query, answer, NEGATIVE_FEEDBACK_CSV) | |
| store_session_history(user_query, answer, "negative") | |
| return "π Q&A logged to negative_feedback.csv and session_history.csv." | |
| # --------------------------------------------------- | |
| # 6. Gradio Interface | |
| # --------------------------------------------------- | |
| def run_query(user_query): | |
| return query_rag(user_query) | |
| with gr.Blocks() as demo: | |
| # Row with two columns: (1) SERMONS jfif logo, (2) headings | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=100): | |
| gr.Image( | |
| value=SERMONS_LOGO_IMG, | |
| label=None, | |
| show_label=False, | |
| width=80, | |
| height=80 | |
| ) | |
| with gr.Column(scale=6): | |
| gr.Markdown("## Derek Prince RAG Demo") | |
| gr.Markdown("Ask questions about DP's sermons data, stored in Pinecone.\n" | |
| "Now with session memory!") | |
| with gr.Column(): | |
| user_query = gr.Textbox( | |
| label="Your Query", | |
| lines=1, | |
| placeholder="Ask about a sermon..." | |
| ) | |
| get_answer_btn = gr.Button("Get Answer") | |
| answer_output = gr.Textbox(label="AI Answer", lines=4) | |
| feedback_radio = gr.Radio( | |
| choices=["π", "βοΈ", "π"], | |
| value="βοΈ", | |
| label="Feedback" | |
| ) | |
| feedback_btn = gr.Button("Submit Feedback") | |
| feedback_result = gr.Label() | |
| get_answer_btn.click(fn=run_query, inputs=[user_query], outputs=[answer_output]) | |
| feedback_btn.click( | |
| fn=handle_feedback, | |
| inputs=[user_query, answer_output, feedback_radio], | |
| outputs=[feedback_result] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |