# coding_lab_server.py # Antigravity Autonomous Coding Lab Server # Powered by CAT V3 (Graph-MoE) & Ollama qwen2.5-coder:3b import os import sys import json import math import urllib.parse from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path import threading # Ensure workspace root is in python path sys.path.append(str(Path(__file__).parent)) try: import torch from cat_v3.model import CATV3Model from agent_executor import OllamaClient, SandboxExecutor, AutonomousCodingAgent BACKEND_AVAILABLE = True except ImportError: BACKEND_AVAILABLE = False _LOADED_SYSTEM = None def load_system(): global _LOADED_SYSTEM if _LOADED_SYSTEM is not None: return _LOADED_SYSTEM if not BACKEND_AVAILABLE: print("Backend dependencies not available.") return None checkpoint_path = "checkpoints/cat_v3/cat_v3_model.pt" if not os.path.exists(checkpoint_path): print(f"Checkpoint {checkpoint_path} not found. Run training first.") return None try: device = "cpu" checkpoint = torch.load(checkpoint_path, map_location=device) vocab = checkpoint["vocab"] tokenizer = checkpoint["tokenizer"] expert_graphs = checkpoint["expert_graphs"] model = CATV3Model( num_concepts=vocab.size(), tokenizer_vocab_size=tokenizer.vocab_size(), pad_id=tokenizer.pad_id, eos_id=tokenizer.eos_id, expert_graphs=expert_graphs, concept_dim=128, hidden_size=128, path_length=8, top_m=8, decoder_vocab_size=tokenizer.vocab_size() ).to(device) model.load_state_dict(checkpoint["model_state_dict"]) model.eval() _LOADED_SYSTEM = { "model": model, "vocab": vocab, "tokenizer": tokenizer, "expert_graphs": expert_graphs, "device": device } print("CAT V3 model loaded successfully for Coding Lab.") return _LOADED_SYSTEM except Exception as e: print(f"Error loading CAT V3 model: {e}") return None def predict_v3_routing(question): loaded = load_system() if not loaded: # Fallback simulated routing if no checkpoint return { "reasoning_path": ["data_input", "sort", "syntax", "control_flow"], "activated_domains": ["coding", "mathematics"], "domain_probabilities": { "mechanical": 0.05, "civil": 0.05, "electrical": 0.02, "physics": 0.08, "mathematics": 0.72, "english": 0.12, "coding": 0.95 }, "nodes": [{"name": "syntax", "activation": 0.8}, {"name": "sort", "activation": 0.9}], "edges": [] } model = loaded["model"] vocab = loaded["vocab"] tokenizer = loaded["tokenizer"] device = loaded["device"] DOMAINS_LIST = ["mechanical", "civil", "electrical", "physics", "mathematics", "english", "coding"] with torch.no_grad(): input_ids, attention_mask = tokenizer.encode(question, max_length=32) input_ids = input_ids.unsqueeze(0).to(device) attention_mask = attention_mask.unsqueeze(0).to(device) outputs = model.generate_response( input_ids=input_ids, attention_mask=attention_mask, router_top_k=2, router_threshold=0.4 ) router_probs = outputs["router_probs"][0].cpu().tolist() active_mask = outputs["router_mask"][0].cpu().tolist() activated_domains = [DOMAINS_LIST[i] for i, mask in enumerate(active_mask) if mask] domain_probs = {DOMAINS_LIST[i]: prob for i, prob in enumerate(router_probs)} fusion_report = model.fusion.get_symbolic_report( vocab=vocab, expert_reports=outputs["expert_reports"], router_mask=outputs["router_mask"], domain_names=DOMAINS_LIST )[0] reasoning_path = [] if fusion_report["reasoning_paths"]: reasoning_path = fusion_report["reasoning_paths"][0] else: # Fallback path if empty reasoning_path = ["data_input", "syntax", "control_flow"] nodes_list = [] for name in vocab.concepts: if name not in ("", ""): act = 0.95 if name in fusion_report["concepts"] else 0.05 nodes_list.append({ "name": name, "activation": act }) edges_list = [] for d_idx, d_name in enumerate(DOMAINS_LIST): if active_mask[d_idx]: expert = model.experts[d_name] edge_index = expert.edge_index.cpu() for i in range(edge_index.size(1)): u = vocab.id_to_concept[edge_index[0, i].item()] v = vocab.id_to_concept[edge_index[1, i].item()] if u not in ("", "") and v not in ("", ""): edges_list.append({ "from": u, "to": v, "weight": 1.0 }) return { "reasoning_path": reasoning_path, "activated_domains": activated_domains, "domain_probabilities": domain_probs, "nodes": nodes_list, "edges": edges_list } class CodingLabRequestHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def do_GET(self): parsed = urllib.parse.urlparse(self.path) if parsed.path == "/" or parsed.path == "/index.html": self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(LAB_HTML.encode("utf-8")) elif parsed.path == "/api/status": ollama = OllamaClient() conn_ok, conn_msg = ollama.check_connection() runtimes = SandboxExecutor.get_supported_runtimes() status_data = { "ollama_connected": conn_ok, "ollama_message": conn_msg, "runtimes": runtimes, "cat_v3_loaded": load_system() is not None } self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(status_data).encode("utf-8")) elif parsed.path == "/api/agent/run": query_params = urllib.parse.parse_qs(parsed.query) question = query_params.get("question", [""])[0].strip() language = query_params.get("language", ["python"])[0].strip().lower() max_iterations = int(query_params.get("max_iterations", ["3"])[0]) temperature = float(query_params.get("temperature", ["0.2"])[0]) try: if not question: raise ValueError("Question cannot be empty.") # Set headers for Server-Sent Events (SSE) self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() def send_event(event_type, payload): event_data = {"event": event_type, "data": payload} try: self.wfile.write(f"data: {json.dumps(event_data)}\n\n".encode("utf-8")) self.wfile.flush() except Exception: pass # Connection closed # Step 1: Run CAT V3 Routing send_event("status", "Running query through CAT V3 Graph-MoE router...") routing_res = predict_v3_routing(question) send_event("routing", { "reasoning_path": routing_res["reasoning_path"], "activated_domains": routing_res["activated_domains"], "domain_probabilities": routing_res["domain_probabilities"], "nodes": routing_res["nodes"], "edges": routing_res["edges"] }) # Step 2: Initialize Agent ollama = OllamaClient(model="qwen2.5-coder:3b") conn_ok, conn_msg = ollama.check_connection() if not conn_ok: send_event("error", f"Ollama Error: {conn_msg}") return sandbox = SandboxExecutor() agent = AutonomousCodingAgent(ollama, sandbox) def agent_callback(msg): # Direct forwarding of callback logs to SSE client send_event("agent_log", msg) # Step 3: Run the loop run_res = agent.run_agent_loop( task=question, language=language, concept_path=routing_res["reasoning_path"], max_iterations=max_iterations, callback=agent_callback ) # Step 4: Finished send_event("done", run_res) except Exception as e: import traceback traceback.print_exc() try: self.send_response(500) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8")) except Exception: pass else: self.send_response(404) self.end_headers() def do_POST(self): parsed = urllib.parse.urlparse(self.path) if parsed.path == "/api/agent/run": content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length) try: data = json.loads(body.decode("utf-8")) question = data.get("question", "").strip() language = data.get("language", "python").strip().lower() max_iterations = int(data.get("max_iterations", 3)) temperature = float(data.get("temperature", 0.2)) if not question: raise ValueError("Question cannot be empty.") # Set headers for Server-Sent Events (SSE) self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() def send_event(event_type, payload): event_data = {"event": event_type, "data": payload} try: self.wfile.write(f"data: {json.dumps(event_data)}\n\n".encode("utf-8")) self.wfile.flush() except Exception: pass # Connection closed # Step 1: Run CAT V3 Routing send_event("status", "Running query through CAT V3 Graph-MoE router...") routing_res = predict_v3_routing(question) send_event("routing", { "reasoning_path": routing_res["reasoning_path"], "activated_domains": routing_res["activated_domains"], "domain_probabilities": routing_res["domain_probabilities"], "nodes": routing_res["nodes"], "edges": routing_res["edges"] }) # Step 2: Initialize Agent ollama = OllamaClient(model="qwen2.5-coder:3b") conn_ok, conn_msg = ollama.check_connection() if not conn_ok: send_event("error", f"Ollama Error: {conn_msg}") return sandbox = SandboxExecutor() agent = AutonomousCodingAgent(ollama, sandbox) def agent_callback(msg): # Direct forwarding of callback logs to SSE client send_event("agent_log", msg) # Step 3: Run the loop run_res = agent.run_agent_loop( task=question, language=language, concept_path=routing_res["reasoning_path"], max_iterations=max_iterations, callback=agent_callback ) # Step 4: Finished send_event("done", run_res) except Exception as e: import traceback traceback.print_exc() try: self.send_response(500) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8")) except Exception: pass else: self.send_response(404) self.end_headers() # ─── FRONTEND HTML / CSS / JS ─────────────────────────────────────────────── LAB_HTML = r""" Antigravity Autonomous Coding Lab

Antigravity Coding Lab

CAT V3 (Graph-MoE) + Ollama qwen2.5-coder:3b Autonomous Agent Loop

Ollama: Checking...
Model: Checking...

Configuration

0.2
3

Runtime Diagnostics

Concept Planning Graph (CAT V3)

MoE Routing Probabilities

00:00:00SystemAgent ready. Enter task details and click run.
// Code will be shown here...
Execution stdout/stderr output will be displayed here...
""" def run(port=8002): server_address = ('', port) httpd = ThreadingHTTPServer(server_address, CodingLabRequestHandler) print(f"Starting Antigravity Autonomous Coding Lab server on port {port}...") try: httpd.serve_forever() except KeyboardInterrupt: print("\nShutting down server.") httpd.server_close() if __name__ == "__main__": port = 8002 if len(sys.argv) > 1: try: port = int(sys.argv[1]) except ValueError: pass run(port)