|
|
import os |
|
|
import re |
|
|
import pandas as pd |
|
|
import joblib |
|
|
import json |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any, Optional |
|
|
|
|
|
|
|
|
try: |
|
|
from newsapi import NewsApiClient |
|
|
except ImportError: |
|
|
NewsApiClient = None |
|
|
try: |
|
|
from serpapi import GoogleSearch |
|
|
except ImportError: |
|
|
GoogleSearch = None |
|
|
try: |
|
|
from core.utils import get_supabase_client |
|
|
except ImportError: |
|
|
get_supabase_client = None |
|
|
|
|
|
|
|
|
MODEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'models', 'thunderbird_market_predictor_v1.joblib') |
|
|
NEWS_API_KEY = os.environ.get("NEWS_API_KEY") |
|
|
SERPAPI_KEY = os.environ.get("SERPAPI_KEY") |
|
|
|
|
|
def get_platform_shifts() -> Optional[Dict[str, str]]: |
|
|
"""Calculates REAL 7-day shift from Supabase. Returns None if it fails.""" |
|
|
if not get_supabase_client: return None |
|
|
print(" - Calculating REAL platform shifts from DB...") |
|
|
try: |
|
|
supabase = get_supabase_client() |
|
|
response = supabase.rpc('get_platform_trend_data_last_14_days').execute() |
|
|
if not response.data or len(response.data) < 2: return None |
|
|
df = pd.DataFrame(response.data); df['date'] = pd.to_datetime(df['date']) |
|
|
seven_days_ago = datetime.now() - pd.Timedelta(days=7) |
|
|
recent = df[df['date'] >= seven_days_ago]; prev = df[df['date'] < seven_days_ago] |
|
|
if prev.empty or recent.empty: return None |
|
|
avg_recent = recent.groupby('platform')['usage_count'].mean() |
|
|
avg_prev = prev.groupby('platform')['usage_count'].mean() |
|
|
shifts = {} |
|
|
for p in ['instagram', 'tiktok', 'youtube']: |
|
|
if p in avg_recent and p in avg_prev and avg_prev[p] > 0: |
|
|
change = ((avg_recent[p] - avg_prev[p]) / avg_prev[p]) * 100 |
|
|
shifts[f"{p}_shift"] = f"{'+' if change > 0 else ''}{round(change)}%" |
|
|
return shifts |
|
|
except Exception as e: |
|
|
print(f" - β DB Error calculating shifts: {e}") |
|
|
return None |
|
|
|
|
|
def get_external_trends() -> dict: |
|
|
"""Orchestrator function to fetch all real-world data.""" |
|
|
print("π [Thunderbird Engine] Fetching all external trends...") |
|
|
results = { "news_headlines": [], "breakout_keyword": None, "trending_audio": None, "platform_shifts": None } |
|
|
|
|
|
|
|
|
if NEWS_API_KEY and NewsApiClient: |
|
|
try: |
|
|
newsapi = NewsApiClient(api_key=NEWS_API_KEY) |
|
|
top_headlines = newsapi.get_everything(q='("influencer marketing")', language='en', sort_by='relevancy', page_size=5) |
|
|
if top_headlines.get('articles'): results["news_headlines"] = [{"title": a['title'], "url": a['url']} for a in top_headlines['articles']] |
|
|
except Exception as e: print(f" - β οΈ NewsAPI failed: {e}") |
|
|
|
|
|
|
|
|
if SERPAPI_KEY and GoogleSearch: |
|
|
try: |
|
|
|
|
|
params = {"engine": "google_trends_trending_now", "frequency": "daily", "api_key": SERPAPI_KEY} |
|
|
search = GoogleSearch(params) |
|
|
res = search.get_dict() |
|
|
if res.get("trending_searches"): results["breakout_keyword"] = res["trending_searches"][0]["title"] |
|
|
|
|
|
|
|
|
params = {"engine": "tiktok_trending", "api_key": SERPAPI_KEY} |
|
|
search = GoogleSearch(params) |
|
|
res = search.get_dict() |
|
|
if res.get("trending_videos"): |
|
|
for video in res["trending_videos"]: |
|
|
if video.get("music"): |
|
|
results["trending_audio"] = {"name": video["music"].get("title"), "coverArtUrl": video["music"].get("cover_thumb")} |
|
|
break |
|
|
except Exception as e: print(f" - β SerpApi failed: {e}") |
|
|
|
|
|
|
|
|
results["platform_shifts"] = get_platform_shifts() |
|
|
|
|
|
return results |
|
|
|
|
|
def predict_niche_trends() -> dict: |
|
|
"""Uses the REAL trained ML pipeline to predict future interest.""" |
|
|
print("\nπ [Thunderbird Engine] Using REAL ML pipeline for predictions...") |
|
|
try: |
|
|
pipeline = joblib.load(MODEL_PATH) |
|
|
encoder = pipeline.named_steps['preprocessor'].named_transformers_['cat'] |
|
|
all_niches = [cat.replace('niche_', '') for cat in encoder.get_feature_names_out(['niche'])] |
|
|
future_dates = pd.to_datetime(pd.date_range(start=datetime.now(), periods=12, freq='ME')) |
|
|
predictions = {} |
|
|
for niche in all_niches: |
|
|
future_df = pd.DataFrame({'month_of_year': future_dates.month, 'niche': [niche] * 12, 'trend_score': 50}) |
|
|
predicted_values = pipeline.predict(future_df[['niche', 'trend_score', 'month_of_year']]) |
|
|
predictions[niche] = [{"date": dt.strftime('%Y-%m'), "value": max(0, int(val))} for dt, val in zip(future_dates, predicted_values)] |
|
|
return {"trend_predictions": predictions} |
|
|
except Exception as e: |
|
|
print(f" - β REAL Prediction Failed: {e}. Chart will be empty.") |
|
|
return {"trend_predictions": {}} |
|
|
|
|
|
def decode_market_trend(topic: str, llm_instance) -> Dict[str, str]: |
|
|
""" |
|
|
Decodes a keyword into a strategy using a simplified prompt for small models. |
|
|
""" |
|
|
print(f"π§ [Thunderbird] Decoding Trend with SIMPLE prompt: {topic}") |
|
|
|
|
|
offline_response = {"summary": "AI Analyst is offline.", "impact": "Please try again.", "strategy": "System is rebooting."} |
|
|
if not llm_instance: return offline_response |
|
|
|
|
|
|
|
|
|
|
|
prompt = f"""[INST] |
|
|
You are an expert marketing strategist. |
|
|
Analyze this trend: "{topic}". |
|
|
Write a short, three-sentence briefing about it. |
|
|
[/INST]""" |
|
|
|
|
|
try: |
|
|
response = llm_instance(prompt, max_tokens=150, temperature=0.7, echo=False) |
|
|
full_text = response['choices'][0]['text'].strip() |
|
|
|
|
|
|
|
|
sentences = re.split(r'(?<=\.)\s+', full_text) |
|
|
|
|
|
|
|
|
summary = sentences[0] if len(sentences) > 0 else "Analysis in progress." |
|
|
impact = sentences[1] if len(sentences) > 1 else "Impact is being evaluated." |
|
|
strategy = sentences[2] if len(sentences) > 2 else "Awaiting actionable strategy." |
|
|
|
|
|
return { |
|
|
"summary": summary, |
|
|
"impact": impact, |
|
|
"strategy": strategy |
|
|
} |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f" - β LLM Error: {e}") |
|
|
return offline_response |