File size: 6,537 Bytes
01c71d2
0bcdbd1
01c71d2
 
1a8dfaf
01c71d2
14b1c48
3b4140e
 
 
 
 
 
 
 
 
 
 
 
 
 
ea92076
dc00ece
 
3b4140e
 
dc00ece
 
3b4140e
 
dc00ece
 
 
 
 
 
 
 
 
 
 
3b4140e
dc00ece
 
 
 
 
 
 
 
da61bf8
01c71d2
3b4140e
 
dc00ece
01c71d2
3b4140e
 
01c71d2
 
8927482
3b4140e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc00ece
3b4140e
dc00ece
3b4140e
01c71d2
 
 
da61bf8
8927482
01c71d2
8927482
 
 
8e0e0db
14b1c48
8927482
8e0e0db
da61bf8
 
14b1c48
 
 
 
853482f
f66fbba
0bcdbd1
58438a6
0bcdbd1
58438a6
0bcdbd1
58438a6
8927482
0bcdbd1
58438a6
 
0bcdbd1
58438a6
 
 
0bcdbd1
 
f66fbba
58438a6
 
0bcdbd1
58438a6
 
0bcdbd1
58438a6
 
 
 
 
0bcdbd1
58438a6
 
 
0bcdbd1
 
 
f66fbba
8927482
14b1c48
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import re
import pandas as pd
import joblib
import json
from datetime import datetime
from typing import Dict, Any, Optional

# --- FINAL, STABLE IMPORTS ---
try:
    from newsapi import NewsApiClient
except ImportError:
    NewsApiClient = None
try:
    from serpapi import GoogleSearch
except ImportError:
    GoogleSearch = None
try:
    from core.utils import get_supabase_client
except ImportError:
    get_supabase_client = None

# --- CONFIGURATION ---
MODEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'models', 'thunderbird_market_predictor_v1.joblib')
NEWS_API_KEY = os.environ.get("NEWS_API_KEY")
SERPAPI_KEY = os.environ.get("SERPAPI_KEY")

def get_platform_shifts() -> Optional[Dict[str, str]]:
    """Calculates REAL 7-day shift from Supabase. Returns None if it fails."""
    if not get_supabase_client: return None
    print("   - Calculating REAL platform shifts from DB...")
    try:
        supabase = get_supabase_client()
        response = supabase.rpc('get_platform_trend_data_last_14_days').execute()
        if not response.data or len(response.data) < 2: return None
        df = pd.DataFrame(response.data); df['date'] = pd.to_datetime(df['date'])
        seven_days_ago = datetime.now() - pd.Timedelta(days=7)
        recent = df[df['date'] >= seven_days_ago]; prev = df[df['date'] < seven_days_ago]
        if prev.empty or recent.empty: return None
        avg_recent = recent.groupby('platform')['usage_count'].mean()
        avg_prev = prev.groupby('platform')['usage_count'].mean()
        shifts = {}
        for p in ['instagram', 'tiktok', 'youtube']:
            if p in avg_recent and p in avg_prev and avg_prev[p] > 0:
                change = ((avg_recent[p] - avg_prev[p]) / avg_prev[p]) * 100
                shifts[f"{p}_shift"] = f"{'+' if change > 0 else ''}{round(change)}%"
        return shifts
    except Exception as e:
        print(f"   - ❌ DB Error calculating shifts: {e}")
        return None

def get_external_trends() -> dict:
    """Orchestrator function to fetch all real-world data."""
    print("πŸš€ [Thunderbird Engine] Fetching all external trends...")
    results = { "news_headlines": [], "breakout_keyword": None, "trending_audio": None, "platform_shifts": None }
    
    # 1. NewsAPI
    if NEWS_API_KEY and NewsApiClient:
        try:
            newsapi = NewsApiClient(api_key=NEWS_API_KEY)
            top_headlines = newsapi.get_everything(q='("influencer marketing")', language='en', sort_by='relevancy', page_size=5)
            if top_headlines.get('articles'): results["news_headlines"] = [{"title": a['title'], "url": a['url']} for a in top_headlines['articles']]
        except Exception as e: print(f"   - ⚠️ NewsAPI failed: {e}")

    # 2. SerpApi for Google Trends & TikTok
    if SERPAPI_KEY and GoogleSearch:
        try:
            # Google Trends
            params = {"engine": "google_trends_trending_now", "frequency": "daily", "api_key": SERPAPI_KEY}
            search = GoogleSearch(params)
            res = search.get_dict()
            if res.get("trending_searches"): results["breakout_keyword"] = res["trending_searches"][0]["title"]
            
            # TikTok Trends
            params = {"engine": "tiktok_trending", "api_key": SERPAPI_KEY}
            search = GoogleSearch(params)
            res = search.get_dict()
            if res.get("trending_videos"):
                for video in res["trending_videos"]:
                    if video.get("music"):
                        results["trending_audio"] = {"name": video["music"].get("title"), "coverArtUrl": video["music"].get("cover_thumb")}
                        break
        except Exception as e: print(f"   - ❌ SerpApi failed: {e}")
    
    # 3. Platform Shifts from DB
    results["platform_shifts"] = get_platform_shifts()
    
    return results

def predict_niche_trends() -> dict:
    """Uses the REAL trained ML pipeline to predict future interest."""
    print("\nπŸš€ [Thunderbird Engine] Using REAL ML pipeline for predictions...")
    try:
        pipeline = joblib.load(MODEL_PATH)
        encoder = pipeline.named_steps['preprocessor'].named_transformers_['cat']
        all_niches = [cat.replace('niche_', '') for cat in encoder.get_feature_names_out(['niche'])]
        future_dates = pd.to_datetime(pd.date_range(start=datetime.now(), periods=12, freq='ME'))
        predictions = {}
        for niche in all_niches:
            future_df = pd.DataFrame({'month_of_year': future_dates.month, 'niche': [niche] * 12, 'trend_score': 50})
            predicted_values = pipeline.predict(future_df[['niche', 'trend_score', 'month_of_year']])
            predictions[niche] = [{"date": dt.strftime('%Y-%m'), "value": max(0, int(val))} for dt, val in zip(future_dates, predicted_values)]
        return {"trend_predictions": predictions}
    except Exception as e:
        print(f"   - ❌ REAL Prediction Failed: {e}. Chart will be empty.")
        return {"trend_predictions": {}}

def decode_market_trend(topic: str, llm_instance) -> Dict[str, str]:
    """
    Decodes a keyword into a strategy using a simplified prompt for small models.
    """
    print(f"🧠 [Thunderbird] Decoding Trend with SIMPLE prompt: {topic}")
    
    offline_response = {"summary": "AI Analyst is offline.", "impact": "Please try again.", "strategy": "System is rebooting."}
    if not llm_instance: return offline_response

    # === THE FINAL, SIMPLEST PROMPT ===
    # We ask for a simple paragraph and let Python do the formatting.
    prompt = f"""[INST]
    You are an expert marketing strategist.
    Analyze this trend: "{topic}".
    Write a short, three-sentence briefing about it.
    [/INST]"""

    try:
        response = llm_instance(prompt, max_tokens=150, temperature=0.7, echo=False)
        full_text = response['choices'][0]['text'].strip()

        # Split the paragraph into three sentences
        sentences = re.split(r'(?<=\.)\s+', full_text)

        # Assign each sentence to a key, with fallbacks
        summary = sentences[0] if len(sentences) > 0 else "Analysis in progress."
        impact = sentences[1] if len(sentences) > 1 else "Impact is being evaluated."
        strategy = sentences[2] if len(sentences) > 2 else "Awaiting actionable strategy."
        
        return {
            "summary": summary,
            "impact": impact,
            "strategy": strategy
        }
        # === END OF FIX ===

    except Exception as e:
        print(f"   - ❌ LLM Error: {e}")
        return offline_response