|
|
| """Codette Web Server — Zero-Dependency Local AI Chat
|
|
|
| Pure Python stdlib HTTP server with SSE streaming.
|
| No Flask, no FastAPI, no npm, no node — just Python.
|
|
|
| Usage:
|
| python codette_server.py # Start on port 7860
|
| python codette_server.py --port 8080 # Custom port
|
| python codette_server.py --no-browser # Don't auto-open browser
|
|
|
| Architecture:
|
| - http.server for static files + REST API
|
| - Server-Sent Events (SSE) for streaming responses
|
| - Threading for background model loading/inference
|
| - CodetteOrchestrator for routing + generation
|
| - CodetteSession for Cocoon-backed memory
|
| """
|
|
|
| import os, sys, json, time, threading, queue, argparse, webbrowser, traceback
|
| from pathlib import Path
|
| from http.server import HTTPServer, SimpleHTTPRequestHandler
|
| from urllib.parse import urlparse, parse_qs
|
| from io import BytesIO
|
|
|
|
|
| _site = r"J:\Lib\site-packages"
|
| if _site not in sys.path:
|
| sys.path.insert(0, _site)
|
| os.environ["PATH"] = r"J:\Lib\site-packages\Library\bin" + os.pathsep + os.environ.get("PATH", "")
|
| try:
|
| sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
| except Exception:
|
| pass
|
|
|
|
|
| _inference_dir = str(Path(__file__).parent)
|
| if _inference_dir not in sys.path:
|
| sys.path.insert(0, _inference_dir)
|
|
|
| from codette_session import (
|
| CodetteSession, SessionStore, ADAPTER_COLORS, AGENT_NAMES
|
| )
|
|
|
|
|
| _orchestrator = None
|
| _orchestrator_lock = threading.Lock()
|
| _inference_semaphore = threading.Semaphore(1)
|
| _orchestrator_status = {"state": "idle", "message": "Not loaded"}
|
| _orchestrator_status_lock = threading.Lock()
|
| _load_error = None
|
|
|
|
|
| _forge_bridge = None
|
| _use_phase6 = True
|
|
|
|
|
| _session: CodetteSession = None
|
| _session_store: SessionStore = None
|
| _session_lock = threading.Lock()
|
|
|
|
|
| _request_queue = queue.Queue()
|
| _response_queues = {}
|
| _response_queues_lock = threading.Lock()
|
| _queue_creation_times = {}
|
|
|
|
|
| _worker_threads = []
|
| _worker_threads_lock = threading.Lock()
|
|
|
|
|
| def _get_orchestrator():
|
| """Lazy-load the orchestrator (first call takes ~60s)."""
|
| global _orchestrator, _orchestrator_status, _load_error, _forge_bridge
|
| if _orchestrator is not None:
|
| return _orchestrator
|
|
|
| with _orchestrator_lock:
|
| if _orchestrator is not None:
|
| return _orchestrator
|
|
|
| with _orchestrator_status_lock:
|
| _orchestrator_status.update({"state": "loading", "message": "Loading Codette model..."})
|
| print("\n Loading CodetteOrchestrator...")
|
|
|
| try:
|
| from codette_orchestrator import CodetteOrchestrator
|
| _orchestrator = CodetteOrchestrator(verbose=True)
|
|
|
| with _orchestrator_status_lock:
|
| _orchestrator_status.update({
|
| "state": "ready",
|
| "message": f"Ready — {len(_orchestrator.available_adapters)} adapters",
|
| "adapters": _orchestrator.available_adapters,
|
| })
|
| print(f" Orchestrator ready: {_orchestrator.available_adapters}")
|
|
|
|
|
| print(f" [DEBUG] _use_phase6 = {_use_phase6}")
|
| if _use_phase6:
|
| try:
|
| print(f" [DEBUG] Importing CodetteForgeBridge...")
|
| from codette_forge_bridge import CodetteForgeBridge
|
| print(f" [DEBUG] Creating bridge instance...")
|
| _forge_bridge = CodetteForgeBridge(_orchestrator, use_phase6=True, use_phase7=True, verbose=True)
|
| print(f" Phase 6 bridge initialized")
|
| print(f" Phase 7 Executive Controller initialized")
|
| with _orchestrator_status_lock:
|
| _orchestrator_status.update({"phase6": "enabled", "phase7": "enabled"})
|
| except Exception as e:
|
| print(f" Phase 6/7 bridge failed (using lightweight routing): {e}")
|
| import traceback
|
| traceback.print_exc()
|
| with _orchestrator_status_lock:
|
| _orchestrator_status.update({"phase6": "disabled", "phase7": "disabled"})
|
| else:
|
| print(f" [DEBUG] Phase 6 disabled (_use_phase6=False)")
|
|
|
| return _orchestrator
|
| except Exception as e:
|
| _load_error = str(e)
|
| with _orchestrator_status_lock:
|
| _orchestrator_status.update({"state": "error", "message": f"Load failed: {e}"})
|
| print(f" ERROR loading orchestrator: {e}")
|
| traceback.print_exc()
|
| return None
|
|
|
|
|
| def _cleanup_orphaned_queues():
|
| """Periodically clean up response queues that are older than 5 minutes.
|
|
|
| This prevents memory leaks from accumulating abandoned request queues.
|
| """
|
| while True:
|
| try:
|
| time.sleep(60)
|
| now = time.time()
|
|
|
| with _response_queues_lock:
|
|
|
| orphaned = []
|
| for req_id, creation_time in list(_queue_creation_times.items()):
|
| if now - creation_time > 300:
|
| orphaned.append(req_id)
|
|
|
|
|
| for req_id in orphaned:
|
| _response_queues.pop(req_id, None)
|
| _queue_creation_times.pop(req_id, None)
|
|
|
| if orphaned:
|
| print(f" Cleaned up {len(orphaned)} orphaned response queues")
|
| except Exception as e:
|
| print(f" WARNING: Cleanup thread error: {e}")
|
|
|
|
|
| def _monitor_worker_health():
|
| """Monitor worker threads and restart any that have died.
|
|
|
| This ensures the system remains responsive even if a worker crashes.
|
| """
|
| while True:
|
| try:
|
| time.sleep(5)
|
|
|
| with _worker_threads_lock:
|
|
|
| alive_workers = []
|
| dead_workers = []
|
|
|
| for i, worker in enumerate(_worker_threads):
|
| if worker.is_alive():
|
| alive_workers.append((i, worker))
|
| else:
|
| dead_workers.append(i)
|
|
|
|
|
| if dead_workers:
|
| print(f" WARNING: Detected {len(dead_workers)} dead worker(s): {dead_workers}")
|
| for i in dead_workers:
|
| print(f" Restarting worker thread {i}...")
|
| new_worker = threading.Thread(target=_worker_thread, daemon=True, name=f"worker-{i}")
|
| new_worker.start()
|
| _worker_threads[i] = new_worker
|
| print(f" Worker threads restarted successfully")
|
|
|
|
|
| work_queue_size = _request_queue.qsize()
|
| if work_queue_size > 0:
|
| print(f" Worker status: {len(alive_workers)} alive, {len(_response_queues)} pending requests, {work_queue_size} queued")
|
|
|
| except Exception as e:
|
| print(f" WARNING: Worker health monitor error: {e}")
|
|
|
|
|
| def _worker_thread():
|
| """Background worker that processes inference requests."""
|
|
|
|
|
|
|
| while True:
|
| try:
|
| request = _request_queue.get(timeout=1.0)
|
| except queue.Empty:
|
| continue
|
|
|
| if request is None:
|
| break
|
|
|
| req_id = request["id"]
|
|
|
|
|
| with _response_queues_lock:
|
| response_q = _response_queues.get(req_id)
|
|
|
| if not response_q:
|
| print(f" WARNING: Orphaned request {req_id} (response queue missing)")
|
| continue
|
|
|
| try:
|
| orch = _get_orchestrator()
|
| if orch is None:
|
| try:
|
| response_q.put({"error": _load_error or "Model failed to load"})
|
| except (queue.Full, RuntimeError) as e:
|
| print(f" ERROR: Failed to queue error response: {e}")
|
| continue
|
|
|
| query = request["query"]
|
| adapter = request.get("adapter")
|
| max_adapters = request.get("max_adapters", 2)
|
|
|
|
|
| try:
|
| response_q.put({"event": "thinking", "adapter": adapter or "auto"})
|
| except (queue.Full, RuntimeError) as e:
|
| print(f" ERROR: Failed to queue thinking event: {e}")
|
| continue
|
|
|
|
|
|
|
| acquired = _inference_semaphore.acquire(timeout=120)
|
| if not acquired:
|
| try:
|
| response_q.put({"error": "Inference queue full, request timed out after 2 minutes"})
|
| except (queue.Full, RuntimeError):
|
| pass
|
| continue
|
|
|
| try:
|
| if _forge_bridge:
|
| result = _forge_bridge.generate(query, adapter=adapter, max_adapters=max_adapters)
|
| else:
|
| result = orch.route_and_generate(
|
| query,
|
| max_adapters=max_adapters,
|
| strategy="keyword",
|
| force_adapter=adapter if adapter and adapter != "auto" else None,
|
| )
|
|
|
|
|
|
|
| epistemic = None
|
|
|
|
|
| route = result.get("route")
|
| perspectives = result.get("perspectives", [])
|
|
|
|
|
| response_data = {
|
| "event": "complete",
|
| "response": result["response"],
|
| "adapter": result.get("adapter",
|
| result.get("adapters", ["base"])[0] if isinstance(result.get("adapters"), list) else "base"),
|
| "confidence": route.get("confidence", 0) if isinstance(route, dict) else (route.confidence if route else 0),
|
| "reasoning": route.get("reasoning", "") if isinstance(route, dict) else (route.reasoning if route else ""),
|
| "tokens": result.get("tokens", 0),
|
| "time": round(result.get("time", 0), 2),
|
| "multi_perspective": route.get("multi_perspective", False) if isinstance(route, dict) else (route.multi_perspective if route else False),
|
| }
|
|
|
|
|
| if perspectives:
|
| response_data["perspectives"] = perspectives
|
|
|
|
|
|
|
|
|
| if epistemic:
|
| response_data["epistemic"] = epistemic
|
|
|
|
|
| tools_used = result.get("tools_used", [])
|
| if tools_used:
|
| response_data["tools_used"] = tools_used
|
|
|
|
|
| with _response_queues_lock:
|
| response_q_still_exists = req_id in _response_queues
|
|
|
| if response_q_still_exists:
|
| try:
|
| response_q.put(response_data)
|
| except (queue.Full, RuntimeError) as e:
|
| print(f" ERROR: Failed to queue response: {e}")
|
| else:
|
| print(f" WARNING: Response queue was cleaned up (handler timeout) - response dropped for {req_id}")
|
|
|
| except Exception as e:
|
| print(f" ERROR during inference: {e}")
|
| traceback.print_exc()
|
|
|
|
|
| with _response_queues_lock:
|
| response_q_still_exists = req_id in _response_queues
|
|
|
| if response_q_still_exists:
|
| try:
|
| response_q.put({"event": "error", "error": str(e)})
|
| except (queue.Full, RuntimeError):
|
| print(f" ERROR: Also failed to queue error response")
|
| else:
|
| print(f" WARNING: Response queue was cleaned up (handler timeout) - error response dropped for {req_id}")
|
| finally:
|
|
|
| _inference_semaphore.release()
|
|
|
| except Exception as e:
|
| print(f" ERROR in worker thread: {e}")
|
| traceback.print_exc()
|
|
|
|
|
| class CodetteHandler(SimpleHTTPRequestHandler):
|
| """Custom HTTP handler for Codette API + static files."""
|
|
|
|
|
| def __init__(self, *args, **kwargs):
|
| static_dir = str(Path(__file__).parent / "static")
|
| super().__init__(*args, directory=static_dir, **kwargs)
|
|
|
| def log_message(self, format, *args):
|
| """Quieter logging — skip static file requests."""
|
| msg = format % args
|
| if not any(ext in msg for ext in [".css", ".js", ".ico", ".png", ".woff"]):
|
| print(f" [{time.strftime('%H:%M:%S')}] {msg}")
|
|
|
| def do_GET(self):
|
| parsed = urlparse(self.path)
|
| path = parsed.path
|
|
|
|
|
| if path == "/api/status":
|
| self._json_response(_orchestrator_status)
|
| elif path == "/api/session":
|
| self._json_response(_session.get_state() if _session else {})
|
| elif path == "/api/sessions":
|
| sessions = _session_store.list_sessions() if _session_store else []
|
| self._json_response({"sessions": sessions})
|
| elif path == "/api/adapters":
|
| self._json_response({
|
| "colors": ADAPTER_COLORS,
|
| "agents": AGENT_NAMES,
|
| "available": _orchestrator.available_adapters if _orchestrator else [],
|
| })
|
| elif path == "/api/chat":
|
|
|
| self._handle_chat_sse(parsed)
|
| elif path == "/":
|
|
|
| self.path = "/index.html"
|
| super().do_GET()
|
| else:
|
| super().do_GET()
|
|
|
| def do_POST(self):
|
| parsed = urlparse(self.path)
|
| path = parsed.path
|
|
|
| if path == "/api/chat":
|
| self._handle_chat_post()
|
| elif path == "/api/session/new":
|
| self._handle_new_session()
|
| elif path == "/api/session/load":
|
| self._handle_load_session()
|
| elif path == "/api/session/save":
|
| self._handle_save_session()
|
| elif path == "/api/session/export":
|
| self._handle_export_session()
|
| elif path == "/api/session/import":
|
| self._handle_import_session()
|
| else:
|
| self.send_error(404, "Not found")
|
|
|
| def _json_response(self, data, status=200):
|
| """Send a JSON response."""
|
| try:
|
| body = json.dumps(data, default=str).encode("utf-8")
|
| self.send_response(status)
|
| self.send_header("Content-Type", "application/json")
|
| self.send_header("Content-Length", len(body))
|
| self.send_header("Access-Control-Allow-Origin", "*")
|
| self.end_headers()
|
| self.wfile.write(body)
|
| self.wfile.flush()
|
| except (ConnectionAbortedError, BrokenPipeError):
|
|
|
| pass
|
| except Exception as e:
|
| print(f" ERROR in _json_response: {e}")
|
|
|
| def _read_json_body(self):
|
| """Read and parse JSON POST body."""
|
| length = int(self.headers.get("Content-Length", 0))
|
| body = self.rfile.read(length)
|
| return json.loads(body) if body else {}
|
|
|
| def _handle_chat_post(self):
|
| """Handle chat request — queue inference, return via SSE or JSON."""
|
| data = self._read_json_body()
|
| query = data.get("query", "").strip()
|
| adapter = data.get("adapter")
|
| max_adapters = data.get("max_adapters", 2)
|
|
|
| if not query:
|
| self._json_response({"error": "Empty query"}, 400)
|
| return
|
|
|
|
|
| if _session and _session.guardian:
|
| check = _session.guardian.check_input(query)
|
| if not check["safe"]:
|
| query = check["cleaned_text"]
|
|
|
|
|
| with _orchestrator_status_lock:
|
| status_state = _orchestrator_status.get("state")
|
| if status_state == "loading":
|
| self._json_response({
|
| "error": "Model is still loading, please wait...",
|
| "status": _orchestrator_status,
|
| }, 503)
|
| return
|
|
|
|
|
| req_id = f"{time.time()}_{id(self)}"
|
| response_q = queue.Queue()
|
|
|
|
|
| with _response_queues_lock:
|
| _response_queues[req_id] = response_q
|
| _queue_creation_times[req_id] = time.time()
|
|
|
| _request_queue.put({
|
| "id": req_id,
|
| "query": query,
|
| "adapter": adapter,
|
| "max_adapters": max_adapters,
|
| })
|
|
|
|
|
| try:
|
|
|
| thinking = response_q.get(timeout=120)
|
| if "error" in thinking and thinking.get("event") != "thinking":
|
| self._json_response(thinking, 500)
|
| return
|
|
|
|
|
| result = response_q.get(timeout=1200)
|
| self._json_response(result)
|
|
|
| except queue.Empty:
|
| self._json_response({"error": "Request timed out"}, 504)
|
| finally:
|
|
|
| with _response_queues_lock:
|
| _response_queues.pop(req_id, None)
|
| _queue_creation_times.pop(req_id, None)
|
|
|
| def _handle_chat_sse(self, parsed):
|
| """Handle SSE streaming endpoint."""
|
| params = parse_qs(parsed.query)
|
| query = params.get("q", [""])[0]
|
| adapter = params.get("adapter", [None])[0]
|
|
|
| if not query:
|
| self.send_error(400, "Missing query parameter 'q'")
|
| return
|
|
|
|
|
| self.send_response(200)
|
| self.send_header("Content-Type", "text/event-stream")
|
| self.send_header("Cache-Control", "no-cache")
|
| self.send_header("Access-Control-Allow-Origin", "*")
|
| self.send_header("Connection", "keep-alive")
|
| self.end_headers()
|
|
|
|
|
| req_id = f"sse_{time.time()}_{id(self)}"
|
| response_q = queue.Queue()
|
|
|
|
|
| with _response_queues_lock:
|
| _response_queues[req_id] = response_q
|
| _queue_creation_times[req_id] = time.time()
|
|
|
| _request_queue.put({
|
| "id": req_id,
|
| "query": query,
|
| "adapter": adapter,
|
| "max_adapters": 2,
|
| })
|
|
|
| try:
|
|
|
| while True:
|
| try:
|
| event = response_q.get(timeout=300)
|
| except queue.Empty:
|
| self._send_sse("error", {"error": "Timeout"})
|
| break
|
|
|
| event_type = event.get("event", "message")
|
| self._send_sse(event_type, event)
|
|
|
| if event_type in ("complete", "error"):
|
| break
|
| finally:
|
| _response_queues.pop(req_id, None)
|
|
|
| def _send_sse(self, event_type, data):
|
| """Send a Server-Sent Event."""
|
| try:
|
| payload = f"event: {event_type}\ndata: {json.dumps(data, default=str)}\n\n"
|
| self.wfile.write(payload.encode("utf-8"))
|
| self.wfile.flush()
|
| except Exception:
|
| pass
|
|
|
| def _handle_new_session(self):
|
| """Create a new session."""
|
| global _session
|
|
|
| if _session and _session_store and _session.messages:
|
| try:
|
| _session_store.save(_session)
|
| except Exception:
|
| pass
|
|
|
| _session = CodetteSession()
|
| self._json_response({"session_id": _session.session_id})
|
|
|
| def _handle_load_session(self):
|
| """Load a previous session."""
|
| global _session
|
| data = self._read_json_body()
|
| session_id = data.get("session_id")
|
|
|
| if not session_id or not _session_store:
|
| self._json_response({"error": "Invalid session ID"}, 400)
|
| return
|
|
|
| loaded = _session_store.load(session_id)
|
| if loaded:
|
| _session = loaded
|
| self._json_response({
|
| "session_id": _session.session_id,
|
| "messages": _session.messages,
|
| "state": _session.get_state(),
|
| })
|
| else:
|
| self._json_response({"error": "Session not found"}, 404)
|
|
|
| def _handle_save_session(self):
|
| """Manually save current session."""
|
| if _session and _session_store:
|
| _session_store.save(_session)
|
| self._json_response({"saved": True, "session_id": _session.session_id})
|
| else:
|
| self._json_response({"error": "No active session"}, 400)
|
|
|
| def _handle_export_session(self):
|
| """Export current session as downloadable JSON."""
|
| if not _session:
|
| self._json_response({"error": "No active session"}, 400)
|
| return
|
|
|
| export_data = _session.to_dict()
|
| export_data["_export_version"] = 1
|
| export_data["_exported_at"] = time.time()
|
|
|
| body = json.dumps(export_data, default=str, indent=2).encode("utf-8")
|
| filename = f"codette_session_{_session.session_id[:8]}.json"
|
| self.send_response(200)
|
| self.send_header("Content-Type", "application/json")
|
| self.send_header("Content-Disposition", f'attachment; filename="{filename}"')
|
| self.send_header("Content-Length", len(body))
|
| self.send_header("Access-Control-Allow-Origin", "*")
|
| self.end_headers()
|
| self.wfile.write(body)
|
|
|
| def _handle_import_session(self):
|
| """Import a session from uploaded JSON."""
|
| global _session
|
| try:
|
| data = self._read_json_body()
|
| if not data or "session_id" not in data:
|
| self._json_response({"error": "Invalid session data"}, 400)
|
| return
|
|
|
|
|
| if _session and _session_store and _session.messages:
|
| try:
|
| _session_store.save(_session)
|
| except Exception:
|
| pass
|
|
|
| _session = CodetteSession()
|
| _session.from_dict(data)
|
|
|
|
|
| if _session_store:
|
| try:
|
| _session_store.save(_session)
|
| except Exception:
|
| pass
|
|
|
| self._json_response({
|
| "session_id": _session.session_id,
|
| "messages": _session.messages,
|
| "state": _session.get_state(),
|
| "imported": True,
|
| })
|
| except Exception as e:
|
| self._json_response({"error": f"Import failed: {e}"}, 400)
|
|
|
|
|
| def main():
|
| global _session, _session_store, _worker_threads
|
|
|
| parser = argparse.ArgumentParser(description="Codette Web UI")
|
| parser.add_argument("--port", type=int, default=7860, help="Port (default: 7860)")
|
| parser.add_argument("--no-browser", action="store_true", help="Don't auto-open browser")
|
| args = parser.parse_args()
|
|
|
| print("=" * 60)
|
| print(" CODETTE WEB UI")
|
| print("=" * 60)
|
|
|
|
|
| _session_store = SessionStore()
|
| _session = CodetteSession()
|
| print(f" Session: {_session.session_id}")
|
| print(f" Cocoon: spiderweb={_session.spiderweb is not None}, "
|
| f"metrics={_session.metrics_engine is not None}")
|
|
|
|
|
|
|
|
|
|
|
| num_workers = 1
|
| with _worker_threads_lock:
|
| for i in range(num_workers):
|
| worker = threading.Thread(target=_worker_thread, daemon=True, name=f"worker-{i}")
|
| worker.start()
|
| _worker_threads.append(worker)
|
| print(f" Started {num_workers} worker thread for serial inference")
|
|
|
|
|
| cleanup_thread = threading.Thread(target=_cleanup_orphaned_queues, daemon=True, name="cleanup")
|
| cleanup_thread.start()
|
| print(f" Started cleanup thread for queue maintenance")
|
|
|
|
|
| health_monitor = threading.Thread(target=_monitor_worker_health, daemon=True, name="health-monitor")
|
| health_monitor.start()
|
| print(f" Started worker health monitor thread")
|
|
|
|
|
| threading.Thread(target=_get_orchestrator, daemon=True).start()
|
|
|
|
|
| print(f" Waiting for model to load (this takes ~60s on first startup)...")
|
| start_wait = time.time()
|
| while True:
|
| with _orchestrator_status_lock:
|
| state = _orchestrator_status.get("state")
|
| if state not in ("idle", "loading"):
|
| break
|
| if time.time() - start_wait > 120:
|
| break
|
| time.sleep(0.5)
|
|
|
| with _orchestrator_status_lock:
|
| state = _orchestrator_status.get("state")
|
| if state == "ready":
|
| print(f" Model loaded in {time.time() - start_wait:.0f}s")
|
| elif state == "loading":
|
| print(f" Model still loading (will continue in background)...")
|
| else:
|
| print(f" WARNING: Model load status: {_orchestrator_status}")
|
|
|
|
|
| server = HTTPServer(("127.0.0.1", args.port), CodetteHandler)
|
| url = f"http://localhost:{args.port}"
|
| print(f"\n Server: {url}")
|
| print(f" Press Ctrl+C to stop\n")
|
|
|
|
|
| if not args.no_browser:
|
| threading.Timer(1.0, lambda: webbrowser.open(url)).start()
|
|
|
| try:
|
| server.serve_forever()
|
| except KeyboardInterrupt:
|
| print("\n Shutting down...")
|
|
|
| if _session and _session_store and _session.messages:
|
| _session_store.save(_session)
|
| print(f" Session saved: {_session.session_id}")
|
| _request_queue.put(None)
|
| server.shutdown()
|
| print(" Goodbye!")
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|