""" Cogni-Engine v1 — Main Entry Point FastAPI server with OpenAI-compatible API. Manages startup, background threads (thinker + keep-alive), and shutdown. Endpoints: POST /v1/chat/completions — Chat (OpenAI-compatible) GET /v1/status — Brain status & intelligence score POST /v1/data/upload — Upload JSONL data GET /v1/health — Health check / keep-alive GET /v1/graph/stats — Detailed graph statistics """ import os import sys import time import signal import asyncio import threading import traceback from typing import Optional, List from contextlib import asynccontextmanager import uvicorn import httpx from fastapi import FastAPI, Request, HTTPException, Header, UploadFile, File from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import config from memory import Memory from knowledge import KnowledgeGraph from thinker import Thinker from brain import Brain # ═══════════════════════════════════════════════════════════ # GLOBAL STATE # ═══════════════════════════════════════════════════════════ brain: Optional[Brain] = None startup_time: float = 0 _keep_alive_task: Optional[asyncio.Task] = None _cleanup_task: Optional[asyncio.Task] = None _ready = False # ═══════════════════════════════════════════════════════════ # STARTUP & SHUTDOWN # ═══════════════════════════════════════════════════════════ def initialize_engine() -> Brain: """ Initialize the entire Cogni-Engine: 1. Validate config 2. Connect to TiDB 3. Load knowledge graph from DB 4. Start thinker thread """ global startup_time startup_time = time.time() print("=" * 55) print(" COGNI-ENGINE v1 — Starting Up") print("=" * 55) # ── Step 1: Config ── config.print_config_summary() config_valid = config.validate_config() if not config_valid: print("[INIT] Config validation has errors. Continuing with warnings...") # ── Step 2: Memory (TiDB) ── print("\n[INIT] Initializing memory layer...") memory = Memory() db_connected = memory.initialize() if db_connected: print("[INIT] Database connected successfully.") else: print("[INIT] Database not connected. Running in memory-only mode.") print("[INIT] WARNING: All data will be lost on restart!") # ── Step 3: Knowledge Graph ── print("\n[INIT] Initializing knowledge graph...") graph = KnowledgeGraph(memory) graph.load_from_memory() stats = graph.get_stats() score = graph.get_intelligence_score() print(f"[INIT] Graph loaded: {stats['total_nodes']} nodes, " f"{stats['total_edges']} edges, score={score:.2f}") # ── Step 4: Thinker ── print("\n[INIT] Initializing thinker...") thinker = Thinker(graph) # ── Step 5: Brain ── print("\n[INIT] Initializing brain...") engine = Brain(graph, thinker) # ── Step 6: Start thinker ── thinker.start() # ── Step 7: Ensure data directory ── os.makedirs(config.DATA_DIR, exist_ok=True) readme_path = os.path.join(config.DATA_DIR, "README.md") if not os.path.exists(readme_path): _create_data_readme(readme_path) elapsed = time.time() - startup_time print(f"\n[INIT] Cogni-Engine ready in {elapsed:.1f}s") print("=" * 55) return engine def _create_data_readme(path: str): """Create README.md in data directory with format guide.""" content = """# Cogni-Engine Data Directory Place your `.jsonl` files here. Each file will be automatically detected and ingested by the thinking engine. ## Format One JSON object per line. Required fields: `type` and `content`. ```json {"type":"fact","content":"Earth orbits the Sun in 365.25 days","tags":["astronomy"]} {"type":"definition","term":"API","content":"Application Programming Interface"} {"type":"relation","from":"Python","to":"Django","relation":"has_framework"} ``` ## Supported Types fact, definition, explanation, description, property, statistic, measurement, term, abbreviation, jargon, slang, idiom, synonym, antonym, quote, rule, example, analogy, opinion, paragraph, relation, cause_effect, comparison, hierarchy, composition, dependency, contradiction, timeline, process, procedure, event, history, change, qa, custom_* ## Optional Fields tags, source, confidence, language, domain, related, metadata See full documentation for details on each type. """ try: with open(path, 'w', encoding='utf-8') as f: f.write(content) except Exception: pass # ═══════════════════════════════════════════════════════════ # AUTHENTICATION # ═══════════════════════════════════════════════════════════ def verify_api_key(authorization: Optional[str]) -> bool: if not authorization: return False parts = authorization.split(" ", 1) if len(parts) != 2 or parts[0].lower() != "bearer": return False token = parts[1].strip() return token in config.API_KEYS # Cek semua keys def require_auth(authorization: Optional[str] = Header(None, alias="Authorization")): """Dependency that enforces authentication.""" if not verify_api_key(authorization): raise HTTPException( status_code=401, detail="Invalid or missing API key. Use 'Authorization: Bearer '" ) # ═══════════════════════════════════════════════════════════ # KEEP-ALIVE & BACKGROUND TASKS # ═══════════════════════════════════════════════════════════ async def keep_alive_loop(): """Self-ping to prevent HF Space from sleeping.""" if not config.KEEP_ALIVE_ENABLED: return base_url = f"http://localhost:{config.PORT}" interval = config.KEEP_ALIVE_INTERVAL # Wait for server to be ready await asyncio.sleep(10) async with httpx.AsyncClient() as client: while True: try: response = await client.get( f"{base_url}/v1/health", timeout=10 ) if config.LOG_THINKING_DETAILS: print(f"[KEEP-ALIVE] Ping OK: {response.status_code}") except Exception as e: print(f"[KEEP-ALIVE] Ping failed: {e}") await asyncio.sleep(interval) async def cleanup_loop(): """Periodic cleanup of expired sessions and buffer flush.""" while True: try: await asyncio.sleep(config.SESSION_CLEANUP_INTERVAL) if brain: brain.cleanup() except Exception as e: print(f"[CLEANUP] Error: {e}") await asyncio.sleep(60) # ═══════════════════════════════════════════════════════════ # FASTAPI APP # ═══════════════════════════════════════════════════════════ @asynccontextmanager async def lifespan(app: FastAPI): """Manage startup and shutdown lifecycle.""" global brain, _keep_alive_task, _cleanup_task, _ready # ── Startup ── try: brain = initialize_engine() _ready = True # Start background tasks _keep_alive_task = asyncio.create_task(keep_alive_loop()) _cleanup_task = asyncio.create_task(cleanup_loop()) except Exception as e: print(f"[FATAL] Startup failed: {e}") traceback.print_exc() _ready = False yield # ── Shutdown ── print("\n[SHUTDOWN] Shutting down Cogni-Engine...") if _keep_alive_task: _keep_alive_task.cancel() if _cleanup_task: _cleanup_task.cancel() if brain: brain.shutdown() print("[SHUTDOWN] Complete.") app = FastAPI( title="Cogni-Engine v1", description="Self-Evolving Knowledge AI", version="1.0.0", lifespan=lifespan ) # CORS — allow all origins for API access app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ═══════════════════════════════════════════════════════════ # API ENDPOINTS # ═══════════════════════════════════════════════════════════ # ─────────────────────────────────────────────────── # POST /v1/chat/completions — OpenAI Compatible # ─────────────────────────────────────────────────── @app.post("/v1/chat/completions") async def chat_completions( request: Request, authorization: Optional[str] = Header(None, alias="Authorization") ): """ OpenAI-compatible chat completions endpoint. Compatible with LibreChat, Open WebUI, ChatBox, etc. """ # Auth if not verify_api_key(authorization): raise HTTPException(status_code=401, detail="Invalid API key") if not _ready or not brain: raise HTTPException(status_code=503, detail="Engine not ready") # Parse request body try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="'messages' field required") temperature = body.get("temperature", config.DEFAULT_TEMPERATURE) temperature = max(0.0, min(1.0, float(temperature))) # Extract session ID from request (custom field) or generate session_id = body.get("session_id", None) # Process through brain try: result = brain.process_message( messages=messages, session_id=session_id, temperature=temperature ) except Exception as e: print(f"[API] Processing error: {e}") traceback.print_exc() raise HTTPException(status_code=500, detail="Internal processing error") # Format as OpenAI-compatible response response_id = f"cogni-{config.generate_session_id()}" return JSONResponse(content={ "id": response_id, "object": "chat.completion", "created": int(time.time()), "model": "cogni-engine-v1", "choices": [ { "index": 0, "message": { "role": "assistant", "content": result["response"] }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": sum(len(m.get("content", "").split()) for m in messages), "completion_tokens": len(result["response"].split()), "total_tokens": ( sum(len(m.get("content", "").split()) for m in messages) + len(result["response"].split()) ), # Cogni-specific metadata "cogni_metadata": { "confidence": result["confidence"], "reasoning_depth": result["reasoning_depth"], "nodes_traversed": result["nodes_traversed"], "chains_used": result["chains_used"], "thinking_cycles": result["thinking_cycles"], "processing_time_ms": result["processing_time_ms"], "session_id": result["session_id"] } } }) # ─────────────────────────────────────────────────── # GET /v1/status — Brain Status # ─────────────────────────────────────────────────── @app.get("/v1/status") async def get_status( authorization: Optional[str] = Header(None, alias="Authorization") ): """Get comprehensive brain status and intelligence metrics.""" if not verify_api_key(authorization): raise HTTPException(status_code=401, detail="Invalid API key") if not _ready or not brain: return JSONResponse(content={"alive": False, "ready": False}) status = brain.get_status() # Add uptime uptime_seconds = time.time() - startup_time status["uptime"] = utils.format_duration(uptime_seconds) status["uptime_seconds"] = round(uptime_seconds, 0) status["started_at"] = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(startup_time) ) return JSONResponse(content=status) # ─────────────────────────────────────────────────── # POST /v1/data/upload — Upload JSONL Data # ─────────────────────────────────────────────────── @app.post("/v1/data/upload") async def upload_data( file: UploadFile = File(...), authorization: Optional[str] = Header(None, alias="Authorization") ): """ Upload a JSONL data file. File will be saved to /data/ and auto-ingested by thinker. """ if not verify_api_key(authorization): raise HTTPException(status_code=401, detail="Invalid API key") if not _ready: raise HTTPException(status_code=503, detail="Engine not ready") # Validate file extension filename = file.filename or "upload.jsonl" if not any(filename.endswith(ext) for ext in config.SUPPORTED_DATA_EXTENSIONS): raise HTTPException( status_code=400, detail=f"Only {config.SUPPORTED_DATA_EXTENSIONS} files are supported" ) # Read content try: content = await file.read() content_str = content.decode("utf-8") except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to read file: {e}") # Validate size size_mb = len(content) / (1024 * 1024) if size_mb > config.MAX_REQUEST_SIZE_MB: raise HTTPException( status_code=413, detail=f"File too large: {size_mb:.1f}MB (max {config.MAX_REQUEST_SIZE_MB}MB)" ) # Validate JSONL format (check first few lines) lines = content_str.strip().split('\n') valid_lines = 0 errors = [] for i, line in enumerate(lines[:10]): line = line.strip() if not line: continue try: import json entry = json.loads(line) if "type" not in entry or "content" not in entry: errors.append(f"Line {i+1}: missing 'type' or 'content' field") else: valid_lines += 1 except json.JSONDecodeError as e: errors.append(f"Line {i+1}: invalid JSON — {e}") if valid_lines == 0: raise HTTPException( status_code=400, detail={ "error": "No valid JSONL entries found", "details": errors[:5] } ) # Save to data directory safe_filename = "".join( c if c.isalnum() or c in "._-" else "_" for c in filename ) # Add timestamp to prevent overwrites ts = int(time.time()) if not safe_filename.startswith(f"{ts}_"): safe_filename = f"{ts}_{safe_filename}" save_path = os.path.join(config.DATA_DIR, safe_filename) try: with open(save_path, 'w', encoding='utf-8') as f: f.write(content_str) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save file: {e}") return JSONResponse(content={ "status": "uploaded", "filename": safe_filename, "total_lines": len(lines), "valid_lines_sampled": valid_lines, "validation_errors": errors[:5] if errors else [], "size_mb": round(size_mb, 2), "message": ( f"File saved. Thinker will auto-ingest on next INGEST cycle " f"(within ~{config.THINKING_INTERVAL_SLOW}s)." ) }) # ─────────────────────────────────────────────────── # GET /v1/health — Health Check # ─────────────────────────────────────────────────── @app.get("/v1/health") async def health_check(): """ Health check endpoint. Used by keep-alive self-ping and external monitoring. No authentication required. """ if not _ready: return JSONResponse( status_code=503, content={ "status": "starting", "timestamp": utils.timestamp_now() } ) uptime = time.time() - startup_time status_data = { "status": "healthy", "timestamp": utils.timestamp_now(), "uptime": utils.format_duration(uptime) } if brain: status_data["thinking_cycles"] = brain.thinker.total_cycles status_data["thinker_phase"] = brain.thinker.current_phase status_data["thinker_running"] = brain.thinker.is_running return JSONResponse(content=status_data) # ─────────────────────────────────────────────────── # GET /v1/graph/stats — Detailed Graph Statistics # ─────────────────────────────────────────────────── @app.get("/v1/graph/stats") async def graph_stats( authorization: Optional[str] = Header(None, alias="Authorization") ): """Get detailed knowledge graph statistics.""" if not verify_api_key(authorization): raise HTTPException(status_code=401, detail="Invalid API key") if not _ready or not brain: raise HTTPException(status_code=503, detail="Engine not ready") graph = brain.graph stats = graph.get_stats() intelligence = graph.get_intelligence_score() db_stats = graph.memory.get_db_stats() thinker_metrics = brain.thinker.metrics # Node type distribution type_counts = {} for node in graph.nodes.values(): type_counts[node.type] = type_counts.get(node.type, 0) + 1 # Edge relation distribution relation_counts = {} for edge in graph.edges.values(): relation_counts[edge.relation] = relation_counts.get(edge.relation, 0) + 1 # Source distribution source_counts = {"data": 0, "inferred": 0, "user_chat": 0} for node in graph.nodes.values(): source_counts[node.source] = source_counts.get(node.source, 0) + 1 # Top weighted nodes top_nodes = sorted( graph.nodes.values(), key=lambda n: n.weight * n.connections, reverse=True )[:20] return JSONResponse(content={ "intelligence_score": round(intelligence, 2), "overview": stats, "node_types": type_counts, "edge_relations": relation_counts, "node_sources": source_counts, "top_nodes": [ { "id": n.id, "content": utils.truncate_text(n.content, 100), "type": n.type, "weight": round(n.weight, 3), "connections": n.connections } for n in top_nodes ], "thinker_metrics": thinker_metrics, "database": db_stats, "uptime_seconds": round(time.time() - startup_time, 0) }) # ─────────────────────────────────────────────────── # GET /v1/models — Model List (OpenAI Compat) # ─────────────────────────────────────────────────── @app.get("/v1/models") async def list_models( authorization: Optional[str] = Header(None, alias="Authorization") ): """ List available models. OpenAI-compatible endpoint required by some clients. """ if not verify_api_key(authorization): raise HTTPException(status_code=401, detail="Invalid API key") intelligence = 0.0 if brain: intelligence = brain.graph.get_intelligence_score() return JSONResponse(content={ "object": "list", "data": [ { "id": "cogni-engine-v1", "object": "model", "created": int(startup_time), "owned_by": "cogni-engine", "permission": [], "root": "cogni-engine-v1", "parent": None, "meta": { "type": "self-evolving-knowledge-ai", "intelligence_score": round(intelligence, 2) } } ] }) # ─────────────────────────────────────────────────── # Root endpoint # ─────────────────────────────────────────────────── @app.get("/") async def root(): """Root endpoint with basic info.""" uptime = time.time() - startup_time if startup_time else 0 info = { "name": "Cogni-Engine v1", "description": "Self-Evolving Knowledge AI", "status": "running" if _ready else "starting", "uptime": utils.format_duration(uptime), "api_docs": "/docs", "endpoints": { "chat": "POST /v1/chat/completions", "status": "GET /v1/status", "upload": "POST /v1/data/upload", "health": "GET /v1/health", "stats": "GET /v1/graph/stats", "models": "GET /v1/models" } } if brain: info["intelligence_score"] = round( brain.graph.get_intelligence_score(), 2 ) info["thinking_cycles"] = brain.thinker.total_cycles info["total_nodes"] = brain.graph.get_stats()["total_nodes"] return JSONResponse(content=info) # ═══════════════════════════════════════════════════════════ # SIGNAL HANDLING # ═══════════════════════════════════════════════════════════ def handle_shutdown_signal(signum, frame): """Handle graceful shutdown on SIGTERM/SIGINT.""" print(f"\n[SIGNAL] Received signal {signum}. Initiating shutdown...") if brain: brain.shutdown() sys.exit(0) # ═══════════════════════════════════════════════════════════ # IMPORTS NEEDED BY THIS FILE # ═══════════════════════════════════════════════════════════ import utils # noqa: E402 — needed for utility functions used in endpoints # ═══════════════════════════════════════════════════════════ # ENTRY POINT # ═══════════════════════════════════════════════════════════ if __name__ == "__main__": # Register signal handlers signal.signal(signal.SIGTERM, handle_shutdown_signal) signal.signal(signal.SIGINT, handle_shutdown_signal) print(f"\n[MAIN] Starting Cogni-Engine on port {config.PORT}...") print(f"[MAIN] API Key: {'SET' if os.environ.get('API_KEY') else 'AUTO-GENERATED'}") print(f"[MAIN] API Docs: http://localhost:{config.PORT}/docs\n") uvicorn.run( app, host="0.0.0.0", port=config.PORT, log_level=config.LOG_LEVEL.lower(), access_log=config.LOG_API_REQUESTS, timeout_keep_alive=65, # Single worker — thinker thread runs in-process workers=1 )