""" Supabase 동기화 스크립트 (리팩토링 버전) ======================================== YAML 데이터를 청크로 변환하여 Supabase에 저장. 핸들러 기반 아키텍처로 새 YAML 구조 지원이 쉽습니다. 사용법: python scripts/sync_to_supabase.py python scripts/sync_to_supabase.py --chain MARRIOTT python scripts/sync_to_supabase.py --dry-run """ import os import sys import yaml import hashlib import click from pathlib import Path from typing import List, Dict, Any, Optional # 프로젝트 루트를 Python 경로에 추가 sys.path.insert(0, str(Path(__file__).parent.parent)) from dotenv import load_dotenv load_dotenv() # 청크 핸들러 모듈 임포트 from chunk_handlers import ( CHUNK_HANDLERS, NESTED_HANDLERS, IGNORED_KEYS, get_handler, get_nested_handler, is_ignored, get_all_handler_keys, ) # =========================================================================== # 청크 ID 생성 # =========================================================================== def generate_chunk_id(doc_id: str, chunk_index: int) -> str: """청크 ID 생성""" return f"{doc_id}_chunk_{chunk_index:04d}" # =========================================================================== # 청크 생성 (핸들러 기반) # =========================================================================== def create_chunks_from_knowledge( doc_id: str, chain: str, extracted_knowledge: Dict[str, Any], verbose: bool = False ) -> List[Dict[str, Any]]: """ extracted_knowledge에서 검색 가능한 청크 생성. 핸들러 기반으로 각 키를 처리합니다. """ chunks = [] chunk_index = 0 unhandled_keys = [] # 호텔 정보 미리 추출 (context용) hotel_name = "Unknown Hotel" hotel_name_ko = None hotel_id_map = {} # hotel_id → 호텔명 매핑 hotel_properties = extracted_knowledge.get("hotel_properties", []) if hotel_properties and isinstance(hotel_properties, list): for hotel in hotel_properties: if isinstance(hotel, dict): h_id = hotel.get("hotel_id") h_name = hotel.get("name", "Unknown") h_name_localized = hotel.get("name_localized", {}) h_name_ko = h_name_localized.get("ko") if isinstance(h_name_localized, dict) else None # hotel_id 매핑 추가 if h_id: hotel_id_map[h_id] = { "name": h_name, "name_ko": h_name_ko, "country": hotel.get("location", {}).get("country", "") if isinstance(hotel.get("location"), dict) else "", "city": hotel.get("location", {}).get("city", "") if isinstance(hotel.get("location"), dict) else "" } # 첫 번째 호텔을 기본 호텔로 사용 first_hotel = hotel_properties[0] if isinstance(first_hotel, dict): hotel_name = first_hotel.get("name", "Unknown Hotel") name_localized = first_hotel.get("name_localized", {}) hotel_name_ko = name_localized.get("ko") if isinstance(name_localized, dict) else None # identity 섹션에서 추가 정보 추출 (프롬프트 출력 호환성) identity = extracted_knowledge.get("identity", {}) if identity and isinstance(identity, dict): # identity에서 정보 보강 if identity.get("title") and hotel_name == "Unknown Hotel": hotel_name = identity.get("title") if identity.get("chain"): chain = identity.get("chain", chain) # source 섹션에서 출처 정보 추출 source = extracted_knowledge.get("source", {}) source_type = source.get("source_type") if isinstance(source, dict) else None source_url = source.get("canonical_url") if isinstance(source, dict) else None retrieved_at = source.get("retrieved_at") if isinstance(source, dict) else None # version 섹션에서 유효 기간 정보 추출 version = extracted_knowledge.get("version", {}) effective_date = version.get("effective_date") if isinstance(version, dict) else None last_updated = version.get("last_updated") if isinstance(version, dict) else None # 컨텍스트 생성 (확장: source/version 정보 포함) context = { "chain": chain, "hotel_name": hotel_name, "hotel_name_ko": hotel_name_ko, "hotel_id_map": hotel_id_map, # hotel_id → 호텔 정보 매핑 추가 "doc_id": doc_id, # identity 추가 정보 "document_category": identity.get("category") if identity else None, "document_type": identity.get("doc_type", identity.get("document_type")) if identity else None, # source 정보 (검색 정렬/필터에 활용) "source_type": source_type, # OFFICIAL, USER_GENERATED, NEWS 등 "source_url": source_url, "retrieved_at": retrieved_at, # version 정보 (시간 기반 정렬에 활용) "effective_date": effective_date, "last_updated": last_updated, } # 공통 메타데이터 (모든 청크에 자동 추가) common_metadata = {} if source_type: common_metadata["source_type"] = source_type if effective_date: common_metadata["effective_date"] = effective_date if last_updated: common_metadata["last_updated"] = last_updated def add_chunk(content: str, metadata: Dict[str, Any]): """청크 추가 헬퍼 - 공통 메타데이터 자동 병합""" nonlocal chunk_index if content and content.strip() and len(content) > 50: # 핸들러 메타데이터 + 공통 메타데이터 병합 merged_metadata = {**common_metadata, **metadata} chunks.append({ "chunk_id": generate_chunk_id(doc_id, chunk_index), "doc_id": doc_id, "chain": chain, "content": content.strip()[:5000], "metadata": merged_metadata }) chunk_index += 1 # 1. 최상위 키 처리 for key, value in extracted_knowledge.items(): if is_ignored(key): continue handler = get_handler(key) if handler: try: result_chunks = handler(value, context) for rc in result_chunks: add_chunk(rc["content"], rc["metadata"]) except Exception as e: if verbose: print(f" ⚠️ 핸들러 오류 ({key}): {e}") else: unhandled_keys.append(key) # 2. 중첩 키 처리 (예: facts.pricing_analysis) for nested_key, handler in NESTED_HANDLERS.items(): if handler is None: continue parts = nested_key.split(".") if len(parts) == 2: parent_key, child_key = parts parent_data = extracted_knowledge.get(parent_key) if isinstance(parent_data, dict) and child_key in parent_data: try: result_chunks = handler(parent_data[child_key], context) for rc in result_chunks: add_chunk(rc["content"], rc["metadata"]) except Exception as e: if verbose: print(f" ⚠️ 중첩 핸들러 오류 ({nested_key}): {e}") # 3. 미처리 키 경고 if unhandled_keys and verbose: print(f" ⚠️ 미처리 키: {', '.join(unhandled_keys)}") return chunks # =========================================================================== # 문서 ID 생성 # =========================================================================== def generate_doc_id(file_path: str) -> str: """파일 경로에서 고유 문서 ID 생성""" return hashlib.md5(file_path.encode()).hexdigest()[:12] # =========================================================================== # YAML 파일 로드 # =========================================================================== def load_yaml_from_md(file_path: Path) -> Optional[Dict[str, Any]]: """마크다운 파일에서 YAML 프론트매터 추출""" try: content = file_path.read_text(encoding='utf-8') lines = content.split('\n') # YAML 블록의 시작과 끝을 줄 단위로 찾기 yaml_start = None yaml_end = None for i, line in enumerate(lines): stripped = line.strip() if stripped == '---': if yaml_start is None: yaml_start = i + 1 # --- 다음 줄부터 else: yaml_end = i # --- 이전 줄까지 break if yaml_start is None or yaml_end is None: return None # YAML 블록 추출 yaml_lines = lines[yaml_start:yaml_end] yaml_part = '\n'.join(yaml_lines) data = yaml.safe_load(yaml_part) return data if data else None except Exception as e: return None def detect_chain(file_path: Path) -> str: """파일 경로에서 체인/도메인 감지 (확장: 호텔 + 항공 + 카드 + 뉴스)""" path_str = str(file_path).upper() # --- 호텔 체인 --- if "MARRIOTT" in path_str: return "MARRIOTT" elif "HILTON" in path_str: return "HILTON" elif "IHG" in path_str: return "IHG" elif "ACCOR" in path_str or "ACCO" in path_str: return "ACCOR" elif "HYATT" in path_str: return "HYATT" # 롯데호텔 (호텔 경로에서만 감지, 카드사 롯데와 구분) elif "LOTTE" in path_str and "/HOTEL/" in path_str.upper(): return "LOTTE" # Jumeirah Hotels & Resorts (두바이 럭셔리 체인) elif "JUMEIRAH" in path_str and "/HOTEL/" in path_str.upper(): return "JUMEIRAH" # --- 항공사 (Phase 1 확장) --- elif "KOREAN_AIR" in path_str or "KOREANAIR" in path_str: return "KOREAN_AIR" elif "ASIANA" in path_str: return "ASIANA" elif "DELTA" in path_str: return "DELTA" elif "UNITED" in path_str: return "UNITED" elif "ALLIANCE" in path_str or "ONEWORLD" in path_str or "STAR_ALLIANCE" in path_str or "SKYTEAM" in path_str: return "ALLIANCE" elif "/AIRLINE/" in path_str.upper(): return "AIRLINE" # --- 카드사 (Phase 1 확장) --- elif "AMEX" in path_str or "FHR" in path_str or "THC" in path_str: return "AMEX" elif "SHINHAN" in path_str: return "SHINHAN" elif "HYUNDAI" in path_str and "CARD" in path_str: return "HYUNDAI" elif "HANA" in path_str and ("CARD" in path_str or "/CREDITCARD/" in path_str): return "HANA" elif "LOTTE" in path_str and ("CARD" in path_str or "/CREDITCARD/" in path_str): return "LOTTE" elif "WOORI" in path_str and ("CARD" in path_str or "/CREDITCARD/" in path_str): return "WOORI" elif "KB" in path_str and "CARD" in path_str: return "KB" elif "SAMSUNG" in path_str and "CARD" in path_str: return "SAMSUNG" elif "/CREDITCARD/" in path_str.upper(): return "CARD" # --- 뉴스/딜 (Phase 1 확장) --- elif "/NEWS/" in path_str.upper() or "/DEAL" in path_str.upper(): return "NEWS" # --- 기타 --- elif "BENEFIT_RATE" in path_str: return "BENEFIT" else: return "OTHER" # =========================================================================== # 메인 동기화 함수 # =========================================================================== @click.command() @click.option('--chain', '-c', type=str, default=None, help='특정 체인만 동기화') @click.option('--domain', '-d', type=click.Choice(['hotel', 'airline', 'card', 'news', 'all']), default='all', help='동기화할 도메인 (hotel/airline/card/news/all)') @click.option('--dry-run', is_flag=True, help='실제 저장하지 않고 확인만') @click.option('--skip-embeddings', is_flag=True, help='임베딩 생성 건너뛰기') @click.option('--verbose', '-v', is_flag=True, help='자세한 출력') @click.option('--file', '-f', type=str, default=None, help='특정 파일만 처리') def main(chain: Optional[str], domain: str, dry_run: bool, skip_embeddings: bool, verbose: bool, file: Optional[str]): """YAML 데이터를 Supabase로 동기화 (확장: 호텔 + 항공 + 카드 + 뉴스)""" print("🚀 Supabase 동기화 (여행 플랫폼 통합 버전)") print("=" * 60) # 도메인별 디렉토리 매핑 domain_dirs = { 'hotel': Path("data/raw/Hotel"), 'airline': Path("data/raw/Airline"), 'card': Path("data/raw/CreditCard"), 'news': Path("data/raw/News"), } # 스캔할 디렉토리 결정 if domain == 'all': data_dirs = [d for d in domain_dirs.values() if d.exists()] else: data_dirs = [domain_dirs[domain]] if domain_dirs[domain].exists() else [] if not data_dirs: print(f"❌ 데이터 디렉토리를 찾을 수 없습니다") return print(f"📂 스캔 디렉토리: {', '.join(str(d) for d in data_dirs)}") # 파일 목록 if file: md_files = [Path(file)] else: md_files = [] for data_dir in data_dirs: md_files.extend(list(data_dir.rglob("*.md"))) print(f"📁 총 {len(md_files)}개 파일 발견") # 체인 필터 if chain: chain = chain.upper() print(f"🔍 필터: {chain}") # 통계 stats = { "total": 0, "success": 0, "skipped": 0, "error": 0, "chunks": 0, } # Quarantine 리포트 (문제 파일 추적) quarantine = { "no_yaml": [], # YAML 프론트매터 없음 "no_knowledge": [], # extracted_knowledge 없음 "no_chunks": [], # 청크 생성 실패 "missing_fields": [], # 필수 필드 누락 경고 } all_chunks = [] all_docs = [] for md_file in md_files: stats["total"] += 1 # 체인 감지 file_chain = detect_chain(md_file) if chain and file_chain != chain: stats["skipped"] += 1 continue # YAML 로드 data = load_yaml_from_md(md_file) if not data or not isinstance(data, dict): if verbose: print(f" ⚠️ {md_file.name} (YAML 없음)") quarantine["no_yaml"].append(str(md_file.name)) stats["skipped"] += 1 continue # extracted_knowledge 추출 (없으면 최상위 data 사용) extracted_knowledge = data.get("extracted_knowledge") if not extracted_knowledge or not isinstance(extracted_knowledge, dict): # extracted_knowledge가 없으면 data 자체가 knowledge일 수 있음 # 다양한 도메인의 핵심 키들을 체크 core_keys = { # 호텔 "hotel_properties", "loyalty_programs", "loyalty_program", "membership_tiers", "tier_implementations", "hotel_brands", "best_rate_guarantee", "channel_benefit_packages", # 항공 "airline_programs", "airline_program", "airline_tiers", "award_charts", "airline_earning_rules", # 카드 "credit_cards", # 프로모션/뉴스 "deal_alerts", "news_updates", "promotions", # 기타 "points_systems", "member_rates", "dining_programs", } if any(key in data for key in core_keys): extracted_knowledge = data else: if verbose: print(f" ⚠️ {md_file.name} (extracted_knowledge 없음)") quarantine["no_knowledge"].append(str(md_file.name)) stats["skipped"] += 1 continue # 문서 ID 생성 # --file 옵션 사용 시 data_dir가 없을 수 있으므로 파일 경로에서 직접 계산 try: # data/raw 기준으로 상대 경로 계산 data_raw = Path("data/raw") if md_file.is_relative_to(data_raw): rel_path = str(md_file.relative_to(data_raw.parent)) elif "data/raw" in str(md_file): # 절대 경로인 경우 data/raw 이후 부분 추출 path_str = str(md_file) idx = path_str.find("data/raw") rel_path = path_str[idx:] if idx >= 0 else str(md_file.name) else: rel_path = str(md_file) except Exception: rel_path = str(md_file) doc_id = generate_doc_id(rel_path) # 청크 생성 chunks = create_chunks_from_knowledge( doc_id=doc_id, chain=file_chain, extracted_knowledge=extracted_knowledge, verbose=verbose ) if chunks: print(f" ✅ {md_file.name} ({len(chunks)}개 청크)") stats["success"] += 1 stats["chunks"] += len(chunks) # 문서 정보 all_docs.append({ "doc_id": doc_id, "source_file": rel_path, "chain": file_chain, "chunk_count": len(chunks), }) all_chunks.extend(chunks) else: if verbose: print(f" ⚠️ {md_file.name} (청크 없음)") quarantine["no_chunks"].append(str(md_file.name)) stats["skipped"] += 1 print() print("=" * 60) print(f"📊 결과: {stats['success']}개 성공, {stats['skipped']}개 건너뜀") print(f" 총 청크: {stats['chunks']}개") # Quarantine 리포트 출력 (문제 파일이 있는 경우) total_quarantined = sum(len(v) for v in quarantine.values()) if total_quarantined > 0: print(f"\n⚠️ Quarantine 리포트 ({total_quarantined}개 파일):") if quarantine["no_yaml"]: print(f" 📄 YAML 없음 ({len(quarantine['no_yaml'])}개): {', '.join(quarantine['no_yaml'][:5])}") if quarantine["no_knowledge"]: print(f" 📄 extracted_knowledge 없음 ({len(quarantine['no_knowledge'])}개): {', '.join(quarantine['no_knowledge'][:5])}") if quarantine["no_chunks"]: print(f" 📄 청크 생성 실패 ({len(quarantine['no_chunks'])}개): {', '.join(quarantine['no_chunks'][:5])}") print(" → 위 파일들은 수동 검토가 필요합니다.") if dry_run: print("\n🔍 Dry-run 모드 - 저장하지 않음") return # Supabase 저장 print("\n💾 Supabase에 저장 중...") try: from src.db import SupabaseAdapter adapter = SupabaseAdapter() # 문서 저장 (필수 필드만) for doc in all_docs: try: doc_data = { "doc_id": doc["doc_id"], "source_file": doc["source_file"], "chain": doc["chain"], "extracted_knowledge": {}, # 빈 dict (NOT NULL 필드) } adapter.client.table("kb_documents").upsert(doc_data, on_conflict='doc_id').execute() except Exception as e: if verbose: print(f" ⚠️ 문서 저장 오류: {e}") # 청크 저장 (임베딩 포함) saved = adapter.upsert_chunks( chunks=all_chunks, generate_embeddings=not skip_embeddings ) print(f"\n✅ Supabase 동기화 완료!") print(f" 문서: {len(all_docs)}개") print(f" 청크: {saved}개") except Exception as e: print(f"\n❌ Supabase 저장 오류: {e}") import traceback if verbose: traceback.print_exc() if __name__ == "__main__": main()