""" Context Package Builder for Unified AI Course Generation (v2) Supabase story_spots 테이블 + walking_network.json을 기반으로 Gemini에 전달할 지역 컨텍스트 패키지를 조합합니다. Usage: from utils.context_builder import ContextBuilder builder = ContextBuilder() context = builder.build_zone_context("A", theme="history", max_spots=30) """ import json import os import logging import time from typing import Dict, List, Any, Optional, Tuple from pathlib import Path from utils.geo import haversine from utils.osrm_distance import get_walking_distances logger = logging.getLogger(__name__) # ============ Constants ============ WALKING_SPEED_KMH = 4.0 DISTANCE_MULTIPLIER = 1.3 # 스팟 수 제한 (토큰 예산 내) MAX_SPOTS_PER_ZONE = 50 MAX_SPOTS_TOTAL = 30 # AI에게 전달할 최대 후보 스팟 수 # 카테고리 한글 매핑 CATEGORY_KR = { "beach": "해변", "coastline": "해안", "harbor": "포구", "oreum": "오름", "forest": "숲", "village": "마을", "shrine": "신당", "fortress": "성곽", "beacon": "봉수대", "wetland": "습지", "traditional": "전통", "ruins": "유적", "cafe": "카페", "restaurant": "음식점", "market": "시장", "school": "학교", "community": "마을회관", "product": "특산물", } # 활동성 범위 매칭 (light 유저도 moderate 산책 코스를 볼 수 있어야 함) ACTIVITY_COMPATIBLE = { "light": {"light", "moderate"}, "moderate": {"light", "moderate", "active"}, "active": {"moderate", "active"}, } # 테마 교차 매칭 (healing ↔ nature, photo ↔ nature) THEME_COMPATIBLE = { "healing": {"healing", "nature"}, "nature": {"nature", "healing"}, "photo": {"photo", "nature"}, "history": {"history"}, "food": {"food"}, } # 최소 후보 스팟 수 (이 이하면 필터 단계적 완화) MIN_SPOTS_THRESHOLD = 3 # DB 캐시 TTL (초) SPOTS_CACHE_TTL_SECONDS = 300 # 5분 class ContextBuilder: """Gemini 컨텍스트 패키지 빌더""" def __init__(self, data_dir: Optional[str] = None): if data_dir is None: # 여러 경로 후보에서 data/ 디렉토리 탐색 # 실제 파일(walking_network.json) 존재 여부로 판단 # - HF Space: /app/data/ (deploy workflow가 복사) # - 로컬 개발: project_root/data/ # 주의: /data는 HF persistent storage root이므로 후순위 base = Path(__file__).parent.parent # backend/ 또는 /app/ candidates = [ base / "data", # /app/data/ (HF Space) base.parent / "data", # project_root/data/ (로컬 개발) ] for candidate in candidates: if (candidate / "walking_network.json").exists(): data_dir = str(candidate) break if data_dir is None: # 파일 없어도 디렉토리라도 있는 경로 사용 for candidate in candidates: if candidate.exists(): data_dir = str(candidate) break if data_dir is None: data_dir = str(candidates[0]) self._data_dir = data_dir self._spots: List[Dict] = [] self._spots_by_id: Dict[str, Dict] = {} self._network: Dict = {} self._network_loaded = False self._spots_loaded = False self._spots_loaded_at: float = 0.0 # TTL 캐시용 타임스탬프 # 스토리라인 캐시 self._storylines: List[Dict] = [] self._storylines_loaded = False self._storylines_loaded_at: float = 0.0 @staticmethod def _row_to_spot(row: Dict[str, Any]) -> Dict[str, Any]: """DB row → spot dict 변환 (기존 코드와 호환되는 포맷)""" spot: Dict[str, Any] = { "id": row["id"], "name": row["name"], "name_en": row.get("name_en"), "name_zh": row.get("name_zh"), "category": row["category"], "location": { "lat": float(row["lat"]), "lng": float(row["lng"]), "address": row.get("address", ""), }, "story": { "title": row.get("story_title", ""), "content": row.get("story_content", ""), "source": row.get("story_source", ""), "tips": row.get("tips", ""), }, "tags": { "tier1": row.get("tags_tier1") or {}, "tier2": row.get("tags_tier2") or [], }, "meta": row.get("meta") or {}, "media": { "main_image": row.get("main_image_url"), "thumbnail": row.get("thumbnail_url"), "generated_image": row.get("generated_image_url"), }, "priority_score": row.get("priority_score", 5), "status": row.get("status", "active"), } # zone / cluster_id: DB에 있으면 포함 if row.get("zone") is not None: spot["zone"] = row["zone"] if row.get("cluster_id") is not None: spot["cluster_id"] = row["cluster_id"] # 향토지 메타데이터 (마을, 출처, 시대) if row.get("village"): spot["village"] = row["village"] if row.get("source_book"): spot["source_book"] = row["source_book"] if row.get("historical_period"): spot["historical_period"] = row["historical_period"] return spot def _load_spots_from_db(self) -> bool: """Supabase story_spots 테이블에서 active 스팟 로드. 성공 시 True.""" try: from db import get_supabase supabase = get_supabase() result = supabase.table("story_spots") \ .select("*") \ .eq("status", "active") \ .execute() rows = result.data or [] self._spots = [self._row_to_spot(r) for r in rows] self._spots_by_id = {s["id"]: s for s in self._spots} self._spots_loaded = True self._spots_loaded_at = time.monotonic() logger.info(f"ContextBuilder loaded {len(self._spots)} spots from Supabase") return True except Exception as e: logger.error(f"Failed to load spots from Supabase: {e}") return False def _is_spots_cache_expired(self) -> bool: """스팟 캐시 TTL 만료 여부""" if not self._spots_loaded: return True elapsed = time.monotonic() - self._spots_loaded_at return elapsed >= SPOTS_CACHE_TTL_SECONDS def _is_storylines_cache_expired(self) -> bool: """스토리라인 캐시 TTL 만료 여부""" if not self._storylines_loaded: return True elapsed = time.monotonic() - self._storylines_loaded_at return elapsed >= SPOTS_CACHE_TTL_SECONDS def _load_storylines_from_db(self) -> bool: """Supabase storylines + storyline_spots 테이블에서 스토리라인 로드""" try: from db import get_supabase supabase = get_supabase() # 모든 active 스토리라인 로드 sl_result = supabase.table("storylines") \ .select("*") \ .eq("status", "active") \ .execute() sl_rows = sl_result.data or [] if not sl_rows: self._storylines = [] self._storylines_loaded = True self._storylines_loaded_at = time.monotonic() return True storylines_map = {sl["id"]: {**sl, "spots": []} for sl in sl_rows} # 모든 storyline_spots를 한 번에 로드 ss_result = supabase.table("storyline_spots") \ .select("*") \ .in_("storyline_id", list(storylines_map.keys())) \ .order("spot_order") \ .execute() for ss in (ss_result.data or []): sl_id = ss["storyline_id"] if sl_id in storylines_map: storylines_map[sl_id]["spots"].append(ss) self._storylines = list(storylines_map.values()) self._storylines_loaded = True self._storylines_loaded_at = time.monotonic() logger.info(f"ContextBuilder loaded {len(self._storylines)} storylines") return True except Exception as e: logger.error(f"Failed to load storylines from DB: {e}") return False def find_matching_storylines( self, lat: float, lng: float, theme: Optional[str] = None, duration_minutes: int = 60, max_results: int = 3, ) -> List[Dict]: """ 사용자 위치 + 취향에 맞는 스토리라인 매칭 스코어 계산: - 위치 근접도: 최대 50점 (반경 내 50, 반경 1.5x 내 25, 이상 0) - 테마 매칭: 최대 30점 (정확 30, 호환 15, 미지정 10) - 시간 적합도: 최대 20점 (20% 이내 20, 50% 이내 10) Returns: [{storyline: {...}, score: int, distance_km: float}] """ self._ensure_loaded() if not self._storylines: return [] scored = [] for sl in self._storylines: score = 0 # 1. 위치 근접도 (max 50) sl_lat = float(sl.get("center_lat") or 0) sl_lng = float(sl.get("center_lng") or 0) sl_radius = float(sl.get("radius_km") or 2.0) if not sl_lat or not sl_lng: continue dist = haversine(lat, lng, sl_lat, sl_lng) if dist > sl_radius * 2: continue # 반경 2배 초과 → 스킵 if dist <= sl_radius: score += 50 elif dist <= sl_radius * 1.5: score += 25 # 2. 테마 매칭 (max 30) sl_theme = sl.get("theme") if theme and sl_theme: if theme == sl_theme: score += 30 elif sl_theme in THEME_COMPATIBLE.get(theme, set()): score += 15 # 테마 불일치 = 0점 elif not theme: score += 10 # 테마 미지정 → 약간의 보너스 # 3. 시간 적합도 (max 20) sl_minutes = sl.get("estimated_minutes") or 60 diff_ratio = abs(duration_minutes - sl_minutes) / max(duration_minutes, 1) if diff_ratio <= 0.2: score += 20 elif diff_ratio <= 0.5: score += 10 # 4. 스팟 유효성 검증: 모든 스팟이 실제로 존재하는지 확인 sl_spots = sl.get("spots", []) all_valid = all( ss.get("spot_id") in self._spots_by_id for ss in sl_spots ) if not all_valid or not sl_spots: logger.warning(f"[storyline] Skipping {sl['id']}: missing spots") continue scored.append({ "storyline": sl, "score": score, "distance_km": round(dist, 3), }) scored.sort(key=lambda x: -x["score"]) return scored[:max_results] def _ensure_loaded(self): """데이터 lazy loading (스팟: DB + TTL 캐시, 네트워크: 파일, 스토리라인: DB + TTL 캐시)""" # 네트워크 데이터는 정적 파일에서 한 번만 로드 if not self._network_loaded: network_path = os.path.join(self._data_dir, "walking_network.json") try: with open(network_path, "r", encoding="utf-8") as f: self._network = json.load(f) logger.info(f"ContextBuilder loaded network: " f"{len(self._network.get('layer1_clusters', {}))} clusters") except FileNotFoundError as e: logger.error(f"Network file not found: {e}") except json.JSONDecodeError as e: logger.error(f"Network JSON parse error: {e}") self._network_loaded = True # 스팟 데이터: DB에서 로드 + TTL 캐시 (만료 시 재로드) if self._is_spots_cache_expired(): if not self._load_spots_from_db(): # DB 실패 시: 이전 캐시 데이터 유지, TTL 리셋 if self._spots: self._spots_loaded_at = time.monotonic() logger.warning(f"DB reload failed, keeping {len(self._spots)} cached spots") else: logger.error("DB load failed and no cached spots available") # 스토리라인: DB에서 로드 + TTL 캐시 (실패해도 기존 흐름에 영향 없음) if self._is_storylines_cache_expired(): if not self._load_storylines_from_db(): if self._storylines: self._storylines_loaded_at = time.monotonic() logger.warning(f"Storyline reload failed, keeping {len(self._storylines)} cached") else: self._storylines_loaded = True # 빈 상태로 마킹하여 반복 시도 방지 def get_zone_for_location(self, lat: float, lng: float) -> str: """좌표에 가장 가까운 존 반환""" self._ensure_loaded() zones = self._network.get("zones", {}) best_zone = "C" best_dist = float('inf') for zone_id, zone in zones.items(): lat_range = zone.get("lat_range", (0, 0)) lng_range = zone.get("lng_range", (0, 0)) center_lat = (lat_range[0] + lat_range[1]) / 2 center_lng = (lng_range[0] + lng_range[1]) / 2 dist = haversine(lat, lng, center_lat, center_lng) if dist < best_dist: best_dist = dist best_zone = zone_id return best_zone def get_nearby_zones(self, zone_id: str) -> List[str]: """인접 존 목록""" self._ensure_loaded() adj = self._network.get("zone_adjacency", {}) return adj.get(zone_id, []) def _get_radius_candidates( self, lat: float, lng: float, radius_km: float, ) -> List[Dict]: """반경 내 활성 스팟 + 거리 계산 (필터 전 단계)""" self._ensure_loaded() candidates = [] for spot in self._spots: if spot.get("status", "active") != "active": continue s_lat = spot["location"]["lat"] s_lng = spot["location"]["lng"] dist = haversine(lat, lng, s_lat, s_lng) if dist <= radius_km: candidates.append({**spot, "_distance_km": round(dist, 3)}) return candidates def _apply_filters( self, candidates: List[Dict], theme: Optional[str] = None, activity_level: Optional[str] = None, mood: Optional[List[str]] = None, use_theme_compat: bool = True, use_activity_range: bool = True, ) -> List[Dict]: """조건 필터 적용 (범위 매칭 지원)""" filtered = [] # 테마 호환 세트 if theme and use_theme_compat: accepted_themes = THEME_COMPATIBLE.get(theme, {theme}) elif theme: accepted_themes = {theme} else: accepted_themes = None # 활동성 호환 세트 if activity_level and use_activity_range: accepted_activities = ACTIVITY_COMPATIBLE.get(activity_level, {activity_level}) elif activity_level: accepted_activities = {activity_level} else: accepted_activities = None for spot in candidates: tags = spot.get("tags", {}).get("tier1", {}) # 테마 필터 if accepted_themes: spot_themes = set(tags.get("theme", [])) if not spot_themes & accepted_themes: # 음식 테마면 restaurant/cafe 카테고리 허용 if theme == "food" and spot["category"] in ("restaurant", "cafe"): pass else: continue # 활동성 필터 (범위 매칭) if accepted_activities: spot_activity = tags.get("activity_level") if spot_activity and spot_activity not in accepted_activities: continue # 분위기 필터 (하나라도 매칭) if mood: spot_moods = tags.get("mood", []) if spot_moods and not any(m in spot_moods for m in mood): continue filtered.append(spot) return filtered def filter_spots( self, lat: float, lng: float, radius_km: float = 3.0, theme: Optional[str] = None, activity_level: Optional[str] = None, mood: Optional[List[str]] = None, max_spots: int = MAX_SPOTS_TOTAL, ) -> List[Dict]: """ 조건에 맞는 스팟 필터링 + 거리순 정렬 최소 스팟 보장을 위해 단계적 필터 완화 적용 Returns: [{...spot_data, _distance_km: float}] """ # 반경 내 후보 (필터 전) candidates = self._get_radius_candidates(lat, lng, radius_km) # 1단계: 전체 필터 적용 (테마 호환 + 활동성 범위 매칭) result = self._apply_filters(candidates, theme, activity_level, mood) # 2단계: 스팟 부족 시 단계적 필터 완화 if len(result) < MIN_SPOTS_THRESHOLD: # 2-1: mood 필터 제거 result = self._apply_filters(candidates, theme, activity_level, mood=None) if len(result) >= MIN_SPOTS_THRESHOLD: logger.info(f"[filter] Relaxed mood filter: {len(result)} spots") if len(result) < MIN_SPOTS_THRESHOLD: # 2-2: activity_level + mood 필터 제거 (테마만 유지) result = self._apply_filters(candidates, theme, activity_level=None, mood=None) if len(result) >= MIN_SPOTS_THRESHOLD: logger.info(f"[filter] Relaxed activity filter: {len(result)} spots") if len(result) < MIN_SPOTS_THRESHOLD: # 2-3: 반경 2배 확장 + 테마만 expanded = self._get_radius_candidates(lat, lng, radius_km * 2) result = self._apply_filters(expanded, theme, activity_level=None, mood=None) if len(result) >= MIN_SPOTS_THRESHOLD: logger.info(f"[filter] Expanded radius to {radius_km * 2}km: {len(result)} spots") if len(result) < MIN_SPOTS_THRESHOLD: # 2-4: 최후 수단 - 테마도 해제, 반경 2배 내 모든 스팟 result = self._get_radius_candidates(lat, lng, radius_km * 2) logger.warning(f"[filter] All filters dropped, using {len(result)} spots in {radius_km * 2}km") # 관련성 스코어 계산 후 정렬 (테마/무드 매칭 → priority_score → 거리) for spot in result: score = spot.get("priority_score", 5) * 10 # 기본 0-100 tags = spot.get("tags", {}).get("tier1", {}) # 테마 정확 매칭 보너스 if theme: spot_themes = set(tags.get("theme", [])) if theme in spot_themes: score += 30 # 정확 매칭 elif spot_themes & THEME_COMPATIBLE.get(theme, set()): score += 15 # 호환 매칭 # 무드 매칭 보너스 if mood: spot_moods = set(tags.get("mood", [])) matched = len(spot_moods & set(mood)) score += matched * 10 # 무드당 10점 # 활동성 매칭 보너스 if activity_level: spot_activity = tags.get("activity_level") if spot_activity == activity_level: score += 10 spot["_relevance_score"] = score result.sort(key=lambda s: (-s["_relevance_score"], s["_distance_km"])) # 거리 다양성 보장: 가까운 스팟만 밀집되지 않도록 밴드별 분배 # (관련성 높은 스팟 우선 + 다양한 거리 대역에서 고르게 선택) if len(result) > max_spots and radius_km > 0: result = self._ensure_distance_diversity(result, max_spots, radius_km) else: result = result[:max_spots] return result def _ensure_distance_diversity( self, spots: List[Dict], max_spots: int, radius_km: float, ) -> List[Dict]: """ 거리 대역별 분배로 후보 스팟의 공간적 다양성 보장. 가까운 곳만 밀집되면 AI가 0.2km 코스를 만드는 문제 방지. 3개 밴드로 나누어 각 밴드에서 최소 비율을 확보: - 근거리 (0 ~ 33%): 후보의 50% - 중거리 (33% ~ 66%): 후보의 30% - 원거리 (66% ~ 100%): 후보의 20% """ band_boundaries = [radius_km * 0.33, radius_km * 0.66, radius_km] band_quotas = [ max(5, int(max_spots * 0.50)), # 근거리: 50% max(3, int(max_spots * 0.30)), # 중거리: 30% max(2, int(max_spots * 0.20)), # 원거리: 20% ] bands: List[List[Dict]] = [[], [], []] for spot in spots: dist = spot.get("_distance_km", 0) if dist <= band_boundaries[0]: bands[0].append(spot) elif dist <= band_boundaries[1]: bands[1].append(spot) else: bands[2].append(spot) selected: List[Dict] = [] remaining: List[Dict] = [] for i, (band, quota) in enumerate(zip(bands, band_quotas)): selected.extend(band[:quota]) remaining.extend(band[quota:]) # 쿼터 미달 밴드가 있으면 나머지에서 채움 (관련성순 유지) if len(selected) < max_spots: remaining.sort(key=lambda s: (-s.get("_relevance_score", 0), s["_distance_km"])) selected.extend(remaining[:max_spots - len(selected)]) # 최종 정렬: 관련성 → 거리 selected.sort(key=lambda s: (-s.get("_relevance_score", 0), s["_distance_km"])) band_counts = [min(len(b), q) for b, q in zip(bands, band_quotas)] logger.info(f"[filter] Distance diversity: bands={band_counts}, " f"total={len(selected)}/{len(spots)} spots") return selected[:max_spots] async def build_area_context( self, lat: float, lng: float, radius_km: float = 3.0, theme: Optional[str] = None, activity_level: Optional[str] = None, mood: Optional[List[str]] = None, duration_minutes: int = 60, ) -> Tuple[str, str, List[Dict]]: """ AI에 전달할 지역 컨텍스트 빌드 Returns: (area_context_text, distance_table_text, filtered_spots) """ self._ensure_loaded() # 1. 스팟 필터링 (단계적 필터 완화 내장) spots = self.filter_spots(lat, lng, radius_km, theme, activity_level, mood) if not spots: return ("후보 스팟이 없습니다.", "", []) # 2. 지역 컨텍스트 텍스트 조합 zone_id = self.get_zone_for_location(lat, lng) zone_info = self._network.get("zones", {}).get(zone_id, {}) lines = [] lines.append(f"# 지역: {zone_info.get('name', zone_id)} ({zone_info.get('description', '')})") lines.append(f"# 반경 {radius_km}km 내 후보 스팟 {len(spots)}개") lines.append("") # 스팟 목록 (압축 형식) lines.append("## 스팟 목록") lines.append("ID|이름|카테고리|좌표|우선순위|스토리요약") for s in spots: cat_kr = CATEGORY_KR.get(s["category"], s["category"]) story = s.get("story", {}) content = story.get("content", "") # 스토리 요약 (80자) if content and "카테고리에 속합니다" not in content: summary = content[:80].replace("\n", " ") else: summary = f"{s['name']} - {cat_kr}" line = ( f"{s['id']}|{s['name']}|{cat_kr}|" f"{s['location']['lat']:.4f},{s['location']['lng']:.4f}|" f"p{s.get('priority_score', 5)}|{summary}" ) lines.append(line) area_context = "\n".join(lines) # 3. 거리표 빌드 (OSRM 사용) distance_lines = await self._build_distance_table(spots, duration_minutes) return (area_context, distance_lines, spots) async def _build_distance_table(self, spots: List[Dict], duration_minutes: int) -> str: """스팟 간 도보 거리표 생성 (OSRM Table API, 폴백: Haversine × 1.3)""" if len(spots) <= 1: return "" max_walkable_km = (duration_minutes / 60) * WALKING_SPEED_KMH # OSRM Table API로 실제 도보 거리+시간 계산 spot_distances, _, spot_durations, _ = await get_walking_distances(spots) lines = [] max_dist = 0 pairs = [] for (id_a, id_b), dist in spot_distances.items(): walk_min = spot_durations.get((id_a, id_b), max(1, round(dist / WALKING_SPEED_KMH * 60))) if dist <= max_walkable_km * 1.5: # 도보 가능 범위 내만 pairs.append((id_a, id_b, dist, walk_min)) max_dist = max(max_dist, dist) # 거리순 정렬, 상위 100개만 pairs.sort(key=lambda x: x[2]) pairs = pairs[:100] lines.append("## 도보 거리표 (OSRM 실측, 제주 보정 적용)") for a, b, dist, walk_min in pairs: name_a = self._spots_by_id.get(a, {}).get("name", a) name_b = self._spots_by_id.get(b, {}).get("name", b) lines.append(f"{a}({name_a}) → {b}({name_b}): {dist:.2f}km, {walk_min}분") # 클러스터 메모 if max_dist > 0 and max_dist < max_walkable_km * 0.5: lines.append("") lines.append(f"⚠️ 모든 스팟이 {max_dist:.1f}km 이내에 밀집되어 있습니다.") lines.append("→ 이동 시간이 희망 시간보다 짧을 수 있으며, 이는 정상입니다.") return "\n".join(lines) def build_spot_details_for_stories(self, spot_ids: List[str]) -> str: """스토리 생성을 위한 스팟 상세 정보""" self._ensure_loaded() lines = [] for i, sid in enumerate(spot_ids): spot = self._spots_by_id.get(sid) if not spot: continue story = spot.get("story", {}) original = story.get("content", "") source = story.get("source", "") lines.append(f"### 스팟 {i+1}: {spot['name']} ({sid})") lines.append(f"- 카테고리: {CATEGORY_KR.get(spot['category'], spot['category'])}") lines.append(f"- 위치: {spot['location'].get('address', '')}") if original and "카테고리에 속합니다" not in original: lines.append(f"- 기존 스토리: {original}") if source and source != "kakao": lines.append(f"- 출처: {source}") lines.append("") return "\n".join(lines) def get_spots_count(self) -> int: """로드된 스팟 수""" self._ensure_loaded() return len(self._spots) # Singleton instance _builder: Optional[ContextBuilder] = None def get_context_builder() -> ContextBuilder: """싱글톤 ContextBuilder 인스턴스 반환""" global _builder if _builder is None: _builder = ContextBuilder() return _builder