File size: 5,926 Bytes
c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 d4a4da7 c5771b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
"""
Trend monitor: current occasions (AI) + niche news (GNews) for ad trending context.
"""
from gnews import GNews
from typing import List, Dict, Any
from datetime import datetime, timedelta
import asyncio
from services.current_occasions import get_current_occasions
NICHE_KEYWORDS = {
"home_insurance": ["home insurance", "homeowners insurance", "property insurance", "natural disaster", "insurance rates"],
"glp1": ["GLP-1", "Ozempic", "Wegovy", "weight loss", "Mounjaro", "Zepbound"],
"auto_insurance": ["auto insurance", "car insurance", "vehicle insurance", "insurance premiums", "driver discounts"],
}
TREND_CACHE: Dict[str, tuple] = {}
CACHE_DURATION = timedelta(hours=1)
TRIGGER_WORDS = [
(["crisis", "warning", "danger", "risk", "threat", "disaster"], "Fear"),
(["shortage", "limited", "running out", "exclusive", "sold out"], "FOMO"),
(["breakthrough", "solution", "cure", "relief", "success"], "Hope"),
(["save", "discount", "cheaper", "affordable", "deal"], "Greed"),
(["new", "innovation", "discover", "reveal", "secret"], "Curiosity"),
(["urgent", "now", "immediate", "breaking"], "Urgency"),
]
class TrendMonitor:
def __init__(self, language: str = "en", country: str = "US"):
self.google_news = GNews(language=language, country=country)
self.google_news.period = "7d"
self.google_news.max_results = 10
def _normalize_niche(self, niche: str) -> str:
key = niche.lower().strip().replace(" ", "_")
return key if key in NICHE_KEYWORDS else niche
async def fetch_trends(self, niche: str) -> List[Dict]:
cache_key = f"trends_{niche}"
if cache_key in TREND_CACHE:
data, cached_at = TREND_CACHE[cache_key]
if datetime.now() - cached_at < CACHE_DURATION:
return data
trends = await self._fetch_news(niche)
TREND_CACHE[cache_key] = (trends, datetime.now())
return trends
async def _fetch_news(self, niche: str) -> List[Dict]:
if niche not in NICHE_KEYWORDS:
raise ValueError(f"Unsupported niche: {niche}. Use: {list(NICHE_KEYWORDS.keys())}")
keywords = NICHE_KEYWORDS[niche][:3]
all_articles = []
loop = asyncio.get_event_loop()
for kw in keywords:
try:
articles = await loop.run_in_executor(
None, lambda k=kw: self.google_news.get_news(k)
)
for a in articles:
a["keyword"] = kw
a["niche"] = niche
all_articles.append(a)
except Exception as e:
print(f"⚠️ News fetch failed for '{kw}': {e}")
if not all_articles:
return []
scored = self._score_articles(all_articles, niche)
return scored[:5]
def _score_articles(self, articles: List[Dict], niche: str) -> List[Dict]:
keywords = NICHE_KEYWORDS[niche]
for a in articles:
text = f"{a.get('title', '')} {a.get('description', '')}".lower()
score = sum(2 for k in keywords if k.lower() in text)
pub = a.get("published date")
if pub:
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(str(pub)) if not isinstance(pub, datetime) else pub
days = (datetime.now() - dt).days
score += 5 if days <= 1 else 3 if days <= 3 else 1
except Exception:
pass
for word in ["crisis", "warning", "breakthrough", "new", "urgent", "breaking", "major"]:
if word in text:
score += 1
a["relevance_score"] = score
return sorted(articles, key=lambda x: x.get("relevance_score", 0), reverse=True)
def _trigger(self, title: str, description: str) -> str:
text = f"{title} {description}".lower()
for words, trigger in TRIGGER_WORDS:
if any(w in text for w in words):
return trigger
return "Emotion"
async def get_relevant_trends_for_niche(self, niche: str) -> Dict[str, Any]:
relevant = []
for occ in await get_current_occasions():
relevant.append({"title": occ["title"], "summary": occ["summary"], "category": occ.get("category", "Occasion")})
niche_key = self._normalize_niche(niche)
if niche_key in NICHE_KEYWORDS:
try:
for t in (await self.fetch_trends(niche_key))[:3]:
desc = t.get("description", "") or t.get("title", "")
relevant.append({
"title": t.get("title", ""),
"summary": desc,
"category": t.get("niche", "News").replace("_", " ").title(),
})
except Exception as e:
print(f"⚠️ News trends skipped: {e}")
return {"relevant_trends": relevant}
async def get_trending_angles(self, niche: str) -> List[Dict]:
trends = await self.fetch_trends(niche)
angles = []
for t in trends[:3]:
title = t.get("title", "")
angles.append({
"key": f"trend_{abs(hash(title)) % 10000}",
"name": f"Trending: {title[:40]}...",
"trigger": self._trigger(title, t.get("description", "")),
"example": (t.get("description") or title)[:100],
"category": "Trending",
"source": "google_news",
"url": t.get("url"),
"expires": (datetime.now() + timedelta(days=7)).isoformat(),
"relevance_score": t.get("relevance_score", 0),
})
return angles
def clear_cache(self) -> None:
global TREND_CACHE
TREND_CACHE = {}
trend_monitor = TrendMonitor()
|