| """ | |
| Trend monitor: current occasions (AI) + niche news (GNews) for ad trending context. | |
| """ | |
| from gnews import GNews | |
| from typing import List, Dict, Any | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| from services.current_occasions import get_current_occasions | |
| NICHE_KEYWORDS = { | |
| "home_insurance": ["home insurance", "homeowners insurance", "property insurance", "natural disaster", "insurance rates"], | |
| "glp1": ["GLP-1", "Ozempic", "Wegovy", "weight loss", "Mounjaro", "Zepbound"], | |
| "auto_insurance": ["auto insurance", "car insurance", "vehicle insurance", "insurance premiums", "driver discounts"], | |
| } | |
| TREND_CACHE: Dict[str, tuple] = {} | |
| CACHE_DURATION = timedelta(hours=1) | |
| TRIGGER_WORDS = [ | |
| (["crisis", "warning", "danger", "risk", "threat", "disaster"], "Fear"), | |
| (["shortage", "limited", "running out", "exclusive", "sold out"], "FOMO"), | |
| (["breakthrough", "solution", "cure", "relief", "success"], "Hope"), | |
| (["save", "discount", "cheaper", "affordable", "deal"], "Greed"), | |
| (["new", "innovation", "discover", "reveal", "secret"], "Curiosity"), | |
| (["urgent", "now", "immediate", "breaking"], "Urgency"), | |
| ] | |
| class TrendMonitor: | |
| def __init__(self, language: str = "en", country: str = "US"): | |
| self.google_news = GNews(language=language, country=country) | |
| self.google_news.period = "7d" | |
| self.google_news.max_results = 10 | |
| def _normalize_niche(self, niche: str) -> str: | |
| key = niche.lower().strip().replace(" ", "_") | |
| return key if key in NICHE_KEYWORDS else niche | |
| async def fetch_trends(self, niche: str) -> List[Dict]: | |
| cache_key = f"trends_{niche}" | |
| if cache_key in TREND_CACHE: | |
| data, cached_at = TREND_CACHE[cache_key] | |
| if datetime.now() - cached_at < CACHE_DURATION: | |
| return data | |
| trends = await self._fetch_news(niche) | |
| TREND_CACHE[cache_key] = (trends, datetime.now()) | |
| return trends | |
| async def _fetch_news(self, niche: str) -> List[Dict]: | |
| if niche not in NICHE_KEYWORDS: | |
| raise ValueError(f"Unsupported niche: {niche}. Use: {list(NICHE_KEYWORDS.keys())}") | |
| keywords = NICHE_KEYWORDS[niche][:3] | |
| all_articles = [] | |
| loop = asyncio.get_event_loop() | |
| for kw in keywords: | |
| try: | |
| articles = await loop.run_in_executor( | |
| None, lambda k=kw: self.google_news.get_news(k) | |
| ) | |
| for a in articles: | |
| a["keyword"] = kw | |
| a["niche"] = niche | |
| all_articles.append(a) | |
| except Exception as e: | |
| print(f"⚠️ News fetch failed for '{kw}': {e}") | |
| if not all_articles: | |
| return [] | |
| scored = self._score_articles(all_articles, niche) | |
| return scored[:5] | |
| def _score_articles(self, articles: List[Dict], niche: str) -> List[Dict]: | |
| keywords = NICHE_KEYWORDS[niche] | |
| for a in articles: | |
| text = f"{a.get('title', '')} {a.get('description', '')}".lower() | |
| score = sum(2 for k in keywords if k.lower() in text) | |
| pub = a.get("published date") | |
| if pub: | |
| try: | |
| from email.utils import parsedate_to_datetime | |
| dt = parsedate_to_datetime(str(pub)) if not isinstance(pub, datetime) else pub | |
| days = (datetime.now() - dt).days | |
| score += 5 if days <= 1 else 3 if days <= 3 else 1 | |
| except Exception: | |
| pass | |
| for word in ["crisis", "warning", "breakthrough", "new", "urgent", "breaking", "major"]: | |
| if word in text: | |
| score += 1 | |
| a["relevance_score"] = score | |
| return sorted(articles, key=lambda x: x.get("relevance_score", 0), reverse=True) | |
| def _trigger(self, title: str, description: str) -> str: | |
| text = f"{title} {description}".lower() | |
| for words, trigger in TRIGGER_WORDS: | |
| if any(w in text for w in words): | |
| return trigger | |
| return "Emotion" | |
| async def get_relevant_trends_for_niche(self, niche: str) -> Dict[str, Any]: | |
| relevant = [] | |
| for occ in await get_current_occasions(): | |
| relevant.append({"title": occ["title"], "summary": occ["summary"], "category": occ.get("category", "Occasion")}) | |
| niche_key = self._normalize_niche(niche) | |
| if niche_key in NICHE_KEYWORDS: | |
| try: | |
| for t in (await self.fetch_trends(niche_key))[:3]: | |
| desc = t.get("description", "") or t.get("title", "") | |
| relevant.append({ | |
| "title": t.get("title", ""), | |
| "summary": desc, | |
| "category": t.get("niche", "News").replace("_", " ").title(), | |
| }) | |
| except Exception as e: | |
| print(f"⚠️ News trends skipped: {e}") | |
| return {"relevant_trends": relevant} | |
| async def get_trending_angles(self, niche: str) -> List[Dict]: | |
| trends = await self.fetch_trends(niche) | |
| angles = [] | |
| for t in trends[:3]: | |
| title = t.get("title", "") | |
| angles.append({ | |
| "key": f"trend_{abs(hash(title)) % 10000}", | |
| "name": f"Trending: {title[:40]}...", | |
| "trigger": self._trigger(title, t.get("description", "")), | |
| "example": (t.get("description") or title)[:100], | |
| "category": "Trending", | |
| "source": "google_news", | |
| "url": t.get("url"), | |
| "expires": (datetime.now() + timedelta(days=7)).isoformat(), | |
| "relevance_score": t.get("relevance_score", 0), | |
| }) | |
| return angles | |
| def clear_cache(self) -> None: | |
| global TREND_CACHE | |
| TREND_CACHE = {} | |
| trend_monitor = TrendMonitor() | |