#!/usr/bin/env python """ YAML 커버리지 분석 도구 ======================= 각 마크다운 파일의 YAML에서: 1. 어떤 키가 존재하는지 수집 2. 어떤 키가 청크로 변환되었는지 비교 3. 누락된 (미처리) 키 리포트 사용법: python scripts/analyze_yaml_coverage.py """ import yaml from pathlib import Path from collections import defaultdict from typing import Dict, Set, Any, List import click # 현재 sync_to_supabase.py에서 처리하는 키 목록 PROCESSED_KEYS = { # extracted_knowledge 직접 하위 "loyalty_programs", "membership_tiers", "points_systems", "credit_cards", "benefits", "hotel_properties", "tier_implementations", "subscription_programs", "promotions", "pricing", "pricing_analysis", # 메타데이터 (청크 불필요) "extraction_timestamp", "extractor_model", # facts 하위 "facts", # facts.pricing_analysis 처리됨 } # 청크 생성 불필요 (메타데이터 등) IGNORED_KEYS = { "evidence", "extra_attributes", "tips", "warnings", "reviewer_quotes", "pros_cons", "recommended_for", "not_recommended_for", "verification_needed", "user_tips", "source_info", "version_info", } def flatten_keys(obj: Any, prefix: str = "", depth: int = 0, max_depth: int = 3) -> Set[str]: """객체에서 모든 키 경로 추출""" keys = set() if depth > max_depth: return keys if isinstance(obj, dict): for key, value in obj.items(): full_key = f"{prefix}.{key}" if prefix else key keys.add(full_key) # 배열이 아닌 경우만 재귀 if not isinstance(value, list): keys.update(flatten_keys(value, full_key, depth + 1, max_depth)) return keys def get_top_level_keys(obj: Dict) -> Set[str]: """최상위 키만 추출""" if not isinstance(obj, dict): return set() return set(obj.keys()) def analyze_file(file_path: Path) -> Dict[str, Any]: """단일 파일 분석""" try: content = file_path.read_text(encoding='utf-8') if '---' not in content: return {"error": "YAML 없음"} yaml_part = content.split('---')[1] data = yaml.safe_load(yaml_part) if not data: return {"error": "빈 YAML"} # extracted_knowledge 추출 ek = data.get('extracted_knowledge', {}) if not ek: return {"error": "extracted_knowledge 없음"} # 최상위 키 추출 top_level = get_top_level_keys(ek) # 처리된 키 processed = top_level & PROCESSED_KEYS # 미처리 키 (무시할 키 제외) unprocessed = top_level - PROCESSED_KEYS - IGNORED_KEYS # facts 하위 분석 facts = ek.get('facts', {}) facts_keys = get_top_level_keys(facts) if isinstance(facts, dict) else set() return { "total_keys": len(top_level), "processed": list(processed), "unprocessed": list(unprocessed), "facts_keys": list(facts_keys) if facts_keys else [], "ignored": list(top_level & IGNORED_KEYS), } except Exception as e: return {"error": str(e)} def analyze_all_files(data_dir: Path) -> Dict[str, Any]: """모든 파일 분석""" results = {} all_keys = defaultdict(int) unprocessed_summary = defaultdict(list) for md_file in data_dir.rglob("*.md"): rel_path = md_file.relative_to(data_dir) result = analyze_file(md_file) results[str(rel_path)] = result if "error" not in result: for key in result.get("unprocessed", []): all_keys[key] += 1 unprocessed_summary[key].append(str(rel_path.stem)[:30]) return { "files": results, "summary": { "total_files": len(results), "success": sum(1 for r in results.values() if "error" not in r), "unprocessed_keys": dict(all_keys), "unprocessed_files": {k: v[:3] for k, v in unprocessed_summary.items()} } } @click.command() @click.option('--verbose', '-v', is_flag=True, help='자세한 출력') @click.option('--file', '-f', type=str, default=None, help='특정 파일만 분석') def main(verbose: bool, file: str): """YAML 커버리지 분석""" print("🔍 YAML 커버리지 분석") print("=" * 60) data_dir = Path("data/raw/Hotel") if file: # 특정 파일 분석 file_path = Path(file) if not file_path.exists(): file_path = data_dir / file if not file_path.exists(): print(f"❌ 파일을 찾을 수 없습니다: {file}") return result = analyze_file(file_path) print(f"\n📄 {file_path.name}") if "error" in result: print(f" ❌ {result['error']}") else: print(f" 총 키: {result['total_keys']}개") print(f" ✅ 처리됨: {result['processed']}") print(f" ❌ 미처리: {result['unprocessed']}") if result['facts_keys']: print(f" 📁 facts 하위: {result['facts_keys']}") return # 전체 분석 analysis = analyze_all_files(data_dir) summary = analysis["summary"] print(f"\n📊 요약") print(f" 총 파일: {summary['total_files']}개") print(f" 성공: {summary['success']}개") unprocessed = summary.get("unprocessed_keys", {}) if unprocessed: print(f"\n⚠️ 미처리 키 (청크로 변환되지 않은 키)") print("-" * 60) for key, count in sorted(unprocessed.items(), key=lambda x: -x[1]): files = summary["unprocessed_files"].get(key, []) file_str = ", ".join(files) print(f" {key:30} : {count:3}개 파일 ({file_str}...)") else: print("\n✅ 모든 키가 처리되고 있습니다!") if verbose: print("\n📄 파일별 상세") print("-" * 60) for path, result in analysis["files"].items(): if "error" in result: print(f" ❌ {path}: {result['error']}") elif result.get("unprocessed"): print(f" ⚠️ {path}") print(f" 미처리: {result['unprocessed']}") if __name__ == "__main__": main()