kwmin_probin / app.py
cksleigen's picture
Upload 8 files
36696b3 verified
# app.py
"""PROBIN - Intelligent Document Analysis System"""
import streamlit as st
import os
import sys
import uuid
from pathlib import Path
from streamlit_pdf_viewer import pdf_viewer
# ν”„λ‘œμ νŠΈ 루트 경둜 μΆ”κ°€
sys.path.insert(0, str(Path(__file__).parent))
from core.pdf_loader import load_pdf
from core.chunker import chunk_text
from core.embedder import embed_chunks
from core.vectordb import VectorDB
from core.retriever import Retriever
from core.generator import Generator
from ui.styles import get_custom_css
from ui.components import render_sources_with_relevance
from utils.pdf_utils import get_text_coordinates
from config.settings import (
CHUNK_SIZE, CHUNK_OVERLAP, TOP_K,
APP_NAME, APP_SUBTITLE, APP_ICON, SHOW_STATS, PDF_HEIGHT
)
# 1. νŽ˜μ΄μ§€ μ„€μ •
st.set_page_config(
page_title=f"{APP_NAME} - {APP_SUBTITLE}",
page_icon=APP_ICON,
layout="wide",
initial_sidebar_state="collapsed"
)
# 2. μ„Έμ…˜ μŠ€ν…Œμ΄νŠΈ μ΄ˆκΈ°ν™”
if "session_id" not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())[:8]
print(f"πŸ†” μƒˆ μ„Έμ…˜ ID 생성: {st.session_state.session_id}")
if "vectordb" not in st.session_state:
st.session_state.vectordb = None
if "retriever" not in st.session_state:
st.session_state.retriever = None
if "generator" not in st.session_state:
st.session_state.generator = Generator()
if "pdf_processed" not in st.session_state:
st.session_state.pdf_processed = False
if "messages" not in st.session_state:
st.session_state.messages = []
if "current_page" not in st.session_state:
st.session_state.current_page = 1
if "pdf_path" not in st.session_state:
st.session_state.pdf_path = None
if "pdf_bytes" not in st.session_state:
st.session_state.pdf_bytes = None
if "annotations" not in st.session_state:
st.session_state.annotations = []
if "zoom_level" not in st.session_state:
st.session_state.zoom_level = 500
# 3. CSS 적용
st.markdown(get_custom_css(), unsafe_allow_html=True)
# --------------------------------------------------------------------------
# ν•¨μˆ˜ μ •μ˜
# --------------------------------------------------------------------------
def render_welcome_screen():
"""μ›°μ»΄ ν™”λ©΄ (PDF μ—…λ‘œλ“œ μ „μ—λ§Œ ν‘œμ‹œ)"""
if not st.session_state.pdf_processed:
st.markdown(
f"""
<div id="welcome" class="hero-container">
<h1 class="hero-title">{APP_ICON} {APP_NAME}</h1>
<p class="hero-subtitle">Experience Intelligent Document Analysis with AI</p>
</div>
""",
unsafe_allow_html=True
)
def move_to_page(page_num, text_content):
"""νŽ˜μ΄μ§€ 이동 및 ν•˜μ΄λΌμ΄νŠΈ (μ¦‰μ‹œ 반영)"""
st.session_state.current_page = page_num
if st.session_state.pdf_path:
highlights = get_text_coordinates(
str(st.session_state.pdf_path),
page_num,
text_content
)
st.session_state.annotations = highlights
# μ¦‰μ‹œ νŽ˜μ΄μ§€ 이동 반영
st.rerun()
def reset_app():
"""μ•± μ™„μ „ μ΄ˆκΈ°ν™”"""
print("\nπŸ”„ μ•± 전체 μ΄ˆκΈ°ν™” μ‹œμž‘...")
# 1. ν˜„μž¬ μ»¬λ ‰μ…˜ μ‚­μ œ
if st.session_state.vectordb is not None:
try:
print(f" πŸ—‘οΈ ν˜„μž¬ μ»¬λ ‰μ…˜ μ‚­μ œ (μ„Έμ…˜: {st.session_state.session_id})")
st.session_state.vectordb.delete_collection()
print(" βœ… μ»¬λ ‰μ…˜ μ‚­μ œ μ™„λ£Œ")
except Exception as e:
print(f" ⚠️ μ»¬λ ‰μ…˜ μ‚­μ œ 였λ₯˜: {e}")
# 2. μƒˆ μ„Έμ…˜ ID 생성
old_session_id = st.session_state.session_id
new_session_id = str(uuid.uuid4())[:8]
print(f" πŸ†” μ„Έμ…˜ ID λ³€κ²½: {old_session_id} β†’ {new_session_id}")
# 3. μ„Έμ…˜ μ΄ˆκΈ°ν™”
keys_to_delete = list(st.session_state.keys())
for key in keys_to_delete:
del st.session_state[key]
# μƒˆ μ„Έμ…˜ ID μ„€μ •
st.session_state.session_id = new_session_id
st.session_state.pdf_processed = False
st.session_state.pdf_path = None
st.session_state.pdf_bytes = None
print(" βœ… μ„Έμ…˜ μ΄ˆκΈ°ν™” μ™„λ£Œ")
print(f"πŸŽ‰ μ΄ˆκΈ°ν™” μ™„λ£Œ! μƒˆ μ„Έμ…˜: {new_session_id}\n")
st.success("βœ… μ΄ˆκΈ°ν™” μ™„λ£Œ!")
st.info("πŸ’‘ **μƒˆ PDFλ₯Ό μ—…λ‘œλ“œν•  μ€€λΉ„κ°€ λ˜μ—ˆμŠ΅λ‹ˆλ‹€!**")
st.rerun()
def process_pdf(uploaded_file):
"""PDF 처리 νŒŒμ΄ν”„λΌμΈ"""
try:
# 파일 μ €μž₯
save_dir = Path("./data/uploads")
save_dir.mkdir(parents=True, exist_ok=True)
pdf_path = save_dir / uploaded_file.name
with open(pdf_path, "wb") as f:
f.write(uploaded_file.getbuffer())
st.session_state.pdf_path = pdf_path
st.session_state.pdf_bytes = uploaded_file.getvalue()
with st.spinner("πŸ”„ λ¬Έμ„œλ₯Ό λΆ„μ„ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€..."):
# 1. PDF λ‘œλ“œ
print(f"\nπŸ“„ PDF λ‘œλ“œ 쀑: {uploaded_file.name}")
pdf_data = load_pdf(str(pdf_path))
st.session_state.total_pages = pdf_data["total_pages"]
print(f" βœ… 총 {pdf_data['total_pages']} νŽ˜μ΄μ§€")
# 2. μ²­ν‚Ή
print(f"\nβœ‚οΈ μ²­ν‚Ή 쀑...")
chunks = chunk_text(pdf_data["pages"], CHUNK_SIZE, CHUNK_OVERLAP)
st.session_state.total_chunks = len(chunks)
print(f" βœ… 총 {len(chunks)}개 청크 생성")
# 3. μž„λ² λ”©
print(f"\nπŸ”’ μž„λ² λ”© 생성 쀑...")
embedded_chunks = embed_chunks(chunks)
print(f" βœ… μž„λ² λ”© μ™„λ£Œ")
# 4. VectorDB 생성
print(f"\nπŸ’Ύ VectorDB μ΄ˆκΈ°ν™” (μ„Έμ…˜: {st.session_state.session_id})...")
if st.session_state.vectordb is not None:
print(" πŸ—‘οΈ κΈ°μ‘΄ μ»¬λ ‰μ…˜ μ‚­μ œ")
try:
st.session_state.vectordb.delete_collection()
except Exception as e:
print(f" ⚠️ μ‚­μ œ 였λ₯˜: {e}")
print(" πŸ†• μƒˆ VectorDB 생성")
st.session_state.vectordb = VectorDB(
session_id=st.session_state.session_id
)
initial_count = st.session_state.vectordb.count()
print(f" πŸ“Š 초기 μƒνƒœ: {initial_count}개 청크")
# 5. 청크 μ €μž₯
print(f"\n πŸ’Ύ 청크 μ €μž₯: {len(embedded_chunks)}개")
st.session_state.vectordb.add_chunks(embedded_chunks)
# 6. Retriever 생성
st.session_state.retriever = Retriever(st.session_state.vectordb)
# 7. μƒνƒœ μ—…λ°μ΄νŠΈ
st.session_state.pdf_processed = True
# 8. μ΅œμ’… 확인
final_count = st.session_state.vectordb.count()
print(f"\nβœ… μ΅œμ’…: {final_count}개 청크 μ €μž₯ μ™„λ£Œ")
# 9. μ΄ˆκΈ°ν™”
st.session_state.messages = []
st.session_state.annotations = []
st.session_state.current_page = 1
print(f"\nπŸŽ‰ PDF 처리 μ™„λ£Œ! (μ„Έμ…˜: {st.session_state.session_id})\n")
st.success("βœ… λ¬Έμ„œ 뢄석 μ™„λ£Œ!")
st.rerun()
except Exception as e:
st.error(f"❌ 였λ₯˜ λ°œμƒ: {str(e)}")
print(f"\n❌ 였λ₯˜:")
import traceback
print(traceback.format_exc())
# --------------------------------------------------------------------------
# 메인 UI
# --------------------------------------------------------------------------
# μ›°μ»΄ ν™”λ©΄ 좜λ ₯
render_welcome_screen()
# --------------------------------------------------------------------------
# Sidebar
# --------------------------------------------------------------------------
with st.sidebar:
st.title(f"{APP_ICON} {APP_NAME}")
uploaded_file = st.file_uploader(
"PDF 파일 μ—…λ‘œλ“œ",
type=["pdf"],
key=f"pdf_uploader_{st.session_state.session_id}"
)
if uploaded_file and not st.session_state.pdf_processed:
process_pdf(uploaded_file)
st.divider()
if st.button("πŸ”„ μ΄ˆκΈ°ν™”", use_container_width=True):
reset_app()
# --------------------------------------------------------------------------
# PDF + Chat UI
# --------------------------------------------------------------------------
if st.session_state.pdf_processed:
col1, col2 = st.columns([5, 5], gap="medium")
# μ™Όμͺ½: PDF λ·°μ–΄
with col1:
# νˆ΄λ°”
toolbar1, toolbar2, toolbar3, toolbar4 = st.columns([1, 1, 2, 2])
with toolbar1:
if st.button("β—€", help="이전 νŽ˜μ΄μ§€"):
if st.session_state.current_page > 1:
st.session_state.current_page -= 1
st.rerun()
with toolbar2:
if st.button("β–Ά", help="λ‹€μŒ νŽ˜μ΄μ§€"):
if st.session_state.current_page < st.session_state.total_pages:
st.session_state.current_page += 1
st.rerun()
with toolbar3:
st.write(f"Page {st.session_state.current_page} / {st.session_state.total_pages}")
with toolbar4:
new_zoom = st.slider("Zoom", 500, 1200, st.session_state.zoom_level, label_visibility="collapsed")
if new_zoom != st.session_state.zoom_level:
st.session_state.zoom_level = new_zoom
st.rerun()
# PDF λ·°μ–΄
pdf_viewer(
input=st.session_state.pdf_bytes,
width=st.session_state.zoom_level,
annotations=st.session_state.annotations,
pages_to_render=[st.session_state.current_page],
render_text=True
)
# 였λ₯Έμͺ½: μ±„νŒ…
with col2:
st.markdown("### πŸ’¬ PROBIN CHAT")
# μ±„νŒ… μ»¨ν…Œμ΄λ„ˆ (슀크둀 κ°€λŠ₯ - 높이 μ€„μž„)
chat_container = st.container(height=500)
with chat_container:
# μ±„νŒ… 기둝이 없을 λ•Œ κ°€μ΄λ“œ ν‘œμ‹œ
if not st.session_state.messages:
st.markdown("""
<div class="chat-placeholder">
<div class="placeholder-title">πŸ‘‹ λ°˜κ°€μ›Œμš”! μ΄λ ‡κ²Œ ν™œμš©ν•΄λ³΄μ„Έμš”</div>
<ol class="placeholder-steps">
<li>AIκ°€ λ¬Έμ„œ λ‚΄μš©μ„ λΆ„μ„ν•˜μ—¬ <strong>λ‹΅λ³€κ³Ό κ·Όκ±°</strong>λ₯Ό μ°Ύμ•„μ€λ‹ˆλ‹€.</li>
<li>λ‹΅λ³€μ˜ <span class="highlight-box">λ…Έλž€μƒ‰ ν•˜μ΄λΌμ΄νŠΈ</span>λ₯Ό ν™•μΈν•˜μ„Έμš”.</li>
</ol>
</div>
""", unsafe_allow_html=True)
# μ±„νŒ… 기둝 ν‘œμ‹œ
else:
for idx, msg in enumerate(st.session_state.messages):
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
if msg.get("sources"):
render_sources_with_relevance(
sources=msg["sources"],
message_idx=idx,
move_to_page_callback=move_to_page
)
# μ±„νŒ… μž…λ ₯
if query := st.chat_input("μ§ˆλ¬Έμ„ μž…λ ₯ν•˜μ„Έμš”..."):
# μ‚¬μš©μž λ©”μ‹œμ§€ μΆ”κ°€
st.session_state.messages.append({
"role": "user",
"content": query
})
# 검색 및 λ‹΅λ³€ 생성
with st.spinner("πŸ” PROBIN이 κ²€μƒ‰μ€‘μž…λ‹ˆλ‹€..."):
print(f"\nπŸ” 질문: {query}")
retrieved_chunks = st.session_state.retriever.retrieve(query, TOP_K)
result = st.session_state.generator.generate_answer(query, retrieved_chunks)
# AI λ‹΅λ³€ μΆ”κ°€
st.session_state.messages.append({
"role": "assistant",
"content": result["answer"],
"sources": result["sources"]
})
# 첫 번째 좜처둜 이동
if result["sources"]:
top_source = result["sources"][0]
highlights = get_text_coordinates(
str(st.session_state.pdf_path),
top_source["page_num"],
top_source["text"]
)
st.session_state.annotations = highlights
st.session_state.current_page = top_source["page_num"]
print(f"βœ… λ‹΅λ³€ μ™„λ£Œ\n")
st.rerun()