import streamlit as st import asyncio import os import json import re from lightrag import LightRAG, QueryParam from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed api_key = os.getenv("OPENAI_API_KEY") # ---------------------- Sources: parsing + search ---------------------- def extract_dc_chunks(context_str): match = re.search(r'-----Document Chunks\(DC\)-----\s+```json\n(.*?)```', context_str, re.DOTALL) if not match: return [] dc_json_str = match.group(1) return json.loads(dc_json_str) @st.cache_resource def get_rag_instance(): return LightRAG( working_dir="src/sermons", embedding_func=openai_embed, llm_model_func=gpt_4o_mini_complete, enable_llm_cache=False ) FULL_CHUNKS_DICT = json.load(open("src/sermons/kv_store_text_chunks_with_timestamps.json", "r", encoding="utf-8")) def find_matches(dc_chunks, full_chunks_dict): results = [] for dc in dc_chunks: dc_content = dc.get("content", "").strip() for chunk_id, chunk_data in full_chunks_dict.items(): if chunk_data.get("content", "").strip() == dc_content: results.append({ "timestamp": chunk_data.get("timestamp"), "file_path": chunk_data.get("file_path"), "content": dc_content }) break return results async def handle_input(user_input, rag): AVAILABLE_SOURCES = list(set( chunk["file_path"] for chunk in FULL_CHUNKS_DICT.values() if "file_path" in chunk )) async def is_query_about_specific_file(user_input: str) -> bool: prompt = ( f"""User question: \"{user_input}\"\n" Does the user explicitly refer to a specific sermon, sermon series, or a named teaching by a preacher (e.g., by name like '2016 6 8 Bishops', 'Kingdom Exceptionalism', 'Joseph Prince's sermon on faith', 'Thankfulness, A Daily Habit - Bill Johnson')? This means the user is asking about content within a *particular* named message, not just a general topic or a request for a list of sermons." Respond with only 'yes' or 'no'. Do not add anything else to your answer.""") response = await gpt_4o_mini_complete(prompt) return "yes" in response.strip().lower() async def guess_relevant_file(user_input: str, file_list: list[str]) -> str | None: files_str = "\n".join(f"- {os.path.basename(f)}" for f in file_list) prompt = (f"User question: \"{user_input}\"\n" f"Choose the most relevant file from:\n{files_str}\n" "Return only filename or 'None'.") response = await gpt_4o_mini_complete(prompt) response = response.strip() if response in [os.path.basename(f) for f in file_list]: for f in file_list: if os.path.basename(f) == response: return f return None explicit_source = None is_specific = await is_query_about_specific_file(user_input) if is_specific: guessed_file = await guess_relevant_file(user_input, AVAILABLE_SOURCES) if guessed_file and guessed_file != "None": explicit_source = guessed_file matched_sources = [] answer = "" if explicit_source: filtered_chunk_ids = [ chunk_id for chunk_id, chunk in FULL_CHUNKS_DICT.items() if chunk.get("file_path") == explicit_source ] ans_param = QueryParam(mode="mix", top_k=1) answer = await rag.aquery(user_input, param=ans_param) st.markdown(f"🔍 **Focused on:** **{os.path.basename(explicit_source)}**") matched_sources = [ { "timestamp": chunk.get("timestamp"), "file_path": chunk.get("file_path"), } for chunk_id, chunk in FULL_CHUNKS_DICT.items() if chunk_id in filtered_chunk_ids and "timestamp" in chunk ][:3] else: ctx_param = QueryParam(mode="mix", only_need_context=True, top_k=3) context_chunks = await rag.aquery(f"{user_input}\n", param=ctx_param) ans_param = QueryParam(mode="mix", top_k=3) answer = await rag.aquery(user_input, param=ans_param) dc_chunks = extract_dc_chunks(context_chunks)[:3] matched_sources = find_matches(dc_chunks, FULL_CHUNKS_DICT) short_answer = answer.split("References")[0].strip() st.markdown(short_answer) st.session_state.messages.append({"role": "assistant", "content": short_answer}) if matched_sources and not explicit_source: sources_md = "#### 📚 Sources:\n" + "\n".join( f"- **Time:** {src['timestamp']} | **File:** {src['file_path']}" for src in matched_sources ) st.markdown(sources_md) st.session_state.messages.append({"role": "assistant", "content": sources_md}) def main(): st.title("LightRAG: Sermons Video Chat Bot BBG") rag = get_rag_instance() if "initialized" not in st.session_state: asyncio.run(rag.initialize_storages()) st.session_state.initialized = True if "messages" not in st.session_state: st.session_state.messages = [] for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) user_input = st.chat_input("What do you want to know about BBG sermons?") if user_input: st.session_state.messages.append({"role": "user", "content": user_input}) with st.chat_message("user"): st.markdown(user_input) with st.chat_message("assistant"): asyncio.run(handle_input(user_input, rag)) if __name__ == "__main__": main()