import os import re import pandas as pd import joblib import json from datetime import datetime from typing import Dict, Any, Optional # --- FINAL, STABLE IMPORTS --- try: from newsapi import NewsApiClient except ImportError: NewsApiClient = None try: from serpapi import GoogleSearch except ImportError: GoogleSearch = None try: from core.utils import get_supabase_client except ImportError: get_supabase_client = None # --- CONFIGURATION --- MODEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'models', 'thunderbird_market_predictor_v1.joblib') NEWS_API_KEY = os.environ.get("NEWS_API_KEY") SERPAPI_KEY = os.environ.get("SERPAPI_KEY") def get_platform_shifts() -> Optional[Dict[str, str]]: """Calculates REAL 7-day shift from Supabase. Returns None if it fails.""" if not get_supabase_client: return None print(" - Calculating REAL platform shifts from DB...") try: supabase = get_supabase_client() response = supabase.rpc('get_platform_trend_data_last_14_days').execute() if not response.data or len(response.data) < 2: return None df = pd.DataFrame(response.data); df['date'] = pd.to_datetime(df['date']) seven_days_ago = datetime.now() - pd.Timedelta(days=7) recent = df[df['date'] >= seven_days_ago]; prev = df[df['date'] < seven_days_ago] if prev.empty or recent.empty: return None avg_recent = recent.groupby('platform')['usage_count'].mean() avg_prev = prev.groupby('platform')['usage_count'].mean() shifts = {} for p in ['instagram', 'tiktok', 'youtube']: if p in avg_recent and p in avg_prev and avg_prev[p] > 0: change = ((avg_recent[p] - avg_prev[p]) / avg_prev[p]) * 100 shifts[f"{p}_shift"] = f"{'+' if change > 0 else ''}{round(change)}%" return shifts except Exception as e: print(f" - āŒ DB Error calculating shifts: {e}") return None def get_external_trends() -> dict: """Orchestrator function to fetch all real-world data.""" print("šŸš€ [Thunderbird Engine] Fetching all external trends...") results = { "news_headlines": [], "breakout_keyword": None, "trending_audio": None, "platform_shifts": None } # 1. NewsAPI if NEWS_API_KEY and NewsApiClient: try: newsapi = NewsApiClient(api_key=NEWS_API_KEY) top_headlines = newsapi.get_everything(q='("influencer marketing")', language='en', sort_by='relevancy', page_size=5) if top_headlines.get('articles'): results["news_headlines"] = [{"title": a['title'], "url": a['url']} for a in top_headlines['articles']] except Exception as e: print(f" - āš ļø NewsAPI failed: {e}") # 2. SerpApi for Google Trends & TikTok if SERPAPI_KEY and GoogleSearch: try: # Google Trends params = {"engine": "google_trends_trending_now", "frequency": "daily", "api_key": SERPAPI_KEY} search = GoogleSearch(params) res = search.get_dict() if res.get("trending_searches"): results["breakout_keyword"] = res["trending_searches"][0]["title"] # TikTok Trends params = {"engine": "tiktok_trending", "api_key": SERPAPI_KEY} search = GoogleSearch(params) res = search.get_dict() if res.get("trending_videos"): for video in res["trending_videos"]: if video.get("music"): results["trending_audio"] = {"name": video["music"].get("title"), "coverArtUrl": video["music"].get("cover_thumb")} break except Exception as e: print(f" - āŒ SerpApi failed: {e}") # 3. Platform Shifts from DB results["platform_shifts"] = get_platform_shifts() return results def predict_niche_trends() -> dict: """Uses the REAL trained ML pipeline to predict future interest.""" print("\nšŸš€ [Thunderbird Engine] Using REAL ML pipeline for predictions...") try: pipeline = joblib.load(MODEL_PATH) encoder = pipeline.named_steps['preprocessor'].named_transformers_['cat'] all_niches = [cat.replace('niche_', '') for cat in encoder.get_feature_names_out(['niche'])] future_dates = pd.to_datetime(pd.date_range(start=datetime.now(), periods=12, freq='ME')) predictions = {} for niche in all_niches: future_df = pd.DataFrame({'month_of_year': future_dates.month, 'niche': [niche] * 12, 'trend_score': 50}) predicted_values = pipeline.predict(future_df[['niche', 'trend_score', 'month_of_year']]) predictions[niche] = [{"date": dt.strftime('%Y-%m'), "value": max(0, int(val))} for dt, val in zip(future_dates, predicted_values)] return {"trend_predictions": predictions} except Exception as e: print(f" - āŒ REAL Prediction Failed: {e}. Chart will be empty.") return {"trend_predictions": {}} def decode_market_trend(topic: str, llm_instance) -> Dict[str, str]: """ Decodes a keyword into a strategy using a simplified prompt for small models. """ print(f"🧠 [Thunderbird] Decoding Trend with SIMPLE prompt: {topic}") offline_response = {"summary": "AI Analyst is offline.", "impact": "Please try again.", "strategy": "System is rebooting."} if not llm_instance: return offline_response # === THE FINAL, SIMPLEST PROMPT === # We ask for a simple paragraph and let Python do the formatting. prompt = f"""[INST] You are an expert marketing strategist. Analyze this trend: "{topic}". Write a short, three-sentence briefing about it. [/INST]""" try: response = llm_instance(prompt, max_tokens=150, temperature=0.7, echo=False) full_text = response['choices'][0]['text'].strip() # Split the paragraph into three sentences sentences = re.split(r'(?<=\.)\s+', full_text) # Assign each sentence to a key, with fallbacks summary = sentences[0] if len(sentences) > 0 else "Analysis in progress." impact = sentences[1] if len(sentences) > 1 else "Impact is being evaluated." strategy = sentences[2] if len(sentences) > 2 else "Awaiting actionable strategy." return { "summary": summary, "impact": impact, "strategy": strategy } # === END OF FIX === except Exception as e: print(f" - āŒ LLM Error: {e}") return offline_response