Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import asyncio | |
| import os | |
| import json | |
| import re | |
| from lightrag import LightRAG, QueryParam | |
| from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed | |
| def get_rag_instance(): | |
| return LightRAG( | |
| working_dir="src/sermons_tgp", | |
| embedding_func=openai_embed, | |
| llm_model_func=gpt_4o_mini_complete, | |
| enable_llm_cache=False | |
| ) | |
| FULL_CHUNKS_DICT = json.load(open("src/sermons_tgp/kv_store_text_chunks_with_timestamps.json", "r", encoding="utf-8")) | |
| def extract_dc_chunks(context_str): | |
| match = re.search(r'-----Document Chunks\(DC\)-----\s+```json\n(.*?)```', context_str, re.DOTALL) | |
| if not match: | |
| return [] | |
| dc_json_str = match.group(1) | |
| return json.loads(dc_json_str) | |
| def find_matches(dc_chunks, full_chunks_dict): | |
| results = [] | |
| for dc in dc_chunks: | |
| dc_content = dc.get("content", "").strip() | |
| for chunk_id, chunk_data in full_chunks_dict.items(): | |
| if chunk_data.get("content", "").strip() == dc_content: | |
| results.append({ | |
| "timestamp": chunk_data.get("timestamp"), | |
| "file_path": chunk_data.get("file_path"), | |
| "content": dc_content | |
| }) | |
| break | |
| return results | |
| async def handle_input(user_input, rag): | |
| AVAILABLE_SOURCES = list(set( | |
| chunk["file_path"] for chunk in FULL_CHUNKS_DICT.values() if "file_path" in chunk | |
| )) | |
| async def is_query_about_specific_file(user_input: str) -> bool: | |
| prompt = ( f"""User question: \"{user_input}\"\n" | |
| Does the user explicitly refer to a specific sermon, sermon series, or a named teaching by a | |
| preacher (e.g., by name like '2016 6 8 Bishops', 'Kingdom Exceptionalism', 'Joseph Prince's sermon on faith', | |
| 'Thankfulness, A Daily Habit - Bill Johnson')? This means the user is asking about content within a *particular* named | |
| message, not just a general topic or a request for a list of sermons." | |
| Respond with only 'yes' or 'no'. Do not add anything else to your answer.""") | |
| response = await gpt_4o_mini_complete(prompt) | |
| return "yes" in response.strip().lower() | |
| async def guess_relevant_file(user_input: str, file_list: list[str]) -> str | None: | |
| files_str = "\n".join(f"- {os.path.basename(f)}" for f in file_list) | |
| prompt = (f"User question: \"{user_input}\"\n" | |
| f"Choose the most relevant file from:\n{files_str}\n" | |
| "Return only filename or 'None'.") | |
| response = await gpt_4o_mini_complete(prompt) | |
| response = response.strip() | |
| if response in [os.path.basename(f) for f in file_list]: | |
| for f in file_list: | |
| if os.path.basename(f) == response: | |
| return f | |
| return None | |
| explicit_source = None | |
| is_specific = await is_query_about_specific_file(user_input) | |
| if is_specific: | |
| guessed_file = await guess_relevant_file(user_input, AVAILABLE_SOURCES) | |
| if guessed_file and guessed_file != "None": | |
| explicit_source = guessed_file | |
| matched_sources = [] | |
| answer = "" | |
| if explicit_source: | |
| filtered_chunk_ids = [ | |
| chunk_id for chunk_id, chunk in FULL_CHUNKS_DICT.items() | |
| if chunk.get("file_path") == explicit_source | |
| ] | |
| ans_param = QueryParam(mode="mix", top_k=1) | |
| answer = await rag.aquery(user_input, param=ans_param) | |
| st.markdown(f"🔍 **Focused on:** **{os.path.basename(explicit_source)}**") | |
| matched_sources = [ | |
| { | |
| "timestamp": chunk.get("timestamp"), | |
| "file_path": chunk.get("file_path"), | |
| } | |
| for chunk_id, chunk in FULL_CHUNKS_DICT.items() | |
| if chunk_id in filtered_chunk_ids and "timestamp" in chunk | |
| ][:3] | |
| else: | |
| ctx_param = QueryParam(mode="mix", only_need_context=True, top_k=3) | |
| context_chunks = await rag.aquery(f"{user_input}\n<!--ctx-->", param=ctx_param) | |
| ans_param = QueryParam(mode="mix", top_k=3) | |
| answer = await rag.aquery(user_input, param=ans_param) | |
| dc_chunks = extract_dc_chunks(context_chunks)[:3] | |
| matched_sources = find_matches(dc_chunks, FULL_CHUNKS_DICT) | |
| short_answer = answer.split("References")[0].strip() | |
| st.markdown(short_answer) | |
| st.session_state.messages.append({"role": "assistant", "content": short_answer}) | |
| if matched_sources and not explicit_source: | |
| sources_md = "#### 📚 Sources:\n" + "\n".join( | |
| f"- **Time:** {src['timestamp']} | **File:** {src['file_path']}" | |
| for src in matched_sources | |
| ) | |
| st.markdown(sources_md) | |
| st.session_state.messages.append({"role": "assistant", "content": sources_md}) | |
| def main(): | |
| st.title("LightRAG: Sermons Video Chat Bot TGP") | |
| rag = get_rag_instance() | |
| if "initialized" not in st.session_state: | |
| asyncio.run(rag.initialize_storages()) | |
| st.session_state.initialized = True | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| user_input = st.chat_input("What do you want to know about TGP sermons?") | |
| if user_input: | |
| st.session_state.messages.append({"role": "user", "content": user_input}) | |
| with st.chat_message("user"): | |
| st.markdown(user_input) | |
| with st.chat_message("assistant"): | |
| asyncio.run(handle_input(user_input, rag)) | |
| if __name__ == "__main__": | |
| main() | |