samchun-gemini / scripts /run_trend_engine.py
JHyeok5's picture
Upload folder using huggingface_hub
f51145f verified
"""
RE:Play Trend Engine v3 โ€” ์ฃผ๊ฐ„ ๋ฐฐ์น˜ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ดํ„ฐ
์ˆœ์ฐจ ์‹คํ–‰ ํŒŒ์ดํ”„๋ผ์ธ:
1. ์นด์นด์˜ค๋งต ๊ทธ๋ฆฌ๋“œ ์Šค์บ” + ๋ฆฌ๋ทฐ ํŒŒ์‹ฑ (trend_spots ๋งˆ์Šคํ„ฐ ์ƒ์„ฑ)
2. SpotMatcher ์ดˆ๊ธฐํ™” (trend_spots + story_spots ์‚ฌ์ „ ๋กœ๋“œ)
3. ์œ ํŠœ๋ธŒ API (SpotMatcher ์—ฐ๋™)
4. ์ธ์Šคํƒ€๊ทธ๋žจ ์ธํ”Œ๋ฃจ์–ธ์„œ ๋ชจ๋‹ˆํ„ฐ๋ง v5.0 (SpotMatcher ์—ฐ๋™)
5. ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ์ˆ˜์ง‘ (URL ํ™•๋ณด + ํฌ๋กค๋ง + DB ์ €์žฅ)
6. ๋ธ”๋กœ๊ทธ ๋ณธ๋ฌธ โ†’ ์žฅ์†Œ๋ช… ์ถ”์ถœ + mention_count ์ง‘๊ณ„
7. ์ข…ํ•ฉ ์Šค์ฝ”์–ด ๊ณ„์‚ฐ + ๋žญํ‚น ์ƒ์„ฑ
Usage:
python backend/scripts/run_trend_engine.py
"""
import asyncio
import json
import logging
import os
import re
import sys
import time
from datetime import date, timedelta
# backend/ ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ import path์— ์ถ”๊ฐ€
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# ๋กœ์ปฌ ์‹คํ–‰ ์‹œ .env ํŒŒ์ผ ๋กœ๋“œ
try:
from dotenv import load_dotenv
# ํ”„๋กœ์ ํŠธ ๋ฃจํŠธ์˜ .env ํŒŒ์ผ ๋กœ๋“œ
env_path = os.path.join(os.path.dirname(__file__), "..", "..", ".env")
load_dotenv(env_path)
except ImportError:
pass # GitHub Actions ๋“ฑ dotenv ์—†๋Š” ํ™˜๊ฒฝ
from supabase import create_client
from trend_engine.collectors.naver_blog import NaverBlogCollector
from trend_engine.collectors.kakaomap import KakaoMapCollector
from trend_engine.collectors.youtube import YouTubeCollector
from trend_engine.collectors.instagram import InstagramCollector
from trend_engine.collectors.naver_datalab import NaverDataLabCollector
from trend_engine.spot_matcher import SpotMatcher
from trend_engine.spot_registrar import TrendSpotRegistrar
from trend_engine.trend_scorer import generate_weekly_ranking
from trend_engine.place_extractor import PlaceNameExtractor
from trend_engine.utils import get_week_period, cleanup_old_trends, safe_upsert_spot_trend
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("trend_engine.orchestrator")
def _get_supabase_client():
url = os.environ.get("SUPABASE_URL") or os.environ.get("VITE_SUPABASE_URL")
key = os.environ.get("SUPABASE_SERVICE_ROLE_KEY")
if not url or not key:
raise ValueError("SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY must be set")
return create_client(url, key)
def run_step(name: str, func, results: dict):
"""๋‹จ์ผ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋กํ•œ๋‹ค. ๋ฐ˜ํ™˜๊ฐ’์„ ๋Œ๋ ค์ค€๋‹ค."""
logger.info("โ”โ”โ” [START] %s โ”โ”โ”", name)
start = time.time()
try:
result = func()
elapsed = time.time() - start
results[name] = {
"status": "ok",
"result": _summarize(result),
"result_raw": result if isinstance(result, dict) else None,
"elapsed_sec": round(elapsed, 1),
}
logger.info("โœ“ [DONE] %s โ€” %.1f์ดˆ", name, elapsed)
return result
except Exception as e:
elapsed = time.time() - start
results[name] = {"status": "error", "error": str(e), "elapsed_sec": round(elapsed, 1)}
logger.error("โœ— [FAIL] %s โ€” %s (%.1f์ดˆ)", name, e, elapsed)
return None
def run_async_step(name: str, coro, results: dict):
"""asyncio ์ฝ”๋ฃจํ‹ด์„ ์‹คํ–‰ํ•˜๋Š” run_step ๋ณ€ํ˜•."""
def wrapper():
return asyncio.run(coro)
return run_step(name, wrapper, results)
def _summarize(result) -> str:
"""๊ฒฐ๊ณผ๋ฅผ ๋กœ๊ทธ์šฉ ์š”์•ฝ ๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜."""
if isinstance(result, dict):
return json.dumps(result, ensure_ascii=False, default=str)[:200]
return str(result)[:200]
def main() -> None:
total_start = time.time()
results: dict = {}
sb = _get_supabase_client()
# โ”€โ”€ 1. ์นด์นด์˜ค๋งต ๊ทธ๋ฆฌ๋“œ ์Šค์บ” (๋จผ์ € ์‹คํ–‰ โ†’ trend_spots ๋งˆ์Šคํ„ฐ ์ƒ์„ฑ) โ”€โ”€
kakao = KakaoMapCollector(sb)
run_async_step("1_kakaomap", kakao.run(), results)
# โ”€โ”€ 2. SpotMatcher + TrendSpotRegistrar ์ดˆ๊ธฐํ™” โ”€โ”€
matcher = SpotMatcher(sb)
registrar = TrendSpotRegistrar(sb)
extractor = PlaceNameExtractor(sb)
logger.info(
"SpotMatcher ์ค€๋น„ ์™„๋ฃŒ โ€” trend_spots %d๊ฑด, story_spots %d๊ฑด",
len(matcher.trend_spots),
len(matcher.story_spots),
)
# โ”€โ”€ 3. ์œ ํŠœ๋ธŒ API (SpotMatcher + Registrar ์—ฐ๋™) โ”€โ”€
youtube = YouTubeCollector(sb, spot_matcher=matcher, spot_registrar=registrar)
run_step("3_youtube", youtube.run, results)
# โ”€โ”€ 4. ์ธ์Šคํƒ€๊ทธ๋žจ ์ธํ”Œ๋ฃจ์–ธ์„œ ๋ชจ๋‹ˆํ„ฐ๋ง v5.1 Multimodal (SpotMatcher + Registrar ์—ฐ๋™) โ”€โ”€
logger.info("์ธ์Šคํƒ€๊ทธ๋žจ: ์ธํ”Œ๋ฃจ์–ธ์„œ ๋ชจ๋‹ˆํ„ฐ๋ง v5.1 (Multimodal AI)")
instagram = InstagramCollector(sb, spot_matcher=matcher, spot_registrar=registrar)
run_step("4_instagram_influencer", instagram.run, results)
# โ”€โ”€ 5. ๋„ค์ด๋ฒ„ ํ”Œ๋ ˆ์ด์Šค โ€” ๋น„ํ™œ์„ฑ (Place ID ๋งค์นญ ๋ถˆ๊ฐ€) โ”€โ”€
logger.info("๋„ค์ด๋ฒ„ ํ”Œ๋ ˆ์ด์Šค: ๋น„ํ™œ์„ฑ (Place ID ๋งค์นญ ๋ถˆ๊ฐ€, 2026-02)")
results["5_naver_place"] = {
"status": "skipped",
"reason": "Place ID matching unavailable",
"elapsed_sec": 0,
}
# โ”€โ”€ 5b. ๋„ค์ด๋ฒ„ DataLab ๊ฒ€์ƒ‰ ํŠธ๋ Œ๋“œ โ€” ๋น„ํ™œ์„ฑ โ”€โ”€
# trend_scorer.py CHANNEL_WEIGHTS์—์„œ ๋น„ํ™œ์„ฑํ™”๋จ (๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ ์•ˆ์ •ํ™” ํ•„์š”)
# ์Šค์ฝ”์–ด๋ง์— ๋ฏธ์‚ฌ์šฉ๋˜๋ฏ€๋กœ ์ˆ˜์ง‘๋„ ์Šคํ‚ต โ†’ API ์ฟผํ„ฐ ์ ˆ์•ฝ
logger.info("๋„ค์ด๋ฒ„ DataLab: ๋น„ํ™œ์„ฑ (์Šค์ฝ”์–ด๋ง ๋ฏธ์‚ฌ์šฉ, API ์ฟผํ„ฐ ์ ˆ์•ฝ)")
results["5b_naver_datalab"] = {
"status": "skipped",
"reason": "Disabled in CHANNEL_WEIGHTS โ€” re-enable in trend_scorer.py first",
"elapsed_sec": 0,
}
# โ”€โ”€ 6. ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ์ˆ˜์ง‘ (URL + ๋ณ‘๋ ฌ ํฌ๋กค๋ง + ์ €์žฅ) โ”€โ”€
blog = NaverBlogCollector(sb)
run_step("6_naver_blog", blog.run, results)
# โ”€โ”€ 7. ๋ธ”๋กœ๊ทธ ๋ณธ๋ฌธ โ†’ ์žฅ์†Œ๋ช… ์ถ”์ถœ + mention_count ์ง‘๊ณ„ โ”€โ”€
def extract_blog_places():
"""๋ธ”๋กœ๊ทธ ํฌ์ŠคํŠธ์—์„œ ์žฅ์†Œ๋ช… ์ถ”์ถœ โ†’ mention_count ์ง‘๊ณ„ โ†’ spot_trends ์ €์žฅ.
S6: get_week_period()๋กœ period ํ‘œ์ค€ํ™”.
S8: upsert๋กœ ์žฌ์‹คํ–‰ ์‹œ ์ค‘๋ณต ๋ฐฉ์ง€.
"""
period_start, period_end = get_week_period()
# spot_trends์—์„œ naver_blog + __pending__ ๋ ˆ์ฝ”๋“œ ์กฐํšŒ (ํŽ˜์ด์ง€๋„ค์ด์…˜)
records = []
page_size = 1000
offset = 0
try:
while True:
batch = (
sb.table("spot_trends")
.select("id, raw_data")
.eq("source", "naver_blog")
.eq("spot_id", "__pending__")
.range(offset, offset + page_size - 1)
.execute()
)
rows = batch.data or []
records.extend(rows)
if len(rows) < page_size:
break
offset += page_size
except Exception as e:
logger.warning("๋ธ”๋กœ๊ทธ pending ๋ ˆ์ฝ”๋“œ ์กฐํšŒ ์‹คํŒจ: %s", e)
return {"error": str(e)}
logger.info("๋ธ”๋กœ๊ทธ pending ๋ ˆ์ฝ”๋“œ: %d๊ฑด ์กฐํšŒ", len(records))
if not records:
return {"pending_records": 0, "places_found": 0}
# ์žฅ์†Œ๋ณ„ ์–ธ๊ธ‰ ํšŸ์ˆ˜ ์ง‘๊ณ„
place_mentions: dict[str, int] = {}
for record in records:
raw = record.get("raw_data", {})
content = raw.get("content_preview", "")
title = raw.get("title", "")
text = f"{title} {content}"
text = re.sub(r"<[^>]+>", "", text) # HTML ํƒœ๊ทธ ์ œ๊ฑฐ
places = extractor.extract(text)
for place in places:
matched_id = matcher.match(place["name"])
# ๊ธฐ์กด DB ๋งค์นญ ์‹คํŒจ โ†’ Registrar๋กœ ์ƒˆ ์žฅ์†Œ ๋“ฑ๋ก
if not matched_id:
matched_id = registrar.register(place["name"], "naver_blog")
if matched_id:
place_mentions[matched_id] = place_mentions.get(matched_id, 0) + 1
# ์ง‘๊ณ„ ๊ฒฐ๊ณผ๋ฅผ spot_trends์— ์ €์žฅ (safe_upsert๋กœ partial index ํ˜ธํ™˜)
saved = 0
for spot_id, count in place_mentions.items():
try:
safe_upsert_spot_trend(sb, {
"spot_id": spot_id,
"source": "naver_blog",
"metric_type": "mention_count",
"metric_value": count,
"period_start": period_start.isoformat(),
"period_end": period_end.isoformat(),
"raw_data": {"aggregated_from": "blog_post_extraction"},
})
saved += 1
except Exception as e:
logger.warning("mention_count ์ €์žฅ ์‹คํŒจ (%s): %s", spot_id, e)
return {
"pending_records": len(records),
"places_found": len(place_mentions),
"mention_records_saved": saved,
}
run_step("7_blog_place_extraction", extract_blog_places, results)
# โ”€โ”€ 7a. SpotMatcher + Registrar ํ†ต๊ณ„ ์ถœ๋ ฅ โ”€โ”€
match_stats = matcher.log_stats()
registrar_stats = registrar.log_stats()
results["7a_match_stats"] = {"status": "ok", "result": str(match_stats), "elapsed_sec": 0}
results["7a_registrar_stats"] = {"status": "ok", "result": str(registrar_stats), "elapsed_sec": 0}
# โ”€โ”€ 7b. __pending__ ๋ธ”๋กœ๊ทธ ์›๋ณธ ๋ ˆ์ฝ”๋“œ ์ •๋ฆฌ (์ด์Šˆ 8) โ”€โ”€
def cleanup_pending():
# ์•ˆ์ „์žฅ์น˜: ์ถ”์ถœ ์„ฑ๊ณต ์‹œ์—๋งŒ pending ์‚ญ์ œ (๋ฐ์ดํ„ฐ ์†์‹ค ๋ฐฉ์ง€)
extraction_result = results.get("7_blog_place_extraction", {})
extraction_ok = extraction_result.get("status") == "ok"
raw = extraction_result.get("result_raw") or {}
places_found = raw.get("places_found", 0) if isinstance(raw, dict) else 0
pending_records = raw.get("pending_records", 0) if isinstance(raw, dict) else 0
if not extraction_ok:
logger.warning(
"__pending__ ์ •๋ฆฌ ์Šคํ‚ต: ์žฅ์†Œ๋ช… ์ถ”์ถœ ๋‹จ๊ณ„๊ฐ€ ์‹คํŒจํ–ˆ์œผ๋ฏ€๋กœ ์›๋ณธ ๋ณด์กด"
)
return {"skipped": True, "reason": "extraction_failed"}
if pending_records > 0 and places_found == 0:
logger.warning(
"__pending__ ์ •๋ฆฌ ์Šคํ‚ต: %d๊ฑด์˜ pending ๋ ˆ์ฝ”๋“œ์—์„œ ์žฅ์†Œ๋ฅผ 0๊ฑด ์ถ”์ถœ โ€” ์›๋ณธ ๋ณด์กด",
pending_records,
)
return {"skipped": True, "reason": "zero_extraction", "pending_records": pending_records}
try:
result = (
sb.table("spot_trends")
.delete()
.eq("source", "naver_blog")
.eq("metric_type", "blog_post")
.eq("spot_id", "__pending__")
.execute()
)
deleted = len(result.data) if result.data else 0
logger.info("__pending__ ๋ธ”๋กœ๊ทธ ๋ ˆ์ฝ”๋“œ %d๊ฑด ์‚ญ์ œ (์ถ”์ถœ %d๊ฑด ์„ฑ๊ณต)", deleted, places_found)
return {"deleted_pending": deleted, "places_found": places_found}
except Exception as e:
logger.warning("__pending__ ์ •๋ฆฌ ์‹คํŒจ: %s", e)
return {"error": str(e)}
run_step("7b_cleanup_pending", cleanup_pending, results)
# โ”€โ”€ 8. ์ข…ํ•ฉ ์Šค์ฝ”์–ด ๊ณ„์‚ฐ + ๋žญํ‚น ์ƒ์„ฑ (์ตœ์†Œ 2์ฑ„๋„ ์„ฑ๊ณต ์‹œ) โ”€โ”€
# ์ˆ˜์ง‘ ์ฑ„๋„ ๋‹จ๊ณ„๋งŒ ์นด์šดํŠธ (1, 3, 4, 6)
collection_steps = ["1_kakaomap", "3_youtube", "4_instagram_influencer", "6_naver_blog"]
successful_channels = [s for s in collection_steps if results.get(s, {}).get("status") == "ok"]
def calc_scores():
if len(successful_channels) < 2:
logger.error(
"์ˆ˜์ง‘ ์„ฑ๊ณต ์ฑ„๋„ %d๊ฐœ (์ตœ์†Œ 2๊ฐœ ํ•„์š”). ์Šค์ฝ”์–ด๋ง ์Šคํ‚ต: %s",
len(successful_channels), successful_channels,
)
return {"skipped": True, "reason": f"Only {len(successful_channels)} channels succeeded"}
logger.info("์Šค์ฝ”์–ด๋ง ์‹คํ–‰: %d๊ฐœ ์ฑ„๋„ ์„ฑ๊ณต โ€” %s", len(successful_channels), successful_channels)
return generate_weekly_ranking(sb)
run_step("8_score_calculation", calc_scores, results)
# โ”€โ”€ 9. S6: ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ ์ •๋ฆฌ (12์ฃผ ์ด์ƒ) โ”€โ”€
def retention_cleanup():
deleted = cleanup_old_trends(sb, retention_weeks=12)
return {"deleted_records": deleted}
run_step("9_retention_cleanup", retention_cleanup, results)
# โ”€โ”€ 9b. trend_spots โ†’ story_spots ์ฃผ๊ฐ„ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ (7์ผ ์ด์ƒ ๋œ ์ŠคํŒŸ) โ”€โ”€
def migrate_trend_to_story():
"""1์ฃผ ์ด์ƒ ๋œ trend_spots๋ฅผ story_spots๋กœ ์ด๊ด€ํ•œ๋‹ค.
SpotMatcher๋กœ ๊ธฐ์กด story_spots ์ค‘๋ณต ํ™•์ธ ํ›„, ๋ฏธ์ค‘๋ณต ์‹œ ์‹ ๊ทœ ์ƒ์„ฑ.
"""
from datetime import datetime as dt, timedelta as td, timezone as tz
cutoff = (dt.now(tz.utc) - td(days=7)).isoformat()
try:
resp = (
sb.table("trend_spots")
.select("id, name, category, lat, lng, address")
.lt("trending_since", cutoff)
.eq("is_course_eligible", False)
.limit(100)
.execute()
)
except Exception as e:
logger.warning("trend_spots ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์กฐํšŒ ์‹คํŒจ: %s", e)
return {"error": str(e)}
candidates = resp.data or []
if not candidates:
return {"candidates": 0, "promoted": 0}
promoted = 0
for ts in candidates:
name = ts.get("name", "")
# SpotMatcher๋กœ ๊ธฐ์กด story_spots ์ค‘๋ณต ํ™•์ธ
existing = matcher.match(name)
if existing:
# ์ด๋ฏธ story_spots์— ์žˆ์œผ๋ฏ€๋กœ is_course_eligible๋งŒ ๋งˆํ‚น
try:
sb.table("trend_spots").update(
{"is_course_eligible": True}
).eq("id", ts["id"]).execute()
except Exception:
pass
continue
# ์‹ ๊ทœ story_spots ์—”ํŠธ๋ฆฌ ์ƒ์„ฑ
story_id = f"promoted_{ts['id']}"
try:
sb.table("story_spots").upsert({
"id": story_id,
"name": name,
"category": ts.get("category", "etc"),
"lat": ts.get("lat"),
"lng": ts.get("lng"),
"address": ts.get("address", ""),
"status": "active",
"priority_score": 5,
}, on_conflict="id").execute()
sb.table("trend_spots").update(
{"is_course_eligible": True}
).eq("id", ts["id"]).execute()
promoted += 1
except Exception as e:
logger.warning("story_spots ํ”„๋กœ๋ชจ์…˜ ์‹คํŒจ (%s): %s", name, e)
logger.info("trendโ†’story ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜: %d๊ฑด ํ›„๋ณด, %d๊ฑด ํ”„๋กœ๋ชจ์…˜", len(candidates), promoted)
return {"candidates": len(candidates), "promoted": promoted}
run_step("9b_trend_migration", migrate_trend_to_story, results)
# โ”€โ”€ ๊ฒฐ๊ณผ ์š”์•ฝ โ”€โ”€
total_elapsed = time.time() - total_start
ok_count = sum(1 for r in results.values() if r.get("status") == "ok")
err_count = sum(1 for r in results.values() if r.get("status") == "error")
skip_count = sum(1 for r in results.values() if r.get("status") == "skipped")
summary = {
"total_steps": len(results),
"succeeded": ok_count,
"failed": err_count,
"skipped": skip_count,
"total_elapsed_sec": round(total_elapsed, 1),
"steps": {
k: {"status": v.get("status"), "elapsed_sec": v.get("elapsed_sec", 0)}
for k, v in results.items()
},
}
logger.info("โ”โ”โ” TREND ENGINE COMPLETE โ”โ”โ”")
logger.info(
"์„ฑ๊ณต: %d / ์‹คํŒจ: %d / ์Šคํ‚ต: %d / ์ด ์†Œ์š”: %.1f์ดˆ",
ok_count, err_count, skip_count, total_elapsed,
)
# JSON ์š”์•ฝ ์ถœ๋ ฅ (GitHub Actions ๋กœ๊ทธ์šฉ)
print(json.dumps(summary, ensure_ascii=False, indent=2))
# ์ „์ฒด ์‹คํŒจ ์‹œ์—๋งŒ ๋น„์ •์ƒ ์ข…๋ฃŒ
if ok_count == 0:
logger.error("๋ชจ๋“  ๋‹จ๊ณ„๊ฐ€ ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.")
sys.exit(1)
if __name__ == "__main__":
main()