import streamlit as st import os from pathlib import Path from pdf_parser import PDFParser from vector_store import VectorStore from rag_system import VisualMultimodalRAG from config import UPLOAD_FOLDER, MAX_PDF_SIZE_MB st.set_page_config( page_title="šŸ“„ Multimodal RAG LLM System (PDF Parsing)", layout="wide", initial_sidebar_state="expanded" ) if 'api_key_set' not in st.session_state: st.session_state.api_key_set = False if 'api_key' not in st.session_state: st.session_state.api_key = None if 'visual_rag_system' not in st.session_state: st.session_state.visual_rag_system = None if 'vector_store' not in st.session_state: st.session_state.vector_store = None if 'parser' not in st.session_state: st.session_state.parser = None if 'current_document' not in st.session_state: st.session_state.current_document = None if 'current_text' not in st.session_state: st.session_state.current_text = None if 'current_images' not in st.session_state: st.session_state.current_images = None if 'current_tables' not in st.session_state: st.session_state.current_tables = None if 'processing_results' not in st.session_state: st.session_state.processing_results = None if 'answering_rag' not in st.session_state: st.session_state.answering_rag = None st.title("šŸ“„ Multimodal RAG LLM System (PDF Parsing)") with st.sidebar: st.header("āš™ļø Configuration") st.subheader("šŸ”‘ OpenAI API Key") api_key = st.text_input( "Enter your OpenAI API key:", type="password", key="api_key_input" ) if api_key: st.session_state.api_key = api_key st.session_state.api_key_set = True if st.session_state.visual_rag_system is None: try: st.session_state.visual_rag_system = VisualMultimodalRAG(api_key=api_key, debug=True) st.session_state.vector_store = VectorStore() st.session_state.parser = PDFParser(debug=True) st.success("āœ… API Key set & systems initialized") except Exception as e: st.error(f"Error initializing systems: {e}") else: st.session_state.api_key_set = False st.warning("āš ļø Please enter your API key to continue") st.divider() st.subheader("šŸ“Š Vector Store Status") if st.session_state.vector_store: try: info = st.session_state.vector_store.get_collection_info() st.metric("Items in Store", info['count']) st.metric("Status", info['status']) st.caption(f"Path: {info['persist_path']}") except Exception as e: st.error(f"Error getting store info: {e}") else: st.info("Set API key to initialize vector store") st.divider() st.subheader("šŸ“ Document Management") if st.button("šŸ”„ Clear Vector Store"): if st.session_state.vector_store: try: st.session_state.vector_store.clear_all() st.success("āœ… Vector store cleared") except Exception as e: st.error(f"Error clearing store: {e}") st.header("šŸ“¤ Upload PDF Document") uploaded_file = st.file_uploader( "Choose a PDF file", type=['pdf'], help="PDF with text, images, and tables" ) if uploaded_file is not None: upload_path = Path(UPLOAD_FOLDER) upload_path.mkdir(exist_ok=True) file_path = upload_path / uploaded_file.name with open(file_path, 'wb') as f: f.write(uploaded_file.getbuffer()) st.success(f"āœ… File saved: {uploaded_file.name}") if st.button("šŸ” Parse PDF"): if not st.session_state.api_key_set: st.error("āŒ Please set OpenAI API key first") else: try: with st.spinner("šŸ“„ Parsing PDF..."): print(f"\n{'='*70}") print(f"PARSING: {uploaded_file.name}") print(f"{'='*70}") # Parse PDF - returns text, images, tables parser = st.session_state.parser text, images, tables = parser.parse_pdf(str(file_path)) # Store in session state st.session_state.current_document = uploaded_file.name st.session_state.current_text = text st.session_state.current_images = images st.session_state.current_tables = tables # Display results col1, col2, col3 = st.columns(3) with col1: st.metric("šŸ“ Text", f"{len(text):,} chars") with col2: st.metric("šŸ–¼ļø Images", len(images)) with col3: st.metric("šŸ“‹ Tables", len(tables)) #if images: # st.subheader("šŸ–¼ļø Extracted Images") # for idx, img in enumerate(images): # ocr_text = img.get('ocr_text', '') # ocr_len = len(ocr_text) # # if ocr_len > 0: # st.success(f"āœ… Image {idx}: {ocr_len} characters (OCR)") # else: # st.warning(f"āš ļø Image {idx}: No OCR text (will use visual analysis)") st.success("āœ… PDF parsing complete!") except Exception as e: st.error(f"āŒ Error parsing PDF: {e}") print(f"Error: {e}") st.divider() st.header("šŸ–¼ļø Analysis & Storage") if st.button("šŸ–¼ļø Analyze & Store Components"): if not st.session_state.api_key_set: st.error("āŒ Please set OpenAI API key first") elif st.session_state.current_text is None: st.error("āŒ Please parse a PDF document first") else: try: with st.spinner("šŸ–¼ļø Analyzing..."): print(f"\n{'='*70}") print(f"VISUAL IMAGE ANALYSIS") print(f"{'='*70}") visual_rag = st.session_state.visual_rag_system vector_store = st.session_state.vector_store results = visual_rag.process_and_store_document( text=st.session_state.current_text, images=st.session_state.current_images, tables=st.session_state.current_tables, vector_store=vector_store, doc_id=st.session_state.current_document or "current_doc" ) st.session_state.processing_results = results st.success("āœ… Visual analysis complete & stored!") col1, col2, col3 = st.columns(3) with col1: st.metric("šŸ–¼ļø Images Analyzed", len(results['image_visual_analyses'])) with col2: st.metric("šŸ“ Text Chunks", len(results['text_summaries'])) with col3: st.metric("šŸ“‹ Tables Analyzed", len(results['table_summaries'])) st.metric("šŸ“Š Total Stored in Vector", results['total_stored']) #if results['image_visual_analyses']: # st.subheader("šŸ–¼ļø Visual Image Analyses (gpt-4o)") # for img_analysis in results['image_visual_analyses']: # with st.expander(f"Image {img_analysis['image_index']} - Visual Analysis"): # st.write("**Visual Analysis by gpt-4o:**") # st.write(img_analysis['visual_analysis']) # # st.write("**Image Path:**") # st.code(img_analysis['image_path']) # # if img_analysis['ocr_text']: # st.write("**OCR Text (backup):**") # st.text(img_analysis['ocr_text'][:500]) #if results['text_summaries']: # st.subheader("šŸ“ Text Chunk Summaries") # for chunk_summary in results['text_summaries']: # with st.expander( # f"Chunk {chunk_summary['chunk_index']} " # f"({chunk_summary['chunk_length']} chars)" # ): # st.write("**Summary:**") # st.write(chunk_summary['summary']) # st.write("**Original Text (first 500 chars):**") # st.text(chunk_summary['original_text']) #if results['table_summaries']: # st.subheader("šŸ“‹ Table Analyses") # for table_summary in results['table_summaries']: # with st.expander( # f"Table {table_summary['table_index']} " # f"({table_summary['table_length']} chars)" # ): # st.write("**Analysis:**") # st.write(table_summary['summary']) # st.write("**Original Content (first 500 chars):**") # st.text(table_summary['original_content']) print(f"\nāœ… Analysis processing complete!") except Exception as e: st.error(f"āŒ Error during analysis: {e}") print(f"Error: {e}") st.divider() st.header("ā“ Ask Questions About Document") if 'answering_rag' not in st.session_state: st.session_state.answering_rag = None if st.session_state.api_key_set and st.session_state.answering_rag is None: from rag_system import AnsweringRAG st.session_state.answering_rag = AnsweringRAG(api_key=st.session_state.api_key, debug=True) question = st.text_area( "Enter your question:", height=100, placeholder="What does the document say about...?" ) if st.button("šŸ” Search & Generate Answer"): if not st.session_state.api_key_set: st.error("āŒ Please set OpenAI API key first") elif st.session_state.current_text is None: st.error("āŒ Please parse a PDF document first") elif not question: st.error("āŒ Please enter a question") else: try: with st.spinner("šŸ”„ Searching document and analyzing..."): print(f"\n{'='*70}") print(f"QUESTION: {question}") print(f"{'='*70}") store = st.session_state.vector_store doc_name = st.session_state.current_document or "current_doc" doc_data = { 'text': st.session_state.current_text, 'images': [], 'tables': [] } store.add_documents(doc_data, doc_name) search_results = store.search(question, n_results=5) print(f"\nšŸ“Š Search Results Found: {len(search_results)}") answering_rag = st.session_state.answering_rag result = answering_rag.analyze_and_answer(question, search_results) st.success("āœ… Analysis complete!") st.subheader("šŸ“ Answer") col1, col2, col3 = st.columns(3) with col1: st.metric("Confidence", f"{result['confidence'].upper()}") with col2: st.metric("Sources Used", result['sources_used']) with col3: if result['sources_used'] > 0: st.metric("Avg Relevance", f"{sum(1-r.get('distance',0) for r in search_results)/len(search_results):.0%}") st.write(result['answer']) if st.checkbox("šŸ“š Show Source Documents"): st.subheader("Sources Used in Answer") for idx, source in enumerate(result['formatted_sources'], 1): relevance = source['relevance'] relevance_bar = "ā–ˆ" * int(relevance * 10) + "ā–‘" * (10 - int(relevance * 10)) with st.expander( f"Source {idx} - {source['type'].upper()} " f"[{relevance_bar}] {relevance:.0%}" ): st.write(source['content']) print(f"\nāœ… Answer generation complete!") except Exception as e: st.error(f"āŒ Error processing question: {e}") print(f"Error: {e}") st.divider()