""" Cogni-Engine v1 — Cognitive Engine (Brain) The central coordinator that connects all components. Processes user requests through three stages: 1. UNDERSTAND — Parse intent, extract entities, build query 2. REASON — Search graph, traverse, build reasoning chains 3. RESPOND — Generate natural language from chains Also manages conversation sessions and knowledge extraction. """ import time import threading from typing import List, Dict, Optional, Tuple, Any import numpy as np import config import utils from knowledge import KnowledgeGraph, Node, Edge, ReasoningChain from language import LanguageGenerator from thinker import Thinker # ═══════════════════════════════════════════════════════════ # CONVERSATION SESSION # ═══════════════════════════════════════════════════════════ class Session: """ Tracks a multi-turn conversation. Maintains context window for coherent dialogue. """ __slots__ = [ 'id', 'messages', 'context_entities', 'context_node_ids', 'system_prompt', 'personality', 'last_active', 'turn_count' ] def __init__(self, session_id: str, system_prompt: str = ""): self.id = session_id self.messages: List[dict] = [] # [{role, content}] self.context_entities: List[str] = [] self.context_node_ids: List[str] = [] self.system_prompt = system_prompt self.personality = utils.parse_system_prompt(system_prompt) self.last_active = time.time() self.turn_count = 0 def add_message(self, role: str, content: str): """Add a message to conversation history.""" self.messages.append({"role": role, "content": content}) # Trim to context window max_messages = config.CONTEXT_WINDOW_TURNS * 2 # user + assistant pairs if len(self.messages) > max_messages: # Keep system prompt awareness but trim old messages self.messages = self.messages[-max_messages:] self.last_active = time.time() self.turn_count += 1 def add_context_entities(self, entities: List[str]): """Add discovered entities to session context.""" for e in entities: if e not in self.context_entities: self.context_entities.append(e) # Keep last N entities self.context_entities = self.context_entities[-30:] def add_context_nodes(self, node_ids: List[str]): """Add discovered node IDs to session context.""" for nid in node_ids: if nid not in self.context_node_ids: self.context_node_ids.append(nid) self.context_node_ids = self.context_node_ids[-50:] def get_context_text(self) -> str: """Get combined context from recent messages.""" recent = self.messages[-config.CONTEXT_WINDOW_TURNS * 2:] parts = [] for msg in recent: if msg["role"] == "user": parts.append(msg["content"]) return " ".join(parts) def is_expired(self) -> bool: """Check if session has expired.""" return (time.time() - self.last_active) > (config.SESSION_TIMEOUT_MINUTES * 60) # ═══════════════════════════════════════════════════════════ # SESSION MANAGER # ═══════════════════════════════════════════════════════════ class SessionManager: """Manages active conversation sessions.""" def __init__(self): self._sessions: Dict[str, Session] = {} self._lock = threading.Lock() def get_or_create( self, session_id: str = None, system_prompt: str = "" ) -> Session: """Get existing session or create new one.""" with self._lock: if session_id and session_id in self._sessions: session = self._sessions[session_id] session.last_active = time.time() # Update system prompt if changed if system_prompt and system_prompt != session.system_prompt: session.system_prompt = system_prompt session.personality = utils.parse_system_prompt(system_prompt) return session # Create new session new_id = session_id or config.generate_session_id() session = Session(new_id, system_prompt) self._sessions[new_id] = session return session def remove(self, session_id: str): """Remove a session.""" with self._lock: self._sessions.pop(session_id, None) def cleanup_expired(self): """Remove expired sessions.""" with self._lock: expired = [ sid for sid, s in self._sessions.items() if s.is_expired() ] for sid in expired: del self._sessions[sid] if expired: print(f"[SESSION] Cleaned up {len(expired)} expired sessions") @property def active_count(self) -> int: return len(self._sessions) # ═══════════════════════════════════════════════════════════ # BRAIN — MAIN COGNITIVE ENGINE # ═══════════════════════════════════════════════════════════ class Brain: """ Central cognitive engine. Coordinates understanding, reasoning, and response generation. Usage: brain = Brain(graph, thinker) response = brain.process_message(messages, session_id) """ def __init__(self, graph: KnowledgeGraph, thinker: Thinker): self.graph = graph self.thinker = thinker self.language = LanguageGenerator() self.sessions = SessionManager() # Processing stats self._total_requests = 0 self._total_response_time = 0.0 self._avg_confidence = 0.0 # ─────────────────────────────────────────────────── # MAIN ENTRY POINT # ─────────────────────────────────────────────────── def process_message( self, messages: List[dict], session_id: str = None, temperature: float = None ) -> dict: """ Process a chat completion request. Args: messages: List of {role, content} messages (OpenAI format) session_id: Optional session ID for multi-turn temperature: Response variation (0-1) Returns: { "response": str, # The generated response text "session_id": str, # Session ID for continuity "confidence": float, # Response confidence "reasoning_depth": int, # How deep the reasoning went "nodes_traversed": int, # How many nodes were visited "chains_used": int, # How many reasoning chains "thinking_cycles": int, # Total thinker cycles so far "processing_time_ms": int # How long this took } """ start_time = time.time() if temperature is None: temperature = config.DEFAULT_TEMPERATURE # ── Extract system prompt and user message ── system_prompt = "" user_message = "" conversation_history = [] for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": system_prompt = content elif role == "user": user_message = content conversation_history.append(msg) elif role == "assistant": conversation_history.append(msg) if not user_message: return self._empty_response(session_id, start_time) # ── Get or create session ── session = self.sessions.get_or_create(session_id, system_prompt) session.add_message("user", user_message) # ── STAGE 1: UNDERSTAND ── query_analysis = self._understand( user_message, session, temperature ) # ── STAGE 2: REASON ── reasoning_result = self._reason(query_analysis, session) # ── STAGE 3: RESPOND ── response_text = self._respond( reasoning_result, query_analysis, session ) # ── Post-processing ── session.add_message("assistant", response_text) # Extract knowledge from user message (async-safe) self._extract_user_knowledge(user_message) # Reinforce used chains and edges self._reinforce_used_knowledge(reasoning_result) # Update stats processing_time = time.time() - start_time self._total_requests += 1 self._total_response_time += processing_time result = { "response": response_text, "session_id": session.id, "confidence": reasoning_result.get("confidence", 0.0), "reasoning_depth": reasoning_result.get("max_depth", 0), "nodes_traversed": reasoning_result.get("nodes_traversed", 0), "chains_used": len(reasoning_result.get("chains", [])), "thinking_cycles": self.thinker.total_cycles, "processing_time_ms": int(processing_time * 1000) } if config.LOG_API_REQUESTS: print( f"[BRAIN] Request processed: " f"confidence={result['confidence']:.2f}, " f"depth={result['reasoning_depth']}, " f"nodes={result['nodes_traversed']}, " f"chains={result['chains_used']}, " f"time={result['processing_time_ms']}ms" ) return result # ═══════════════════════════════════════════════════ # STAGE 1: UNDERSTAND # ═══════════════════════════════════════════════════ def _understand( self, message: str, session: Session, temperature: float ) -> dict: """ Parse user message to understand what is being asked. Returns query_analysis dict: { intent: str, intent_confidence: float, entities: [str], keywords: [str], query_vector: np.ndarray, temperature: float, is_followup: bool, context_entities: [str], query_text: str, confidence: float (initial, from intent detection) } """ # ── Detect intent ── intent, intent_confidence = utils.detect_intent(message) # ── Check if this is a follow-up ── is_followup = self._is_followup(message, session) if is_followup and intent == "general": intent = "followup" # ── Extract entities ── entities = utils.extract_entities_simple(message) # ── Extract keywords ── keywords = utils.extract_keywords(message, max_keywords=15) # If no entities found, use keywords as entities if not entities and keywords: entities = keywords[:5] # ── Build context-enriched query ── query_parts = [message] if is_followup and session.context_entities: # Add recent context for follow-up questions query_parts.extend(session.context_entities[-5:]) query_text = " ".join(query_parts) # ── Compute query vector ── query_vector = utils.text_to_vector_tfidf(query_text) # ── If follow-up, blend with previous context vector ── if is_followup and session.context_node_ids: context_vectors = [] for nid in session.context_node_ids[-5:]: node = self.graph.get_node(nid) if node: context_vectors.append(node.vector) if context_vectors: context_mean = utils.vector_mean(context_vectors) # Blend: 70% current query + 30% context query_vector = utils.normalize( query_vector * 0.7 + context_mean * 0.3 ) # ── Update session context ── session.add_context_entities(entities) return { "intent": intent, "intent_confidence": intent_confidence, "entities": entities, "keywords": keywords, "query_vector": query_vector, "temperature": temperature, "is_followup": is_followup, "context_entities": list(session.context_entities), "query_text": query_text, "confidence": intent_confidence } def _is_followup(self, message: str, session: Session) -> bool: """Detect if message is a follow-up to previous conversation.""" if session.turn_count == 0: return False message_lower = message.lower().strip() # Short messages after conversation likely follow-ups if len(message_lower.split()) <= 5 and session.turn_count > 0: return True # Pronoun references followup_indicators = [ "itu", "tersebut", "nya", "dia", "mereka", "lanjutkan", "jelaskan lagi", "maksudnya", "terus", "lalu", "bagaimana dengan", "it", "that", "they", "them", "those", "what about", "how about", "tell me more", "continue", "go on", "elaborate", "dan", "juga", "selain itu", ] for indicator in followup_indicators: if indicator in message_lower: return True return False # ═══════════════════════════════════════════════════ # STAGE 2: REASON # ═══════════════════════════════════════════════════ def _reason(self, query_analysis: dict, session: Session) -> dict: """ Search knowledge graph and build reasoning chains. Returns reasoning_result dict: { chains: [ReasoningChain], matched_nodes: [(Node, float)], confidence: float, max_depth: int, nodes_traversed: int, direct_nodes: [Node], direct_edges: [Edge] } """ query_vector = query_analysis["query_vector"] entities = query_analysis["entities"] intent = query_analysis["intent"] temperature = query_analysis["temperature"] # ── Step 1: Find matching nodes ── matched_nodes = self._find_relevant_nodes( query_vector, entities, session ) if not matched_nodes: return { "chains": [], "matched_nodes": [], "confidence": 0.0, "max_depth": 0, "nodes_traversed": 0, "direct_nodes": [], "direct_edges": [] } # Track traversed nodes all_traversed_ids = set() # ── Step 2: Build reasoning chains ── start_node_ids = [node.id for node, _ in matched_nodes[:5]] all_traversed_ids.update(start_node_ids) chains = self.graph.build_reasoning_chains( start_nodes=start_node_ids, max_chains=config.MAX_CHAINS_PER_RESPONSE, max_depth=config.MAX_TRAVERSAL_DEPTH ) # Track all nodes in chains for chain in chains: for item_id in chain.path: if item_id in self.graph.nodes: all_traversed_ids.add(item_id) # ── Step 3: Intent-specific reasoning ── if intent == "relation" and len(entities) >= 2: relation_chains = self._reason_relation(entities, temperature) chains.extend(relation_chains) elif intent == "compare" and len(entities) >= 2: compare_chains = self._reason_comparison(entities, temperature) chains.extend(compare_chains) elif intent == "cause": cause_chains = self._reason_causation(start_node_ids, temperature) chains.extend(cause_chains) # ── Step 4: Deduplicate and sort chains ── seen_chain_ids = set() unique_chains = [] for c in chains: if c.id not in seen_chain_ids: seen_chain_ids.add(c.id) unique_chains.append(c) unique_chains.sort(key=lambda c: c.confidence, reverse=True) unique_chains = unique_chains[:config.MAX_CHAINS_PER_RESPONSE] # ── Step 5: Calculate overall confidence ── if unique_chains: chain_confidences = [c.confidence for c in unique_chains] max_match_sim = matched_nodes[0][1] if matched_nodes else 0.0 overall_confidence = ( max(chain_confidences) * 0.4 + (sum(chain_confidences) / len(chain_confidences)) * 0.3 + max_match_sim * 0.3 ) else: overall_confidence = matched_nodes[0][1] * 0.4 if matched_nodes else 0.0 overall_confidence = utils.clamp(overall_confidence, 0.0, 1.0) # ── Step 6: Calculate max reasoning depth ── max_depth = 0 for chain in unique_chains: node_count = sum(1 for i in chain.path if i in self.graph.nodes) max_depth = max(max_depth, node_count) # ── Collect direct nodes and edges for fallback generation ── direct_nodes = [node for node, _ in matched_nodes[:10]] direct_edges = [] for node in direct_nodes: direct_edges.extend(self.graph.get_all_edges_for(node.id)[:5]) # Update session context with discovered nodes session.add_context_nodes(list(all_traversed_ids)[:20]) return { "chains": unique_chains, "matched_nodes": matched_nodes, "confidence": round(overall_confidence, 4), "max_depth": max_depth, "nodes_traversed": len(all_traversed_ids), "direct_nodes": direct_nodes, "direct_edges": direct_edges } def _find_relevant_nodes( self, query_vector: np.ndarray, entities: List[str], session: Session ) -> List[Tuple[Node, float]]: """ Find nodes relevant to the query using multiple strategies. Combines vector similarity with entity matching. """ all_matches: Dict[str, Tuple[Node, float]] = {} # ── Strategy 1: Vector similarity search ── vector_matches = self.graph.find_similar_nodes( query_vector, top_k=config.MAX_NODES_PER_SEARCH, min_similarity=0.2 ) for node, sim in vector_matches: if node.id not in all_matches or sim > all_matches[node.id][1]: all_matches[node.id] = (node, sim) # ── Strategy 2: Entity exact/fuzzy match ── for entity in entities: # Exact match exact_node = self.graph.get_node_by_content(entity) if exact_node: # Boost exact matches existing_sim = all_matches.get(exact_node.id, (None, 0))[1] all_matches[exact_node.id] = (exact_node, max(existing_sim, 0.95)) # Fuzzy match via vector entity_vector = utils.text_to_vector_tfidf(entity) entity_matches = self.graph.find_similar_nodes( entity_vector, top_k=5, min_similarity=0.4 ) for node, sim in entity_matches: # Boost because it matched an entity directly boosted_sim = min(sim * 1.2, 1.0) if node.id not in all_matches or boosted_sim > all_matches[node.id][1]: all_matches[node.id] = (node, boosted_sim) # ── Strategy 3: Context-based (for follow-ups) ── if session.context_node_ids: for ctx_nid in session.context_node_ids[-5:]: ctx_node = self.graph.get_node(ctx_nid) if ctx_node: sim = utils.cosine_similarity(query_vector, ctx_node.vector) if sim > 0.3: # Context nodes get moderate boost boosted = min(sim * 1.1, 1.0) if ctx_nid not in all_matches or boosted > all_matches[ctx_nid][1]: all_matches[ctx_nid] = (ctx_node, boosted) # Sort by similarity descending results = sorted( all_matches.values(), key=lambda x: x[1], reverse=True ) return results[:config.MAX_NODES_PER_SEARCH] def _reason_relation( self, entities: List[str], temperature: float ) -> List[ReasoningChain]: """Find relationship between two entities.""" if len(entities) < 2: return [] chains = [] # Find nodes for both entities node_a = self.graph.get_node_by_content(entities[0]) node_b = self.graph.get_node_by_content(entities[1]) if not node_a: matches = self.graph.find_similar_to_text(entities[0], top_k=1, min_similarity=0.4) if matches: node_a = matches[0][0] if not node_b: matches = self.graph.find_similar_to_text(entities[1], top_k=1, min_similarity=0.4) if matches: node_b = matches[0][0] if not node_a or not node_b: return [] # Find paths between them paths = self.graph.find_paths( node_a.id, node_b.id, max_depth=config.MAX_TRAVERSAL_DEPTH, max_paths=3 ) for path in paths: confidence = self._score_path(path) chain = ReasoningChain( chain_id=config.generate_chain_id(path), path=path, conclusion=f"{entities[0]} → {entities[1]}", confidence=confidence ) chains.append(chain) # Also try reverse direction reverse_paths = self.graph.find_paths( node_b.id, node_a.id, max_depth=config.MAX_TRAVERSAL_DEPTH, max_paths=2 ) for path in reverse_paths: confidence = self._score_path(path) chain = ReasoningChain( chain_id=config.generate_chain_id(path), path=path, conclusion=f"{entities[1]} → {entities[0]}", confidence=confidence ) chains.append(chain) return chains def _reason_comparison( self, entities: List[str], temperature: float ) -> List[ReasoningChain]: """Build comparison reasoning between entities.""" if len(entities) < 2: return [] chains = [] for entity in entities[:2]: matches = self.graph.find_similar_to_text( entity, top_k=1, min_similarity=0.3 ) if matches: node = matches[0][0] entity_chains = self.graph.build_reasoning_chains( [node.id], max_chains=2, max_depth=4 ) chains.extend(entity_chains) return chains def _reason_causation( self, start_node_ids: List[str], temperature: float ) -> List[ReasoningChain]: """Follow causal chains from starting nodes.""" chains = [] for nid in start_node_ids[:3]: # Follow "causes" edges specifically current = nid path = [current] visited = {current} for _ in range(config.MAX_TRAVERSAL_DEPTH): cause_edges = [ e for e in self.graph.get_edges_from(current) if e.relation in ("causes", "leads_to", "results_in") and e.to_node not in visited ] if not cause_edges: break best = max(cause_edges, key=lambda e: e.confidence) path.append(best.id) path.append(best.to_node) visited.add(best.to_node) current = best.to_node if len(path) >= 3: confidence = self._score_path(path) chain = ReasoningChain( chain_id=config.generate_chain_id(path), path=path, conclusion="causal_chain", confidence=confidence ) chains.append(chain) return chains def _score_path(self, path: list) -> float: """Score a path for confidence.""" edge_scores = [] for item_id in path: edge = self.graph.get_edge(item_id) if edge: edge_scores.append(edge.weight * edge.confidence) if not edge_scores: return 0.3 avg = sum(edge_scores) / len(edge_scores) length_penalty = 1.0 / (1.0 + 0.1 * len(edge_scores)) return utils.clamp(avg * length_penalty, 0.0, 1.0) # ═══════════════════════════════════════════════════ # STAGE 3: RESPOND # ═══════════════════════════════════════════════════ def _respond( self, reasoning_result: dict, query_analysis: dict, session: Session ) -> str: """ Generate natural language response from reasoning results. Uses compositional language generation. """ chains = reasoning_result.get("chains", []) confidence = reasoning_result.get("confidence", 0.0) direct_nodes = reasoning_result.get("direct_nodes", []) direct_edges = reasoning_result.get("direct_edges", []) personality = session.personality lang = personality.get("language", config.DEFAULT_LANGUAGE) # Merge confidence from reasoning into query analysis query_analysis_with_confidence = dict(query_analysis) query_analysis_with_confidence["confidence"] = confidence graph_stats = self.graph.get_stats() # ── Primary path: Chain-based generation ── if chains: response = self.language.generate_response( chains=chains, query_analysis=query_analysis_with_confidence, personality=personality, all_nodes=self.graph.nodes, all_edges=self.graph.edges, graph_stats=graph_stats ) if response and response.strip(): return response # ── Fallback: Direct node-based generation ── if direct_nodes: response = self.language.generate_from_direct_nodes( nodes=direct_nodes, edges=direct_edges, query_analysis=query_analysis_with_confidence, personality=personality, all_nodes=self.graph.nodes, lang=lang ) if response and response.strip(): return response # ── Last resort: Honest uncertainty ── return self._generate_uncertainty_response( query_analysis, personality, graph_stats, lang ) def _generate_uncertainty_response( self, query_analysis: dict, personality: dict, graph_stats: dict, lang: str ) -> str: """ Generate an honest uncertainty response. Built compositionally — NOT a static template. """ # Force uncertainty structure query_analysis_low = dict(query_analysis) query_analysis_low["confidence"] = 0.1 # Create minimal chains list (empty) response = self.language.generate_response( chains=[], query_analysis=query_analysis_low, personality=personality, all_nodes=self.graph.nodes, all_edges=self.graph.edges, graph_stats=graph_stats ) if response and response.strip(): return response # Absolute last fallback (should rarely reach here) entities = query_analysis.get("entities", []) topic = ", ".join(entities[:2]) if entities else "topik tersebut" rng = utils.seeded_random(utils.variation_seed()) if lang == "id": options = [ f"Saya belum memiliki informasi yang cukup mengenai {topic}. " f"Pengetahuan saya akan berkembang seiring waktu dan dengan " f"penambahan data yang relevan.", f"Mengenai {topic}, pemahaman saya masih terbatas saat ini. " f"Dengan berjalannya waktu dan penambahan informasi, saya akan " f"mampu membahas topik ini dengan lebih baik.", f"Topik {topic} belum tercakup secara memadai dalam pengetahuan " f"saya saat ini. Saya terus belajar dan memperluas pemahaman " f"saya secara mandiri.", ] else: options = [ f"I don't have sufficient information about {topic} yet. " f"My knowledge grows over time and with the addition of " f"relevant data.", f"Regarding {topic}, my understanding is currently limited. " f"As time goes on and more information is added, I'll be " f"able to discuss this topic more thoroughly.", f"The topic of {topic} isn't yet well covered in my " f"knowledge base. I'm continuously learning and expanding " f"my understanding autonomously.", ] return rng.choice(options) # ─────────────────────────────────────────────────── # POST-PROCESSING # ─────────────────────────────────────────────────── def _extract_user_knowledge(self, message: str): """ Extract knowledge from user message. Delegates to thinker for knowledge extraction. Does NOT store raw message. """ try: self.thinker.extract_from_user_message(message) except Exception as e: if config.LOG_THINKING_DETAILS: print(f"[BRAIN] Knowledge extraction error: {e}") def _reinforce_used_knowledge(self, reasoning_result: dict): """Reinforce edges and chains that were used in this response.""" chains = reasoning_result.get("chains", []) for chain in chains: # Save and reinforce chain self.graph.save_chain(chain) self.graph.reinforce_chain(chain.id) # ─────────────────────────────────────────────────── # UTILITY RESPONSES # ─────────────────────────────────────────────────── def _empty_response(self, session_id: str, start_time: float) -> dict: """Return response for empty/invalid input.""" processing_time = time.time() - start_time return { "response": "", "session_id": session_id or "", "confidence": 0.0, "reasoning_depth": 0, "nodes_traversed": 0, "chains_used": 0, "thinking_cycles": self.thinker.total_cycles, "processing_time_ms": int(processing_time * 1000) } # ─────────────────────────────────────────────────── # STATUS & STATS # ─────────────────────────────────────────────────── def get_status(self) -> dict: """Get comprehensive brain status.""" graph_stats = self.graph.get_stats() thinker_status = self.thinker.get_status() intelligence_score = self.graph.get_intelligence_score() avg_response_time = ( (self._total_response_time / self._total_requests * 1000) if self._total_requests > 0 else 0 ) return { "alive": True, "intelligence_score": round(intelligence_score, 2), # Graph stats "graph": { "total_nodes": graph_stats["total_nodes"], "total_edges": graph_stats["total_edges"], "total_chains": graph_stats["total_chains"], "inferred_nodes": graph_stats["inferred_nodes"], "inferred_edges": graph_stats["inferred_edges"], "max_abstraction_depth": graph_stats["max_abstraction_depth"], "avg_connections": graph_stats["avg_connections"], "avg_confidence": graph_stats["avg_confidence"], "inference_ratio": graph_stats["inference_ratio"], }, # Thinker stats "thinker": { "running": thinker_status["running"], "current_phase": thinker_status["current_phase"], "total_cycles": thinker_status["total_cycles"], "interval_seconds": thinker_status["interval_seconds"], "metrics": thinker_status["metrics"], }, # API stats "api": { "total_requests": self._total_requests, "avg_response_time_ms": round(avg_response_time, 1), "active_sessions": self.sessions.active_count, }, # Memory stats "memory": self.graph.memory.get_db_stats(), } def cleanup(self): """Periodic cleanup tasks.""" self.sessions.cleanup_expired() def shutdown(self): """Graceful shutdown.""" print("[BRAIN] Shutting down...") self.thinker.stop() self.graph.force_sync() self.graph.memory.shutdown() print("[BRAIN] Shutdown complete.")