""" Trend monitor: current occasions (AI) + niche news (GNews) for ad trending context. """ from gnews import GNews from typing import List, Dict, Any from datetime import datetime, timedelta import asyncio from services.current_occasions import get_current_occasions NICHE_KEYWORDS = { "home_insurance": ["home insurance", "homeowners insurance", "property insurance", "natural disaster", "insurance rates"], "glp1": ["GLP-1", "Ozempic", "Wegovy", "weight loss", "Mounjaro", "Zepbound"], "auto_insurance": ["auto insurance", "car insurance", "vehicle insurance", "insurance premiums", "driver discounts"], } TREND_CACHE: Dict[str, tuple] = {} CACHE_DURATION = timedelta(hours=1) TRIGGER_WORDS = [ (["crisis", "warning", "danger", "risk", "threat", "disaster"], "Fear"), (["shortage", "limited", "running out", "exclusive", "sold out"], "FOMO"), (["breakthrough", "solution", "cure", "relief", "success"], "Hope"), (["save", "discount", "cheaper", "affordable", "deal"], "Greed"), (["new", "innovation", "discover", "reveal", "secret"], "Curiosity"), (["urgent", "now", "immediate", "breaking"], "Urgency"), ] class TrendMonitor: def __init__(self, language: str = "en", country: str = "US"): self.google_news = GNews(language=language, country=country) self.google_news.period = "7d" self.google_news.max_results = 10 def _normalize_niche(self, niche: str) -> str: key = niche.lower().strip().replace(" ", "_") return key if key in NICHE_KEYWORDS else niche async def fetch_trends(self, niche: str) -> List[Dict]: cache_key = f"trends_{niche}" if cache_key in TREND_CACHE: data, cached_at = TREND_CACHE[cache_key] if datetime.now() - cached_at < CACHE_DURATION: return data trends = await self._fetch_news(niche) TREND_CACHE[cache_key] = (trends, datetime.now()) return trends async def _fetch_news(self, niche: str) -> List[Dict]: if niche not in NICHE_KEYWORDS: raise ValueError(f"Unsupported niche: {niche}. Use: {list(NICHE_KEYWORDS.keys())}") keywords = NICHE_KEYWORDS[niche][:3] all_articles = [] loop = asyncio.get_event_loop() for kw in keywords: try: articles = await loop.run_in_executor( None, lambda k=kw: self.google_news.get_news(k) ) for a in articles: a["keyword"] = kw a["niche"] = niche all_articles.append(a) except Exception as e: print(f"⚠️ News fetch failed for '{kw}': {e}") if not all_articles: return [] scored = self._score_articles(all_articles, niche) return scored[:5] def _score_articles(self, articles: List[Dict], niche: str) -> List[Dict]: keywords = NICHE_KEYWORDS[niche] for a in articles: text = f"{a.get('title', '')} {a.get('description', '')}".lower() score = sum(2 for k in keywords if k.lower() in text) pub = a.get("published date") if pub: try: from email.utils import parsedate_to_datetime dt = parsedate_to_datetime(str(pub)) if not isinstance(pub, datetime) else pub days = (datetime.now() - dt).days score += 5 if days <= 1 else 3 if days <= 3 else 1 except Exception: pass for word in ["crisis", "warning", "breakthrough", "new", "urgent", "breaking", "major"]: if word in text: score += 1 a["relevance_score"] = score return sorted(articles, key=lambda x: x.get("relevance_score", 0), reverse=True) def _trigger(self, title: str, description: str) -> str: text = f"{title} {description}".lower() for words, trigger in TRIGGER_WORDS: if any(w in text for w in words): return trigger return "Emotion" async def get_relevant_trends_for_niche(self, niche: str) -> Dict[str, Any]: relevant = [] for occ in await get_current_occasions(): relevant.append({"title": occ["title"], "summary": occ["summary"], "category": occ.get("category", "Occasion")}) niche_key = self._normalize_niche(niche) if niche_key in NICHE_KEYWORDS: try: for t in (await self.fetch_trends(niche_key))[:3]: desc = t.get("description", "") or t.get("title", "") relevant.append({ "title": t.get("title", ""), "summary": desc, "category": t.get("niche", "News").replace("_", " ").title(), }) except Exception as e: print(f"⚠️ News trends skipped: {e}") return {"relevant_trends": relevant} async def get_trending_angles(self, niche: str) -> List[Dict]: trends = await self.fetch_trends(niche) angles = [] for t in trends[:3]: title = t.get("title", "") angles.append({ "key": f"trend_{abs(hash(title)) % 10000}", "name": f"Trending: {title[:40]}...", "trigger": self._trigger(title, t.get("description", "")), "example": (t.get("description") or title)[:100], "category": "Trending", "source": "google_news", "url": t.get("url"), "expires": (datetime.now() + timedelta(days=7)).isoformat(), "relevance_score": t.get("relevance_score", 0), }) return angles def clear_cache(self) -> None: global TREND_CACHE TREND_CACHE = {} trend_monitor = TrendMonitor()