CogniEngine / main.py
sadidft's picture
Update main.py
5394701 verified
"""
Cogni-Engine v1 — Main Entry Point
FastAPI server with OpenAI-compatible API.
Manages startup, background threads (thinker + keep-alive), and shutdown.
Endpoints:
POST /v1/chat/completions — Chat (OpenAI-compatible)
GET /v1/status — Brain status & intelligence score
POST /v1/data/upload — Upload JSONL data
GET /v1/health — Health check / keep-alive
GET /v1/graph/stats — Detailed graph statistics
"""
import os
import sys
import time
import signal
import asyncio
import threading
import traceback
from typing import Optional, List
from contextlib import asynccontextmanager
import uvicorn
import httpx
from fastapi import FastAPI, Request, HTTPException, Header, UploadFile, File
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import config
from memory import Memory
from knowledge import KnowledgeGraph
from thinker import Thinker
from brain import Brain
# ═══════════════════════════════════════════════════════════
# GLOBAL STATE
# ═══════════════════════════════════════════════════════════
brain: Optional[Brain] = None
startup_time: float = 0
_keep_alive_task: Optional[asyncio.Task] = None
_cleanup_task: Optional[asyncio.Task] = None
_ready = False
# ═══════════════════════════════════════════════════════════
# STARTUP & SHUTDOWN
# ═══════════════════════════════════════════════════════════
def initialize_engine() -> Brain:
"""
Initialize the entire Cogni-Engine:
1. Validate config
2. Connect to TiDB
3. Load knowledge graph from DB
4. Start thinker thread
"""
global startup_time
startup_time = time.time()
print("=" * 55)
print(" COGNI-ENGINE v1 — Starting Up")
print("=" * 55)
# ── Step 1: Config ──
config.print_config_summary()
config_valid = config.validate_config()
if not config_valid:
print("[INIT] Config validation has errors. Continuing with warnings...")
# ── Step 2: Memory (TiDB) ──
print("\n[INIT] Initializing memory layer...")
memory = Memory()
db_connected = memory.initialize()
if db_connected:
print("[INIT] Database connected successfully.")
else:
print("[INIT] Database not connected. Running in memory-only mode.")
print("[INIT] WARNING: All data will be lost on restart!")
# ── Step 3: Knowledge Graph ──
print("\n[INIT] Initializing knowledge graph...")
graph = KnowledgeGraph(memory)
graph.load_from_memory()
stats = graph.get_stats()
score = graph.get_intelligence_score()
print(f"[INIT] Graph loaded: {stats['total_nodes']} nodes, "
f"{stats['total_edges']} edges, score={score:.2f}")
# ── Step 4: Thinker ──
print("\n[INIT] Initializing thinker...")
thinker = Thinker(graph)
# ── Step 5: Brain ──
print("\n[INIT] Initializing brain...")
engine = Brain(graph, thinker)
# ── Step 6: Start thinker ──
thinker.start()
# ── Step 7: Ensure data directory ──
os.makedirs(config.DATA_DIR, exist_ok=True)
readme_path = os.path.join(config.DATA_DIR, "README.md")
if not os.path.exists(readme_path):
_create_data_readme(readme_path)
elapsed = time.time() - startup_time
print(f"\n[INIT] Cogni-Engine ready in {elapsed:.1f}s")
print("=" * 55)
return engine
def _create_data_readme(path: str):
"""Create README.md in data directory with format guide."""
content = """# Cogni-Engine Data Directory
Place your `.jsonl` files here. Each file will be automatically
detected and ingested by the thinking engine.
## Format
One JSON object per line. Required fields: `type` and `content`.
```json
{"type":"fact","content":"Earth orbits the Sun in 365.25 days","tags":["astronomy"]}
{"type":"definition","term":"API","content":"Application Programming Interface"}
{"type":"relation","from":"Python","to":"Django","relation":"has_framework"}
```
## Supported Types
fact, definition, explanation, description, property, statistic,
measurement, term, abbreviation, jargon, slang, idiom, synonym,
antonym, quote, rule, example, analogy, opinion, paragraph,
relation, cause_effect, comparison, hierarchy, composition,
dependency, contradiction, timeline, process, procedure, event,
history, change, qa, custom_*
## Optional Fields
tags, source, confidence, language, domain, related, metadata
See full documentation for details on each type.
"""
try:
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
except Exception:
pass
# ═══════════════════════════════════════════════════════════
# AUTHENTICATION
# ═══════════════════════════════════════════════════════════
def verify_api_key(authorization: Optional[str]) -> bool:
if not authorization:
return False
parts = authorization.split(" ", 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
return False
token = parts[1].strip()
return token in config.API_KEYS # Cek semua keys
def require_auth(authorization: Optional[str] = Header(None, alias="Authorization")):
"""Dependency that enforces authentication."""
if not verify_api_key(authorization):
raise HTTPException(
status_code=401,
detail="Invalid or missing API key. Use 'Authorization: Bearer <key>'"
)
# ═══════════════════════════════════════════════════════════
# KEEP-ALIVE & BACKGROUND TASKS
# ═══════════════════════════════════════════════════════════
async def keep_alive_loop():
"""Self-ping to prevent HF Space from sleeping."""
if not config.KEEP_ALIVE_ENABLED:
return
base_url = f"http://localhost:{config.PORT}"
interval = config.KEEP_ALIVE_INTERVAL
# Wait for server to be ready
await asyncio.sleep(10)
async with httpx.AsyncClient() as client:
while True:
try:
response = await client.get(
f"{base_url}/v1/health",
timeout=10
)
if config.LOG_THINKING_DETAILS:
print(f"[KEEP-ALIVE] Ping OK: {response.status_code}")
except Exception as e:
print(f"[KEEP-ALIVE] Ping failed: {e}")
await asyncio.sleep(interval)
async def cleanup_loop():
"""Periodic cleanup of expired sessions and buffer flush."""
while True:
try:
await asyncio.sleep(config.SESSION_CLEANUP_INTERVAL)
if brain:
brain.cleanup()
except Exception as e:
print(f"[CLEANUP] Error: {e}")
await asyncio.sleep(60)
# ═══════════════════════════════════════════════════════════
# FASTAPI APP
# ═══════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage startup and shutdown lifecycle."""
global brain, _keep_alive_task, _cleanup_task, _ready
# ── Startup ──
try:
brain = initialize_engine()
_ready = True
# Start background tasks
_keep_alive_task = asyncio.create_task(keep_alive_loop())
_cleanup_task = asyncio.create_task(cleanup_loop())
except Exception as e:
print(f"[FATAL] Startup failed: {e}")
traceback.print_exc()
_ready = False
yield
# ── Shutdown ──
print("\n[SHUTDOWN] Shutting down Cogni-Engine...")
if _keep_alive_task:
_keep_alive_task.cancel()
if _cleanup_task:
_cleanup_task.cancel()
if brain:
brain.shutdown()
print("[SHUTDOWN] Complete.")
app = FastAPI(
title="Cogni-Engine v1",
description="Self-Evolving Knowledge AI",
version="1.0.0",
lifespan=lifespan
)
# CORS — allow all origins for API access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ═══════════════════════════════════════════════════════════
# API ENDPOINTS
# ═══════════════════════════════════════════════════════════
# ───────────────────────────────────────────────────
# POST /v1/chat/completions — OpenAI Compatible
# ───────────────────────────────────────────────────
@app.post("/v1/chat/completions")
async def chat_completions(
request: Request,
authorization: Optional[str] = Header(None, alias="Authorization")
):
"""
OpenAI-compatible chat completions endpoint.
Compatible with LibreChat, Open WebUI, ChatBox, etc.
"""
# Auth
if not verify_api_key(authorization):
raise HTTPException(status_code=401, detail="Invalid API key")
if not _ready or not brain:
raise HTTPException(status_code=503, detail="Engine not ready")
# Parse request body
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="'messages' field required")
temperature = body.get("temperature", config.DEFAULT_TEMPERATURE)
temperature = max(0.0, min(1.0, float(temperature)))
# Extract session ID from request (custom field) or generate
session_id = body.get("session_id", None)
# Process through brain
try:
result = brain.process_message(
messages=messages,
session_id=session_id,
temperature=temperature
)
except Exception as e:
print(f"[API] Processing error: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail="Internal processing error")
# Format as OpenAI-compatible response
response_id = f"cogni-{config.generate_session_id()}"
return JSONResponse(content={
"id": response_id,
"object": "chat.completion",
"created": int(time.time()),
"model": "cogni-engine-v1",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": result["response"]
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": sum(len(m.get("content", "").split()) for m in messages),
"completion_tokens": len(result["response"].split()),
"total_tokens": (
sum(len(m.get("content", "").split()) for m in messages) +
len(result["response"].split())
),
# Cogni-specific metadata
"cogni_metadata": {
"confidence": result["confidence"],
"reasoning_depth": result["reasoning_depth"],
"nodes_traversed": result["nodes_traversed"],
"chains_used": result["chains_used"],
"thinking_cycles": result["thinking_cycles"],
"processing_time_ms": result["processing_time_ms"],
"session_id": result["session_id"]
}
}
})
# ───────────────────────────────────────────────────
# GET /v1/status — Brain Status
# ───────────────────────────────────────────────────
@app.get("/v1/status")
async def get_status(
authorization: Optional[str] = Header(None, alias="Authorization")
):
"""Get comprehensive brain status and intelligence metrics."""
if not verify_api_key(authorization):
raise HTTPException(status_code=401, detail="Invalid API key")
if not _ready or not brain:
return JSONResponse(content={"alive": False, "ready": False})
status = brain.get_status()
# Add uptime
uptime_seconds = time.time() - startup_time
status["uptime"] = utils.format_duration(uptime_seconds)
status["uptime_seconds"] = round(uptime_seconds, 0)
status["started_at"] = time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(startup_time)
)
return JSONResponse(content=status)
# ───────────────────────────────────────────────────
# POST /v1/data/upload — Upload JSONL Data
# ───────────────────────────────────────────────────
@app.post("/v1/data/upload")
async def upload_data(
file: UploadFile = File(...),
authorization: Optional[str] = Header(None, alias="Authorization")
):
"""
Upload a JSONL data file.
File will be saved to /data/ and auto-ingested by thinker.
"""
if not verify_api_key(authorization):
raise HTTPException(status_code=401, detail="Invalid API key")
if not _ready:
raise HTTPException(status_code=503, detail="Engine not ready")
# Validate file extension
filename = file.filename or "upload.jsonl"
if not any(filename.endswith(ext) for ext in config.SUPPORTED_DATA_EXTENSIONS):
raise HTTPException(
status_code=400,
detail=f"Only {config.SUPPORTED_DATA_EXTENSIONS} files are supported"
)
# Read content
try:
content = await file.read()
content_str = content.decode("utf-8")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to read file: {e}")
# Validate size
size_mb = len(content) / (1024 * 1024)
if size_mb > config.MAX_REQUEST_SIZE_MB:
raise HTTPException(
status_code=413,
detail=f"File too large: {size_mb:.1f}MB (max {config.MAX_REQUEST_SIZE_MB}MB)"
)
# Validate JSONL format (check first few lines)
lines = content_str.strip().split('\n')
valid_lines = 0
errors = []
for i, line in enumerate(lines[:10]):
line = line.strip()
if not line:
continue
try:
import json
entry = json.loads(line)
if "type" not in entry or "content" not in entry:
errors.append(f"Line {i+1}: missing 'type' or 'content' field")
else:
valid_lines += 1
except json.JSONDecodeError as e:
errors.append(f"Line {i+1}: invalid JSON — {e}")
if valid_lines == 0:
raise HTTPException(
status_code=400,
detail={
"error": "No valid JSONL entries found",
"details": errors[:5]
}
)
# Save to data directory
safe_filename = "".join(
c if c.isalnum() or c in "._-" else "_"
for c in filename
)
# Add timestamp to prevent overwrites
ts = int(time.time())
if not safe_filename.startswith(f"{ts}_"):
safe_filename = f"{ts}_{safe_filename}"
save_path = os.path.join(config.DATA_DIR, safe_filename)
try:
with open(save_path, 'w', encoding='utf-8') as f:
f.write(content_str)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save file: {e}")
return JSONResponse(content={
"status": "uploaded",
"filename": safe_filename,
"total_lines": len(lines),
"valid_lines_sampled": valid_lines,
"validation_errors": errors[:5] if errors else [],
"size_mb": round(size_mb, 2),
"message": (
f"File saved. Thinker will auto-ingest on next INGEST cycle "
f"(within ~{config.THINKING_INTERVAL_SLOW}s)."
)
})
# ───────────────────────────────────────────────────
# GET /v1/health — Health Check
# ───────────────────────────────────────────────────
@app.get("/v1/health")
async def health_check():
"""
Health check endpoint.
Used by keep-alive self-ping and external monitoring.
No authentication required.
"""
if not _ready:
return JSONResponse(
status_code=503,
content={
"status": "starting",
"timestamp": utils.timestamp_now()
}
)
uptime = time.time() - startup_time
status_data = {
"status": "healthy",
"timestamp": utils.timestamp_now(),
"uptime": utils.format_duration(uptime)
}
if brain:
status_data["thinking_cycles"] = brain.thinker.total_cycles
status_data["thinker_phase"] = brain.thinker.current_phase
status_data["thinker_running"] = brain.thinker.is_running
return JSONResponse(content=status_data)
# ───────────────────────────────────────────────────
# GET /v1/graph/stats — Detailed Graph Statistics
# ───────────────────────────────────────────────────
@app.get("/v1/graph/stats")
async def graph_stats(
authorization: Optional[str] = Header(None, alias="Authorization")
):
"""Get detailed knowledge graph statistics."""
if not verify_api_key(authorization):
raise HTTPException(status_code=401, detail="Invalid API key")
if not _ready or not brain:
raise HTTPException(status_code=503, detail="Engine not ready")
graph = brain.graph
stats = graph.get_stats()
intelligence = graph.get_intelligence_score()
db_stats = graph.memory.get_db_stats()
thinker_metrics = brain.thinker.metrics
# Node type distribution
type_counts = {}
for node in graph.nodes.values():
type_counts[node.type] = type_counts.get(node.type, 0) + 1
# Edge relation distribution
relation_counts = {}
for edge in graph.edges.values():
relation_counts[edge.relation] = relation_counts.get(edge.relation, 0) + 1
# Source distribution
source_counts = {"data": 0, "inferred": 0, "user_chat": 0}
for node in graph.nodes.values():
source_counts[node.source] = source_counts.get(node.source, 0) + 1
# Top weighted nodes
top_nodes = sorted(
graph.nodes.values(),
key=lambda n: n.weight * n.connections,
reverse=True
)[:20]
return JSONResponse(content={
"intelligence_score": round(intelligence, 2),
"overview": stats,
"node_types": type_counts,
"edge_relations": relation_counts,
"node_sources": source_counts,
"top_nodes": [
{
"id": n.id,
"content": utils.truncate_text(n.content, 100),
"type": n.type,
"weight": round(n.weight, 3),
"connections": n.connections
}
for n in top_nodes
],
"thinker_metrics": thinker_metrics,
"database": db_stats,
"uptime_seconds": round(time.time() - startup_time, 0)
})
# ───────────────────────────────────────────────────
# GET /v1/models — Model List (OpenAI Compat)
# ───────────────────────────────────────────────────
@app.get("/v1/models")
async def list_models(
authorization: Optional[str] = Header(None, alias="Authorization")
):
"""
List available models.
OpenAI-compatible endpoint required by some clients.
"""
if not verify_api_key(authorization):
raise HTTPException(status_code=401, detail="Invalid API key")
intelligence = 0.0
if brain:
intelligence = brain.graph.get_intelligence_score()
return JSONResponse(content={
"object": "list",
"data": [
{
"id": "cogni-engine-v1",
"object": "model",
"created": int(startup_time),
"owned_by": "cogni-engine",
"permission": [],
"root": "cogni-engine-v1",
"parent": None,
"meta": {
"type": "self-evolving-knowledge-ai",
"intelligence_score": round(intelligence, 2)
}
}
]
})
# ───────────────────────────────────────────────────
# Root endpoint
# ───────────────────────────────────────────────────
@app.get("/")
async def root():
"""Root endpoint with basic info."""
uptime = time.time() - startup_time if startup_time else 0
info = {
"name": "Cogni-Engine v1",
"description": "Self-Evolving Knowledge AI",
"status": "running" if _ready else "starting",
"uptime": utils.format_duration(uptime),
"api_docs": "/docs",
"endpoints": {
"chat": "POST /v1/chat/completions",
"status": "GET /v1/status",
"upload": "POST /v1/data/upload",
"health": "GET /v1/health",
"stats": "GET /v1/graph/stats",
"models": "GET /v1/models"
}
}
if brain:
info["intelligence_score"] = round(
brain.graph.get_intelligence_score(), 2
)
info["thinking_cycles"] = brain.thinker.total_cycles
info["total_nodes"] = brain.graph.get_stats()["total_nodes"]
return JSONResponse(content=info)
# ═══════════════════════════════════════════════════════════
# SIGNAL HANDLING
# ═══════════════════════════════════════════════════════════
def handle_shutdown_signal(signum, frame):
"""Handle graceful shutdown on SIGTERM/SIGINT."""
print(f"\n[SIGNAL] Received signal {signum}. Initiating shutdown...")
if brain:
brain.shutdown()
sys.exit(0)
# ═══════════════════════════════════════════════════════════
# IMPORTS NEEDED BY THIS FILE
# ═══════════════════════════════════════════════════════════
import utils # noqa: E402 — needed for utility functions used in endpoints
# ═══════════════════════════════════════════════════════════
# ENTRY POINT
# ═══════════════════════════════════════════════════════════
if __name__ == "__main__":
# Register signal handlers
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
print(f"\n[MAIN] Starting Cogni-Engine on port {config.PORT}...")
print(f"[MAIN] API Key: {'SET' if os.environ.get('API_KEY') else 'AUTO-GENERATED'}")
print(f"[MAIN] API Docs: http://localhost:{config.PORT}/docs\n")
uvicorn.run(
app,
host="0.0.0.0",
port=config.PORT,
log_level=config.LOG_LEVEL.lower(),
access_log=config.LOG_API_REQUESTS,
timeout_keep_alive=65,
# Single worker — thinker thread runs in-process
workers=1
)