Creative_Breakthrough / services /trend_monitor.py
sushilideaclan01's picture
refactored the files
d4a4da7
"""
Trend monitor: current occasions (AI) + niche news (GNews) for ad trending context.
"""
from gnews import GNews
from typing import List, Dict, Any
from datetime import datetime, timedelta
import asyncio
from services.current_occasions import get_current_occasions
NICHE_KEYWORDS = {
"home_insurance": ["home insurance", "homeowners insurance", "property insurance", "natural disaster", "insurance rates"],
"glp1": ["GLP-1", "Ozempic", "Wegovy", "weight loss", "Mounjaro", "Zepbound"],
"auto_insurance": ["auto insurance", "car insurance", "vehicle insurance", "insurance premiums", "driver discounts"],
}
TREND_CACHE: Dict[str, tuple] = {}
CACHE_DURATION = timedelta(hours=1)
TRIGGER_WORDS = [
(["crisis", "warning", "danger", "risk", "threat", "disaster"], "Fear"),
(["shortage", "limited", "running out", "exclusive", "sold out"], "FOMO"),
(["breakthrough", "solution", "cure", "relief", "success"], "Hope"),
(["save", "discount", "cheaper", "affordable", "deal"], "Greed"),
(["new", "innovation", "discover", "reveal", "secret"], "Curiosity"),
(["urgent", "now", "immediate", "breaking"], "Urgency"),
]
class TrendMonitor:
def __init__(self, language: str = "en", country: str = "US"):
self.google_news = GNews(language=language, country=country)
self.google_news.period = "7d"
self.google_news.max_results = 10
def _normalize_niche(self, niche: str) -> str:
key = niche.lower().strip().replace(" ", "_")
return key if key in NICHE_KEYWORDS else niche
async def fetch_trends(self, niche: str) -> List[Dict]:
cache_key = f"trends_{niche}"
if cache_key in TREND_CACHE:
data, cached_at = TREND_CACHE[cache_key]
if datetime.now() - cached_at < CACHE_DURATION:
return data
trends = await self._fetch_news(niche)
TREND_CACHE[cache_key] = (trends, datetime.now())
return trends
async def _fetch_news(self, niche: str) -> List[Dict]:
if niche not in NICHE_KEYWORDS:
raise ValueError(f"Unsupported niche: {niche}. Use: {list(NICHE_KEYWORDS.keys())}")
keywords = NICHE_KEYWORDS[niche][:3]
all_articles = []
loop = asyncio.get_event_loop()
for kw in keywords:
try:
articles = await loop.run_in_executor(
None, lambda k=kw: self.google_news.get_news(k)
)
for a in articles:
a["keyword"] = kw
a["niche"] = niche
all_articles.append(a)
except Exception as e:
print(f"⚠️ News fetch failed for '{kw}': {e}")
if not all_articles:
return []
scored = self._score_articles(all_articles, niche)
return scored[:5]
def _score_articles(self, articles: List[Dict], niche: str) -> List[Dict]:
keywords = NICHE_KEYWORDS[niche]
for a in articles:
text = f"{a.get('title', '')} {a.get('description', '')}".lower()
score = sum(2 for k in keywords if k.lower() in text)
pub = a.get("published date")
if pub:
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(str(pub)) if not isinstance(pub, datetime) else pub
days = (datetime.now() - dt).days
score += 5 if days <= 1 else 3 if days <= 3 else 1
except Exception:
pass
for word in ["crisis", "warning", "breakthrough", "new", "urgent", "breaking", "major"]:
if word in text:
score += 1
a["relevance_score"] = score
return sorted(articles, key=lambda x: x.get("relevance_score", 0), reverse=True)
def _trigger(self, title: str, description: str) -> str:
text = f"{title} {description}".lower()
for words, trigger in TRIGGER_WORDS:
if any(w in text for w in words):
return trigger
return "Emotion"
async def get_relevant_trends_for_niche(self, niche: str) -> Dict[str, Any]:
relevant = []
for occ in await get_current_occasions():
relevant.append({"title": occ["title"], "summary": occ["summary"], "category": occ.get("category", "Occasion")})
niche_key = self._normalize_niche(niche)
if niche_key in NICHE_KEYWORDS:
try:
for t in (await self.fetch_trends(niche_key))[:3]:
desc = t.get("description", "") or t.get("title", "")
relevant.append({
"title": t.get("title", ""),
"summary": desc,
"category": t.get("niche", "News").replace("_", " ").title(),
})
except Exception as e:
print(f"⚠️ News trends skipped: {e}")
return {"relevant_trends": relevant}
async def get_trending_angles(self, niche: str) -> List[Dict]:
trends = await self.fetch_trends(niche)
angles = []
for t in trends[:3]:
title = t.get("title", "")
angles.append({
"key": f"trend_{abs(hash(title)) % 10000}",
"name": f"Trending: {title[:40]}...",
"trigger": self._trigger(title, t.get("description", "")),
"example": (t.get("description") or title)[:100],
"category": "Trending",
"source": "google_news",
"url": t.get("url"),
"expires": (datetime.now() + timedelta(days=7)).isoformat(),
"relevance_score": t.get("relevance_score", 0),
})
return angles
def clear_cache(self) -> None:
global TREND_CACHE
TREND_CACHE = {}
trend_monitor = TrendMonitor()