File size: 5,926 Bytes
c5771b6
d4a4da7
c5771b6
 
 
d4a4da7
c5771b6
 
 
d4a4da7
 
c5771b6
d4a4da7
 
 
c5771b6
 
d4a4da7
 
 
 
 
 
 
 
 
 
 
c5771b6
 
 
 
 
d4a4da7
c5771b6
d4a4da7
 
 
 
 
c5771b6
 
 
d4a4da7
 
 
 
c5771b6
 
d4a4da7
 
c5771b6
d4a4da7
 
c5771b6
d4a4da7
 
c5771b6
 
d4a4da7
c5771b6
d4a4da7
 
 
 
c5771b6
d4a4da7
c5771b6
 
d4a4da7
 
 
 
c5771b6
d4a4da7
 
 
 
 
c5771b6
d4a4da7
 
 
 
 
c5771b6
d4a4da7
c5771b6
 
d4a4da7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5771b6
 
 
d4a4da7
 
 
c5771b6
 
d4a4da7
 
c5771b6
 
d4a4da7
c5771b6
d4a4da7
 
c5771b6
d4a4da7
 
c5771b6
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Trend monitor: current occasions (AI) + niche news (GNews) for ad trending context.
"""

from gnews import GNews
from typing import List, Dict, Any
from datetime import datetime, timedelta
import asyncio

from services.current_occasions import get_current_occasions

NICHE_KEYWORDS = {
    "home_insurance": ["home insurance", "homeowners insurance", "property insurance", "natural disaster", "insurance rates"],
    "glp1": ["GLP-1", "Ozempic", "Wegovy", "weight loss", "Mounjaro", "Zepbound"],
    "auto_insurance": ["auto insurance", "car insurance", "vehicle insurance", "insurance premiums", "driver discounts"],
}

TREND_CACHE: Dict[str, tuple] = {}
CACHE_DURATION = timedelta(hours=1)

TRIGGER_WORDS = [
    (["crisis", "warning", "danger", "risk", "threat", "disaster"], "Fear"),
    (["shortage", "limited", "running out", "exclusive", "sold out"], "FOMO"),
    (["breakthrough", "solution", "cure", "relief", "success"], "Hope"),
    (["save", "discount", "cheaper", "affordable", "deal"], "Greed"),
    (["new", "innovation", "discover", "reveal", "secret"], "Curiosity"),
    (["urgent", "now", "immediate", "breaking"], "Urgency"),
]


class TrendMonitor:
    def __init__(self, language: str = "en", country: str = "US"):
        self.google_news = GNews(language=language, country=country)
        self.google_news.period = "7d"
        self.google_news.max_results = 10

    def _normalize_niche(self, niche: str) -> str:
        key = niche.lower().strip().replace(" ", "_")
        return key if key in NICHE_KEYWORDS else niche

    async def fetch_trends(self, niche: str) -> List[Dict]:
        cache_key = f"trends_{niche}"
        if cache_key in TREND_CACHE:
            data, cached_at = TREND_CACHE[cache_key]
            if datetime.now() - cached_at < CACHE_DURATION:
                return data
        trends = await self._fetch_news(niche)
        TREND_CACHE[cache_key] = (trends, datetime.now())
        return trends

    async def _fetch_news(self, niche: str) -> List[Dict]:
        if niche not in NICHE_KEYWORDS:
            raise ValueError(f"Unsupported niche: {niche}. Use: {list(NICHE_KEYWORDS.keys())}")
        keywords = NICHE_KEYWORDS[niche][:3]
        all_articles = []
        loop = asyncio.get_event_loop()
        for kw in keywords:
            try:
                articles = await loop.run_in_executor(
                    None, lambda k=kw: self.google_news.get_news(k)
                )
                for a in articles:
                    a["keyword"] = kw
                    a["niche"] = niche
                    all_articles.append(a)
            except Exception as e:
                print(f"⚠️ News fetch failed for '{kw}': {e}")
        if not all_articles:
            return []
        scored = self._score_articles(all_articles, niche)
        return scored[:5]

    def _score_articles(self, articles: List[Dict], niche: str) -> List[Dict]:
        keywords = NICHE_KEYWORDS[niche]
        for a in articles:
            text = f"{a.get('title', '')} {a.get('description', '')}".lower()
            score = sum(2 for k in keywords if k.lower() in text)
            pub = a.get("published date")
            if pub:
                try:
                    from email.utils import parsedate_to_datetime
                    dt = parsedate_to_datetime(str(pub)) if not isinstance(pub, datetime) else pub
                    days = (datetime.now() - dt).days
                    score += 5 if days <= 1 else 3 if days <= 3 else 1
                except Exception:
                    pass
            for word in ["crisis", "warning", "breakthrough", "new", "urgent", "breaking", "major"]:
                if word in text:
                    score += 1
            a["relevance_score"] = score
        return sorted(articles, key=lambda x: x.get("relevance_score", 0), reverse=True)

    def _trigger(self, title: str, description: str) -> str:
        text = f"{title} {description}".lower()
        for words, trigger in TRIGGER_WORDS:
            if any(w in text for w in words):
                return trigger
        return "Emotion"

    async def get_relevant_trends_for_niche(self, niche: str) -> Dict[str, Any]:
        relevant = []
        for occ in await get_current_occasions():
            relevant.append({"title": occ["title"], "summary": occ["summary"], "category": occ.get("category", "Occasion")})
        niche_key = self._normalize_niche(niche)
        if niche_key in NICHE_KEYWORDS:
            try:
                for t in (await self.fetch_trends(niche_key))[:3]:
                    desc = t.get("description", "") or t.get("title", "")
                    relevant.append({
                        "title": t.get("title", ""),
                        "summary": desc,
                        "category": t.get("niche", "News").replace("_", " ").title(),
                    })
            except Exception as e:
                print(f"⚠️ News trends skipped: {e}")
        return {"relevant_trends": relevant}

    async def get_trending_angles(self, niche: str) -> List[Dict]:
        trends = await self.fetch_trends(niche)
        angles = []
        for t in trends[:3]:
            title = t.get("title", "")
            angles.append({
                "key": f"trend_{abs(hash(title)) % 10000}",
                "name": f"Trending: {title[:40]}...",
                "trigger": self._trigger(title, t.get("description", "")),
                "example": (t.get("description") or title)[:100],
                "category": "Trending",
                "source": "google_news",
                "url": t.get("url"),
                "expires": (datetime.now() + timedelta(days=7)).isoformat(),
                "relevance_score": t.get("relevance_score", 0),
            })
        return angles

    def clear_cache(self) -> None:
        global TREND_CACHE
        TREND_CACHE = {}


trend_monitor = TrendMonitor()