Spaces:
Runtime error
Runtime error
| # app.py | |
| """PROBIN - Intelligent Document Analysis System""" | |
| import streamlit as st | |
| import os | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| from streamlit_pdf_viewer import pdf_viewer | |
| # νλ‘μ νΈ λ£¨νΈ κ²½λ‘ μΆκ° | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from core.pdf_loader import load_pdf | |
| from core.chunker import chunk_text | |
| from core.embedder import embed_chunks | |
| from core.vectordb import VectorDB | |
| from core.retriever import Retriever | |
| from core.generator import Generator | |
| from ui.styles import get_custom_css | |
| from ui.components import render_sources_with_relevance | |
| from utils.pdf_utils import get_text_coordinates | |
| from config.settings import ( | |
| CHUNK_SIZE, CHUNK_OVERLAP, TOP_K, | |
| APP_NAME, APP_SUBTITLE, APP_ICON, SHOW_STATS, PDF_HEIGHT | |
| ) | |
| # 1. νμ΄μ§ μ€μ | |
| st.set_page_config( | |
| page_title=f"{APP_NAME} - {APP_SUBTITLE}", | |
| page_icon=APP_ICON, | |
| layout="wide", | |
| initial_sidebar_state="collapsed" | |
| ) | |
| # 2. μΈμ μ€ν μ΄νΈ μ΄κΈ°ν | |
| if "session_id" not in st.session_state: | |
| st.session_state.session_id = str(uuid.uuid4())[:8] | |
| print(f"π μ μΈμ ID μμ±: {st.session_state.session_id}") | |
| if "vectordb" not in st.session_state: | |
| st.session_state.vectordb = None | |
| if "retriever" not in st.session_state: | |
| st.session_state.retriever = None | |
| if "generator" not in st.session_state: | |
| st.session_state.generator = Generator() | |
| if "pdf_processed" not in st.session_state: | |
| st.session_state.pdf_processed = False | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "current_page" not in st.session_state: | |
| st.session_state.current_page = 1 | |
| if "pdf_path" not in st.session_state: | |
| st.session_state.pdf_path = None | |
| if "pdf_bytes" not in st.session_state: | |
| st.session_state.pdf_bytes = None | |
| if "annotations" not in st.session_state: | |
| st.session_state.annotations = [] | |
| if "zoom_level" not in st.session_state: | |
| st.session_state.zoom_level = 500 | |
| # 3. CSS μ μ© | |
| st.markdown(get_custom_css(), unsafe_allow_html=True) | |
| # -------------------------------------------------------------------------- | |
| # ν¨μ μ μ | |
| # -------------------------------------------------------------------------- | |
| def render_welcome_screen(): | |
| """μ°μ»΄ νλ©΄ (PDF μ λ‘λ μ μλ§ νμ)""" | |
| if not st.session_state.pdf_processed: | |
| st.markdown( | |
| f""" | |
| <div id="welcome" class="hero-container"> | |
| <h1 class="hero-title">{APP_ICON} {APP_NAME}</h1> | |
| <p class="hero-subtitle">Experience Intelligent Document Analysis with AI</p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| def move_to_page(page_num, text_content): | |
| """νμ΄μ§ μ΄λ λ° νμ΄λΌμ΄νΈ (μ¦μ λ°μ)""" | |
| st.session_state.current_page = page_num | |
| if st.session_state.pdf_path: | |
| highlights = get_text_coordinates( | |
| str(st.session_state.pdf_path), | |
| page_num, | |
| text_content | |
| ) | |
| st.session_state.annotations = highlights | |
| # μ¦μ νμ΄μ§ μ΄λ λ°μ | |
| st.rerun() | |
| def reset_app(): | |
| """μ± μμ μ΄κΈ°ν""" | |
| print("\nπ μ± μ 체 μ΄κΈ°ν μμ...") | |
| # 1. νμ¬ μ»¬λ μ μμ | |
| if st.session_state.vectordb is not None: | |
| try: | |
| print(f" ποΈ νμ¬ μ»¬λ μ μμ (μΈμ : {st.session_state.session_id})") | |
| st.session_state.vectordb.delete_collection() | |
| print(" β 컬λ μ μμ μλ£") | |
| except Exception as e: | |
| print(f" β οΈ μ»¬λ μ μμ μ€λ₯: {e}") | |
| # 2. μ μΈμ ID μμ± | |
| old_session_id = st.session_state.session_id | |
| new_session_id = str(uuid.uuid4())[:8] | |
| print(f" π μΈμ ID λ³κ²½: {old_session_id} β {new_session_id}") | |
| # 3. μΈμ μ΄κΈ°ν | |
| keys_to_delete = list(st.session_state.keys()) | |
| for key in keys_to_delete: | |
| del st.session_state[key] | |
| # μ μΈμ ID μ€μ | |
| st.session_state.session_id = new_session_id | |
| st.session_state.pdf_processed = False | |
| st.session_state.pdf_path = None | |
| st.session_state.pdf_bytes = None | |
| print(" β μΈμ μ΄κΈ°ν μλ£") | |
| print(f"π μ΄κΈ°ν μλ£! μ μΈμ : {new_session_id}\n") | |
| st.success("β μ΄κΈ°ν μλ£!") | |
| st.info("π‘ **μ PDFλ₯Ό μ λ‘λν μ€λΉκ° λμμ΅λλ€!**") | |
| st.rerun() | |
| def process_pdf(uploaded_file): | |
| """PDF μ²λ¦¬ νμ΄νλΌμΈ""" | |
| try: | |
| # νμΌ μ μ₯ | |
| save_dir = Path("./data/uploads") | |
| save_dir.mkdir(parents=True, exist_ok=True) | |
| pdf_path = save_dir / uploaded_file.name | |
| with open(pdf_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| st.session_state.pdf_path = pdf_path | |
| st.session_state.pdf_bytes = uploaded_file.getvalue() | |
| with st.spinner("π λ¬Έμλ₯Ό λΆμνκ³ μμ΅λλ€..."): | |
| # 1. PDF λ‘λ | |
| print(f"\nπ PDF λ‘λ μ€: {uploaded_file.name}") | |
| pdf_data = load_pdf(str(pdf_path)) | |
| st.session_state.total_pages = pdf_data["total_pages"] | |
| print(f" β μ΄ {pdf_data['total_pages']} νμ΄μ§") | |
| # 2. μ²νΉ | |
| print(f"\nβοΈ μ²νΉ μ€...") | |
| chunks = chunk_text(pdf_data["pages"], CHUNK_SIZE, CHUNK_OVERLAP) | |
| st.session_state.total_chunks = len(chunks) | |
| print(f" β μ΄ {len(chunks)}κ° μ²ν¬ μμ±") | |
| # 3. μλ² λ© | |
| print(f"\nπ’ μλ² λ© μμ± μ€...") | |
| embedded_chunks = embed_chunks(chunks) | |
| print(f" β μλ² λ© μλ£") | |
| # 4. VectorDB μμ± | |
| print(f"\nπΎ VectorDB μ΄κΈ°ν (μΈμ : {st.session_state.session_id})...") | |
| if st.session_state.vectordb is not None: | |
| print(" ποΈ κΈ°μ‘΄ 컬λ μ μμ ") | |
| try: | |
| st.session_state.vectordb.delete_collection() | |
| except Exception as e: | |
| print(f" β οΈ μμ μ€λ₯: {e}") | |
| print(" π μ VectorDB μμ±") | |
| st.session_state.vectordb = VectorDB( | |
| session_id=st.session_state.session_id | |
| ) | |
| initial_count = st.session_state.vectordb.count() | |
| print(f" π μ΄κΈ° μν: {initial_count}κ° μ²ν¬") | |
| # 5. μ²ν¬ μ μ₯ | |
| print(f"\n πΎ μ²ν¬ μ μ₯: {len(embedded_chunks)}κ°") | |
| st.session_state.vectordb.add_chunks(embedded_chunks) | |
| # 6. Retriever μμ± | |
| st.session_state.retriever = Retriever(st.session_state.vectordb) | |
| # 7. μν μ λ°μ΄νΈ | |
| st.session_state.pdf_processed = True | |
| # 8. μ΅μ’ νμΈ | |
| final_count = st.session_state.vectordb.count() | |
| print(f"\nβ μ΅μ’ : {final_count}κ° μ²ν¬ μ μ₯ μλ£") | |
| # 9. μ΄κΈ°ν | |
| st.session_state.messages = [] | |
| st.session_state.annotations = [] | |
| st.session_state.current_page = 1 | |
| print(f"\nπ PDF μ²λ¦¬ μλ£! (μΈμ : {st.session_state.session_id})\n") | |
| st.success("β λ¬Έμ λΆμ μλ£!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"β μ€λ₯ λ°μ: {str(e)}") | |
| print(f"\nβ μ€λ₯:") | |
| import traceback | |
| print(traceback.format_exc()) | |
| # -------------------------------------------------------------------------- | |
| # λ©μΈ UI | |
| # -------------------------------------------------------------------------- | |
| # μ°μ»΄ νλ©΄ μΆλ ₯ | |
| render_welcome_screen() | |
| # -------------------------------------------------------------------------- | |
| # Sidebar | |
| # -------------------------------------------------------------------------- | |
| with st.sidebar: | |
| st.title(f"{APP_ICON} {APP_NAME}") | |
| uploaded_file = st.file_uploader( | |
| "PDF νμΌ μ λ‘λ", | |
| type=["pdf"], | |
| key=f"pdf_uploader_{st.session_state.session_id}" | |
| ) | |
| if uploaded_file and not st.session_state.pdf_processed: | |
| process_pdf(uploaded_file) | |
| st.divider() | |
| if st.button("π μ΄κΈ°ν", use_container_width=True): | |
| reset_app() | |
| # -------------------------------------------------------------------------- | |
| # PDF + Chat UI | |
| # -------------------------------------------------------------------------- | |
| if st.session_state.pdf_processed: | |
| col1, col2 = st.columns([5, 5], gap="medium") | |
| # μΌμͺ½: PDF λ·°μ΄ | |
| with col1: | |
| # ν΄λ° | |
| toolbar1, toolbar2, toolbar3, toolbar4 = st.columns([1, 1, 2, 2]) | |
| with toolbar1: | |
| if st.button("β", help="μ΄μ νμ΄μ§"): | |
| if st.session_state.current_page > 1: | |
| st.session_state.current_page -= 1 | |
| st.rerun() | |
| with toolbar2: | |
| if st.button("βΆ", help="λ€μ νμ΄μ§"): | |
| if st.session_state.current_page < st.session_state.total_pages: | |
| st.session_state.current_page += 1 | |
| st.rerun() | |
| with toolbar3: | |
| st.write(f"Page {st.session_state.current_page} / {st.session_state.total_pages}") | |
| with toolbar4: | |
| new_zoom = st.slider("Zoom", 500, 1200, st.session_state.zoom_level, label_visibility="collapsed") | |
| if new_zoom != st.session_state.zoom_level: | |
| st.session_state.zoom_level = new_zoom | |
| st.rerun() | |
| # PDF λ·°μ΄ | |
| pdf_viewer( | |
| input=st.session_state.pdf_bytes, | |
| width=st.session_state.zoom_level, | |
| annotations=st.session_state.annotations, | |
| pages_to_render=[st.session_state.current_page], | |
| render_text=True | |
| ) | |
| # μ€λ₯Έμͺ½: μ±ν | |
| with col2: | |
| st.markdown("### π¬ PROBIN CHAT") | |
| # μ±ν 컨ν μ΄λ (μ€ν¬λ‘€ κ°λ₯ - λμ΄ μ€μ) | |
| chat_container = st.container(height=500) | |
| with chat_container: | |
| # μ±ν κΈ°λ‘μ΄ μμ λ κ°μ΄λ νμ | |
| if not st.session_state.messages: | |
| st.markdown(""" | |
| <div class="chat-placeholder"> | |
| <div class="placeholder-title">π λ°κ°μμ! μ΄λ κ² νμ©ν΄λ³΄μΈμ</div> | |
| <ol class="placeholder-steps"> | |
| <li>AIκ° λ¬Έμ λ΄μ©μ λΆμνμ¬ <strong>λ΅λ³κ³Ό κ·Όκ±°</strong>λ₯Ό μ°Ύμμ€λλ€.</li> | |
| <li>λ΅λ³μ <span class="highlight-box">λ Έλμ νμ΄λΌμ΄νΈ</span>λ₯Ό νμΈνμΈμ.</li> | |
| </ol> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # μ±ν κΈ°λ‘ νμ | |
| else: | |
| for idx, msg in enumerate(st.session_state.messages): | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| if msg.get("sources"): | |
| render_sources_with_relevance( | |
| sources=msg["sources"], | |
| message_idx=idx, | |
| move_to_page_callback=move_to_page | |
| ) | |
| # μ±ν μ λ ₯ | |
| if query := st.chat_input("μ§λ¬Έμ μ λ ₯νμΈμ..."): | |
| # μ¬μ©μ λ©μμ§ μΆκ° | |
| st.session_state.messages.append({ | |
| "role": "user", | |
| "content": query | |
| }) | |
| # κ²μ λ° λ΅λ³ μμ± | |
| with st.spinner("π PROBINμ΄ κ²μμ€μ λλ€..."): | |
| print(f"\nπ μ§λ¬Έ: {query}") | |
| retrieved_chunks = st.session_state.retriever.retrieve(query, TOP_K) | |
| result = st.session_state.generator.generate_answer(query, retrieved_chunks) | |
| # AI λ΅λ³ μΆκ° | |
| st.session_state.messages.append({ | |
| "role": "assistant", | |
| "content": result["answer"], | |
| "sources": result["sources"] | |
| }) | |
| # 첫 λ²μ§Έ μΆμ²λ‘ μ΄λ | |
| if result["sources"]: | |
| top_source = result["sources"][0] | |
| highlights = get_text_coordinates( | |
| str(st.session_state.pdf_path), | |
| top_source["page_num"], | |
| top_source["text"] | |
| ) | |
| st.session_state.annotations = highlights | |
| st.session_state.current_page = top_source["page_num"] | |
| print(f"β λ΅λ³ μλ£\n") | |
| st.rerun() |