Spaces:
Sleeping
Sleeping
File size: 5,699 Bytes
ce8355f 2da889a ce8355f 2da889a a305f94 2da889a a305f94 2da889a 87a6c5c ab29d82 a305f94 0a613a1 ac4b23b a305f94 4a1eeaf a305f94 87a6c5c 3f1c021 0df5b09 b56a332 87a6c5c a305f94 0df5b09 a305f94 3f1c021 87a6c5c 6e3312c 2da889a ec5bf47 a305f94 d3f1853 2da889a 0df5b09 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import streamlit as st
import asyncio
import os
import json
import re
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
@st.cache_resource
def get_rag_instance():
return LightRAG(
working_dir="src/sermons_tgp",
embedding_func=openai_embed,
llm_model_func=gpt_4o_mini_complete,
enable_llm_cache=False
)
FULL_CHUNKS_DICT = json.load(open("src/sermons_tgp/kv_store_text_chunks_with_timestamps.json", "r", encoding="utf-8"))
def extract_dc_chunks(context_str):
match = re.search(r'-----Document Chunks\(DC\)-----\s+```json\n(.*?)```', context_str, re.DOTALL)
if not match:
return []
dc_json_str = match.group(1)
return json.loads(dc_json_str)
def find_matches(dc_chunks, full_chunks_dict):
results = []
for dc in dc_chunks:
dc_content = dc.get("content", "").strip()
for chunk_id, chunk_data in full_chunks_dict.items():
if chunk_data.get("content", "").strip() == dc_content:
results.append({
"timestamp": chunk_data.get("timestamp"),
"file_path": chunk_data.get("file_path"),
"content": dc_content
})
break
return results
async def handle_input(user_input, rag):
AVAILABLE_SOURCES = list(set(
chunk["file_path"] for chunk in FULL_CHUNKS_DICT.values() if "file_path" in chunk
))
async def is_query_about_specific_file(user_input: str) -> bool:
prompt = ( f"""User question: \"{user_input}\"\n"
Does the user explicitly refer to a specific sermon, sermon series, or a named teaching by a
preacher (e.g., by name like '2016 6 8 Bishops', 'Kingdom Exceptionalism', 'Joseph Prince's sermon on faith',
'Thankfulness, A Daily Habit - Bill Johnson')? This means the user is asking about content within a *particular* named
message, not just a general topic or a request for a list of sermons."
Respond with only 'yes' or 'no'. Do not add anything else to your answer.""")
response = await gpt_4o_mini_complete(prompt)
return "yes" in response.strip().lower()
async def guess_relevant_file(user_input: str, file_list: list[str]) -> str | None:
files_str = "\n".join(f"- {os.path.basename(f)}" for f in file_list)
prompt = (f"User question: \"{user_input}\"\n"
f"Choose the most relevant file from:\n{files_str}\n"
"Return only filename or 'None'.")
response = await gpt_4o_mini_complete(prompt)
response = response.strip()
if response in [os.path.basename(f) for f in file_list]:
for f in file_list:
if os.path.basename(f) == response:
return f
return None
explicit_source = None
is_specific = await is_query_about_specific_file(user_input)
if is_specific:
guessed_file = await guess_relevant_file(user_input, AVAILABLE_SOURCES)
if guessed_file and guessed_file != "None":
explicit_source = guessed_file
matched_sources = []
answer = ""
if explicit_source:
filtered_chunk_ids = [
chunk_id for chunk_id, chunk in FULL_CHUNKS_DICT.items()
if chunk.get("file_path") == explicit_source
]
ans_param = QueryParam(mode="mix", top_k=1)
answer = await rag.aquery(user_input, param=ans_param)
st.markdown(f"🔍 **Focused on:** **{os.path.basename(explicit_source)}**")
matched_sources = [
{
"timestamp": chunk.get("timestamp"),
"file_path": chunk.get("file_path"),
}
for chunk_id, chunk in FULL_CHUNKS_DICT.items()
if chunk_id in filtered_chunk_ids and "timestamp" in chunk
][:3]
else:
ctx_param = QueryParam(mode="mix", only_need_context=True, top_k=3)
context_chunks = await rag.aquery(f"{user_input}\n<!--ctx-->", param=ctx_param)
ans_param = QueryParam(mode="mix", top_k=3)
answer = await rag.aquery(user_input, param=ans_param)
dc_chunks = extract_dc_chunks(context_chunks)[:3]
matched_sources = find_matches(dc_chunks, FULL_CHUNKS_DICT)
short_answer = answer.split("References")[0].strip()
st.markdown(short_answer)
st.session_state.messages.append({"role": "assistant", "content": short_answer})
if matched_sources and not explicit_source:
sources_md = "#### 📚 Sources:\n" + "\n".join(
f"- **Time:** {src['timestamp']} | **File:** {src['file_path']}"
for src in matched_sources
)
st.markdown(sources_md)
st.session_state.messages.append({"role": "assistant", "content": sources_md})
def main():
st.title("LightRAG: Sermons Video Chat Bot TGP")
rag = get_rag_instance()
if "initialized" not in st.session_state:
asyncio.run(rag.initialize_storages())
st.session_state.initialized = True
if "messages" not in st.session_state:
st.session_state.messages = []
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
user_input = st.chat_input("What do you want to know about TGP sermons?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
with st.chat_message("assistant"):
asyncio.run(handle_input(user_input, rag))
if __name__ == "__main__":
main()
|