# rag_system.py - RAG 시스템 모듈 import os import json import torch import pickle import numpy as np # FAISS GPU/CPU 자동 선택 try: # GPU가 있고 CUDA가 사용 가능하면 faiss-gpu 시도 if torch.cuda.is_available(): try: import subprocess subprocess.run(['pip', 'install', 'faiss-gpu'], check=True, capture_output=True) import faiss print("✅ faiss-gpu 설치 및 로드 성공") except: import faiss print("⚠️ faiss-gpu 설치 실패, faiss-cpu 사용") else: import faiss print("💻 CPU 환경: faiss-cpu 사용") except ImportError: print("❌ FAISS 설치 필요: pip install faiss-cpu") import sys sys.exit(1) from sentence_transformers import SentenceTransformer from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from law_fetcher import HFLawAPIFetcher from config import RAG_CONFIG, TAX_KEYWORDS, RELEVANCE_KEYWORDS class HFSpacesTaxRAG: """HF Spaces 최적화 취득세 RAG 시스템""" def __init__(self, custom_config=None): print("🚀 HF Spaces 취득세 RAG 시스템 초기화 중...") # 설정 로드 (커스텀 설정 지원) if custom_config: self.config = custom_config print("🔧 커스텀 설정 적용") else: self.config = RAG_CONFIG self.tax_keywords = TAX_KEYWORDS # 디바이스 설정 (허깅페이스 스페이스 환경 고려) if torch.cuda.is_available(): self.device = 'cuda' else: self.device = 'cpu' # 허깅페이스 스페이스 환경 감지 self.is_huggingface_space = os.getenv('SPACE_ID') is not None if self.is_huggingface_space: print(f"🚀 허깅페이스 스페이스 환경 감지 - 디바이스: {self.device}") else: print(f"💻 로컬 개발 환경 - 디바이스: {self.device}") # 컴포넌트 초기화 self.load_embedding_model() self.setup_vectorizers() self.setup_law_fetcher() # 데이터 초기화 self.initialize_system() print("✅ RAG 시스템 초기화 완료") def load_embedding_model(self): """임베딩 모델 로드 (환경별 최적화)""" print("임베딩 모델 로딩 중...") # 환경에 따른 모델 선택 전략 if self.is_huggingface_space and self.device == 'cuda': # 허깅페이스 GPU 환경: 성능 우선 model_priority = self.config['embedding_models'] else: # 로컬 CPU 환경: 가벼운 모델 우선 model_priority = [ 'paraphrase-multilingual-MiniLM-L12-v2', # 가장 가벼운 모델 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2', 'jhgan/ko-sroberta-multitask' ] for model_name in model_priority: try: print(f"시도 중: {model_name}") # 로컬 CPU 환경에서는 더 보수적인 설정 if not self.is_huggingface_space and self.device == 'cpu': self.embedding_model = SentenceTransformer( model_name, device=self.device, cache_folder='./model_cache' # 로컬 캐시 사용 ) else: self.embedding_model = SentenceTransformer( model_name, device=self.device ) print(f"임베딩 모델 로드 완료: {model_name}") return except Exception as e: print(f"{model_name} 로드 실패: {e}") continue raise Exception("모든 임베딩 모델 로드 실패") def setup_vectorizers(self): """벡터라이저 설정""" self.tfidf_vectorizer = TfidfVectorizer( max_features=1000, ngram_range=(1, 3), stop_words=None ) # 초기화 self.vector_db = None self.tfidf_matrix = None self.documents = [] self.metadata = [] def setup_law_fetcher(self): """법령 페처 설정""" self.law_fetcher = HFLawAPIFetcher() def initialize_system(self): """시스템 초기화 - 데이터 로드 또는 구축""" if self.load_prebuilt_data(): print("✅ 기존 벡터 DB 데이터 로드 완료") else: print("🔄 새로운 RAG 시스템 구축 시작...") self.build_system() def load_prebuilt_data(self): """미리 구축된 벡터 DB 데이터 로드""" try: # 벡터 DB 로드 if os.path.exists('vector_db.faiss'): self.vector_db = faiss.read_index('vector_db.faiss') print(f"✅ 벡터 DB 로드: {self.vector_db.ntotal}개 벡터") else: return False # 문서 로드 if os.path.exists('documents.pkl'): with open('documents.pkl', 'rb') as f: self.documents = pickle.load(f) print(f"✅ 문서 로드: {len(self.documents)}개") else: return False # 메타데이터 로드 if os.path.exists('metadata.json'): with open('metadata.json', 'r', encoding='utf-8') as f: self.metadata = json.load(f) print(f"✅ 메타데이터 로드: {len(self.metadata)}개") else: return False # TF-IDF 행렬 구축 if self.documents: self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.documents) print("✅ TF-IDF 행렬 구축 완료") return True except Exception as e: print(f"❌ 벡터 DB 데이터 로드 실패: {e}") return False def build_system(self): """전체 시스템 구축 (캐시 활용)""" print("🗠️ RAG 시스템 구축 시작...") # 1. 법령 데이터 수집 (캐시 활용) raw_law_data = self.law_fetcher.fetch_laws() if not raw_law_data: print("❌ 법령 데이터 수집 실패") return False # 2. 문서 전처리 documents = [] metadata = [] parsed_law_data = {} for law_name, raw_data in raw_law_data.items(): parsed_data = self.law_fetcher._parse_json_response(raw_data) if parsed_data: parsed_law_data[law_name] = parsed_data if not parsed_law_data: print("❌ 법령 데이터 파싱 실패") return False for law_name, law_info in parsed_law_data.items(): for article_key, article_content in law_info.get('조문목록', {}).items(): # 취득세 관련도 계산 relevance = self._calculate_relevance(article_content) if relevance > 0.1: documents.append(article_content) metadata.append({ 'law_name': law_name, 'article_key': article_key, 'relevance': relevance, 'length': len(article_content) }) if not documents: print("❌ 취득세 관련 문서가 없습니다") return False print(f"📄 전처리 완료: {len(documents)}개 문서") # 3. 임베딩 생성 try: embeddings = self.embedding_model.encode( documents, show_progress_bar=True, convert_to_numpy=True, batch_size=self.config['batch_size'] ) print(f"✅ 임베딩 생성 완료: {embeddings.shape}") except Exception as e: print(f"❌ 임베딩 생성 실패: {e}") return False # 4. 벡터 데이터베이스 구축 try: dimension = embeddings.shape[1] self.vector_db = faiss.IndexFlatIP(dimension) faiss.normalize_L2(embeddings) self.vector_db.add(embeddings.astype('float32')) self.documents = documents self.metadata = metadata print(f"✅ 벡터 DB 구축 완료: {self.vector_db.ntotal}개 벡터") except Exception as e: print(f"❌ 벡터 DB 구축 실패: {e}") return False # 5. TF-IDF 구축 try: self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(documents) print("✅ TF-IDF 구축 완료") except Exception as e: print(f"❌ TF-IDF 구축 실패: {e}") # 6. 구축된 데이터 저장 self._save_built_system() print("🎉 RAG 시스템 구축 완료!") return True def _save_built_system(self): """구축된 시스템 저장""" try: print("💾 구축된 RAG 시스템 저장 중...") if self.vector_db: faiss.write_index(self.vector_db, 'vector_db.faiss') print("✅ 벡터 DB 저장 완료") if self.documents: with open('documents.pkl', 'wb') as f: pickle.dump(self.documents, f) print("✅ 문서 저장 완료") if self.metadata: with open('metadata.json', 'w', encoding='utf-8') as f: json.dump(self.metadata, f, ensure_ascii=False, indent=2) print("✅ 메타데이터 저장 완료") except Exception as e: print(f"❌ 시스템 저장 실패: {e}") def _calculate_relevance(self, content): """취득세 관련도 계산""" content_lower = content.lower() score = 0.0 total_possible_score = sum(RELEVANCE_KEYWORDS.values()) for keyword, weight in RELEVANCE_KEYWORDS.items(): if keyword in content_lower: frequency = content_lower.count(keyword) frequency_multiplier = min(1 + (frequency - 1) * 0.15, 2.0) score += weight * frequency_multiplier max_realistic_score = total_possible_score * 0.3 return min(score / max_realistic_score, 1.0) def search(self, query, top_k=None): """하이브리드 검색""" if top_k is None: top_k = self.config['top_k'] if not self.vector_db or not self.documents: return [] print(f"🔍 검색 쿼리: '{query}'") try: # 벡터 검색 query_embedding = self.embedding_model.encode([query], convert_to_numpy=True) faiss.normalize_L2(query_embedding) vector_scores, vector_indices = self.vector_db.search( query_embedding.astype('float32'), min(top_k * 2, len(self.documents)) ) # TF-IDF 검색 tfidf_scores = [] if self.tfidf_matrix is not None: query_tfidf = self.tfidf_vectorizer.transform([query]) tfidf_similarities = cosine_similarity(query_tfidf, self.tfidf_matrix).flatten() tfidf_scores = tfidf_similarities # 결과 조합 results = [] for i, (v_score, v_idx) in enumerate(zip(vector_scores[0], vector_indices[0])): if v_score > self.config['similarity_threshold']: tfidf_score = tfidf_scores[v_idx] if len(tfidf_scores) > v_idx else 0.0 # 하이브리드 점수 hybrid_score = ( self.config['hybrid_weights']['vector'] * float(v_score) + self.config['hybrid_weights']['tfidf'] * float(tfidf_score) ) results.append({ 'content': self.documents[v_idx], # LLM 프로세서가 기대하는 키 이름 'document': self.documents[v_idx], # 호환성을 위해 유지 'metadata': self.metadata[v_idx], 'vector_score': float(v_score), 'tfidf_score': float(tfidf_score), 'hybrid_score': hybrid_score }) # 하이브리드 점수로 정렬 results.sort(key=lambda x: x['hybrid_score'], reverse=True) print(f"📊 검색 결과: {len(results[:top_k])}개 문서 발견") return results[:top_k] except Exception as e: print(f"❌ 검색 오류: {e}") return [] def generate_answer(self, query, search_results): """검색 결과 기반 답변 생성""" if not search_results: return "관련된 법령 조문을 찾을 수 없습니다." answer_parts = [f"**💬 '{query}'에 대한 답변입니다.**\n"] answer_parts.append("📋 **관련 법령:**\n") for i, result in enumerate(search_results, 1): metadata = result['metadata'] hybrid_score = result['hybrid_score'] answer_parts.append( f"**{i}. {metadata['law_name']} {metadata['article_key']}** " f"(관련도: {hybrid_score:.3f})\n" ) answer_parts.append(f"```\n{result['document'][:300]}...\n```\n") # 규칙 기반 추가 정보 self._add_rule_based_info(query, answer_parts) return '\n'.join(answer_parts) def _add_rule_based_info(self, query, answer_parts): """규칙 기반 추가 정보""" if any(keyword in query for keyword in ['세율', '세액', '얼마']): answer_parts.append("\n💡 **취득세 세율 정보:**") answer_parts.append("- **주택**: 1~3% (보유 주택 수에 따라)") answer_parts.append("- **일반 부동산**: 4%") answer_parts.append("- **농지**: 3%\n") if any(keyword in query for keyword in ['신고', '기한', '언제']): answer_parts.append("\n⏰ **신고 기한:**") answer_parts.append("- **일반 취득**: 취득일부터 60일 이내") answer_parts.append("- **상속**: 취득일이 속하는 달의 말일부터 6개월 이내\n") if any(keyword in query for keyword in ['감면', '특례', '혜택']): answer_parts.append("\n🎁 **주요 감면 혜택:**") answer_parts.append("- **1세대 1주택**: 6억원 이하 50% 감면") answer_parts.append("- **신혼부부**: 추가 감면 혜택") answer_parts.append("- **농지**: 직접 영농시 감면\n") def query(self, user_question): """전체 질의응답 처리""" print(f"\n❓ 질문: {user_question}") print("="*60) # 1. 관련 문서 검색 search_results = self.search(user_question, top_k=3) # 2. 답변 생성 answer = self.generate_answer(user_question, search_results) return { 'question': user_question, 'answer': answer, 'sources': search_results, 'source_count': len(search_results) } def get_cache_info(self): """캐시 정보 반환""" return { 'cache_dir': self.law_fetcher.cache_dir, 'cache_count': len(self.law_fetcher.cache_info), 'cache_info': self.law_fetcher.cache_info } # 싱글톤 인스턴스 (Flask 앱에서 사용) _rag_instance = None def get_rag_system(custom_config=None): """RAG 시스템 싱글톤 인스턴스 반환""" global _rag_instance if _rag_instance is None or custom_config is not None: _rag_instance = HFSpacesTaxRAG(custom_config) return _rag_instance # Flask 앱에서 사용할 수 있는 간단한 함수들 def search_tax_law(question): """취득세 법령 검색 + LLM 처리 (Flask 앱용)""" try: # LLM 처리기 import from llm_processor import get_llm_processor, is_llm_available print(f"🔍 처리 시작: {question}") # 1. RAG 검색 rag = get_rag_system() rag_result = rag.query(question) print(f"📚 RAG 검색 완료: {rag_result['source_count']}개 문서") # 2. LLM 처리 시도 if is_llm_available(): print("🤖 LLM 처리 시도 중...") llm = get_llm_processor() # RAG 검색 결과를 LLM에 전달 search_results = rag_result.get('sources', []) final_response = llm.process_with_rag(question, search_results) print("✅ LLM 처리 완료") return final_response else: print("⚠️ LLM 사용 불가, RAG 결과만 반환") return rag_result['answer'] except Exception as e: print(f"❌ 처리 오류: {e}") import traceback traceback.print_exc() return f"질문 처리 중 오류가 발생했습니다: {e}" def search_tax_law_rag_only(question): """RAG만 사용한 검색 (LLM 없이)""" try: rag = get_rag_system() result = rag.query(question) return result['answer'] except Exception as e: print(f"❌ RAG 검색 오류: {e}") return f"법령 검색 중 오류가 발생했습니다: {e}" def is_rag_available(): """RAG 시스템 사용 가능 여부 확인""" try: rag = get_rag_system() return rag.vector_db is not None and len(rag.documents) > 0 except Exception: return False def get_system_status(): """전체 시스템 상태 확인""" try: from llm_processor import is_llm_available status = { 'rag_available': is_rag_available(), 'llm_available': is_llm_available(), 'pipeline_complete': is_rag_available() and is_llm_available() } if status['rag_available']: rag = get_rag_system() status['rag_details'] = { 'documents': len(rag.documents), 'device': rag.device, 'is_hf_space': rag.is_huggingface_space } return status except Exception as e: return {'error': str(e)}