File size: 5,807 Bytes
ce8355f 2da889a ce8355f 2da889a c957cab 2da889a 92eb512 2bcc3d7 9ddbabd 2bcc3d7 9ddbabd 2bcc3d7 92eb512 2bcc3d7 92eb512 2bcc3d7 92eb512 2bcc3d7 92eb512 2bcc3d7 2da889a 2bcc3d7 2da889a 2bcc3d7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import streamlit as st
import asyncio
import os
import json
import re
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
api_key = os.getenv("OPENAI_API_KEY")
# ---------------------- Sources: parsing + search ----------------------
def extract_dc_chunks(context_str):
match = re.search(r'-----Document Chunks\(DC\)-----\s+```json\n(.*?)```', context_str, re.DOTALL)
if not match:
return []
dc_json_str = match.group(1)
return json.loads(dc_json_str)
@st.cache_resource
def get_rag_instance():
return LightRAG(
working_dir="src/sermons",
embedding_func=openai_embed,
llm_model_func=gpt_4o_mini_complete,
enable_llm_cache=False
)
FULL_CHUNKS_DICT = json.load(open("src/sermons/kv_store_text_chunks_with_timestamps.json", "r", encoding="utf-8"))
def find_matches(dc_chunks, full_chunks_dict):
results = []
for dc in dc_chunks:
dc_content = dc.get("content", "").strip()
for chunk_id, chunk_data in full_chunks_dict.items():
if chunk_data.get("content", "").strip() == dc_content:
results.append({
"timestamp": chunk_data.get("timestamp"),
"file_path": chunk_data.get("file_path"),
"content": dc_content
})
break
return results
async def handle_input(user_input, rag):
AVAILABLE_SOURCES = list(set(
chunk["file_path"] for chunk in FULL_CHUNKS_DICT.values() if "file_path" in chunk
))
async def is_query_about_specific_file(user_input: str) -> bool:
prompt = ( f"""User question: \"{user_input}\"\n"
Does the user explicitly refer to a specific sermon, sermon series, or a named teaching by a
preacher (e.g., by name like '2016 6 8 Bishops', 'Kingdom Exceptionalism', 'Joseph Prince's sermon on faith',
'Thankfulness, A Daily Habit - Bill Johnson')? This means the user is asking about content within a *particular* named
message, not just a general topic or a request for a list of sermons."
Respond with only 'yes' or 'no'. Do not add anything else to your answer.""")
response = await gpt_4o_mini_complete(prompt)
return "yes" in response.strip().lower()
async def guess_relevant_file(user_input: str, file_list: list[str]) -> str | None:
files_str = "\n".join(f"- {os.path.basename(f)}" for f in file_list)
prompt = (f"User question: \"{user_input}\"\n"
f"Choose the most relevant file from:\n{files_str}\n"
"Return only filename or 'None'.")
response = await gpt_4o_mini_complete(prompt)
response = response.strip()
if response in [os.path.basename(f) for f in file_list]:
for f in file_list:
if os.path.basename(f) == response:
return f
return None
explicit_source = None
is_specific = await is_query_about_specific_file(user_input)
if is_specific:
guessed_file = await guess_relevant_file(user_input, AVAILABLE_SOURCES)
if guessed_file and guessed_file != "None":
explicit_source = guessed_file
matched_sources = []
answer = ""
if explicit_source:
filtered_chunk_ids = [
chunk_id for chunk_id, chunk in FULL_CHUNKS_DICT.items()
if chunk.get("file_path") == explicit_source
]
ans_param = QueryParam(mode="mix", top_k=1)
answer = await rag.aquery(user_input, param=ans_param)
st.markdown(f"๐ **Focused on:** **{os.path.basename(explicit_source)}**")
matched_sources = [
{
"timestamp": chunk.get("timestamp"),
"file_path": chunk.get("file_path"),
}
for chunk_id, chunk in FULL_CHUNKS_DICT.items()
if chunk_id in filtered_chunk_ids and "timestamp" in chunk
][:3]
else:
ctx_param = QueryParam(mode="mix", only_need_context=True, top_k=3)
context_chunks = await rag.aquery(f"{user_input}\n<!--ctx-->", param=ctx_param)
ans_param = QueryParam(mode="mix", top_k=3)
answer = await rag.aquery(user_input, param=ans_param)
dc_chunks = extract_dc_chunks(context_chunks)[:3]
matched_sources = find_matches(dc_chunks, FULL_CHUNKS_DICT)
short_answer = answer.split("References")[0].strip()
st.markdown(short_answer)
st.session_state.messages.append({"role": "assistant", "content": short_answer})
if matched_sources and not explicit_source:
sources_md = "#### ๐ Sources:\n" + "\n".join(
f"- **Time:** {src['timestamp']} | **File:** {src['file_path']}"
for src in matched_sources
)
st.markdown(sources_md)
st.session_state.messages.append({"role": "assistant", "content": sources_md})
def main():
st.title("LightRAG: Sermons Video Chat Bot BBG")
rag = get_rag_instance()
if "initialized" not in st.session_state:
asyncio.run(rag.initialize_storages())
st.session_state.initialized = True
if "messages" not in st.session_state:
st.session_state.messages = []
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
user_input = st.chat_input("What do you want to know about BBG sermons?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
with st.chat_message("assistant"):
asyncio.run(handle_input(user_input, rag))
if __name__ == "__main__":
main() |