""" RE:Play Trend Engine v3 — 주간 배치 오케스트레이터 순차 실행 파이프라인: 1. 카카오맵 그리드 스캔 + 리뷰 파싱 (trend_spots 마스터 생성) 2. SpotMatcher 초기화 (trend_spots + story_spots 사전 로드) 3. 유튜브 API (SpotMatcher 연동) 4. 인스타그램 인플루언서 모니터링 v5.0 (SpotMatcher 연동) 5. 네이버 블로그 수집 (URL 확보 + 크롤링 + DB 저장) 6. 블로그 본문 → 장소명 추출 + mention_count 집계 7. 종합 스코어 계산 + 랭킹 생성 Usage: python backend/scripts/run_trend_engine.py """ import asyncio import json import logging import os import re import sys import time from datetime import date, timedelta # backend/ 디렉토리를 import path에 추가 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) # 로컬 실행 시 .env 파일 로드 try: from dotenv import load_dotenv # 프로젝트 루트의 .env 파일 로드 env_path = os.path.join(os.path.dirname(__file__), "..", "..", ".env") load_dotenv(env_path) except ImportError: pass # GitHub Actions 등 dotenv 없는 환경 from supabase import create_client from trend_engine.collectors.naver_blog import NaverBlogCollector from trend_engine.collectors.kakaomap import KakaoMapCollector from trend_engine.collectors.youtube import YouTubeCollector from trend_engine.collectors.instagram import InstagramCollector from trend_engine.collectors.naver_datalab import NaverDataLabCollector from trend_engine.spot_matcher import SpotMatcher from trend_engine.spot_registrar import TrendSpotRegistrar from trend_engine.trend_scorer import generate_weekly_ranking from trend_engine.place_extractor import PlaceNameExtractor from trend_engine.utils import get_week_period, cleanup_old_trends, safe_upsert_spot_trend logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("trend_engine.orchestrator") def _get_supabase_client(): url = os.environ.get("SUPABASE_URL") or os.environ.get("VITE_SUPABASE_URL") key = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") if not url or not key: raise ValueError("SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY must be set") return create_client(url, key) def run_step(name: str, func, results: dict): """단일 파이프라인 단계를 실행하고 결과를 기록한다. 반환값을 돌려준다.""" logger.info("━━━ [START] %s ━━━", name) start = time.time() try: result = func() elapsed = time.time() - start results[name] = { "status": "ok", "result": _summarize(result), "result_raw": result if isinstance(result, dict) else None, "elapsed_sec": round(elapsed, 1), } logger.info("✓ [DONE] %s — %.1f초", name, elapsed) return result except Exception as e: elapsed = time.time() - start results[name] = {"status": "error", "error": str(e), "elapsed_sec": round(elapsed, 1)} logger.error("✗ [FAIL] %s — %s (%.1f초)", name, e, elapsed) return None def run_async_step(name: str, coro, results: dict): """asyncio 코루틴을 실행하는 run_step 변형.""" def wrapper(): return asyncio.run(coro) return run_step(name, wrapper, results) def _summarize(result) -> str: """결과를 로그용 요약 문자열로 변환.""" if isinstance(result, dict): return json.dumps(result, ensure_ascii=False, default=str)[:200] return str(result)[:200] def main() -> None: total_start = time.time() results: dict = {} sb = _get_supabase_client() # ── 1. 카카오맵 그리드 스캔 (먼저 실행 → trend_spots 마스터 생성) ── kakao = KakaoMapCollector(sb) run_async_step("1_kakaomap", kakao.run(), results) # ── 2. SpotMatcher + TrendSpotRegistrar 초기화 ── matcher = SpotMatcher(sb) registrar = TrendSpotRegistrar(sb) extractor = PlaceNameExtractor(sb) logger.info( "SpotMatcher 준비 완료 — trend_spots %d건, story_spots %d건", len(matcher.trend_spots), len(matcher.story_spots), ) # ── 3. 유튜브 API (SpotMatcher + Registrar 연동) ── youtube = YouTubeCollector(sb, spot_matcher=matcher, spot_registrar=registrar) run_step("3_youtube", youtube.run, results) # ── 4. 인스타그램 인플루언서 모니터링 v5.1 Multimodal (SpotMatcher + Registrar 연동) ── logger.info("인스타그램: 인플루언서 모니터링 v5.1 (Multimodal AI)") instagram = InstagramCollector(sb, spot_matcher=matcher, spot_registrar=registrar) run_step("4_instagram_influencer", instagram.run, results) # ── 5. 네이버 플레이스 — 비활성 (Place ID 매칭 불가) ── logger.info("네이버 플레이스: 비활성 (Place ID 매칭 불가, 2026-02)") results["5_naver_place"] = { "status": "skipped", "reason": "Place ID matching unavailable", "elapsed_sec": 0, } # ── 5b. 네이버 DataLab 검색 트렌드 — 비활성 ── # trend_scorer.py CHANNEL_WEIGHTS에서 비활성화됨 (데이터 수집 안정화 필요) # 스코어링에 미사용되므로 수집도 스킵 → API 쿼터 절약 logger.info("네이버 DataLab: 비활성 (스코어링 미사용, API 쿼터 절약)") results["5b_naver_datalab"] = { "status": "skipped", "reason": "Disabled in CHANNEL_WEIGHTS — re-enable in trend_scorer.py first", "elapsed_sec": 0, } # ── 6. 네이버 블로그 수집 (URL + 병렬 크롤링 + 저장) ── blog = NaverBlogCollector(sb) run_step("6_naver_blog", blog.run, results) # ── 7. 블로그 본문 → 장소명 추출 + mention_count 집계 ── def extract_blog_places(): """블로그 포스트에서 장소명 추출 → mention_count 집계 → spot_trends 저장. S6: get_week_period()로 period 표준화. S8: upsert로 재실행 시 중복 방지. """ period_start, period_end = get_week_period() # spot_trends에서 naver_blog + __pending__ 레코드 조회 (페이지네이션) records = [] page_size = 1000 offset = 0 try: while True: batch = ( sb.table("spot_trends") .select("id, raw_data") .eq("source", "naver_blog") .eq("spot_id", "__pending__") .range(offset, offset + page_size - 1) .execute() ) rows = batch.data or [] records.extend(rows) if len(rows) < page_size: break offset += page_size except Exception as e: logger.warning("블로그 pending 레코드 조회 실패: %s", e) return {"error": str(e)} logger.info("블로그 pending 레코드: %d건 조회", len(records)) if not records: return {"pending_records": 0, "places_found": 0} # 장소별 언급 횟수 집계 place_mentions: dict[str, int] = {} for record in records: raw = record.get("raw_data", {}) content = raw.get("content_preview", "") title = raw.get("title", "") text = f"{title} {content}" text = re.sub(r"<[^>]+>", "", text) # HTML 태그 제거 places = extractor.extract(text) for place in places: matched_id = matcher.match(place["name"]) # 기존 DB 매칭 실패 → Registrar로 새 장소 등록 if not matched_id: matched_id = registrar.register(place["name"], "naver_blog") if matched_id: place_mentions[matched_id] = place_mentions.get(matched_id, 0) + 1 # 집계 결과를 spot_trends에 저장 (safe_upsert로 partial index 호환) saved = 0 for spot_id, count in place_mentions.items(): try: safe_upsert_spot_trend(sb, { "spot_id": spot_id, "source": "naver_blog", "metric_type": "mention_count", "metric_value": count, "period_start": period_start.isoformat(), "period_end": period_end.isoformat(), "raw_data": {"aggregated_from": "blog_post_extraction"}, }) saved += 1 except Exception as e: logger.warning("mention_count 저장 실패 (%s): %s", spot_id, e) return { "pending_records": len(records), "places_found": len(place_mentions), "mention_records_saved": saved, } run_step("7_blog_place_extraction", extract_blog_places, results) # ── 7a. SpotMatcher + Registrar 통계 출력 ── match_stats = matcher.log_stats() registrar_stats = registrar.log_stats() results["7a_match_stats"] = {"status": "ok", "result": str(match_stats), "elapsed_sec": 0} results["7a_registrar_stats"] = {"status": "ok", "result": str(registrar_stats), "elapsed_sec": 0} # ── 7b. __pending__ 블로그 원본 레코드 정리 (이슈 8) ── def cleanup_pending(): # 안전장치: 추출 성공 시에만 pending 삭제 (데이터 손실 방지) extraction_result = results.get("7_blog_place_extraction", {}) extraction_ok = extraction_result.get("status") == "ok" raw = extraction_result.get("result_raw") or {} places_found = raw.get("places_found", 0) if isinstance(raw, dict) else 0 pending_records = raw.get("pending_records", 0) if isinstance(raw, dict) else 0 if not extraction_ok: logger.warning( "__pending__ 정리 스킵: 장소명 추출 단계가 실패했으므로 원본 보존" ) return {"skipped": True, "reason": "extraction_failed"} if pending_records > 0 and places_found == 0: logger.warning( "__pending__ 정리 스킵: %d건의 pending 레코드에서 장소를 0건 추출 — 원본 보존", pending_records, ) return {"skipped": True, "reason": "zero_extraction", "pending_records": pending_records} try: result = ( sb.table("spot_trends") .delete() .eq("source", "naver_blog") .eq("metric_type", "blog_post") .eq("spot_id", "__pending__") .execute() ) deleted = len(result.data) if result.data else 0 logger.info("__pending__ 블로그 레코드 %d건 삭제 (추출 %d건 성공)", deleted, places_found) return {"deleted_pending": deleted, "places_found": places_found} except Exception as e: logger.warning("__pending__ 정리 실패: %s", e) return {"error": str(e)} run_step("7b_cleanup_pending", cleanup_pending, results) # ── 8. 종합 스코어 계산 + 랭킹 생성 (최소 2채널 성공 시) ── # 수집 채널 단계만 카운트 (1, 3, 4, 6) collection_steps = ["1_kakaomap", "3_youtube", "4_instagram_influencer", "6_naver_blog"] successful_channels = [s for s in collection_steps if results.get(s, {}).get("status") == "ok"] def calc_scores(): if len(successful_channels) < 2: logger.error( "수집 성공 채널 %d개 (최소 2개 필요). 스코어링 스킵: %s", len(successful_channels), successful_channels, ) return {"skipped": True, "reason": f"Only {len(successful_channels)} channels succeeded"} logger.info("스코어링 실행: %d개 채널 성공 — %s", len(successful_channels), successful_channels) return generate_weekly_ranking(sb) run_step("8_score_calculation", calc_scores, results) # ── 9. S6: 오래된 데이터 정리 (12주 이상) ── def retention_cleanup(): deleted = cleanup_old_trends(sb, retention_weeks=12) return {"deleted_records": deleted} run_step("9_retention_cleanup", retention_cleanup, results) # ── 9b. trend_spots → story_spots 주간 마이그레이션 (7일 이상 된 스팟) ── def migrate_trend_to_story(): """1주 이상 된 trend_spots를 story_spots로 이관한다. SpotMatcher로 기존 story_spots 중복 확인 후, 미중복 시 신규 생성. """ from datetime import datetime as dt, timedelta as td, timezone as tz cutoff = (dt.now(tz.utc) - td(days=7)).isoformat() try: resp = ( sb.table("trend_spots") .select("id, name, category, lat, lng, address") .lt("trending_since", cutoff) .eq("is_course_eligible", False) .limit(100) .execute() ) except Exception as e: logger.warning("trend_spots 마이그레이션 조회 실패: %s", e) return {"error": str(e)} candidates = resp.data or [] if not candidates: return {"candidates": 0, "promoted": 0} promoted = 0 for ts in candidates: name = ts.get("name", "") # SpotMatcher로 기존 story_spots 중복 확인 existing = matcher.match(name) if existing: # 이미 story_spots에 있으므로 is_course_eligible만 마킹 try: sb.table("trend_spots").update( {"is_course_eligible": True} ).eq("id", ts["id"]).execute() except Exception: pass continue # 신규 story_spots 엔트리 생성 story_id = f"promoted_{ts['id']}" try: sb.table("story_spots").upsert({ "id": story_id, "name": name, "category": ts.get("category", "etc"), "lat": ts.get("lat"), "lng": ts.get("lng"), "address": ts.get("address", ""), "status": "active", "priority_score": 5, }, on_conflict="id").execute() sb.table("trend_spots").update( {"is_course_eligible": True} ).eq("id", ts["id"]).execute() promoted += 1 except Exception as e: logger.warning("story_spots 프로모션 실패 (%s): %s", name, e) logger.info("trend→story 마이그레이션: %d건 후보, %d건 프로모션", len(candidates), promoted) return {"candidates": len(candidates), "promoted": promoted} run_step("9b_trend_migration", migrate_trend_to_story, results) # ── 결과 요약 ── total_elapsed = time.time() - total_start ok_count = sum(1 for r in results.values() if r.get("status") == "ok") err_count = sum(1 for r in results.values() if r.get("status") == "error") skip_count = sum(1 for r in results.values() if r.get("status") == "skipped") summary = { "total_steps": len(results), "succeeded": ok_count, "failed": err_count, "skipped": skip_count, "total_elapsed_sec": round(total_elapsed, 1), "steps": { k: {"status": v.get("status"), "elapsed_sec": v.get("elapsed_sec", 0)} for k, v in results.items() }, } logger.info("━━━ TREND ENGINE COMPLETE ━━━") logger.info( "성공: %d / 실패: %d / 스킵: %d / 총 소요: %.1f초", ok_count, err_count, skip_count, total_elapsed, ) # JSON 요약 출력 (GitHub Actions 로그용) print(json.dumps(summary, ensure_ascii=False, indent=2)) # 전체 실패 시에만 비정상 종료 if ok_count == 0: logger.error("모든 단계가 실패했습니다.") sys.exit(1) if __name__ == "__main__": main()