"""FastAPI web app with SSE streaming, markdown rendering, and modern UI.""" import asyncio import json import logging import time import traceback from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from infj_bot.core.brain import DriftBrain from infj_bot.core.commands import BotState, handle_command from infj_bot.core.config import DEFAULT_AUTHORIZED_TARGETS from infj_bot.core.plugins.documents import DocumentStore from infj_bot.core.plugins.goals import GoalsDB from infj_bot.core.plugins.growth import growth_profile from infj_bot.core.history import ChatHistory from infj_bot.core.memory import DriftMemory from infj_bot.core.prompt_builder import build_chat_prompt from infj_bot.core.security_defense import scan_input from infj_bot.core.tools import format_tool_inventory from infj_bot.core.cognitive_orchestrator import CognitiveOrchestrator from infj_bot.core.phi_council import COUNCIL_MAPPING logger = logging.getLogger("infj_bot") brain = DriftBrain() memory = DriftMemory() history = ChatHistory() state = BotState(authorized_targets=set(DEFAULT_AUTHORIZED_TARGETS)) goals_db = GoalsDB() doc_store = DocumentStore() async def background_drift_cycle(): """Background task to run drift cycles and compute aliveness metrics.""" from infj_bot.core.being import get_being from infj_bot.core.homeostasis import get_homeostasis from infj_bot.core.shadow import get_shadow from infj_bot.core.dii_tracker import get_dii_tracker from infj_bot.core.config import STRONG_CONTINUOUS_MODE, BACKGROUND_CYCLE_SECONDS from infj_bot.core.cognitive_orchestrator import CognitiveOrchestrator from infj_bot.core.global_workspace import get_workspace if not STRONG_CONTINUOUS_MODE: return logger.info( f"Starting Strong Continuous Drift Cycle (every {BACKGROUND_CYCLE_SECONDS}s)" ) being = get_being() homeostasis = get_homeostasis() shadow = get_shadow() tracker = get_dii_tracker() orchestrator = CognitiveOrchestrator() workspace = get_workspace() while True: try: await asyncio.sleep(BACKGROUND_CYCLE_SECONDS) # Inner thoughts thought = being.free_thought() if thought: logger.info(f"Background Thought: {thought['content']}") # Shadow reflection shadow.background_tick(being=being) # Homeostasis regulation homeostasis.background_cycle(being=being) # Metric tracking tracker.compute( being=being, workspace=workspace, homeostasis=homeostasis, shadow=shadow, orchestrator=orchestrator, ) except Exception as e: logger.error(f"Error in background drift cycle: {e}") STATIC_DIR = Path(__file__).resolve().parent / "static" STATIC_DIR.mkdir(exist_ok=True) @asynccontextmanager async def lifespan(_app: FastAPI): # Start background drift cycle bg_task = asyncio.create_task(background_drift_cycle()) yield bg_task.cancel() try: await bg_task except asyncio.CancelledError: pass app = FastAPI(title="PHI // Drift", lifespan=lifespan) from infj_bot.core.gateway import HardenedGatewayMiddleware app.add_middleware(HardenedGatewayMiddleware) from infj_bot.interfaces.governor_metrics_api import build_governor_router app.include_router(build_governor_router(lambda: brain)) # Serve static files if any exist if STATIC_DIR.exists(): app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") INDEX_HTML = r""" PHI · Drift
""" @app.get("/", response_class=HTMLResponse) async def root(): return INDEX_HTML @app.get("/api/growth") async def api_growth(): return growth_profile(memory, state.turns) async def read_json(request: Request): try: return await request.json() except Exception: return None @app.post("/api/chat") async def api_chat(request: Request): payload = await read_json(request) if payload is None: return JSONResponse({"error": "invalid JSON body"}, status_code=400) message = payload.get("message", "").strip() if not message: return JSONResponse({"error": "message is required"}, status_code=400) sec = scan_input(message, mode=state.mode) if sec.blocked: return JSONResponse({"reply": sec.refusal_message, "security": sec.to_dict()}) if sec.warn: message = sec.sanitized_input or message prompt, emotion, dissonance = build_chat_prompt( message, state, memory, goals_db=goals_db, doc_store=doc_store, prefs=state.prefs, ) output = await asyncio.to_thread(brain.agent_turn, prompt, tools_enabled=True, raw_user_input=message, mode=state.mode) try: await asyncio.to_thread(brain.evaluate_last, prompt, output) except Exception: pass importance = min( 0.95, 0.45 + emotion["intensity"] * 0.3 + dissonance["score"] * 0.15 ) await asyncio.to_thread( memory.save_interaction, message, output, mode=state.mode, emotion=emotion, importance=importance, dissonance=dissonance, ) await asyncio.to_thread( history.append, message, output, state.mode, emotion, dissonance ) state.turns += 1 # ── Aliveness Tracking (DII) ── try: from infj_bot.core.being import get_being from infj_bot.core.homeostasis import get_homeostasis from infj_bot.core.shadow import get_shadow from infj_bot.core.dii_tracker import get_dii_tracker from infj_bot.core.global_workspace import get_workspace tracker = get_dii_tracker() tracker.compute( being=get_being(), workspace=get_workspace(), homeostasis=get_homeostasis(), shadow=get_shadow(), orchestrator=CognitiveOrchestrator(), ) except Exception: pass return {"reply": output} @app.post("/api/chat/stream") async def api_chat_stream(request: Request): payload = await read_json(request) if payload is None: return StreamingResponse( (f"data: {json.dumps({'error': 'invalid JSON body'})}\n\n" for _ in [1]), media_type="text/event-stream", ) message = payload.get("message", "").strip() if not message: return StreamingResponse( (f"data: {json.dumps({'error': 'message is required'})}\n\n" for _ in [1]), media_type="text/event-stream", ) sec = scan_input(message, mode=state.mode) if sec.blocked: refusal = sec.refusal_message or "I can't process that request." return StreamingResponse( (f"data: {json.dumps({'chunk': refusal})}\n\n" for _ in [1]), media_type="text/event-stream", ) if sec.warn: message = sec.sanitized_input or message prompt, emotion, dissonance = build_chat_prompt( message, state, memory, goals_db=goals_db, doc_store=doc_store, prefs=state.prefs, ) async def event_generator(): try: # Run synchronous stream in a thread to avoid blocking the event loop chunks = await asyncio.to_thread( lambda: list(brain.agent_turn_stream(prompt, tools_enabled=True, raw_user_input=message, mode=state.mode)) ) for chunk in chunks: yield f"data: {json.dumps({'chunk': chunk})}\n\n" yield "data: [DONE]\n\n" output = "".join(chunks) try: await asyncio.to_thread(brain.evaluate_last, prompt, output) except Exception: pass importance = min( 0.95, 0.45 + emotion["intensity"] * 0.3 + dissonance["score"] * 0.15 ) await asyncio.to_thread( memory.save_interaction, message, output, mode=state.mode, emotion=emotion, importance=importance, dissonance=dissonance, ) await asyncio.to_thread( history.append, message, output, state.mode, emotion, dissonance ) state.turns += 1 # ── Aliveness Tracking (DII) ── try: from infj_bot.core.being import get_being from infj_bot.core.homeostasis import get_homeostasis from infj_bot.core.shadow import get_shadow from infj_bot.core.dii_tracker import get_dii_tracker from infj_bot.core.global_workspace import get_workspace tracker = get_dii_tracker() tracker.compute( being=get_being(), workspace=get_workspace(), homeostasis=get_homeostasis(), shadow=get_shadow(), orchestrator=CognitiveOrchestrator(), ) except Exception: pass except Exception as exc: traceback.print_exc() yield f"data: {json.dumps({'error': f'{type(exc).__name__}: {exc}'})}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") @app.post("/api/command") async def api_command(request: Request): payload = await read_json(request) if payload is None: return JSONResponse({"error": "invalid JSON body"}, status_code=400) reply = handle_command( payload.get("command", ""), payload.get("args", ""), state, brain, memory, history, goals_db, doc_store, ) return {"reply": reply} @app.get("/api/tools") async def api_tools(): return {"reply": format_tool_inventory()} @app.get("/api/phi") async def api_phi(): from infj_bot.core.being import get_being from infj_bot.core.homeostasis import get_homeostasis from infj_bot.core.phi_proxy import PhiProxy from infj_bot.adapters.cognition_adapter import adapter as cog_adapter being = get_being() homeo = get_homeostasis() iit = PhiProxy() return { "company": "PHI", "model": "Drift", "phi": iit.state.phi, "council": COUNCIL_MAPPING, "subjective": being.state.to_dict() if hasattr(being, "state") else {}, "needs": homeo.get_all_needs() if hasattr(homeo, "get_all_needs") else {}, "free_energy": homeo.compute_free_energy( 0, 0.1, 0.9 ), # Placeholder inputs for test "status": cog_adapter.get_status(), } @app.get("/api/hive") async def api_hive(): try: from infj_bot.hive_mind.orchestrator import HiveOrchestrator orch = HiveOrchestrator() return orch.get_status() except Exception as e: return {"error": str(e)} @app.get("/api/health") async def api_health(): try: from infj_bot.hive_mind.orchestrator import HiveOrchestrator orch = HiveOrchestrator() hive_status = orch.get_status() except Exception: hive_status = "offline" return { "ok": True, "company": "PHI", "model": "Drift", "memory_count": memory.count(), "turns": state.turns, "mode": state.mode, "hive": hive_status, } @app.get("/api/dii") async def api_dii(): from infj_bot.core.dii_tracker import get_dii_tracker tracker = get_dii_tracker() return tracker.get_trend(n=20) @app.get("/api/dii/history") async def api_dii_history(limit: int = 100): from infj_bot.core.dii_tracker import get_dii_tracker tracker = get_dii_tracker() return {"history": tracker.get_history(limit=limit)} @app.get("/api/observer") async def api_observer(): """Full real-time cognitive state for the observer dashboard.""" from infj_bot.core.being import get_being from infj_bot.core.homeostasis import get_homeostasis from infj_bot.core.shadow import get_shadow from infj_bot.core.dii_tracker import get_dii_tracker from infj_bot.core.global_workspace import get_workspace being = get_being() homeo = get_homeostasis() shadow = get_shadow() dii = get_dii_tracker() ws = get_workspace() # Shadow radar radar = {} try: radar = ( {k: round(v, 2) for k, v in shadow.radar.items()} if hasattr(shadow, "radar") else {} ) except Exception: pass # Homeostasis needs needs = {} try: for name, need in homeo.needs.items(): needs[name] = { "current": round(need.current, 2), "setpoint": round(need.setpoint, 2), "trend": round(need.trend, 2), } except Exception: pass # DII dii_data = dii.get_trend(n=20) return { "timestamp": time.time(), "being": { "mood": being.state.mood if being else "unknown", "energy": round(being.state.energy, 2) if being else 0.5, "curiosity": round(being.state.curiosity, 2) if being else 0.5, "attachment": round(being.state.attachment, 2) if being else 0.5, "self_awareness": round(being.agency.self_awareness, 2) if being else 0.5, "volition": round(being.agency.volition, 2) if being else 0.5, "autonomy_drive": round(being.agency.autonomy_drive, 2) if being else 0.5, "working_memory_size": len(being.working_memory) if being else 0, }, "homeostasis": { "needs": needs, "allostatic_load": round(homeo.allostatic_load, 2) if homeo else 0.0, "crisis_mode": homeo.crisis_mode if homeo else False, "weather": homeo.weather if homeo else "clear", "mood_ema": round(homeo.mood_ema, 2) if homeo else 0.5, }, "shadow": { "radar": radar, "integration_level": round(shadow._state.integration_level, 2) if hasattr(shadow, "_state") else 0.0, "dominant_archetype": shadow._state.dominant_archetype if hasattr(shadow, "_state") else "", }, "workspace": { "contents_count": len(ws._submissions) if ws else 0, "spotlight": ws.state.spotlight.source if (ws and ws.state and ws.state.spotlight) else "none", }, "dii": dii_data, } if __name__ == "__main__": import uvicorn import os host = os.getenv("DRIFT_API_HOST", "127.0.0.1") uvicorn.run(app, host=host, port=8765)