"""FastAPI web app with SSE streaming, markdown rendering, and modern UI."""
import asyncio
import json
import logging
import time
import traceback
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from infj_bot.core.brain import DriftBrain
from infj_bot.core.commands import BotState, handle_command
from infj_bot.core.config import DEFAULT_AUTHORIZED_TARGETS
from infj_bot.core.plugins.documents import DocumentStore
from infj_bot.core.plugins.goals import GoalsDB
from infj_bot.core.plugins.growth import growth_profile
from infj_bot.core.history import ChatHistory
from infj_bot.core.memory import DriftMemory
from infj_bot.core.prompt_builder import build_chat_prompt
from infj_bot.core.security_defense import scan_input
from infj_bot.core.tools import format_tool_inventory
from infj_bot.core.cognitive_orchestrator import CognitiveOrchestrator
from infj_bot.core.phi_council import COUNCIL_MAPPING
logger = logging.getLogger("infj_bot")
brain = DriftBrain()
memory = DriftMemory()
history = ChatHistory()
state = BotState(authorized_targets=set(DEFAULT_AUTHORIZED_TARGETS))
goals_db = GoalsDB()
doc_store = DocumentStore()
async def background_drift_cycle():
"""Background task to run drift cycles and compute aliveness metrics."""
from infj_bot.core.being import get_being
from infj_bot.core.homeostasis import get_homeostasis
from infj_bot.core.shadow import get_shadow
from infj_bot.core.dii_tracker import get_dii_tracker
from infj_bot.core.config import STRONG_CONTINUOUS_MODE, BACKGROUND_CYCLE_SECONDS
from infj_bot.core.cognitive_orchestrator import CognitiveOrchestrator
from infj_bot.core.global_workspace import get_workspace
if not STRONG_CONTINUOUS_MODE:
return
logger.info(
f"Starting Strong Continuous Drift Cycle (every {BACKGROUND_CYCLE_SECONDS}s)"
)
being = get_being()
homeostasis = get_homeostasis()
shadow = get_shadow()
tracker = get_dii_tracker()
orchestrator = CognitiveOrchestrator()
workspace = get_workspace()
while True:
try:
await asyncio.sleep(BACKGROUND_CYCLE_SECONDS)
# Inner thoughts
thought = being.free_thought()
if thought:
logger.info(f"Background Thought: {thought['content']}")
# Shadow reflection
shadow.background_tick(being=being)
# Homeostasis regulation
homeostasis.background_cycle(being=being)
# Metric tracking
tracker.compute(
being=being,
workspace=workspace,
homeostasis=homeostasis,
shadow=shadow,
orchestrator=orchestrator,
)
except Exception as e:
logger.error(f"Error in background drift cycle: {e}")
STATIC_DIR = Path(__file__).resolve().parent / "static"
STATIC_DIR.mkdir(exist_ok=True)
@asynccontextmanager
async def lifespan(_app: FastAPI):
# Start background drift cycle
bg_task = asyncio.create_task(background_drift_cycle())
yield
bg_task.cancel()
try:
await bg_task
except asyncio.CancelledError:
pass
app = FastAPI(title="PHI // Drift", lifespan=lifespan)
from infj_bot.core.gateway import HardenedGatewayMiddleware
app.add_middleware(HardenedGatewayMiddleware)
from infj_bot.interfaces.governor_metrics_api import build_governor_router
app.include_router(build_governor_router(lambda: brain))
# Serve static files if any exist
if STATIC_DIR.exists():
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
INDEX_HTML = r"""
PHI · Drift
"""
@app.get("/", response_class=HTMLResponse)
async def root():
return INDEX_HTML
@app.get("/api/growth")
async def api_growth():
return growth_profile(memory, state.turns)
async def read_json(request: Request):
try:
return await request.json()
except Exception:
return None
@app.post("/api/chat")
async def api_chat(request: Request):
payload = await read_json(request)
if payload is None:
return JSONResponse({"error": "invalid JSON body"}, status_code=400)
message = payload.get("message", "").strip()
if not message:
return JSONResponse({"error": "message is required"}, status_code=400)
sec = scan_input(message, mode=state.mode)
if sec.blocked:
return JSONResponse({"reply": sec.refusal_message, "security": sec.to_dict()})
if sec.warn:
message = sec.sanitized_input or message
prompt, emotion, dissonance = build_chat_prompt(
message,
state,
memory,
goals_db=goals_db,
doc_store=doc_store,
prefs=state.prefs,
)
output = await asyncio.to_thread(brain.agent_turn, prompt, tools_enabled=True, raw_user_input=message, mode=state.mode)
try:
await asyncio.to_thread(brain.evaluate_last, prompt, output)
except Exception:
pass
importance = min(
0.95, 0.45 + emotion["intensity"] * 0.3 + dissonance["score"] * 0.15
)
await asyncio.to_thread(
memory.save_interaction,
message,
output,
mode=state.mode,
emotion=emotion,
importance=importance,
dissonance=dissonance,
)
await asyncio.to_thread(
history.append, message, output, state.mode, emotion, dissonance
)
state.turns += 1
# ── Aliveness Tracking (DII) ──
try:
from infj_bot.core.being import get_being
from infj_bot.core.homeostasis import get_homeostasis
from infj_bot.core.shadow import get_shadow
from infj_bot.core.dii_tracker import get_dii_tracker
from infj_bot.core.global_workspace import get_workspace
tracker = get_dii_tracker()
tracker.compute(
being=get_being(),
workspace=get_workspace(),
homeostasis=get_homeostasis(),
shadow=get_shadow(),
orchestrator=CognitiveOrchestrator(),
)
except Exception:
pass
return {"reply": output}
@app.post("/api/chat/stream")
async def api_chat_stream(request: Request):
payload = await read_json(request)
if payload is None:
return StreamingResponse(
(f"data: {json.dumps({'error': 'invalid JSON body'})}\n\n" for _ in [1]),
media_type="text/event-stream",
)
message = payload.get("message", "").strip()
if not message:
return StreamingResponse(
(f"data: {json.dumps({'error': 'message is required'})}\n\n" for _ in [1]),
media_type="text/event-stream",
)
sec = scan_input(message, mode=state.mode)
if sec.blocked:
refusal = sec.refusal_message or "I can't process that request."
return StreamingResponse(
(f"data: {json.dumps({'chunk': refusal})}\n\n" for _ in [1]),
media_type="text/event-stream",
)
if sec.warn:
message = sec.sanitized_input or message
prompt, emotion, dissonance = build_chat_prompt(
message,
state,
memory,
goals_db=goals_db,
doc_store=doc_store,
prefs=state.prefs,
)
async def event_generator():
try:
# Run synchronous stream in a thread to avoid blocking the event loop
chunks = await asyncio.to_thread(
lambda: list(brain.agent_turn_stream(prompt, tools_enabled=True, raw_user_input=message, mode=state.mode))
)
for chunk in chunks:
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
yield "data: [DONE]\n\n"
output = "".join(chunks)
try:
await asyncio.to_thread(brain.evaluate_last, prompt, output)
except Exception:
pass
importance = min(
0.95, 0.45 + emotion["intensity"] * 0.3 + dissonance["score"] * 0.15
)
await asyncio.to_thread(
memory.save_interaction,
message,
output,
mode=state.mode,
emotion=emotion,
importance=importance,
dissonance=dissonance,
)
await asyncio.to_thread(
history.append, message, output, state.mode, emotion, dissonance
)
state.turns += 1
# ── Aliveness Tracking (DII) ──
try:
from infj_bot.core.being import get_being
from infj_bot.core.homeostasis import get_homeostasis
from infj_bot.core.shadow import get_shadow
from infj_bot.core.dii_tracker import get_dii_tracker
from infj_bot.core.global_workspace import get_workspace
tracker = get_dii_tracker()
tracker.compute(
being=get_being(),
workspace=get_workspace(),
homeostasis=get_homeostasis(),
shadow=get_shadow(),
orchestrator=CognitiveOrchestrator(),
)
except Exception:
pass
except Exception as exc:
traceback.print_exc()
yield f"data: {json.dumps({'error': f'{type(exc).__name__}: {exc}'})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.post("/api/command")
async def api_command(request: Request):
payload = await read_json(request)
if payload is None:
return JSONResponse({"error": "invalid JSON body"}, status_code=400)
reply = handle_command(
payload.get("command", ""),
payload.get("args", ""),
state,
brain,
memory,
history,
goals_db,
doc_store,
)
return {"reply": reply}
@app.get("/api/tools")
async def api_tools():
return {"reply": format_tool_inventory()}
@app.get("/api/phi")
async def api_phi():
from infj_bot.core.being import get_being
from infj_bot.core.homeostasis import get_homeostasis
from infj_bot.core.phi_proxy import PhiProxy
from infj_bot.adapters.cognition_adapter import adapter as cog_adapter
being = get_being()
homeo = get_homeostasis()
iit = PhiProxy()
return {
"company": "PHI",
"model": "Drift",
"phi": iit.state.phi,
"council": COUNCIL_MAPPING,
"subjective": being.state.to_dict() if hasattr(being, "state") else {},
"needs": homeo.get_all_needs() if hasattr(homeo, "get_all_needs") else {},
"free_energy": homeo.compute_free_energy(
0, 0.1, 0.9
), # Placeholder inputs for test
"status": cog_adapter.get_status(),
}
@app.get("/api/hive")
async def api_hive():
try:
from infj_bot.hive_mind.orchestrator import HiveOrchestrator
orch = HiveOrchestrator()
return orch.get_status()
except Exception as e:
return {"error": str(e)}
@app.get("/api/health")
async def api_health():
try:
from infj_bot.hive_mind.orchestrator import HiveOrchestrator
orch = HiveOrchestrator()
hive_status = orch.get_status()
except Exception:
hive_status = "offline"
return {
"ok": True,
"company": "PHI",
"model": "Drift",
"memory_count": memory.count(),
"turns": state.turns,
"mode": state.mode,
"hive": hive_status,
}
@app.get("/api/dii")
async def api_dii():
from infj_bot.core.dii_tracker import get_dii_tracker
tracker = get_dii_tracker()
return tracker.get_trend(n=20)
@app.get("/api/dii/history")
async def api_dii_history(limit: int = 100):
from infj_bot.core.dii_tracker import get_dii_tracker
tracker = get_dii_tracker()
return {"history": tracker.get_history(limit=limit)}
@app.get("/api/observer")
async def api_observer():
"""Full real-time cognitive state for the observer dashboard."""
from infj_bot.core.being import get_being
from infj_bot.core.homeostasis import get_homeostasis
from infj_bot.core.shadow import get_shadow
from infj_bot.core.dii_tracker import get_dii_tracker
from infj_bot.core.global_workspace import get_workspace
being = get_being()
homeo = get_homeostasis()
shadow = get_shadow()
dii = get_dii_tracker()
ws = get_workspace()
# Shadow radar
radar = {}
try:
radar = (
{k: round(v, 2) for k, v in shadow.radar.items()}
if hasattr(shadow, "radar")
else {}
)
except Exception:
pass
# Homeostasis needs
needs = {}
try:
for name, need in homeo.needs.items():
needs[name] = {
"current": round(need.current, 2),
"setpoint": round(need.setpoint, 2),
"trend": round(need.trend, 2),
}
except Exception:
pass
# DII
dii_data = dii.get_trend(n=20)
return {
"timestamp": time.time(),
"being": {
"mood": being.state.mood if being else "unknown",
"energy": round(being.state.energy, 2) if being else 0.5,
"curiosity": round(being.state.curiosity, 2) if being else 0.5,
"attachment": round(being.state.attachment, 2) if being else 0.5,
"self_awareness": round(being.agency.self_awareness, 2) if being else 0.5,
"volition": round(being.agency.volition, 2) if being else 0.5,
"autonomy_drive": round(being.agency.autonomy_drive, 2) if being else 0.5,
"working_memory_size": len(being.working_memory) if being else 0,
},
"homeostasis": {
"needs": needs,
"allostatic_load": round(homeo.allostatic_load, 2) if homeo else 0.0,
"crisis_mode": homeo.crisis_mode if homeo else False,
"weather": homeo.weather if homeo else "clear",
"mood_ema": round(homeo.mood_ema, 2) if homeo else 0.5,
},
"shadow": {
"radar": radar,
"integration_level": round(shadow._state.integration_level, 2)
if hasattr(shadow, "_state")
else 0.0,
"dominant_archetype": shadow._state.dominant_archetype
if hasattr(shadow, "_state")
else "",
},
"workspace": {
"contents_count": len(ws._submissions) if ws else 0,
"spotlight": ws.state.spotlight.source
if (ws and ws.state and ws.state.spotlight)
else "none",
},
"dii": dii_data,
}
if __name__ == "__main__":
import uvicorn
import os
host = os.getenv("DRIFT_API_HOST", "127.0.0.1")
uvicorn.run(app, host=host, port=8765)