Financial_bot / app.py
Pushkya's picture
Upload 30 files
8299003 verified
Raw
History Blame Contribute Delete
7.07 kB
"""
app.py
======
Streamlit chat UI for the Morningstar Financial Intelligence RAG Pipeline.
Deployed on Hugging Face Spaces (free CPU tier).
Vector store: Qdrant Cloud (free tier).
LLM: Google Gemini 1.5 Flash (free tier, 1M tokens/min).
Run locally:
streamlit run app.py
"""
import os
import streamlit as st
from dotenv import load_dotenv
load_dotenv()
# ── Page config ────────────────────────────────────────────────────────────────
st.set_page_config(
page_title = "Financial Intelligence Assistant",
page_icon = "πŸ“Š",
layout = "wide",
)
# ── Load pipeline once (cached across sessions) ────────────────────────────────
@st.cache_resource(show_spinner="Loading pipeline...")
def init_pipeline():
from src.retriever import FinancialRetriever
from src.rag_chain import get_llm
from src.guardrails import RAGGuardrails
retriever = FinancialRetriever(rerank=True)
llm = get_llm()
guardrails = RAGGuardrails(run_grounding=False) # NLI skipped for latency
return retriever, llm, guardrails
retriever, llm, guardrails = init_pipeline()
# ── Sidebar ────────────────────────────────────────────────────────────────────
with st.sidebar:
st.title("About")
st.markdown(
"This assistant answers questions about **Apple Inc.** using:\n"
"- SEC filings: 10-K, 10-Q, 8-K\n"
"- Morningstar equity research reports\n\n"
"**Retrieval:** Dense ANN + cross-encoder reranking \n"
"**LLM:** Google Gemini 2.5 Flash \n"
"**Vector store:** Qdrant Cloud\n\n"
"_For informational purposes only. Not investment advice._"
)
st.divider()
st.caption("Sample questions:")
examples = [
"What was Apple's total net sales in FY2024?",
"What were Apple's main risk factors in the 2024 10-K?",
"How did Apple's Services segment perform in FY2024?",
"What is Apple's gross margin trend over the last 3 years?",
"What did Apple say about AI in their latest filings?",
]
for ex in examples:
if st.button(ex, use_container_width=True):
st.session_state["prefill"] = ex
# ── Main UI ────────────────────────────────────────────────────────────────────
st.title("Financial Intelligence Assistant")
st.caption(
"Apple SEC Filings (10-K, 10-Q, 8-K) + Morningstar Research | "
"Powered by Google Gemini 1.5 Flash + Qdrant"
)
# ── Session state ──────────────────────────────────────────────────────────────
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat history
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
if msg.get("sources"):
with st.expander("Sources", expanded=False):
for src in msg["sources"]:
st.caption(src)
# Handle prefilled example question from sidebar
prefill = st.session_state.pop("prefill", None)
# ── Chat input ─────────────────────────────────────────────────────────────────
query = st.chat_input("Ask about Apple financials...") or prefill
if query:
# Show user message
st.session_state.messages.append({"role": "user", "content": query})
with st.chat_message("user"):
st.markdown(query)
with st.chat_message("assistant"):
# Layer 1: input guardrail
ok, reason = guardrails.gate_input(query)
if not ok:
st.warning(reason)
st.session_state.messages.append({
"role": "assistant", "content": reason
})
st.stop()
# Retrieve chunks
with st.spinner("Searching documents..."):
chunks = retriever.retrieve(query, n_results=8)
# Layer 2: retrieval guardrail
ok, reason = guardrails.gate_retrieval(chunks)
if not ok:
st.warning(reason)
st.session_state.messages.append({
"role": "assistant", "content": reason
})
st.stop()
# Build context
context = retriever.build_context(chunks, max_chars=6000)
# Build prompt
from src.rag_chain import RAG_PROMPT_TEMPLATE
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt_template = PromptTemplate.from_template(RAG_PROMPT_TEMPLATE)
chain = prompt_template | llm | StrOutputParser()
# Stream response token by token
full_answer = ""
placeholder = st.empty()
for chunk_token in chain.stream({"query": query, "context": context}):
full_answer += chunk_token
placeholder.markdown(full_answer + "β–Œ")
placeholder.markdown(full_answer)
# Layer 3: output guardrail (basic checks, no NLI for latency)
try:
final_answer, warnings = guardrails.gate_output(
full_answer, chunks
)
except ValueError as e:
st.error(str(e))
st.session_state.messages.append({
"role": "assistant", "content": str(e)
})
st.stop()
for w in warnings:
st.caption(f"Note: {w}")
# Build source attribution list
sources = []
for i, c in enumerate(chunks[:5], 1):
m = c["metadata"]
doc_type = m.get("doc_type", "")
fiscal_year = m.get("fiscal_year", "")
filed = m.get("filing_date", "")
heading = m.get("heading_path", "") or m.get("section_title", "")
score = c.get("score", 0)
label = f"[{i}] {doc_type}"
if fiscal_year:
label += f" FY{fiscal_year}"
if filed:
label += f" (filed {filed})"
if heading:
label += f" | {heading[:60]}"
label += f" | score: {score:.3f}"
sources.append(label)
if sources:
with st.expander("Sources", expanded=False):
for src in sources:
st.caption(src)
# Save to session state
st.session_state.messages.append({
"role" : "assistant",
"content" : final_answer,
"sources" : sources,
})