reachify-ai-service / core /thunderbird_engine.py
amitbhatt6075's picture
fix: Simplify LLM prompt to prevent instruction echoing
58438a6
import os
import re
import pandas as pd
import joblib
import json
from datetime import datetime
from typing import Dict, Any, Optional
# --- FINAL, STABLE IMPORTS ---
try:
from newsapi import NewsApiClient
except ImportError:
NewsApiClient = None
try:
from serpapi import GoogleSearch
except ImportError:
GoogleSearch = None
try:
from core.utils import get_supabase_client
except ImportError:
get_supabase_client = None
# --- CONFIGURATION ---
MODEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'models', 'thunderbird_market_predictor_v1.joblib')
NEWS_API_KEY = os.environ.get("NEWS_API_KEY")
SERPAPI_KEY = os.environ.get("SERPAPI_KEY")
def get_platform_shifts() -> Optional[Dict[str, str]]:
"""Calculates REAL 7-day shift from Supabase. Returns None if it fails."""
if not get_supabase_client: return None
print(" - Calculating REAL platform shifts from DB...")
try:
supabase = get_supabase_client()
response = supabase.rpc('get_platform_trend_data_last_14_days').execute()
if not response.data or len(response.data) < 2: return None
df = pd.DataFrame(response.data); df['date'] = pd.to_datetime(df['date'])
seven_days_ago = datetime.now() - pd.Timedelta(days=7)
recent = df[df['date'] >= seven_days_ago]; prev = df[df['date'] < seven_days_ago]
if prev.empty or recent.empty: return None
avg_recent = recent.groupby('platform')['usage_count'].mean()
avg_prev = prev.groupby('platform')['usage_count'].mean()
shifts = {}
for p in ['instagram', 'tiktok', 'youtube']:
if p in avg_recent and p in avg_prev and avg_prev[p] > 0:
change = ((avg_recent[p] - avg_prev[p]) / avg_prev[p]) * 100
shifts[f"{p}_shift"] = f"{'+' if change > 0 else ''}{round(change)}%"
return shifts
except Exception as e:
print(f" - ❌ DB Error calculating shifts: {e}")
return None
def get_external_trends() -> dict:
"""Orchestrator function to fetch all real-world data."""
print("πŸš€ [Thunderbird Engine] Fetching all external trends...")
results = { "news_headlines": [], "breakout_keyword": None, "trending_audio": None, "platform_shifts": None }
# 1. NewsAPI
if NEWS_API_KEY and NewsApiClient:
try:
newsapi = NewsApiClient(api_key=NEWS_API_KEY)
top_headlines = newsapi.get_everything(q='("influencer marketing")', language='en', sort_by='relevancy', page_size=5)
if top_headlines.get('articles'): results["news_headlines"] = [{"title": a['title'], "url": a['url']} for a in top_headlines['articles']]
except Exception as e: print(f" - ⚠️ NewsAPI failed: {e}")
# 2. SerpApi for Google Trends & TikTok
if SERPAPI_KEY and GoogleSearch:
try:
# Google Trends
params = {"engine": "google_trends_trending_now", "frequency": "daily", "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
res = search.get_dict()
if res.get("trending_searches"): results["breakout_keyword"] = res["trending_searches"][0]["title"]
# TikTok Trends
params = {"engine": "tiktok_trending", "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
res = search.get_dict()
if res.get("trending_videos"):
for video in res["trending_videos"]:
if video.get("music"):
results["trending_audio"] = {"name": video["music"].get("title"), "coverArtUrl": video["music"].get("cover_thumb")}
break
except Exception as e: print(f" - ❌ SerpApi failed: {e}")
# 3. Platform Shifts from DB
results["platform_shifts"] = get_platform_shifts()
return results
def predict_niche_trends() -> dict:
"""Uses the REAL trained ML pipeline to predict future interest."""
print("\nπŸš€ [Thunderbird Engine] Using REAL ML pipeline for predictions...")
try:
pipeline = joblib.load(MODEL_PATH)
encoder = pipeline.named_steps['preprocessor'].named_transformers_['cat']
all_niches = [cat.replace('niche_', '') for cat in encoder.get_feature_names_out(['niche'])]
future_dates = pd.to_datetime(pd.date_range(start=datetime.now(), periods=12, freq='ME'))
predictions = {}
for niche in all_niches:
future_df = pd.DataFrame({'month_of_year': future_dates.month, 'niche': [niche] * 12, 'trend_score': 50})
predicted_values = pipeline.predict(future_df[['niche', 'trend_score', 'month_of_year']])
predictions[niche] = [{"date": dt.strftime('%Y-%m'), "value": max(0, int(val))} for dt, val in zip(future_dates, predicted_values)]
return {"trend_predictions": predictions}
except Exception as e:
print(f" - ❌ REAL Prediction Failed: {e}. Chart will be empty.")
return {"trend_predictions": {}}
def decode_market_trend(topic: str, llm_instance) -> Dict[str, str]:
"""
Decodes a keyword into a strategy using a simplified prompt for small models.
"""
print(f"🧠 [Thunderbird] Decoding Trend with SIMPLE prompt: {topic}")
offline_response = {"summary": "AI Analyst is offline.", "impact": "Please try again.", "strategy": "System is rebooting."}
if not llm_instance: return offline_response
# === THE FINAL, SIMPLEST PROMPT ===
# We ask for a simple paragraph and let Python do the formatting.
prompt = f"""[INST]
You are an expert marketing strategist.
Analyze this trend: "{topic}".
Write a short, three-sentence briefing about it.
[/INST]"""
try:
response = llm_instance(prompt, max_tokens=150, temperature=0.7, echo=False)
full_text = response['choices'][0]['text'].strip()
# Split the paragraph into three sentences
sentences = re.split(r'(?<=\.)\s+', full_text)
# Assign each sentence to a key, with fallbacks
summary = sentences[0] if len(sentences) > 0 else "Analysis in progress."
impact = sentences[1] if len(sentences) > 1 else "Impact is being evaluated."
strategy = sentences[2] if len(sentences) > 2 else "Awaiting actionable strategy."
return {
"summary": summary,
"impact": impact,
"strategy": strategy
}
# === END OF FIX ===
except Exception as e:
print(f" - ❌ LLM Error: {e}")
return offline_response