dnj0's picture
Update src/app.py
3340ad6 verified
import streamlit as st
import os
from pathlib import Path
from pdf_parser import PDFParser
from vector_store import VectorStore
from rag_system import VisualMultimodalRAG
from config import UPLOAD_FOLDER, MAX_PDF_SIZE_MB
st.set_page_config(
page_title="πŸ“„ Multimodal RAG LLM System (PDF Parsing)",
layout="wide",
initial_sidebar_state="expanded"
)
if 'api_key_set' not in st.session_state:
st.session_state.api_key_set = False
if 'api_key' not in st.session_state:
st.session_state.api_key = None
if 'visual_rag_system' not in st.session_state:
st.session_state.visual_rag_system = None
if 'vector_store' not in st.session_state:
st.session_state.vector_store = None
if 'parser' not in st.session_state:
st.session_state.parser = None
if 'current_document' not in st.session_state:
st.session_state.current_document = None
if 'current_text' not in st.session_state:
st.session_state.current_text = None
if 'current_images' not in st.session_state:
st.session_state.current_images = None
if 'current_tables' not in st.session_state:
st.session_state.current_tables = None
if 'processing_results' not in st.session_state:
st.session_state.processing_results = None
if 'answering_rag' not in st.session_state:
st.session_state.answering_rag = None
st.title("πŸ“„ Multimodal RAG LLM System (PDF Parsing)")
with st.sidebar:
st.header("βš™οΈ Configuration")
st.subheader("πŸ”‘ OpenAI API Key")
api_key = st.text_input(
"Enter your OpenAI API key:",
type="password",
key="api_key_input"
)
if api_key:
st.session_state.api_key = api_key
st.session_state.api_key_set = True
if st.session_state.visual_rag_system is None:
try:
st.session_state.visual_rag_system = VisualMultimodalRAG(api_key=api_key, debug=True)
st.session_state.vector_store = VectorStore()
st.session_state.parser = PDFParser(debug=True)
st.success("βœ… API Key set & systems initialized")
except Exception as e:
st.error(f"Error initializing systems: {e}")
else:
st.session_state.api_key_set = False
st.warning("⚠️ Please enter your API key to continue")
st.divider()
st.subheader("πŸ“Š Vector Store Status")
if st.session_state.vector_store:
try:
info = st.session_state.vector_store.get_collection_info()
st.metric("Items in Store", info['count'])
st.metric("Status", info['status'])
st.caption(f"Path: {info['persist_path']}")
except Exception as e:
st.error(f"Error getting store info: {e}")
else:
st.info("Set API key to initialize vector store")
st.divider()
st.subheader("πŸ“ Document Management")
if st.button("πŸ”„ Clear Vector Store"):
if st.session_state.vector_store:
try:
st.session_state.vector_store.clear_all()
st.success("βœ… Vector store cleared")
except Exception as e:
st.error(f"Error clearing store: {e}")
st.header("πŸ“€ Upload PDF Document")
uploaded_file = st.file_uploader(
"Choose a PDF file",
type=['pdf'],
help="PDF with text, images, and tables"
)
if uploaded_file is not None:
upload_path = Path(UPLOAD_FOLDER)
upload_path.mkdir(exist_ok=True)
file_path = upload_path / uploaded_file.name
with open(file_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
st.success(f"βœ… File saved: {uploaded_file.name}")
if st.button("πŸ” Parse PDF"):
if not st.session_state.api_key_set:
st.error("❌ Please set OpenAI API key first")
else:
try:
with st.spinner("πŸ“„ Parsing PDF..."):
print(f"\n{'='*70}")
print(f"PARSING: {uploaded_file.name}")
print(f"{'='*70}")
# Parse PDF - returns text, images, tables
parser = st.session_state.parser
text, images, tables = parser.parse_pdf(str(file_path))
# Store in session state
st.session_state.current_document = uploaded_file.name
st.session_state.current_text = text
st.session_state.current_images = images
st.session_state.current_tables = tables
# Display results
col1, col2, col3 = st.columns(3)
with col1:
st.metric("πŸ“ Text", f"{len(text):,} chars")
with col2:
st.metric("πŸ–ΌοΈ Images", len(images))
with col3:
st.metric("πŸ“‹ Tables", len(tables))
#if images:
# st.subheader("πŸ–ΌοΈ Extracted Images")
# for idx, img in enumerate(images):
# ocr_text = img.get('ocr_text', '')
# ocr_len = len(ocr_text)
#
# if ocr_len > 0:
# st.success(f"βœ… Image {idx}: {ocr_len} characters (OCR)")
# else:
# st.warning(f"⚠️ Image {idx}: No OCR text (will use visual analysis)")
st.success("βœ… PDF parsing complete!")
except Exception as e:
st.error(f"❌ Error parsing PDF: {e}")
print(f"Error: {e}")
st.divider()
st.header("πŸ–ΌοΈ Analysis & Storage")
if st.button("πŸ–ΌοΈ Analyze & Store Components"):
if not st.session_state.api_key_set:
st.error("❌ Please set OpenAI API key first")
elif st.session_state.current_text is None:
st.error("❌ Please parse a PDF document first")
else:
try:
with st.spinner("πŸ–ΌοΈ Analyzing..."):
print(f"\n{'='*70}")
print(f"VISUAL IMAGE ANALYSIS")
print(f"{'='*70}")
visual_rag = st.session_state.visual_rag_system
vector_store = st.session_state.vector_store
results = visual_rag.process_and_store_document(
text=st.session_state.current_text,
images=st.session_state.current_images,
tables=st.session_state.current_tables,
vector_store=vector_store,
doc_id=st.session_state.current_document or "current_doc"
)
st.session_state.processing_results = results
st.success("βœ… Visual analysis complete & stored!")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("πŸ–ΌοΈ Images Analyzed", len(results['image_visual_analyses']))
with col2:
st.metric("πŸ“ Text Chunks", len(results['text_summaries']))
with col3:
st.metric("πŸ“‹ Tables Analyzed", len(results['table_summaries']))
st.metric("πŸ“Š Total Stored in Vector", results['total_stored'])
#if results['image_visual_analyses']:
# st.subheader("πŸ–ΌοΈ Visual Image Analyses (gpt-4o)")
# for img_analysis in results['image_visual_analyses']:
# with st.expander(f"Image {img_analysis['image_index']} - Visual Analysis"):
# st.write("**Visual Analysis by gpt-4o:**")
# st.write(img_analysis['visual_analysis'])
#
# st.write("**Image Path:**")
# st.code(img_analysis['image_path'])
#
# if img_analysis['ocr_text']:
# st.write("**OCR Text (backup):**")
# st.text(img_analysis['ocr_text'][:500])
#if results['text_summaries']:
# st.subheader("πŸ“ Text Chunk Summaries")
# for chunk_summary in results['text_summaries']:
# with st.expander(
# f"Chunk {chunk_summary['chunk_index']} "
# f"({chunk_summary['chunk_length']} chars)"
# ):
# st.write("**Summary:**")
# st.write(chunk_summary['summary'])
# st.write("**Original Text (first 500 chars):**")
# st.text(chunk_summary['original_text'])
#if results['table_summaries']:
# st.subheader("πŸ“‹ Table Analyses")
# for table_summary in results['table_summaries']:
# with st.expander(
# f"Table {table_summary['table_index']} "
# f"({table_summary['table_length']} chars)"
# ):
# st.write("**Analysis:**")
# st.write(table_summary['summary'])
# st.write("**Original Content (first 500 chars):**")
# st.text(table_summary['original_content'])
print(f"\nβœ… Analysis processing complete!")
except Exception as e:
st.error(f"❌ Error during analysis: {e}")
print(f"Error: {e}")
st.divider()
st.header("❓ Ask Questions About Document")
if 'answering_rag' not in st.session_state:
st.session_state.answering_rag = None
if st.session_state.api_key_set and st.session_state.answering_rag is None:
from rag_system import AnsweringRAG
st.session_state.answering_rag = AnsweringRAG(api_key=st.session_state.api_key, debug=True)
question = st.text_area(
"Enter your question:",
height=100,
placeholder="What does the document say about...?"
)
if st.button("πŸ” Search & Generate Answer"):
if not st.session_state.api_key_set:
st.error("❌ Please set OpenAI API key first")
elif st.session_state.current_text is None:
st.error("❌ Please parse a PDF document first")
elif not question:
st.error("❌ Please enter a question")
else:
try:
with st.spinner("πŸ”„ Searching document and analyzing..."):
print(f"\n{'='*70}")
print(f"QUESTION: {question}")
print(f"{'='*70}")
store = st.session_state.vector_store
doc_name = st.session_state.current_document or "current_doc"
doc_data = {
'text': st.session_state.current_text,
'images': [],
'tables': []
}
store.add_documents(doc_data, doc_name)
search_results = store.search(question, n_results=5)
print(f"\nπŸ“Š Search Results Found: {len(search_results)}")
answering_rag = st.session_state.answering_rag
result = answering_rag.analyze_and_answer(question, search_results)
st.success("βœ… Analysis complete!")
st.subheader("πŸ“ Answer")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Confidence", f"{result['confidence'].upper()}")
with col2:
st.metric("Sources Used", result['sources_used'])
with col3:
if result['sources_used'] > 0:
st.metric("Avg Relevance", f"{sum(1-r.get('distance',0) for r in search_results)/len(search_results):.0%}")
st.write(result['answer'])
if st.checkbox("πŸ“š Show Source Documents"):
st.subheader("Sources Used in Answer")
for idx, source in enumerate(result['formatted_sources'], 1):
relevance = source['relevance']
relevance_bar = "β–ˆ" * int(relevance * 10) + "β–‘" * (10 - int(relevance * 10))
with st.expander(
f"Source {idx} - {source['type'].upper()} "
f"[{relevance_bar}] {relevance:.0%}"
):
st.write(source['content'])
print(f"\nβœ… Answer generation complete!")
except Exception as e:
st.error(f"❌ Error processing question: {e}")
print(f"Error: {e}")
st.divider()