Video_Rag_TGP / src /streamlit_app.py
SaviAnna's picture
Update src/streamlit_app.py
0a613a1 verified
import streamlit as st
import asyncio
import os
import json
import re
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
@st.cache_resource
def get_rag_instance():
return LightRAG(
working_dir="src/sermons_tgp",
embedding_func=openai_embed,
llm_model_func=gpt_4o_mini_complete,
enable_llm_cache=False
)
FULL_CHUNKS_DICT = json.load(open("src/sermons_tgp/kv_store_text_chunks_with_timestamps.json", "r", encoding="utf-8"))
def extract_dc_chunks(context_str):
match = re.search(r'-----Document Chunks\(DC\)-----\s+```json\n(.*?)```', context_str, re.DOTALL)
if not match:
return []
dc_json_str = match.group(1)
return json.loads(dc_json_str)
def find_matches(dc_chunks, full_chunks_dict):
results = []
for dc in dc_chunks:
dc_content = dc.get("content", "").strip()
for chunk_id, chunk_data in full_chunks_dict.items():
if chunk_data.get("content", "").strip() == dc_content:
results.append({
"timestamp": chunk_data.get("timestamp"),
"file_path": chunk_data.get("file_path"),
"content": dc_content
})
break
return results
async def handle_input(user_input, rag):
AVAILABLE_SOURCES = list(set(
chunk["file_path"] for chunk in FULL_CHUNKS_DICT.values() if "file_path" in chunk
))
async def is_query_about_specific_file(user_input: str) -> bool:
prompt = ( f"""User question: \"{user_input}\"\n"
Does the user explicitly refer to a specific sermon, sermon series, or a named teaching by a
preacher (e.g., by name like '2016 6 8 Bishops', 'Kingdom Exceptionalism', 'Joseph Prince's sermon on faith',
'Thankfulness, A Daily Habit - Bill Johnson')? This means the user is asking about content within a *particular* named
message, not just a general topic or a request for a list of sermons."
Respond with only 'yes' or 'no'. Do not add anything else to your answer.""")
response = await gpt_4o_mini_complete(prompt)
return "yes" in response.strip().lower()
async def guess_relevant_file(user_input: str, file_list: list[str]) -> str | None:
files_str = "\n".join(f"- {os.path.basename(f)}" for f in file_list)
prompt = (f"User question: \"{user_input}\"\n"
f"Choose the most relevant file from:\n{files_str}\n"
"Return only filename or 'None'.")
response = await gpt_4o_mini_complete(prompt)
response = response.strip()
if response in [os.path.basename(f) for f in file_list]:
for f in file_list:
if os.path.basename(f) == response:
return f
return None
explicit_source = None
is_specific = await is_query_about_specific_file(user_input)
if is_specific:
guessed_file = await guess_relevant_file(user_input, AVAILABLE_SOURCES)
if guessed_file and guessed_file != "None":
explicit_source = guessed_file
matched_sources = []
answer = ""
if explicit_source:
filtered_chunk_ids = [
chunk_id for chunk_id, chunk in FULL_CHUNKS_DICT.items()
if chunk.get("file_path") == explicit_source
]
ans_param = QueryParam(mode="mix", top_k=1)
answer = await rag.aquery(user_input, param=ans_param)
st.markdown(f"🔍 **Focused on:** **{os.path.basename(explicit_source)}**")
matched_sources = [
{
"timestamp": chunk.get("timestamp"),
"file_path": chunk.get("file_path"),
}
for chunk_id, chunk in FULL_CHUNKS_DICT.items()
if chunk_id in filtered_chunk_ids and "timestamp" in chunk
][:3]
else:
ctx_param = QueryParam(mode="mix", only_need_context=True, top_k=3)
context_chunks = await rag.aquery(f"{user_input}\n<!--ctx-->", param=ctx_param)
ans_param = QueryParam(mode="mix", top_k=3)
answer = await rag.aquery(user_input, param=ans_param)
dc_chunks = extract_dc_chunks(context_chunks)[:3]
matched_sources = find_matches(dc_chunks, FULL_CHUNKS_DICT)
short_answer = answer.split("References")[0].strip()
st.markdown(short_answer)
st.session_state.messages.append({"role": "assistant", "content": short_answer})
if matched_sources and not explicit_source:
sources_md = "#### 📚 Sources:\n" + "\n".join(
f"- **Time:** {src['timestamp']} | **File:** {src['file_path']}"
for src in matched_sources
)
st.markdown(sources_md)
st.session_state.messages.append({"role": "assistant", "content": sources_md})
def main():
st.title("LightRAG: Sermons Video Chat Bot TGP")
rag = get_rag_instance()
if "initialized" not in st.session_state:
asyncio.run(rag.initialize_storages())
st.session_state.initialized = True
if "messages" not in st.session_state:
st.session_state.messages = []
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
user_input = st.chat_input("What do you want to know about TGP sermons?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
with st.chat_message("assistant"):
asyncio.run(handle_input(user_input, rag))
if __name__ == "__main__":
main()