Codette-Reasoning / inference /codette_server.py
Raiff1982's picture
Upload 120 files
ed1b365 verified
#!/usr/bin/env python3
"""Codette Web Server — Zero-Dependency Local AI Chat
Pure Python stdlib HTTP server with SSE streaming.
No Flask, no FastAPI, no npm, no node — just Python.
Usage:
python codette_server.py # Start on port 7860
python codette_server.py --port 8080 # Custom port
python codette_server.py --no-browser # Don't auto-open browser
Architecture:
- http.server for static files + REST API
- Server-Sent Events (SSE) for streaming responses
- Threading for background model loading/inference
- CodetteOrchestrator for routing + generation
- CodetteSession for Cocoon-backed memory
"""
import os, sys, json, time, threading, queue, argparse, webbrowser, traceback
from pathlib import Path
from http.server import HTTPServer, SimpleHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from io import BytesIO
# Auto-configure environment
_site = r"J:\Lib\site-packages"
if _site not in sys.path:
sys.path.insert(0, _site)
os.environ["PATH"] = r"J:\Lib\site-packages\Library\bin" + os.pathsep + os.environ.get("PATH", "")
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
except Exception:
pass
# Project imports
_inference_dir = str(Path(__file__).parent)
if _inference_dir not in sys.path:
sys.path.insert(0, _inference_dir)
from codette_session import (
CodetteSession, SessionStore, ADAPTER_COLORS, AGENT_NAMES
)
# Lazy import orchestrator (heavy — loads llama_cpp)
_orchestrator = None
_orchestrator_lock = threading.Lock()
_inference_semaphore = threading.Semaphore(1) # Limit to 1 concurrent inference (llama.cpp can't parallelize)
_orchestrator_status = {"state": "idle", "message": "Not loaded"}
_orchestrator_status_lock = threading.Lock() # Protect _orchestrator_status from race conditions
_load_error = None
# Phase 6 bridge (optional, wraps orchestrator)
_forge_bridge = None
_use_phase6 = True # ENABLED: Foundation restoration (memory kernel + stability field) wrapped in ForgeEngine + Phase 7 routing
# Current session
_session: CodetteSession = None
_session_store: SessionStore = None
_session_lock = threading.Lock()
# Request queue for thread-safe model access
_request_queue = queue.Queue()
_response_queues = {} # request_id -> queue.Queue
_response_queues_lock = threading.Lock() # Protect _response_queues from race conditions
_queue_creation_times = {} # Track when each queue was created for cleanup
# Worker threads for health monitoring
_worker_threads = []
_worker_threads_lock = threading.Lock()
def _get_orchestrator():
"""Lazy-load the orchestrator (first call takes ~60s)."""
global _orchestrator, _orchestrator_status, _load_error, _forge_bridge
if _orchestrator is not None:
return _orchestrator
with _orchestrator_lock:
if _orchestrator is not None:
return _orchestrator
with _orchestrator_status_lock:
_orchestrator_status.update({"state": "loading", "message": "Loading Codette model..."})
print("\n Loading CodetteOrchestrator...")
try:
from codette_orchestrator import CodetteOrchestrator
_orchestrator = CodetteOrchestrator(verbose=True)
with _orchestrator_status_lock:
_orchestrator_status.update({
"state": "ready",
"message": f"Ready — {len(_orchestrator.available_adapters)} adapters",
"adapters": _orchestrator.available_adapters,
})
print(f" Orchestrator ready: {_orchestrator.available_adapters}")
# Initialize Phase 6 bridge with Phase 7 routing (wraps orchestrator with ForgeEngine + Executive Controller)
print(f" [DEBUG] _use_phase6 = {_use_phase6}")
if _use_phase6:
try:
print(f" [DEBUG] Importing CodetteForgeBridge...")
from codette_forge_bridge import CodetteForgeBridge
print(f" [DEBUG] Creating bridge instance...")
_forge_bridge = CodetteForgeBridge(_orchestrator, use_phase6=True, use_phase7=True, verbose=True)
print(f" Phase 6 bridge initialized")
print(f" Phase 7 Executive Controller initialized")
with _orchestrator_status_lock:
_orchestrator_status.update({"phase6": "enabled", "phase7": "enabled"})
except Exception as e:
print(f" Phase 6/7 bridge failed (using lightweight routing): {e}")
import traceback
traceback.print_exc()
with _orchestrator_status_lock:
_orchestrator_status.update({"phase6": "disabled", "phase7": "disabled"})
else:
print(f" [DEBUG] Phase 6 disabled (_use_phase6=False)")
return _orchestrator
except Exception as e:
_load_error = str(e)
with _orchestrator_status_lock:
_orchestrator_status.update({"state": "error", "message": f"Load failed: {e}"})
print(f" ERROR loading orchestrator: {e}")
traceback.print_exc()
return None
def _cleanup_orphaned_queues():
"""Periodically clean up response queues that are older than 5 minutes.
This prevents memory leaks from accumulating abandoned request queues.
"""
while True:
try:
time.sleep(60) # Run cleanup every 60 seconds
now = time.time()
with _response_queues_lock:
# Find queues older than 5 minutes (300 seconds)
orphaned = []
for req_id, creation_time in list(_queue_creation_times.items()):
if now - creation_time > 300:
orphaned.append(req_id)
# Remove orphaned queues
for req_id in orphaned:
_response_queues.pop(req_id, None)
_queue_creation_times.pop(req_id, None)
if orphaned:
print(f" Cleaned up {len(orphaned)} orphaned response queues")
except Exception as e:
print(f" WARNING: Cleanup thread error: {e}")
def _monitor_worker_health():
"""Monitor worker threads and restart any that have died.
This ensures the system remains responsive even if a worker crashes.
"""
while True:
try:
time.sleep(5) # Check every 5 seconds
with _worker_threads_lock:
# Check each worker thread
alive_workers = []
dead_workers = []
for i, worker in enumerate(_worker_threads):
if worker.is_alive():
alive_workers.append((i, worker))
else:
dead_workers.append(i)
# Log and restart any dead workers
if dead_workers:
print(f" WARNING: Detected {len(dead_workers)} dead worker(s): {dead_workers}")
for i in dead_workers:
print(f" Restarting worker thread {i}...")
new_worker = threading.Thread(target=_worker_thread, daemon=True, name=f"worker-{i}")
new_worker.start()
_worker_threads[i] = new_worker
print(f" Worker threads restarted successfully")
# Log current work queue status periodically
work_queue_size = _request_queue.qsize()
if work_queue_size > 0:
print(f" Worker status: {len(alive_workers)} alive, {len(_response_queues)} pending requests, {work_queue_size} queued")
except Exception as e:
print(f" WARNING: Worker health monitor error: {e}")
def _worker_thread():
"""Background worker that processes inference requests."""
# NOTE: Session handling disabled for now due to scoping issues
# TODO: Refactor session management to avoid UnboundLocalError
while True:
try:
request = _request_queue.get(timeout=1.0)
except queue.Empty:
continue
if request is None:
break # Shutdown signal
req_id = request["id"]
# Get response queue with thread lock (prevent race condition)
with _response_queues_lock:
response_q = _response_queues.get(req_id)
if not response_q:
print(f" WARNING: Orphaned request {req_id} (response queue missing)")
continue
try:
orch = _get_orchestrator()
if orch is None:
try:
response_q.put({"error": _load_error or "Model failed to load"})
except (queue.Full, RuntimeError) as e:
print(f" ERROR: Failed to queue error response: {e}")
continue
query = request["query"]
adapter = request.get("adapter") # None = auto-route
max_adapters = request.get("max_adapters", 2)
# Send "thinking" event
try:
response_q.put({"event": "thinking", "adapter": adapter or "auto"})
except (queue.Full, RuntimeError) as e:
print(f" ERROR: Failed to queue thinking event: {e}")
continue
# Route and generate — limit to 1 concurrent inference to avoid memory exhaustion
# Add timeout to prevent deadlock if inference gets stuck
acquired = _inference_semaphore.acquire(timeout=120)
if not acquired:
try:
response_q.put({"error": "Inference queue full, request timed out after 2 minutes"})
except (queue.Full, RuntimeError):
pass
continue
try:
if _forge_bridge:
result = _forge_bridge.generate(query, adapter=adapter, max_adapters=max_adapters)
else:
result = orch.route_and_generate(
query,
max_adapters=max_adapters,
strategy="keyword",
force_adapter=adapter if adapter and adapter != "auto" else None,
)
# Update session DISABLED - session handling deferred
# (was causing UnboundLocalError due to scoping issues)
epistemic = None
# Extract route info from result (if available from ForgeEngine)
route = result.get("route")
perspectives = result.get("perspectives", [])
# Build response
response_data = {
"event": "complete",
"response": result["response"],
"adapter": result.get("adapter",
result.get("adapters", ["base"])[0] if isinstance(result.get("adapters"), list) else "base"),
"confidence": route.get("confidence", 0) if isinstance(route, dict) else (route.confidence if route else 0),
"reasoning": route.get("reasoning", "") if isinstance(route, dict) else (route.reasoning if route else ""),
"tokens": result.get("tokens", 0),
"time": round(result.get("time", 0), 2),
"multi_perspective": route.get("multi_perspective", False) if isinstance(route, dict) else (route.multi_perspective if route else False),
}
# Add perspectives if available
if perspectives:
response_data["perspectives"] = perspectives
# Cocoon state DISABLED (requires session handling refactoring)
# Add epistemic report if available
if epistemic:
response_data["epistemic"] = epistemic
# Add tool usage info if any tools were called
tools_used = result.get("tools_used", [])
if tools_used:
response_data["tools_used"] = tools_used
# RE-CHECK response queue still exists (handler may have cleaned it up if timeout fired)
with _response_queues_lock:
response_q_still_exists = req_id in _response_queues
if response_q_still_exists:
try:
response_q.put(response_data)
except (queue.Full, RuntimeError) as e:
print(f" ERROR: Failed to queue response: {e}")
else:
print(f" WARNING: Response queue was cleaned up (handler timeout) - response dropped for {req_id}")
except Exception as e:
print(f" ERROR during inference: {e}")
traceback.print_exc()
# DEFENSIVE: RE-CHECK response queue before putting error
with _response_queues_lock:
response_q_still_exists = req_id in _response_queues
if response_q_still_exists:
try:
response_q.put({"event": "error", "error": str(e)})
except (queue.Full, RuntimeError):
print(f" ERROR: Also failed to queue error response")
else:
print(f" WARNING: Response queue was cleaned up (handler timeout) - error response dropped for {req_id}")
finally:
# Always release the semaphore
_inference_semaphore.release()
except Exception as e:
print(f" ERROR in worker thread: {e}")
traceback.print_exc()
class CodetteHandler(SimpleHTTPRequestHandler):
"""Custom HTTP handler for Codette API + static files."""
# Serve static files from inference/static/
def __init__(self, *args, **kwargs):
static_dir = str(Path(__file__).parent / "static")
super().__init__(*args, directory=static_dir, **kwargs)
def log_message(self, format, *args):
"""Quieter logging — skip static file requests."""
msg = format % args
if not any(ext in msg for ext in [".css", ".js", ".ico", ".png", ".woff"]):
print(f" [{time.strftime('%H:%M:%S')}] {msg}")
def do_GET(self):
parsed = urlparse(self.path)
path = parsed.path
# API routes
if path == "/api/status":
self._json_response(_orchestrator_status)
elif path == "/api/session":
self._json_response(_session.get_state() if _session else {})
elif path == "/api/sessions":
sessions = _session_store.list_sessions() if _session_store else []
self._json_response({"sessions": sessions})
elif path == "/api/adapters":
self._json_response({
"colors": ADAPTER_COLORS,
"agents": AGENT_NAMES,
"available": _orchestrator.available_adapters if _orchestrator else [],
})
elif path == "/api/chat":
# SSE endpoint for streaming
self._handle_chat_sse(parsed)
elif path == "/":
# Serve index.html
self.path = "/index.html"
super().do_GET()
else:
super().do_GET()
def do_POST(self):
parsed = urlparse(self.path)
path = parsed.path
if path == "/api/chat":
self._handle_chat_post()
elif path == "/api/session/new":
self._handle_new_session()
elif path == "/api/session/load":
self._handle_load_session()
elif path == "/api/session/save":
self._handle_save_session()
elif path == "/api/session/export":
self._handle_export_session()
elif path == "/api/session/import":
self._handle_import_session()
else:
self.send_error(404, "Not found")
def _json_response(self, data, status=200):
"""Send a JSON response."""
try:
body = json.dumps(data, default=str).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
self.wfile.flush()
except (ConnectionAbortedError, BrokenPipeError):
# Client disconnected before response was fully sent — this is normal
pass
except Exception as e:
print(f" ERROR in _json_response: {e}")
def _read_json_body(self):
"""Read and parse JSON POST body."""
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
return json.loads(body) if body else {}
def _handle_chat_post(self):
"""Handle chat request — queue inference, return via SSE or JSON."""
data = self._read_json_body()
query = data.get("query", "").strip()
adapter = data.get("adapter")
max_adapters = data.get("max_adapters", 2)
if not query:
self._json_response({"error": "Empty query"}, 400)
return
# Guardian input check
if _session and _session.guardian:
check = _session.guardian.check_input(query)
if not check["safe"]:
query = check["cleaned_text"]
# Check if orchestrator is loading
with _orchestrator_status_lock:
status_state = _orchestrator_status.get("state")
if status_state == "loading":
self._json_response({
"error": "Model is still loading, please wait...",
"status": _orchestrator_status,
}, 503)
return
# Queue the request
req_id = f"{time.time()}_{id(self)}"
response_q = queue.Queue()
# Add with thread lock
with _response_queues_lock:
_response_queues[req_id] = response_q
_queue_creation_times[req_id] = time.time()
_request_queue.put({
"id": req_id,
"query": query,
"adapter": adapter,
"max_adapters": max_adapters,
})
# Wait for response (with timeout)
try:
# First wait for thinking event
thinking = response_q.get(timeout=120)
if "error" in thinking and thinking.get("event") != "thinking":
self._json_response(thinking, 500)
return
# Wait for complete event (multi-perspective can take 15+ min on CPU)
result = response_q.get(timeout=1200) # 20 min max for inference
self._json_response(result)
except queue.Empty:
self._json_response({"error": "Request timed out"}, 504)
finally:
# Clean up with thread lock
with _response_queues_lock:
_response_queues.pop(req_id, None)
_queue_creation_times.pop(req_id, None)
def _handle_chat_sse(self, parsed):
"""Handle SSE streaming endpoint."""
params = parse_qs(parsed.query)
query = params.get("q", [""])[0]
adapter = params.get("adapter", [None])[0]
if not query:
self.send_error(400, "Missing query parameter 'q'")
return
# Set up SSE headers
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Connection", "keep-alive")
self.end_headers()
# Queue request
req_id = f"sse_{time.time()}_{id(self)}"
response_q = queue.Queue()
# Add with thread lock
with _response_queues_lock:
_response_queues[req_id] = response_q
_queue_creation_times[req_id] = time.time()
_request_queue.put({
"id": req_id,
"query": query,
"adapter": adapter,
"max_adapters": 2,
})
try:
# Stream events
while True:
try:
event = response_q.get(timeout=300)
except queue.Empty:
self._send_sse("error", {"error": "Timeout"})
break
event_type = event.get("event", "message")
self._send_sse(event_type, event)
if event_type in ("complete", "error"):
break
finally:
_response_queues.pop(req_id, None)
def _send_sse(self, event_type, data):
"""Send a Server-Sent Event."""
try:
payload = f"event: {event_type}\ndata: {json.dumps(data, default=str)}\n\n"
self.wfile.write(payload.encode("utf-8"))
self.wfile.flush()
except Exception:
pass
def _handle_new_session(self):
"""Create a new session."""
global _session
# Save current session first
if _session and _session_store and _session.messages:
try:
_session_store.save(_session)
except Exception:
pass
_session = CodetteSession()
self._json_response({"session_id": _session.session_id})
def _handle_load_session(self):
"""Load a previous session."""
global _session
data = self._read_json_body()
session_id = data.get("session_id")
if not session_id or not _session_store:
self._json_response({"error": "Invalid session ID"}, 400)
return
loaded = _session_store.load(session_id)
if loaded:
_session = loaded
self._json_response({
"session_id": _session.session_id,
"messages": _session.messages,
"state": _session.get_state(),
})
else:
self._json_response({"error": "Session not found"}, 404)
def _handle_save_session(self):
"""Manually save current session."""
if _session and _session_store:
_session_store.save(_session)
self._json_response({"saved": True, "session_id": _session.session_id})
else:
self._json_response({"error": "No active session"}, 400)
def _handle_export_session(self):
"""Export current session as downloadable JSON."""
if not _session:
self._json_response({"error": "No active session"}, 400)
return
export_data = _session.to_dict()
export_data["_export_version"] = 1
export_data["_exported_at"] = time.time()
body = json.dumps(export_data, default=str, indent=2).encode("utf-8")
filename = f"codette_session_{_session.session_id[:8]}.json"
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Disposition", f'attachment; filename="{filename}"')
self.send_header("Content-Length", len(body))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def _handle_import_session(self):
"""Import a session from uploaded JSON."""
global _session
try:
data = self._read_json_body()
if not data or "session_id" not in data:
self._json_response({"error": "Invalid session data"}, 400)
return
# Save current session before importing
if _session and _session_store and _session.messages:
try:
_session_store.save(_session)
except Exception:
pass
_session = CodetteSession()
_session.from_dict(data)
# Save imported session to store
if _session_store:
try:
_session_store.save(_session)
except Exception:
pass
self._json_response({
"session_id": _session.session_id,
"messages": _session.messages,
"state": _session.get_state(),
"imported": True,
})
except Exception as e:
self._json_response({"error": f"Import failed: {e}"}, 400)
def main():
global _session, _session_store, _worker_threads
parser = argparse.ArgumentParser(description="Codette Web UI")
parser.add_argument("--port", type=int, default=7860, help="Port (default: 7860)")
parser.add_argument("--no-browser", action="store_true", help="Don't auto-open browser")
args = parser.parse_args()
print("=" * 60)
print(" CODETTE WEB UI")
print("=" * 60)
# Initialize session
_session_store = SessionStore()
_session = CodetteSession()
print(f" Session: {_session.session_id}")
print(f" Cocoon: spiderweb={_session.spiderweb is not None}, "
f"metrics={_session.metrics_engine is not None}")
# Start worker thread for request processing
# NOTE: Only 1 worker needed — llama.cpp cannot parallelize inference.
# With 1 semaphore + 1 worker, we avoid idle threads and deadlock risk.
# Multiple workers would just spin waiting for the semaphore.
num_workers = 1
with _worker_threads_lock:
for i in range(num_workers):
worker = threading.Thread(target=_worker_thread, daemon=True, name=f"worker-{i}")
worker.start()
_worker_threads.append(worker)
print(f" Started {num_workers} worker thread for serial inference")
# Start cleanup thread for orphaned response queues
cleanup_thread = threading.Thread(target=_cleanup_orphaned_queues, daemon=True, name="cleanup")
cleanup_thread.start()
print(f" Started cleanup thread for queue maintenance")
# Start worker health monitor thread
health_monitor = threading.Thread(target=_monitor_worker_health, daemon=True, name="health-monitor")
health_monitor.start()
print(f" Started worker health monitor thread")
# Start model loading in background
threading.Thread(target=_get_orchestrator, daemon=True).start()
# Wait for model to load (up to 120 seconds)
print(f" Waiting for model to load (this takes ~60s on first startup)...")
start_wait = time.time()
while True:
with _orchestrator_status_lock:
state = _orchestrator_status.get("state")
if state not in ("idle", "loading"):
break
if time.time() - start_wait > 120:
break
time.sleep(0.5)
with _orchestrator_status_lock:
state = _orchestrator_status.get("state")
if state == "ready":
print(f" Model loaded in {time.time() - start_wait:.0f}s")
elif state == "loading":
print(f" Model still loading (will continue in background)...")
else:
print(f" WARNING: Model load status: {_orchestrator_status}")
# Start server
server = HTTPServer(("127.0.0.1", args.port), CodetteHandler)
url = f"http://localhost:{args.port}"
print(f"\n Server: {url}")
print(f" Press Ctrl+C to stop\n")
# Open browser
if not args.no_browser:
threading.Timer(1.0, lambda: webbrowser.open(url)).start()
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n Shutting down...")
# Save session
if _session and _session_store and _session.messages:
_session_store.save(_session)
print(f" Session saved: {_session.session_id}")
_request_queue.put(None) # Shutdown worker
server.shutdown()
print(" Goodbye!")
if __name__ == "__main__":
main()