import os import re import time import json import logging import subprocess import signal from datetime import datetime, timedelta from typing import List, Dict, Any from fastapi import FastAPI, Request from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse from huggingface_hub import HfApi # Still needed for dataset persistence if TOKEN exists import uvicorn try: import ollama except ImportError: ollama = None # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ai-impact-tracker") app = FastAPI() # Persistence Configuration (HF Datasets) DATASET_REPO_ID = "ndwdgda/ai-impact-history" # Hardcoded as requested HISTORY_FILENAME = "history.json" LOCAL_HISTORY_FILE = "history.json" # Templates templates = Jinja2Templates(directory="templates") # --- Persistence Logic (HF Datasets) --- def load_history() -> List[Dict[str, Any]]: """Download and load history from HF Dataset.""" token = os.environ.get("HF_TOKEN") if not token: logger.warning("HF_TOKEN missing. Using local/empty history.") return _load_local_history() api = HfApi(token=token) try: # Download file to local path file_path = api.hf_hub_download( repo_id=DATASET_REPO_ID, repo_type="dataset", filename=HISTORY_FILENAME, local_dir=".", local_dir_use_symlinks=False ) logger.info(f"Downloaded history from {DATASET_REPO_ID}") return _load_local_history() except Exception as e: logger.warning(f"Could not download history (might be first run): {e}") return [] def _load_local_history() -> List[Dict[str, Any]]: """Helper to load local JSON file.""" if os.path.exists(LOCAL_HISTORY_FILE): try: with open(LOCAL_HISTORY_FILE, "r") as f: history = json.load(f) if isinstance(history, list): return history return [history] if isinstance(history, dict) else [] except Exception as e: logger.error(f"Error reading local history: {e}") return [] def save_update(data: Dict[str, Any]): """Save update to local file AND upload to HF Dataset.""" history = load_history() # Prepend new update history.insert(0, data) # limit to 50 history = history[:50] # Save locally first try: with open(LOCAL_HISTORY_FILE, "w") as f: json.dump(history, f, indent=2) except Exception as e: logger.error(f"Failed to save local history: {e}") return # Upload to HF token = os.environ.get("HF_TOKEN") if token: try: api = HfApi(token=token) # Ensure repo exists (create private if not) try: api.create_repo( repo_id=DATASET_REPO_ID, repo_type="dataset", private=True, exist_ok=True ) except Exception as e: logger.warning(f"Repo creation check failed (might already exist or permission issue): {e}") api.upload_file( path_or_fileobj=LOCAL_HISTORY_FILE, path_in_repo=HISTORY_FILENAME, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"Auto-update: {data.get('date')}" ) logger.info(f"Uploaded history to {DATASET_REPO_ID}") except Exception as e: logger.error(f"Failed to upload history to HF: {e}") else: logger.warning("HF_TOKEN missing. Skipping upload.") def get_latest_data() -> Dict[str, Any]: """Get most recent data.""" defaults = { "date": datetime.now().strftime("%Y-%m-%d"), "ai_layoffs": [], "ai_case_studies": [], "strategic_intel": [], "robot_production": [], "robot_news": [], "ai_advancements": [], "ubi_updates": [], "labor_signals": [], "displaced_jobs_current": "0.0M", "displaced_jobs_2026_projection": "0.0M", "unemployment_stats": { "u3_official": "4.2%", "real_jobless": "10.5%", "underemployed": "11.8%", "malemployed_grads": "4.2M" }, "treasury_signal": "RECESSIONARY (-75k net)", "stats": { "layoffs_2025": "1.2M", "layoffs_2026": "15.4k", "net_us_2026": "-45,000", "monthly_trend": [ {"month": "Jan 25", "count": 8000}, {"month": "Feb 25", "count": 9500}, {"month": "Mar 25", "count": 12000}, {"month": "Apr 25", "count": 11000}, {"month": "May 25", "count": 14000}, {"month": "Jun 25", "count": 13500}, {"month": "Jul 25", "count": 15000}, {"month": "Aug 25", "count": 16000}, {"month": "Sep 25", "count": 18000}, {"month": "Oct 25", "count": 21000}, {"month": "Nov 25", "count": 25000}, {"month": "Dec 25", "count": 32000} ], "ai_live": { "status": "IDLE", "model": "Local/Cloud Hybrid", "prompt_preview": "System initialized.", "raw_response": "Neural Core online." } }, "ai_live": { "status": "IDLE", "model": "Local/Cloud Hybrid", "prompt_preview": "System initialized.", "raw_response": "Neural Core online." } } # Deep default enforcement for nested structures def deep_verify(d, defaults): for k, v in defaults.items(): if k not in d: d[k] = v elif isinstance(v, dict) and isinstance(d[k], dict): deep_verify(d[k], v) elif k == "monthly_trend" and (not isinstance(d[k], list) or len(d[k]) == 0): d[k] = v # Force revert if corrupted history = load_history() if history: latest = history[0] deep_verify(latest, defaults) return latest return defaults # --- AI Agent Logic --- import feedparser from duckduckgo_search import DDGS # --- Context Fetching (The "Eyes") --- def fetch_realtime_context(): """Gather real-time text from RSS and Web Search for the agents.""" logger.info("Fetching real-time context...") context = { "economy": "", "robotics": "", "tech": "" } counts = { "economy": 0, "robotics": 0, "tech": 0 } # 1. RSS Feeds (Fast, reliable news) # Using multiple sources to guarantee data availability feeds = { "economy": [ "https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10000350", # CNBC Economy "http://feeds.marketwatch.com/marketwatch/topstories/" # MarketWatch Top ], "tech": [ "https://techcrunch.com/category/artificial-intelligence/feed/", "https://www.theverge.com/rss/artificial-intelligence/index.xml", "https://www.wired.com/feed/tag/ai/latest/rss" ], "robotics": [ "https://www.therobotreport.com/feed/", "https://www.sciencedaily.com/rss/computers_math/robotics.xml" ] } for category, urls in feeds.items(): text_buffer = [] for url in urls: try: f = feedparser.parse(url) # Increased to 15 to ensure we get volume for the 20-item quota for entry in f.entries[:15]: text_buffer.append(f"- {entry.title} ({entry.get('published', '')})") except Exception as e: logger.error(f"RSS Fail {url}: {e}") count = len(text_buffer) counts[category] += count context[category] += "\n".join(text_buffer) # 2. Web Search (Specific Queries) # Using DDGS for targeted info that RSS misses try: ddgs = DDGS() # Helper for robust search def robust_search(query, key, fallback_query=None): time.sleep(2) # Increased delay for heavier load try: # Increased max_results to ensure 20+ items can be found results = ddgs.text(query, max_results=12) if len(results) < 5 and fallback_query: time.sleep(1) results += ddgs.text(fallback_query, max_results=10) if results: count = len(results) counts[key] += count context[key] += "\n\nSEARCH RESULTS:\n" + "\n".join([f"- {r['title']}: {r['body']}" for r in results]) except Exception as ex: logger.error(f"Search '{query}' failed: {ex}") # Auditor Searches (Economy) robust_search("US economy jobs report tax receipts 2025", "economy", "US unemployment rate real numbers 2026") robust_search("US consumer spending trends December 2025", "economy") robust_search("Daily Treasury Statement tax receipts widtholding trend 2025", "economy") # Specific for Labor Signals robust_search("WARN act notices California Texas New York December 2025", "economy") # Specific for Labor Signals # Robotics Searches robust_search("Tesla Optimus robot production numbers 2025 2026", "robotics", "Humanoid robot deployments 2025") robust_search("Figure 02 robot BMW manufacturing details", "robotics") robust_search("Agility Robotics Digit deployment Amazon GXO", "robotics") # Tech Searches (Layoffs & AI) -- Heavy Load for 20+ items robust_search("List of all tech layoffs December 2025 January 2026", "tech", "Tech company layoffs tracker 2026") robust_search("Google Amazon Microsoft Meta layoffs 2026", "tech") robust_search("New AI model releases December 2025 January 2026", "tech", "OpenAI Anthropic Google DeepMind news 2026") robust_search("Companies replacing humans with AI case studies 2026", "tech") robust_search("Universal Basic Income pilot programs 2025 2026 updates", "tech") robust_search("Guaranteed Income pilots California New York 2026", "tech") # Specific for UBI except Exception as e: logger.error(f"Search Fail: {e}") logger.info(f"Context Counts: {counts}") # DEBUG LOG return context, counts # --- AI Agent Logic (3-Stage Specialist Swarm) --- # AGENT 1: THE STRATEGIST (Macro, Audit & Prediction) STRATEGIST_PROMPT = """ You are the Chief Strategist for the AI Unemployment Tracker. Date: {current_date}. Focus: Overall Macro Impact. ## DATA CONTEXT: {context} TASK: Combine Macro Audit and Future Predictions into a single JSON. 1. **Labor Audit**: Calculate U-3 vs Real Jobless (~10.5%). 2. **Macro Stats**: - Total Layoffs 2025 (Full Year Est): 1.2M. - Total Layoffs 2026 (YTD): Calculate from context. - Net US Employment 2026: (Created - Displaced). 3. **Monthly Trend**: Provide a 13-month array of Global Layoffs starting Jan 2025 to Jan 2026. - 2025 Baseline: Jan-Dec scaling from 10k to 35k. - 2026 Data: Use context for Jan 26. Output ONLY JSON: {{ "stats": {{ "layoffs_2025": "1.2M", "layoffs_2026": "45k", "net_us_2026": "-12,000", "monthly_trend": [ {{"month": "Jan 25", "count": 10000}}, ..., {{"month": "Jan 26", "count": 15000}} ] }}, "unemployment_stats": {{ "u3_official": "4.2%", "real_jobless": "10.5%", "underemployed": "11.8%", "analysis": "..." }}, "labor_signals": [ {{"signal": "...", "date": "..."}} ], "displaced_jobs_current": "2.4M", "displaced_jobs_2026_projection": "4.2M" }} """ # AGENT 2: THE LIBRARIAN (Feeds, Robotics & Research) LIBRARIAN_PROMPT = """ You are the Lead Librarian for the AI Unemployment Tracker. Date: {current_date}. Focus: Event Discovery. ## DATA CONTEXT: {context} TASK: Discover and summarize all specific events. 1. **AI Layoffs**: 20+ items (Company, Role, Jobs Lost, Date). 2. **Robot News**: 20+ items (Hardware, Units, Impact). 3. **Case Studies**: 10+ items (Company, Tool, Savings). 4. **Frontier/UBI**: 20+ items (Models, Policy, Pilots). Output ONLY JSON: {{ "ai_layoffs": [...], "robot_news": [...], "robot_production": [...], "ai_case_studies": [...], "ai_advancements": [...], "ubi_updates": [...] }} """ # AGENT 3: THE HISTORIAN (Long-Term Memory & Trend Chaining) HISTORIAN_PROMPT = """ You are the Lead Historian for the AI Unemployment Tracker. Date: {current_date}. ## HISTORICAL MEMORY (Summaries of last 5 cycles): {history} ## CURRENT RESEARCH: {current_research} TASK: Synthesize if the new data indicates an ACCELERATION or DECELERATION of AI displacement compared to recent memory. 1. Return a concise analysis. 2. Generate a 'cycle_summary' (1 paragraph) that will be stored in memory for the next cycle. Output ONLY JSON: {{ "trend_analysis": "...", "cycle_summary": "..." }} """ # Prompts consolidated into Strategist and Librarian. # Global state for live logs to avoid disk writes for every step LIVE_LOG_STATE = { "status": "IDLE", "model": "meta-llama/Llama-3.1-8B-Instruct", "prompt_preview": "Waiting for next cycle...", "raw_response": "Ready." } def update_live_state(status, log_append=None): """Update the global live log state.""" global LIVE_LOG_STATE LIVE_LOG_STATE["status"] = status if log_append: LIVE_LOG_STATE["raw_response"] += log_append @app.get("/api/live_log") async def get_live_log(): """Return the current live log state.""" return LIVE_LOG_STATE def run_ai_research(): """Run 3-Stage Specialist Swarm with Context Injection.""" global LIVE_LOG_STATE # RESET LOG LIVE_LOG_STATE = { "status": "INITIALIZING SWARM...", "model": DEFAULT_MODEL, "prompt_preview": "Gathering RSS feeds and Search contexts...", "raw_response": "--- SYSTEM KERNEL START ---\n" } # Step 0: Gather Real-World Context update_live_state("FETCHING CONTEXT (RSS/Search)...") try: context_data, counts = fetch_realtime_context() context_preview = ( f"--- CONTEXT GATHERED ---\n" f"ECONOMY: {len(context_data['economy'])} chars ({counts.get('economy', 0)} items)\n" f"ROBOTICS: {len(context_data['robotics'])} chars ({counts.get('robotics', 0)} items)\n" f"TECH: {len(context_data['tech'])} chars ({counts.get('tech', 0)} items)\n" f"Sources: CNBC, MarketWatch, TechCrunch, Wired, TheVerge, RobotReport, DuckDuckGo\n\n" ) LIVE_LOG_STATE["prompt_preview"] = f"Injected Context:\n{context_data['economy'][:300]}...\n..." update_live_state("CONTEXT LOADED", context_preview) except Exception as e: logger.error(f"Context fail: {e}") today = datetime.now() today_str = today.strftime("%Y-%m-%d") final_data = get_latest_data() # helper for safe JSON extract def safe_extract(content): if not content: return {} try: # Look for last JSON block (sometimes AI chats before/after) matches = re.findall(r'\{[\s\S]*\}', content) if matches: # Try to parse the largest block best_block = max(matches, key=len) return json.loads(best_block) except: pass return {} OLLAMA_URL = os.environ.get("OLLAMA_HOST", "http://localhost:11434") DEFAULT_MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5:7b") def call_ollama(prompt, model=None): if not ollama: update_live_state("ENGINE MISSING", "\n[ERROR] Ollama library not found locally.\n") return None target_model = model or DEFAULT_MODEL try: update_live_state("LOCAL GPU PROCESSING...") # Create a client instance with the specific host client_local = ollama.Client(host=OLLAMA_URL) response = client_local.chat(model=target_model, messages=[ {'role': 'user', 'content': prompt}, ]) LIVE_LOG_STATE["model"] = f"LOCAL:{target_model}" return response['message']['content'] except Exception as e: err_msg = str(e) logger.warning(f"Ollama Fail at {OLLAMA_URL}: {err_msg}") if "Connection refused" in err_msg or "failed to connect" in err_msg.lower(): update_live_state("LOCAL GPU OFFLINE", f"\n[BRIDGE] Could not reach {OLLAMA_URL}. Ensure Ollama is running on your Mac.\n") else: update_live_state("LOCAL ERROR", f"\n[OLLAMA ERROR] {err_msg[:60]}...\n") return None # --- STAGE 1: THE STRATEGIST (Macro) --- update_live_state("STAGE 1: STRATEGIST (Macro & Prediction)...") try: macro_context = context_data["economy"] + "\n" + context_data["tech"] p1 = STRATEGIST_PROMPT.format(current_date=today_str, context=macro_context) c1 = call_ollama(p1) if not c1: raise Exception("Local Engine Failed Stage 1") j1 = safe_extract(c1) if j1: # Safe Stats Merge if "stats" in j1 and isinstance(j1["stats"], dict): final_data["stats"].update(j1["stats"]) # Flatten root keys for k in ["unemployment_stats", "labor_signals", "displaced_jobs_current", "displaced_jobs_2026_projection"]: if k in j1: final_data[k] = j1[k] save_update(final_data) update_live_state("STRATEGIST COMPLETE", f"\n--- STRATEGIST OUTPUT ---\n{c1[:300]}...\n") except Exception as e: logger.error(f"Strategist Failed: {e}") update_live_state("STRATEGIST FAILED", f"\nERROR: {e}\n") # --- STAGE 2: THE LIBRARIAN (Research) --- update_live_state("STAGE 2: LIBRARIAN (Events discover)...") try: lib_context = context_data["tech"] + "\n" + context_data["robotics"] p2 = LIBRARIAN_PROMPT.format(current_date=today_str, context=lib_context) c2 = call_ollama(p2) if not c2: raise Exception("Local Engine Failed Stage 2") j2 = safe_extract(c2) if j2: final_data.update(j2) save_update(final_data) update_live_state("LIBRARIAN COMPLETE", f"\n--- LIBRARIAN OUTPUT ---\n{c2[:300]}...\n") except Exception as e: logger.error(f"Librarian Failed: {e}") update_live_state("LIBRARIAN FAILED", f"\nERROR: {e}\n") # --- STAGE 3: THE HISTORIAN (Memory & Synthesis) --- update_live_state("STAGE 3: HISTORIAN (Memory Synthesis)...") try: history = load_history() past_summaries = [h.get("cycle_summary", "") for h in history[:5] if h.get("cycle_summary")] history_text = "\n".join([f"- {s}" for s in past_summaries]) if past_summaries else "No prior memories." current_research = f"Strategist: {c1[:500]}...\nLibrarian: {c2[:500]}..." p3 = HISTORIAN_PROMPT.format(current_date=today_str, history=history_text, current_research=current_research) c3 = call_ollama(p3) if not c3: raise Exception("Local Engine Failed Stage 3") j3 = safe_extract(c3) if j3: final_data["trend_analysis"] = j3.get("trend_analysis", "Market parity.") final_data["cycle_summary"] = j3.get("cycle_summary", "Research cycle concluded.") save_update(final_data) update_live_state("HISTORIAN COMPLETE", f"\n--- HISTORIAN OUTPUT ---\n{c3[:300]}...\n") except Exception as e: logger.error(f"Historian Failed: {e}") final_data["cycle_summary"] = "AI Swarm error. Operating on raw context counts." LIVE_LOG_STATE["status"] = "SWARM COMPLETE" LIVE_LOG_STATE["raw_response"] += "\n--- PROCESS FINISHED ---\n" final_data["ai_live"] = LIVE_LOG_STATE final_data["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") save_update(final_data) update_live_state("IDLE (Next Cycle in 30m)") return final_data # --- Routes --- import threading # ... (imports) # Global state for next update time NEXT_UPDATE_TIME = None def background_updater(): """Run update every 30 minutes.""" global NEXT_UPDATE_TIME while True: try: logger.info("Starting scheduled 30-minute update...") run_ai_research() # Set next update time NEXT_UPDATE_TIME = datetime.now() + timedelta(minutes=30) logger.info(f"Update complete. Next update at {NEXT_UPDATE_TIME}") # Sleep 30 minutes time.sleep(1800) except Exception as e: logger.error(f"Background update failed: {e}") time.sleep(60) # Retry in 1 min on failure @app.on_event("startup") def startup_event(): # Load history load_history() # Check Ollama status if ollama: try: # Quick ping to check if local server is up logger.info("Pinging local Ollama server...") resp = ollama.list() # Handle different library versions (dict vs object) models_list = resp.models if hasattr(resp, 'models') else resp.get('models', []) model_names = [] for m in models_list: name = getattr(m, 'model', getattr(m, 'name', None)) or str(m) model_names.append(name) logger.info(f"Ollama is READY on local M1 GPU. Models: {model_names}") update_live_state("LOCAL ENGINE READY", f"\n--- NEURAL CORE: OLLAMA READY (M1 GPU) ---\nINSTALLED MODELS: {', '.join(model_names)}\n") except Exception as e: logger.warning(f"Ollama is not running locally. Falling back to cloud. Error: {e}") update_live_state("CLOUD FALLBACK MODE", "\n--- NEURAL CORE: OLLAMA OFFLINE. CLOUD ACTIVE. ---\n") else: logger.info("Ollama library not found. Using Cloud-only mode.") update_live_state("CLOUD MODE", "\n--- NEURAL CORE: CLOUD-ONLY MODE ACTIVE. ---\n") # Start background thread thread = threading.Thread(target=background_updater, daemon=True) thread.start() logger.info("Background 30-min updater started.") @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): data = get_latest_data() # Calculate time remaining for UI next_update_timestamp = NEXT_UPDATE_TIME.timestamp() if NEXT_UPDATE_TIME else (datetime.now() + timedelta(minutes=1)).timestamp() return templates.TemplateResponse("index.html", { "request": request, "data": data, "next_update": next_update_timestamp }) @app.post("/update") async def trigger_update(): new_data = run_ai_research() if new_data: return {"status": "success"} return {"status": "error"} def force_clear_port(port): """Detect and kill any process using the target port (macOS/Unix).""" try: # Find the PID using the port result = subprocess.check_output(["lsof", "-ti", f":{port}"]) pids = result.decode().strip().split('\n') for pid in pids: if pid: logger.info(f"[SINGULARITY] Terminating zombie process {pid} on port {port}...") os.kill(int(pid), signal.SIGTERM) time.sleep(1) # Wait for port to clear except subprocess.CalledProcessError: # No process found on that port pass except Exception as e: logger.error(f"[SINGULARITY] Cleanup failed: {e}") if __name__ == "__main__": # 100% LOCAL SINGULARITY MODE port = int(os.environ.get("PORT", 7860)) logger.info(f"--- SINGULARITY STARTUP (Port {port}) ---") # 1. Kill any previous instances immediately force_clear_port(port) # 2. Launch fresh try: uvicorn.run(app, host="0.0.0.0", port=port) except Exception as e: logger.error(f"FATAL: Could not bind to port {port}: {e}")