File size: 6,537 Bytes
01c71d2 0bcdbd1 01c71d2 1a8dfaf 01c71d2 14b1c48 3b4140e ea92076 dc00ece 3b4140e dc00ece 3b4140e dc00ece 3b4140e dc00ece da61bf8 01c71d2 3b4140e dc00ece 01c71d2 3b4140e 01c71d2 8927482 3b4140e dc00ece 3b4140e dc00ece 3b4140e 01c71d2 da61bf8 8927482 01c71d2 8927482 8e0e0db 14b1c48 8927482 8e0e0db da61bf8 14b1c48 853482f f66fbba 0bcdbd1 58438a6 0bcdbd1 58438a6 0bcdbd1 58438a6 8927482 0bcdbd1 58438a6 0bcdbd1 58438a6 0bcdbd1 f66fbba 58438a6 0bcdbd1 58438a6 0bcdbd1 58438a6 0bcdbd1 58438a6 0bcdbd1 f66fbba 8927482 14b1c48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import os
import re
import pandas as pd
import joblib
import json
from datetime import datetime
from typing import Dict, Any, Optional
# --- FINAL, STABLE IMPORTS ---
try:
from newsapi import NewsApiClient
except ImportError:
NewsApiClient = None
try:
from serpapi import GoogleSearch
except ImportError:
GoogleSearch = None
try:
from core.utils import get_supabase_client
except ImportError:
get_supabase_client = None
# --- CONFIGURATION ---
MODEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'models', 'thunderbird_market_predictor_v1.joblib')
NEWS_API_KEY = os.environ.get("NEWS_API_KEY")
SERPAPI_KEY = os.environ.get("SERPAPI_KEY")
def get_platform_shifts() -> Optional[Dict[str, str]]:
"""Calculates REAL 7-day shift from Supabase. Returns None if it fails."""
if not get_supabase_client: return None
print(" - Calculating REAL platform shifts from DB...")
try:
supabase = get_supabase_client()
response = supabase.rpc('get_platform_trend_data_last_14_days').execute()
if not response.data or len(response.data) < 2: return None
df = pd.DataFrame(response.data); df['date'] = pd.to_datetime(df['date'])
seven_days_ago = datetime.now() - pd.Timedelta(days=7)
recent = df[df['date'] >= seven_days_ago]; prev = df[df['date'] < seven_days_ago]
if prev.empty or recent.empty: return None
avg_recent = recent.groupby('platform')['usage_count'].mean()
avg_prev = prev.groupby('platform')['usage_count'].mean()
shifts = {}
for p in ['instagram', 'tiktok', 'youtube']:
if p in avg_recent and p in avg_prev and avg_prev[p] > 0:
change = ((avg_recent[p] - avg_prev[p]) / avg_prev[p]) * 100
shifts[f"{p}_shift"] = f"{'+' if change > 0 else ''}{round(change)}%"
return shifts
except Exception as e:
print(f" - β DB Error calculating shifts: {e}")
return None
def get_external_trends() -> dict:
"""Orchestrator function to fetch all real-world data."""
print("π [Thunderbird Engine] Fetching all external trends...")
results = { "news_headlines": [], "breakout_keyword": None, "trending_audio": None, "platform_shifts": None }
# 1. NewsAPI
if NEWS_API_KEY and NewsApiClient:
try:
newsapi = NewsApiClient(api_key=NEWS_API_KEY)
top_headlines = newsapi.get_everything(q='("influencer marketing")', language='en', sort_by='relevancy', page_size=5)
if top_headlines.get('articles'): results["news_headlines"] = [{"title": a['title'], "url": a['url']} for a in top_headlines['articles']]
except Exception as e: print(f" - β οΈ NewsAPI failed: {e}")
# 2. SerpApi for Google Trends & TikTok
if SERPAPI_KEY and GoogleSearch:
try:
# Google Trends
params = {"engine": "google_trends_trending_now", "frequency": "daily", "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
res = search.get_dict()
if res.get("trending_searches"): results["breakout_keyword"] = res["trending_searches"][0]["title"]
# TikTok Trends
params = {"engine": "tiktok_trending", "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
res = search.get_dict()
if res.get("trending_videos"):
for video in res["trending_videos"]:
if video.get("music"):
results["trending_audio"] = {"name": video["music"].get("title"), "coverArtUrl": video["music"].get("cover_thumb")}
break
except Exception as e: print(f" - β SerpApi failed: {e}")
# 3. Platform Shifts from DB
results["platform_shifts"] = get_platform_shifts()
return results
def predict_niche_trends() -> dict:
"""Uses the REAL trained ML pipeline to predict future interest."""
print("\nπ [Thunderbird Engine] Using REAL ML pipeline for predictions...")
try:
pipeline = joblib.load(MODEL_PATH)
encoder = pipeline.named_steps['preprocessor'].named_transformers_['cat']
all_niches = [cat.replace('niche_', '') for cat in encoder.get_feature_names_out(['niche'])]
future_dates = pd.to_datetime(pd.date_range(start=datetime.now(), periods=12, freq='ME'))
predictions = {}
for niche in all_niches:
future_df = pd.DataFrame({'month_of_year': future_dates.month, 'niche': [niche] * 12, 'trend_score': 50})
predicted_values = pipeline.predict(future_df[['niche', 'trend_score', 'month_of_year']])
predictions[niche] = [{"date": dt.strftime('%Y-%m'), "value": max(0, int(val))} for dt, val in zip(future_dates, predicted_values)]
return {"trend_predictions": predictions}
except Exception as e:
print(f" - β REAL Prediction Failed: {e}. Chart will be empty.")
return {"trend_predictions": {}}
def decode_market_trend(topic: str, llm_instance) -> Dict[str, str]:
"""
Decodes a keyword into a strategy using a simplified prompt for small models.
"""
print(f"π§ [Thunderbird] Decoding Trend with SIMPLE prompt: {topic}")
offline_response = {"summary": "AI Analyst is offline.", "impact": "Please try again.", "strategy": "System is rebooting."}
if not llm_instance: return offline_response
# === THE FINAL, SIMPLEST PROMPT ===
# We ask for a simple paragraph and let Python do the formatting.
prompt = f"""[INST]
You are an expert marketing strategist.
Analyze this trend: "{topic}".
Write a short, three-sentence briefing about it.
[/INST]"""
try:
response = llm_instance(prompt, max_tokens=150, temperature=0.7, echo=False)
full_text = response['choices'][0]['text'].strip()
# Split the paragraph into three sentences
sentences = re.split(r'(?<=\.)\s+', full_text)
# Assign each sentence to a key, with fallbacks
summary = sentences[0] if len(sentences) > 0 else "Analysis in progress."
impact = sentences[1] if len(sentences) > 1 else "Impact is being evaluated."
strategy = sentences[2] if len(sentences) > 2 else "Awaiting actionable strategy."
return {
"summary": summary,
"impact": impact,
"strategy": strategy
}
# === END OF FIX ===
except Exception as e:
print(f" - β LLM Error: {e}")
return offline_response |