aiupdates / app.py
Nhughes09
V8.9: Final Import & Reliability Fix
8b585ba
import os
import re
import time
import json
import logging
import subprocess
import signal
from datetime import datetime, timedelta
from typing import List, Dict, Any
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from huggingface_hub import HfApi # Still needed for dataset persistence if TOKEN exists
import uvicorn
try:
import ollama
except ImportError:
ollama = None
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ai-impact-tracker")
app = FastAPI()
# Persistence Configuration (HF Datasets)
DATASET_REPO_ID = "ndwdgda/ai-impact-history" # Hardcoded as requested
HISTORY_FILENAME = "history.json"
LOCAL_HISTORY_FILE = "history.json"
# Templates
templates = Jinja2Templates(directory="templates")
# --- Persistence Logic (HF Datasets) ---
def load_history() -> List[Dict[str, Any]]:
"""Download and load history from HF Dataset."""
token = os.environ.get("HF_TOKEN")
if not token:
logger.warning("HF_TOKEN missing. Using local/empty history.")
return _load_local_history()
api = HfApi(token=token)
try:
# Download file to local path
file_path = api.hf_hub_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
filename=HISTORY_FILENAME,
local_dir=".",
local_dir_use_symlinks=False
)
logger.info(f"Downloaded history from {DATASET_REPO_ID}")
return _load_local_history()
except Exception as e:
logger.warning(f"Could not download history (might be first run): {e}")
return []
def _load_local_history() -> List[Dict[str, Any]]:
"""Helper to load local JSON file."""
if os.path.exists(LOCAL_HISTORY_FILE):
try:
with open(LOCAL_HISTORY_FILE, "r") as f:
history = json.load(f)
if isinstance(history, list):
return history
return [history] if isinstance(history, dict) else []
except Exception as e:
logger.error(f"Error reading local history: {e}")
return []
def save_update(data: Dict[str, Any]):
"""Save update to local file AND upload to HF Dataset."""
history = load_history()
# Prepend new update
history.insert(0, data)
# limit to 50
history = history[:50]
# Save locally first
try:
with open(LOCAL_HISTORY_FILE, "w") as f:
json.dump(history, f, indent=2)
except Exception as e:
logger.error(f"Failed to save local history: {e}")
return
# Upload to HF
token = os.environ.get("HF_TOKEN")
if token:
try:
api = HfApi(token=token)
# Ensure repo exists (create private if not)
try:
api.create_repo(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
private=True,
exist_ok=True
)
except Exception as e:
logger.warning(f"Repo creation check failed (might already exist or permission issue): {e}")
api.upload_file(
path_or_fileobj=LOCAL_HISTORY_FILE,
path_in_repo=HISTORY_FILENAME,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Auto-update: {data.get('date')}"
)
logger.info(f"Uploaded history to {DATASET_REPO_ID}")
except Exception as e:
logger.error(f"Failed to upload history to HF: {e}")
else:
logger.warning("HF_TOKEN missing. Skipping upload.")
def get_latest_data() -> Dict[str, Any]:
"""Get most recent data."""
defaults = {
"date": datetime.now().strftime("%Y-%m-%d"),
"ai_layoffs": [],
"ai_case_studies": [],
"strategic_intel": [],
"robot_production": [],
"robot_news": [],
"ai_advancements": [],
"ubi_updates": [],
"labor_signals": [],
"displaced_jobs_current": "0.0M",
"displaced_jobs_2026_projection": "0.0M",
"unemployment_stats": {
"u3_official": "4.2%",
"real_jobless": "10.5%",
"underemployed": "11.8%",
"malemployed_grads": "4.2M"
},
"treasury_signal": "RECESSIONARY (-75k net)",
"stats": {
"layoffs_2025": "1.2M",
"layoffs_2026": "15.4k",
"net_us_2026": "-45,000",
"monthly_trend": [
{"month": "Jan 25", "count": 8000}, {"month": "Feb 25", "count": 9500},
{"month": "Mar 25", "count": 12000}, {"month": "Apr 25", "count": 11000},
{"month": "May 25", "count": 14000}, {"month": "Jun 25", "count": 13500},
{"month": "Jul 25", "count": 15000}, {"month": "Aug 25", "count": 16000},
{"month": "Sep 25", "count": 18000}, {"month": "Oct 25", "count": 21000},
{"month": "Nov 25", "count": 25000}, {"month": "Dec 25", "count": 32000}
],
"ai_live": {
"status": "IDLE",
"model": "Local/Cloud Hybrid",
"prompt_preview": "System initialized.",
"raw_response": "Neural Core online."
}
},
"ai_live": {
"status": "IDLE",
"model": "Local/Cloud Hybrid",
"prompt_preview": "System initialized.",
"raw_response": "Neural Core online."
}
}
# Deep default enforcement for nested structures
def deep_verify(d, defaults):
for k, v in defaults.items():
if k not in d:
d[k] = v
elif isinstance(v, dict) and isinstance(d[k], dict):
deep_verify(d[k], v)
elif k == "monthly_trend" and (not isinstance(d[k], list) or len(d[k]) == 0):
d[k] = v # Force revert if corrupted
history = load_history()
if history:
latest = history[0]
deep_verify(latest, defaults)
return latest
return defaults
# --- AI Agent Logic ---
import feedparser
from duckduckgo_search import DDGS
# --- Context Fetching (The "Eyes") ---
def fetch_realtime_context():
"""Gather real-time text from RSS and Web Search for the agents."""
logger.info("Fetching real-time context...")
context = {
"economy": "",
"robotics": "",
"tech": ""
}
counts = {
"economy": 0, "robotics": 0, "tech": 0
}
# 1. RSS Feeds (Fast, reliable news)
# Using multiple sources to guarantee data availability
feeds = {
"economy": [
"https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10000350", # CNBC Economy
"http://feeds.marketwatch.com/marketwatch/topstories/" # MarketWatch Top
],
"tech": [
"https://techcrunch.com/category/artificial-intelligence/feed/",
"https://www.theverge.com/rss/artificial-intelligence/index.xml",
"https://www.wired.com/feed/tag/ai/latest/rss"
],
"robotics": [
"https://www.therobotreport.com/feed/",
"https://www.sciencedaily.com/rss/computers_math/robotics.xml"
]
}
for category, urls in feeds.items():
text_buffer = []
for url in urls:
try:
f = feedparser.parse(url)
# Increased to 15 to ensure we get volume for the 20-item quota
for entry in f.entries[:15]:
text_buffer.append(f"- {entry.title} ({entry.get('published', '')})")
except Exception as e:
logger.error(f"RSS Fail {url}: {e}")
count = len(text_buffer)
counts[category] += count
context[category] += "\n".join(text_buffer)
# 2. Web Search (Specific Queries)
# Using DDGS for targeted info that RSS misses
try:
ddgs = DDGS()
# Helper for robust search
def robust_search(query, key, fallback_query=None):
time.sleep(2) # Increased delay for heavier load
try:
# Increased max_results to ensure 20+ items can be found
results = ddgs.text(query, max_results=12)
if len(results) < 5 and fallback_query:
time.sleep(1)
results += ddgs.text(fallback_query, max_results=10)
if results:
count = len(results)
counts[key] += count
context[key] += "\n\nSEARCH RESULTS:\n" + "\n".join([f"- {r['title']}: {r['body']}" for r in results])
except Exception as ex:
logger.error(f"Search '{query}' failed: {ex}")
# Auditor Searches (Economy)
robust_search("US economy jobs report tax receipts 2025", "economy", "US unemployment rate real numbers 2026")
robust_search("US consumer spending trends December 2025", "economy")
robust_search("Daily Treasury Statement tax receipts widtholding trend 2025", "economy") # Specific for Labor Signals
robust_search("WARN act notices California Texas New York December 2025", "economy") # Specific for Labor Signals
# Robotics Searches
robust_search("Tesla Optimus robot production numbers 2025 2026", "robotics", "Humanoid robot deployments 2025")
robust_search("Figure 02 robot BMW manufacturing details", "robotics")
robust_search("Agility Robotics Digit deployment Amazon GXO", "robotics")
# Tech Searches (Layoffs & AI) -- Heavy Load for 20+ items
robust_search("List of all tech layoffs December 2025 January 2026", "tech", "Tech company layoffs tracker 2026")
robust_search("Google Amazon Microsoft Meta layoffs 2026", "tech")
robust_search("New AI model releases December 2025 January 2026", "tech", "OpenAI Anthropic Google DeepMind news 2026")
robust_search("Companies replacing humans with AI case studies 2026", "tech")
robust_search("Universal Basic Income pilot programs 2025 2026 updates", "tech")
robust_search("Guaranteed Income pilots California New York 2026", "tech") # Specific for UBI
except Exception as e:
logger.error(f"Search Fail: {e}")
logger.info(f"Context Counts: {counts}") # DEBUG LOG
return context, counts
# --- AI Agent Logic (3-Stage Specialist Swarm) ---
# AGENT 1: THE STRATEGIST (Macro, Audit & Prediction)
STRATEGIST_PROMPT = """
You are the Chief Strategist for the AI Unemployment Tracker.
Date: {current_date}. Focus: Overall Macro Impact.
## DATA CONTEXT:
{context}
TASK: Combine Macro Audit and Future Predictions into a single JSON.
1. **Labor Audit**: Calculate U-3 vs Real Jobless (~10.5%).
2. **Macro Stats**:
- Total Layoffs 2025 (Full Year Est): 1.2M.
- Total Layoffs 2026 (YTD): Calculate from context.
- Net US Employment 2026: (Created - Displaced).
3. **Monthly Trend**: Provide a 13-month array of Global Layoffs starting Jan 2025 to Jan 2026.
- 2025 Baseline: Jan-Dec scaling from 10k to 35k.
- 2026 Data: Use context for Jan 26.
Output ONLY JSON:
{{
"stats": {{
"layoffs_2025": "1.2M",
"layoffs_2026": "45k",
"net_us_2026": "-12,000",
"monthly_trend": [
{{"month": "Jan 25", "count": 10000}}, ..., {{"month": "Jan 26", "count": 15000}}
]
}},
"unemployment_stats": {{
"u3_official": "4.2%",
"real_jobless": "10.5%",
"underemployed": "11.8%",
"analysis": "..."
}},
"labor_signals": [
{{"signal": "...", "date": "..."}}
],
"displaced_jobs_current": "2.4M",
"displaced_jobs_2026_projection": "4.2M"
}}
"""
# AGENT 2: THE LIBRARIAN (Feeds, Robotics & Research)
LIBRARIAN_PROMPT = """
You are the Lead Librarian for the AI Unemployment Tracker.
Date: {current_date}. Focus: Event Discovery.
## DATA CONTEXT:
{context}
TASK: Discover and summarize all specific events.
1. **AI Layoffs**: 20+ items (Company, Role, Jobs Lost, Date).
2. **Robot News**: 20+ items (Hardware, Units, Impact).
3. **Case Studies**: 10+ items (Company, Tool, Savings).
4. **Frontier/UBI**: 20+ items (Models, Policy, Pilots).
Output ONLY JSON:
{{
"ai_layoffs": [...],
"robot_news": [...],
"robot_production": [...],
"ai_case_studies": [...],
"ai_advancements": [...],
"ubi_updates": [...]
}}
"""
# AGENT 3: THE HISTORIAN (Long-Term Memory & Trend Chaining)
HISTORIAN_PROMPT = """
You are the Lead Historian for the AI Unemployment Tracker.
Date: {current_date}.
## HISTORICAL MEMORY (Summaries of last 5 cycles):
{history}
## CURRENT RESEARCH:
{current_research}
TASK: Synthesize if the new data indicates an ACCELERATION or DECELERATION of AI displacement compared to recent memory.
1. Return a concise analysis.
2. Generate a 'cycle_summary' (1 paragraph) that will be stored in memory for the next cycle.
Output ONLY JSON:
{{
"trend_analysis": "...",
"cycle_summary": "..."
}}
"""
# Prompts consolidated into Strategist and Librarian.
# Global state for live logs to avoid disk writes for every step
LIVE_LOG_STATE = {
"status": "IDLE",
"model": "meta-llama/Llama-3.1-8B-Instruct",
"prompt_preview": "Waiting for next cycle...",
"raw_response": "Ready."
}
def update_live_state(status, log_append=None):
"""Update the global live log state."""
global LIVE_LOG_STATE
LIVE_LOG_STATE["status"] = status
if log_append:
LIVE_LOG_STATE["raw_response"] += log_append
@app.get("/api/live_log")
async def get_live_log():
"""Return the current live log state."""
return LIVE_LOG_STATE
def run_ai_research():
"""Run 3-Stage Specialist Swarm with Context Injection."""
global LIVE_LOG_STATE
# RESET LOG
LIVE_LOG_STATE = {
"status": "INITIALIZING SWARM...",
"model": DEFAULT_MODEL,
"prompt_preview": "Gathering RSS feeds and Search contexts...",
"raw_response": "--- SYSTEM KERNEL START ---\n"
}
# Step 0: Gather Real-World Context
update_live_state("FETCHING CONTEXT (RSS/Search)...")
try:
context_data, counts = fetch_realtime_context()
context_preview = (
f"--- CONTEXT GATHERED ---\n"
f"ECONOMY: {len(context_data['economy'])} chars ({counts.get('economy', 0)} items)\n"
f"ROBOTICS: {len(context_data['robotics'])} chars ({counts.get('robotics', 0)} items)\n"
f"TECH: {len(context_data['tech'])} chars ({counts.get('tech', 0)} items)\n"
f"Sources: CNBC, MarketWatch, TechCrunch, Wired, TheVerge, RobotReport, DuckDuckGo\n\n"
)
LIVE_LOG_STATE["prompt_preview"] = f"Injected Context:\n{context_data['economy'][:300]}...\n..."
update_live_state("CONTEXT LOADED", context_preview)
except Exception as e:
logger.error(f"Context fail: {e}")
today = datetime.now()
today_str = today.strftime("%Y-%m-%d")
final_data = get_latest_data()
# helper for safe JSON extract
def safe_extract(content):
if not content: return {}
try:
# Look for last JSON block (sometimes AI chats before/after)
matches = re.findall(r'\{[\s\S]*\}', content)
if matches:
# Try to parse the largest block
best_block = max(matches, key=len)
return json.loads(best_block)
except:
pass
return {}
OLLAMA_URL = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
DEFAULT_MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5:7b")
def call_ollama(prompt, model=None):
if not ollama:
update_live_state("ENGINE MISSING", "\n[ERROR] Ollama library not found locally.\n")
return None
target_model = model or DEFAULT_MODEL
try:
update_live_state("LOCAL GPU PROCESSING...")
# Create a client instance with the specific host
client_local = ollama.Client(host=OLLAMA_URL)
response = client_local.chat(model=target_model, messages=[
{'role': 'user', 'content': prompt},
])
LIVE_LOG_STATE["model"] = f"LOCAL:{target_model}"
return response['message']['content']
except Exception as e:
err_msg = str(e)
logger.warning(f"Ollama Fail at {OLLAMA_URL}: {err_msg}")
if "Connection refused" in err_msg or "failed to connect" in err_msg.lower():
update_live_state("LOCAL GPU OFFLINE", f"\n[BRIDGE] Could not reach {OLLAMA_URL}. Ensure Ollama is running on your Mac.\n")
else:
update_live_state("LOCAL ERROR", f"\n[OLLAMA ERROR] {err_msg[:60]}...\n")
return None
# --- STAGE 1: THE STRATEGIST (Macro) ---
update_live_state("STAGE 1: STRATEGIST (Macro & Prediction)...")
try:
macro_context = context_data["economy"] + "\n" + context_data["tech"]
p1 = STRATEGIST_PROMPT.format(current_date=today_str, context=macro_context)
c1 = call_ollama(p1)
if not c1: raise Exception("Local Engine Failed Stage 1")
j1 = safe_extract(c1)
if j1:
# Safe Stats Merge
if "stats" in j1 and isinstance(j1["stats"], dict):
final_data["stats"].update(j1["stats"])
# Flatten root keys
for k in ["unemployment_stats", "labor_signals", "displaced_jobs_current", "displaced_jobs_2026_projection"]:
if k in j1:
final_data[k] = j1[k]
save_update(final_data)
update_live_state("STRATEGIST COMPLETE", f"\n--- STRATEGIST OUTPUT ---\n{c1[:300]}...\n")
except Exception as e:
logger.error(f"Strategist Failed: {e}")
update_live_state("STRATEGIST FAILED", f"\nERROR: {e}\n")
# --- STAGE 2: THE LIBRARIAN (Research) ---
update_live_state("STAGE 2: LIBRARIAN (Events discover)...")
try:
lib_context = context_data["tech"] + "\n" + context_data["robotics"]
p2 = LIBRARIAN_PROMPT.format(current_date=today_str, context=lib_context)
c2 = call_ollama(p2)
if not c2: raise Exception("Local Engine Failed Stage 2")
j2 = safe_extract(c2)
if j2:
final_data.update(j2)
save_update(final_data)
update_live_state("LIBRARIAN COMPLETE", f"\n--- LIBRARIAN OUTPUT ---\n{c2[:300]}...\n")
except Exception as e:
logger.error(f"Librarian Failed: {e}")
update_live_state("LIBRARIAN FAILED", f"\nERROR: {e}\n")
# --- STAGE 3: THE HISTORIAN (Memory & Synthesis) ---
update_live_state("STAGE 3: HISTORIAN (Memory Synthesis)...")
try:
history = load_history()
past_summaries = [h.get("cycle_summary", "") for h in history[:5] if h.get("cycle_summary")]
history_text = "\n".join([f"- {s}" for s in past_summaries]) if past_summaries else "No prior memories."
current_research = f"Strategist: {c1[:500]}...\nLibrarian: {c2[:500]}..."
p3 = HISTORIAN_PROMPT.format(current_date=today_str, history=history_text, current_research=current_research)
c3 = call_ollama(p3)
if not c3: raise Exception("Local Engine Failed Stage 3")
j3 = safe_extract(c3)
if j3:
final_data["trend_analysis"] = j3.get("trend_analysis", "Market parity.")
final_data["cycle_summary"] = j3.get("cycle_summary", "Research cycle concluded.")
save_update(final_data)
update_live_state("HISTORIAN COMPLETE", f"\n--- HISTORIAN OUTPUT ---\n{c3[:300]}...\n")
except Exception as e:
logger.error(f"Historian Failed: {e}")
final_data["cycle_summary"] = "AI Swarm error. Operating on raw context counts."
LIVE_LOG_STATE["status"] = "SWARM COMPLETE"
LIVE_LOG_STATE["raw_response"] += "\n--- PROCESS FINISHED ---\n"
final_data["ai_live"] = LIVE_LOG_STATE
final_data["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
save_update(final_data)
update_live_state("IDLE (Next Cycle in 30m)")
return final_data
# --- Routes ---
import threading
# ... (imports)
# Global state for next update time
NEXT_UPDATE_TIME = None
def background_updater():
"""Run update every 30 minutes."""
global NEXT_UPDATE_TIME
while True:
try:
logger.info("Starting scheduled 30-minute update...")
run_ai_research()
# Set next update time
NEXT_UPDATE_TIME = datetime.now() + timedelta(minutes=30)
logger.info(f"Update complete. Next update at {NEXT_UPDATE_TIME}")
# Sleep 30 minutes
time.sleep(1800)
except Exception as e:
logger.error(f"Background update failed: {e}")
time.sleep(60) # Retry in 1 min on failure
@app.on_event("startup")
def startup_event():
# Load history
load_history()
# Check Ollama status
if ollama:
try:
# Quick ping to check if local server is up
logger.info("Pinging local Ollama server...")
resp = ollama.list()
# Handle different library versions (dict vs object)
models_list = resp.models if hasattr(resp, 'models') else resp.get('models', [])
model_names = []
for m in models_list:
name = getattr(m, 'model', getattr(m, 'name', None)) or str(m)
model_names.append(name)
logger.info(f"Ollama is READY on local M1 GPU. Models: {model_names}")
update_live_state("LOCAL ENGINE READY", f"\n--- NEURAL CORE: OLLAMA READY (M1 GPU) ---\nINSTALLED MODELS: {', '.join(model_names)}\n")
except Exception as e:
logger.warning(f"Ollama is not running locally. Falling back to cloud. Error: {e}")
update_live_state("CLOUD FALLBACK MODE", "\n--- NEURAL CORE: OLLAMA OFFLINE. CLOUD ACTIVE. ---\n")
else:
logger.info("Ollama library not found. Using Cloud-only mode.")
update_live_state("CLOUD MODE", "\n--- NEURAL CORE: CLOUD-ONLY MODE ACTIVE. ---\n")
# Start background thread
thread = threading.Thread(target=background_updater, daemon=True)
thread.start()
logger.info("Background 30-min updater started.")
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
data = get_latest_data()
# Calculate time remaining for UI
next_update_timestamp = NEXT_UPDATE_TIME.timestamp() if NEXT_UPDATE_TIME else (datetime.now() + timedelta(minutes=1)).timestamp()
return templates.TemplateResponse("index.html", {
"request": request,
"data": data,
"next_update": next_update_timestamp
})
@app.post("/update")
async def trigger_update():
new_data = run_ai_research()
if new_data:
return {"status": "success"}
return {"status": "error"}
def force_clear_port(port):
"""Detect and kill any process using the target port (macOS/Unix)."""
try:
# Find the PID using the port
result = subprocess.check_output(["lsof", "-ti", f":{port}"])
pids = result.decode().strip().split('\n')
for pid in pids:
if pid:
logger.info(f"[SINGULARITY] Terminating zombie process {pid} on port {port}...")
os.kill(int(pid), signal.SIGTERM)
time.sleep(1) # Wait for port to clear
except subprocess.CalledProcessError:
# No process found on that port
pass
except Exception as e:
logger.error(f"[SINGULARITY] Cleanup failed: {e}")
if __name__ == "__main__":
# 100% LOCAL SINGULARITY MODE
port = int(os.environ.get("PORT", 7860))
logger.info(f"--- SINGULARITY STARTUP (Port {port}) ---")
# 1. Kill any previous instances immediately
force_clear_port(port)
# 2. Launch fresh
try:
uvicorn.run(app, host="0.0.0.0", port=port)
except Exception as e:
logger.error(f"FATAL: Could not bind to port {port}: {e}")