""" RFx 문서 분석 AI 에이전트 (PDF Text Highlighting) PDF 텍스트에 직접 하이라이트 표시 """ import streamlit as st import fitz # PyMuPDF import chromadb from sentence_transformers import SentenceTransformer, util import requests import os import re import shutil from collections import Counter import numpy as np from typing import List, Dict, Tuple import base64 GROK_API_KEY = os.getenv("GROK_API_KEY") GROK_API_BASE = "https://api.x.ai/v1" CHROMA_DIR = "./chroma_db" EMBEDDING_MODEL = 'jhgan/ko-sroberta-multitask' st.set_page_config( page_title="RFx 문서 분석 AI", page_icon="📄", layout="wide", initial_sidebar_state="collapsed" ) st.markdown(""" """, unsafe_allow_html=True) def init_session(): if 'processed' not in st.session_state: st.session_state.processed = False if 'vector_db' not in st.session_state: st.session_state.vector_db = None if 'embedder' not in st.session_state: st.session_state.embedder = None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'doc_metadata' not in st.session_state: st.session_state.doc_metadata = {} if 'pdf_bytes' not in st.session_state: st.session_state.pdf_bytes = None if 'pdf_pages_text' not in st.session_state: st.session_state.pdf_pages_text = {} if 'current_highlights' not in st.session_state: st.session_state.current_highlights = [] def extract_text_from_pdf(pdf_file) -> Tuple[List[str], List[Dict], bytes, Dict]: pdf_bytes = pdf_file.read() doc = fitz.open(stream=pdf_bytes, filetype="pdf") chunks = [] metadata_list = [] pages_text = {} CHUNK_SIZE = 300 OVERLAP_SIZE = 60 for page_num in range(len(doc)): page = doc[page_num] text = page.get_text("text") pages_text[page_num + 1] = text if not text.strip(): continue lines = [line.strip() for line in text.split('\n') if line.strip()] cleaned_text = '\n'.join(lines) sentences = re.split(r'([.!?]\s+|\n{2,})', cleaned_text) sentences = [s for s in sentences if s.strip()] current_chunk = "" current_length = 0 for sentence in sentences: sentence_length = len(sentence) if current_length + sentence_length > CHUNK_SIZE and current_chunk: chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "paragraph" }) overlap_text = current_chunk[-OVERLAP_SIZE:] if len(current_chunk) > OVERLAP_SIZE else current_chunk current_chunk = overlap_text + sentence current_length = len(current_chunk) else: current_chunk += sentence current_length += sentence_length if current_chunk.strip(): chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "paragraph" }) doc.close() return chunks, metadata_list, pdf_bytes, pages_text @st.cache_resource def load_embedding_model(): return SentenceTransformer(EMBEDDING_MODEL) def create_vector_db(chunks: List[str], metadata_list: List[Dict]): embedder = load_embedding_model() if os.path.exists(CHROMA_DIR): try: shutil.rmtree(CHROMA_DIR) except Exception: pass client = chromadb.PersistentClient( path=CHROMA_DIR, settings=chromadb.Settings( anonymized_telemetry=False, allow_reset=True, is_persistent=True ) ) try: collection = client.get_or_create_collection( name="rfx_docs", metadata={"hnsw:space": "cosine"} ) except Exception: try: client.delete_collection("rfx_docs") except Exception: pass collection = client.create_collection( name="rfx_docs", metadata={"hnsw:space": "cosine"} ) batch_size = 32 all_embeddings = [] for i in range(0, len(chunks), batch_size): batch = chunks[i:i + batch_size] embeddings = embedder.encode(batch, show_progress_bar=False, convert_to_numpy=True) all_embeddings.extend(embeddings) ids = [f"doc_{i}" for i in range(len(chunks))] collection.add( embeddings=[emb.tolist() for emb in all_embeddings], documents=chunks, metadatas=metadata_list, ids=ids ) return collection, embedder def extract_keywords(text: str, top_n: int = 5) -> List[str]: words_with_numbers = re.findall(r'[가-힣]*\d+[가-힣]*', text) words = re.findall(r'[가-힣]{2,}', text) stopwords = { '것', '등', '및', '그', '이', '저', '수', '때', '중', '내', '년', '월', '일', '경우', '대한', '통해', '위해', '관련', '있는', '하는', '되는', '이런', '저런', '어떤', '무슨', '어느', '누구', '언제', '어디', '무엇', '어떻게', '왜', '알려', '설명', '말해', '대해', '관하여', '있나요', '인가요', '무엇인가요', '얼마', '입니까', '합니까' } important_keywords = { '금액', '가격', '비용', '예산', '설계', '사업', '과업', '계약', '공사', '용역', '제안', '입찰', '낙찰', '견적', '단가' } filtered_words = [w for w in words if w not in stopwords and len(w) >= 2] word_freq = Counter(filtered_words) for word in word_freq: if word in important_keywords: word_freq[word] += 5 result = [] result.extend([w for w in words_with_numbers if w]) for word, _ in word_freq.most_common(top_n * 2): if word not in result: result.append(word) if len(result) >= top_n: break return result[:top_n] def rewrite_query(query: str) -> Dict[str, any]: original = query.strip() cleaned = re.sub(r'[?!,.~]', '', original) keywords = extract_keywords(cleaned, top_n=7) variations = [] variations.append(original) if keywords: if len(keywords) >= 2: variations.append(' '.join(keywords[:2])) if len(keywords) >= 3: variations.append(' '.join(keywords[:3])) for kw in keywords[:3]: if kw not in variations: variations.append(kw) synonym_map = { '금액': ['가격', '비용', '예산'], '설계': ['디자인', '계획'], '사업': ['프로젝트', '과업'], } for keyword in keywords[:2]: if keyword in synonym_map: for syn in synonym_map[keyword]: combined = original.replace(keyword, syn) if combined not in variations: variations.append(combined) break seen = set() unique_variations = [] for v in variations: if v not in seen and v.strip(): seen.add(v) unique_variations.append(v) return { 'original': original, 'cleaned': cleaned, 'keywords': keywords, 'variations': unique_variations[:7] } def search_with_multiple_queries(queries: List[str], collection, embedder, top_k: int = 5) -> Dict: all_results = [] seen_ids = set() for query in queries: query_embedding = embedder.encode([query], convert_to_numpy=True)[0] results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=min(top_k * 5, 30), include=["documents", "metadatas", "distances"] ) for i, doc_id in enumerate(results['ids'][0]): if doc_id not in seen_ids: seen_ids.add(doc_id) all_results.append({ 'id': doc_id, 'document': results['documents'][0][i], 'metadata': results['metadatas'][0][i], 'distance': results['distances'][0][i], 'query': query }) all_results.sort(key=lambda x: x['distance']) top_results = all_results[:top_k] return { 'documents': [[r['document'] for r in top_results]], 'metadatas': [[r['metadata'] for r in top_results]], 'distances': [[r['distance'] for r in top_results]], 'queries_used': queries, 'total_found': len(all_results) } def rerank_results(query: str, search_results: Dict, embedder, keywords: List[str]) -> Dict: docs = search_results['documents'][0] metas = search_results['metadatas'][0] distances = search_results['distances'][0] if not docs: return { 'documents': [[]], 'metadatas': [[]], 'distances': [[]], 'scores': [] } query_embedding = embedder.encode([query], convert_to_numpy=True)[0] doc_embeddings = embedder.encode(docs, convert_to_numpy=True) similarities = util.cos_sim(query_embedding, doc_embeddings)[0].cpu().numpy() keyword_scores = [] for doc in docs: doc_lower = doc.lower() score = sum(1 for kw in keywords if kw.lower() in doc_lower) keyword_scores.append(score) if max(keyword_scores) > 0: keyword_scores = [s / max(keyword_scores) for s in keyword_scores] numeric_query_terms = ['금액', '예산', '가격', '비용', '단가'] is_numeric_query = any(term in query for term in numeric_query_terms) if is_numeric_query: money_patterns = [ r'\d{1,3}(?:,\d{3})+원', r'\d+만원', r'\d+억원', r'\(일금\s*[^)]+\)' ] numeric_scores = [] for doc in docs: score = 0 for pattern in money_patterns: if re.search(pattern, doc): score = 1 break numeric_scores.append(score) if max(numeric_scores) > 0: numeric_scores = [s / max(numeric_scores) for s in numeric_scores] else: numeric_scores = [0.0 for _ in numeric_scores] final_scores = [ 0.6 * sim + 0.25 * kw + 0.15 * num for sim, kw, num in zip(similarities, keyword_scores, numeric_scores) ] else: final_scores = [0.7 * sim + 0.3 * kw for sim, kw in zip(similarities, keyword_scores)] ranked_indices = np.argsort(final_scores)[::-1] return { 'documents': [[docs[i] for i in ranked_indices]], 'metadatas': [[metas[i] for i in ranked_indices]], 'distances': [[distances[i] for i in ranked_indices]], 'scores': [final_scores[i] for i in ranked_indices] } def build_context(search_results: Dict, max_length: int = 3000) -> str: context_parts = [] current_length = 0 docs = search_results['documents'][0] metas = search_results['metadatas'][0] for i, (doc, meta) in enumerate(zip(docs, metas), 1): part = f"[문서 {i}] (페이지 {meta['page']})\n{doc}\n" part_length = len(part) if current_length + part_length > max_length: remaining = max_length - current_length if remaining > 200: part = f"[문서 {i}] (페이지 {meta['page']})\n{doc[:remaining-50]}...\n" context_parts.append(part) break context_parts.append(part) current_length += part_length return "\n".join(context_parts) def generate_answer(query: str, search_results: Dict, api_key: str) -> str: context = build_context(search_results, max_length=4000) system_prompt = """당신은 RFx 문서 전문 분석가입니다. **중요 원칙:** 1. 제공된 문서를 **매우 꼼꼼히** 읽고 정확한 정보를 찾으세요 2. 숫자, 금액, 날짜 등 구체적인 정보를 우선적으로 찾으세요 3. 문서에 정보가 있는데도 "없다"고 하지 마세요 4. 답변 시 반드시 [문서 N, 페이지 X] 형태로 출처 명시 5. 애매한 표현 대신 구체적인 수치를 제공하세요 **답변 형식:** - 핵심 답변을 먼저 명확하게 제시 - 출처 명시 (페이지 번호 포함) - 필요시 추가 관련 정보 제공""" user_prompt = f"""다음 문서들을 **매우 꼼꼼히** 읽고 질문에 답변하세요. <문서> {context} <질문> {query} **중요**: - 문서를 처음부터 끝까지 주의 깊게 읽으세요 - 숫자, 금액 등 구체적인 정보를 찾으세요 - 찾은 정보는 정확히 인용하세요 - 정말로 문서에 없는 경우에만 "찾을 수 없습니다"라고 하세요""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } payload = { "model": "grok-3", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.1, "max_tokens": 2000, "stream": False } try: response = requests.post( f"{GROK_API_BASE}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: error_detail = "" try: error_data = response.json() error_detail = error_data.get('error', {}).get('message', '') except Exception: error_detail = response.text return f"❌ API 오류 (코드: {response.status_code})\n\n{error_detail}" result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: return f"❌ 오류: {str(e)}" def highlight_text_in_pdf(pdf_bytes: bytes, highlight_info: List[Dict]) -> bytes: doc = fitz.open(stream=pdf_bytes, filetype="pdf") for item in highlight_info: page_num = item['page'] - 1 search_text = item['text'] if page_num >= len(doc): continue page = doc[page_num] text_variations = [ search_text, search_text.replace(' ', ''), search_text.replace(',', ''), ] for text_var in text_variations: text_instances = page.search_for(text_var) for inst in text_instances: highlight = page.add_highlight_annot(inst) highlight.set_colors(stroke=[1, 1, 0]) highlight.update() output_bytes = doc.tobytes() doc.close() return output_bytes def extract_highlight_texts(documents: List[str], keywords: List[str]) -> List[str]: highlight_texts = [] for doc in documents: money_patterns = [ r'\d{1,3}(?:,\d{3})+원', r'\d+만원', r'\d+억원', r'\(일금\s*[^)]+\)', ] for pattern in money_patterns: matches = re.findall(pattern, doc) highlight_texts.extend(matches) date_patterns = [ r'\d{4}[년.]\d{1,2}[월.]\d{1,2}일?', r'\d{2}\.\d{2}\.\d{2}', ] for pattern in date_patterns: matches = re.findall(pattern, doc) highlight_texts.extend(matches) for keyword in keywords: if keyword in doc: sentences = re.split(r'[.!?]\s+', doc) for sent in sentences: if keyword in sent and len(sent) < 100: highlight_texts.append(sent.strip()) unique_texts = list(set(highlight_texts)) unique_texts.sort(key=len) return unique_texts[:10] def render_pdf_with_highlights(pdf_bytes: bytes, highlight_info: List[Dict]): highlighted_pdf = highlight_text_in_pdf(pdf_bytes, highlight_info) doc = fitz.open(stream=highlighted_pdf, filetype="pdf") highlighted_pages = set(h['page'] for h in highlight_info) pdf_html = '
' for page_num in range(len(doc)): page = doc[page_num] pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) img_data = pix.tobytes("png") img_base64 = base64.b64encode(img_data).decode() pdf_html += '
' pdf_html += f'
📄 페이지 {page_num + 1}
' if (page_num + 1) in highlighted_pages: page_highlights = [h for h in highlight_info if h['page'] == page_num + 1] highlight_texts = ', '.join([f'"{h["text"][:30]}..."' for h in page_highlights[:3]]) pdf_html += f'
⭐ 하이라이트: {highlight_texts}
' pdf_html += f'' pdf_html += '
' pdf_html += '
' doc.close() return pdf_html def main(): init_session() st.markdown('
📄 RFx 문서 분석 AI 에이전트
', unsafe_allow_html=True) with st.sidebar: st.header("⚙️ 설정") grok_key = st.text_input("Grok API Key", value=GROK_API_KEY or "", type="password") if grok_key: os.environ["GROK_API_KEY"] = grok_key st.session_state.grok_key = grok_key st.divider() if st.button("🔄 데이터베이스 초기화", help="ChromaDB 오류 발생 시 클릭"): if os.path.exists(CHROMA_DIR): try: shutil.rmtree(CHROMA_DIR) st.success("✅ 데이터베이스 초기화 완료!") st.session_state.processed = False st.session_state.vector_db = None st.rerun() except Exception as e: st.error(f"초기화 실패: {str(e)}") st.divider() st.subheader("📤 문서 업로드") uploaded_file = st.file_uploader("PDF 파일 선택", type=['pdf']) if uploaded_file: if st.button("📄 문서 처리", type="primary", disabled=st.session_state.get('processing', False)): if not grok_key: st.error("⚠️ Grok API 키를 입력하세요!") return st.session_state.processing = True with st.spinner("📄 문서 처리 중..."): try: chunks, metadata_list, pdf_bytes, pages_text = extract_text_from_pdf(uploaded_file) st.info(f"📑 {len(chunks)}개 청크 추출 완료") with st.expander("📝 추출된 텍스트 샘플", expanded=False): if chunks: st.text(f"첫 번째 청크 (총 {len(chunks[0])}자):") st.code(chunks[0][:500] + "..." if len(chunks[0]) > 500 else chunks[0]) with st.spinner("🔧 벡터 데이터베이스 생성 중..."): collection, embedder = create_vector_db(chunks, metadata_list) st.session_state.vector_db = collection st.session_state.embedder = embedder st.session_state.pdf_bytes = pdf_bytes st.session_state.pdf_pages_text = pages_text st.session_state.processed = True st.session_state.doc_metadata = { "filename": uploaded_file.name, "chunks": len(chunks), "pages": len(set(m['page'] for m in metadata_list)) } st.success("✅ 문서 처리 완료!") except Exception as e: st.error(f"오류: {str(e)}") finally: st.session_state.processing = False st.divider() if st.session_state.processed: st.subheader("📊 문서 정보") meta = st.session_state.doc_metadata st.write(f"**파일명:** {meta['filename']}") st.write(f"**페이지:** {meta['pages']}페이지") st.write(f"**청크:** {meta['chunks']}개") if st.button("🗑️ 채팅 초기화"): st.session_state.chat_history = [] st.session_state.current_highlights = [] st.rerun() if not st.session_state.processed: st.info("👈 왼쪽 사이드바에서 PDF 문서를 업로드하세요") col1, col2, col3 = st.columns(3) with col1: st.markdown("### 📄 PDF 뷰어\n원본 문서 확인") with col2: st.markdown("### 🎨 하이라이트\n핵심 내용 강조") with col3: st.markdown("### 💬 AI 챗봇\n정확한 답변") else: col1, col2 = st.columns([1, 1]) with col1: st.markdown("### 📄 문서 뷰어") if st.session_state.pdf_bytes: pdf_html = render_pdf_with_highlights( st.session_state.pdf_bytes, st.session_state.current_highlights ) st.markdown(pdf_html, unsafe_allow_html=True) with col2: st.markdown("### 💬 AI 챗봇") chat_container = st.container() with chat_container: for msg in st.session_state.chat_history: with st.chat_message(msg["role"]): st.markdown(msg["content"]) if msg["role"] == "assistant" and "sources" in msg: with st.expander("📚 참조 문서"): for i, (doc, meta) in enumerate(zip( msg["sources"]["docs"], msg["sources"]["metas"] ), 1): score = msg["sources"]["scores"][i-1] if "scores" in msg["sources"] else None score_text = f" (관련도: {score:.2%})" if score else "" st.markdown(f"""
페이지 {meta['page']} {score_text}
{doc[:300]}{'...' if len(doc) > 300 else ''}
""", unsafe_allow_html=True) if prompt := st.chat_input("질문을 입력하세요...", disabled=st.session_state.get('processing', False)): if not st.session_state.get('grok_key'): st.error("⚠️ Grok API 키를 입력해주세요!") return with st.chat_message("user"): st.markdown(prompt) st.session_state.chat_history.append({"role": "user", "content": prompt}) with st.chat_message("assistant"): with st.spinner("🔍 검색 및 분석 중..."): try: query_info = rewrite_query(prompt) with st.expander("🔍 검색 디버그 정보", expanded=False): st.write("**추출된 키워드:**", query_info['keywords']) st.write("**검색 쿼리 변형:**", query_info['variations']) search_results = search_with_multiple_queries( query_info['variations'], st.session_state.vector_db, st.session_state.embedder, top_k=7 ) with st.expander("📄 검색된 문서 내용", expanded=False): st.write(f"**총 {search_results.get('total_found', 0)}개 문서 발견**") for i, doc in enumerate(search_results['documents'][0][:3], 1): st.write(f"**문서 {i}:**") st.text(doc[:300] + "..." if len(doc) > 300 else doc) st.divider() if 'total_found' in search_results: st.success(f"✅ {search_results['total_found']}개 문서에서 상위 7개 선택") reranked_results = rerank_results( query_info['original'], search_results, st.session_state.embedder, query_info['keywords'] ) answer = generate_answer( query_info['original'], reranked_results, st.session_state.grok_key ) st.markdown(answer) highlight_texts = extract_highlight_texts( reranked_results['documents'][0], query_info['keywords'] ) highlights = [] for doc, meta in zip(reranked_results['documents'][0], reranked_results['metadatas'][0]): for text in highlight_texts: if text in doc: highlights.append({ 'page': meta['page'], 'text': text }) st.session_state.current_highlights = highlights st.session_state.chat_history.append({ "role": "assistant", "content": answer, "sources": { "docs": reranked_results['documents'][0], "metas": reranked_results['metadatas'][0], "scores": reranked_results.get('scores', []), "keywords": query_info['keywords'] } }) with st.expander("📚 참조 문서", expanded=True): for i, (doc, meta) in enumerate(zip( reranked_results['documents'][0], reranked_results['metadatas'][0] ), 1): score = reranked_results.get('scores', [None])[i-1] score_text = f" (관련도: {score:.2%})" if score else "" st.markdown(f"""
페이지 {meta['page']} {score_text}
{doc[:300]}{'...' if len(doc) > 300 else ''}
""", unsafe_allow_html=True) st.rerun() except Exception as e: st.error(f"오류: {str(e)}") import traceback st.code(traceback.format_exc()) if __name__ == "__main__": main()