""" Cogni-Engine v1 — 24/7 Thinking Loop The autonomous cognitive process that never stops. 8 phases of continuous reasoning that make the AI smarter over time. Phases: 1. INGEST — Scan /data/ folder, parse new JSONL files 2. CONNECT — Find hidden connections between nodes via similarity 3. INFER — Transitive & analogical inference to discover new knowledge 4. ABSTRACT — Cluster similar nodes into higher-level abstractions 5. STRENGTHEN/WEAKEN — Reinforce used edges, decay unused ones 6. COMPRESS — Merge redundant nodes, prune dead edges 7. VALIDATE — Check logical consistency, resolve contradictions 8. SELF-QUESTION — Generate and answer internal questions to find gaps """ import os import json import time import threading import traceback from typing import List, Dict, Optional, Tuple import numpy as np import config import utils from knowledge import KnowledgeGraph, Node, Edge, ReasoningChain # ═══════════════════════════════════════════════════════════ # THINKER ENGINE # ═══════════════════════════════════════════════════════════ class Thinker: """ Autonomous thinking engine. Runs in a background thread, continuously processing and enriching the knowledge graph. """ def __init__(self, graph: KnowledgeGraph): self.graph = graph self._thread: Optional[threading.Thread] = None self._running = False self._paused = False # Thinking state self._cycle_count = 0 self._total_cycles = 0 self._current_phase = "init" self._phase_index = 0 self._interval = config.THINKING_INTERVAL_FAST self._operations_this_cycle = 0 # File tracking self._file_checksums: Dict[str, str] = {} # Phase definitions (ordered) self._phases = [ ("ingest", self._phase_ingest), ("connect", self._phase_connect), ("infer", self._phase_infer), ("abstract", self._phase_abstract), ("strengthen", self._phase_strengthen_weaken), ("compress", self._phase_compress), ("validate", self._phase_validate), ("self_question", self._phase_self_question), ] # Metrics self._metrics = { "nodes_ingested": 0, "edges_ingested": 0, "connections_found": 0, "inferences_made": 0, "abstractions_created": 0, "edges_reinforced": 0, "edges_decayed": 0, "nodes_merged": 0, "edges_pruned": 0, "nodes_pruned": 0, "contradictions_resolved": 0, "self_questions_asked": 0, "self_questions_answered": 0, } # ─────────────────────────────────────────────────── # LIFECYCLE # ─────────────────────────────────────────────────── def start(self): """Start the thinking loop in a background thread.""" if self._running: print("[THINKER] Already running.") return # Load previous state self._load_state() self._running = True self._thread = threading.Thread( target=self._thinking_loop, name="CogniThinker", daemon=True ) self._thread.start() print(f"[THINKER] Started. Resuming from cycle {self._total_cycles}.") def stop(self): """Stop the thinking loop gracefully.""" if not self._running: return print("[THINKER] Stopping...") self._running = False if self._thread: self._thread.join(timeout=30) self._save_state() print("[THINKER] Stopped.") def pause(self): """Pause thinking (for heavy API load).""" self._paused = True def resume(self): """Resume thinking.""" self._paused = False @property def is_running(self) -> bool: return self._running @property def current_phase(self) -> str: return self._current_phase @property def total_cycles(self) -> int: return self._total_cycles @property def metrics(self) -> dict: return dict(self._metrics) def get_status(self) -> dict: """Get detailed thinker status.""" return { "running": self._running, "paused": self._paused, "current_phase": self._current_phase, "cycle_count": self._cycle_count, "total_cycles": self._total_cycles, "interval_seconds": self._interval, "operations_last_cycle": self._operations_this_cycle, "metrics": dict(self._metrics) } # ─────────────────────────────────────────────────── # STATE PERSISTENCE # ─────────────────────────────────────────────────── def _load_state(self): """Load thinking state from DB.""" state = self.graph.memory.load_thinking_state() self._total_cycles = state.get("total_cycles", 0) self._cycle_count = state.get("current_cycle", 0) self._current_phase = state.get("phase", "init") saved_metrics = state.get("metrics", {}) if saved_metrics: for key in self._metrics: if key in saved_metrics: self._metrics[key] = saved_metrics[key] # Load file checksums self._file_checksums = self.graph.memory.load_file_checksums() def _save_state(self): """Save thinking state to DB.""" self.graph.memory.save_thinking_state({ "current_cycle": self._cycle_count, "total_cycles": self._total_cycles, "cursor_position": "", "phase": self._current_phase, "metrics": dict(self._metrics) }) # ─────────────────────────────────────────────────── # MAIN LOOP # ─────────────────────────────────────────────────── def _thinking_loop(self): """ Main thinking loop. Runs continuously until stopped. Cycles through all 8 phases, adapting speed based on activity. """ print("[THINKER] Thinking loop started.") while self._running: try: # Wait if paused if self._paused: time.sleep(1) continue # Execute current phase self._operations_this_cycle = 0 phase_name, phase_func = self._phases[self._phase_index] self._current_phase = phase_name try: phase_func() except Exception as e: print(f"[THINKER] Error in phase '{phase_name}': {e}") if config.LOG_THINKING_DETAILS: traceback.print_exc() # Advance to next phase self._phase_index = (self._phase_index + 1) % len(self._phases) # Count cycles (one full rotation = 1 cycle) if self._phase_index == 0: self._cycle_count += 1 self._total_cycles += 1 # Periodic state save if self._total_cycles % config.SYNC_INTERVAL_CYCLES == 0: self._save_state() self.graph.sync() # Adaptive speed self._adapt_speed() # Log progress periodically if self._total_cycles % 100 == 0 and config.LOG_THINKING_DETAILS: stats = self.graph.get_stats() score = self.graph.get_intelligence_score() print( f"[THINKER] Cycle {self._total_cycles}: " f"nodes={stats['total_nodes']}, " f"edges={stats['total_edges']}, " f"inferred={stats['inferred_edges']}, " f"score={score:.2f}" ) # Sleep between phases time.sleep(self._interval / len(self._phases)) except Exception as e: print(f"[THINKER] Loop error: {e}") traceback.print_exc() time.sleep(5) # Recovery pause print("[THINKER] Thinking loop ended.") def _adapt_speed(self): """Adjust thinking speed based on activity.""" if self._operations_this_cycle > config.THINKING_STABILITY_THRESHOLD: # Active: think faster self._interval = max( config.THINKING_INTERVAL_FAST, self._interval * 0.9 ) else: # Stable: think slower self._interval = min( config.THINKING_INTERVAL_SLOW, self._interval * 1.1 ) # ═══════════════════════════════════════════════════ # PHASE 1: INGEST # ═══════════════════════════════════════════════════ def _phase_ingest(self): """ Scan /data/ folder for new or changed JSONL files. Parse entries and create nodes + edges in graph. """ if not os.path.exists(config.DATA_DIR): return files = [] for fname in os.listdir(config.DATA_DIR): if any(fname.endswith(ext) for ext in config.SUPPORTED_DATA_EXTENSIONS): files.append(fname) if not files: return for fname in files: filepath = os.path.join(config.DATA_DIR, fname) try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() except Exception as e: print(f"[THINKER/INGEST] Error reading {fname}: {e}") continue # Check if file has changed checksum = utils.hash_file_content(content) if self._file_checksums.get(fname) == checksum: continue # File unchanged, skip print(f"[THINKER/INGEST] Processing file: {fname}") lines = content.strip().split('\n') processed = 0 for line_num, line in enumerate(lines): if processed >= config.MAX_LINES_PER_INGEST: break line = line.strip() if not line: continue try: entry = json.loads(line) except json.JSONDecodeError: continue self._ingest_entry(entry, source=fname) processed += 1 # Mark file as processed self._file_checksums[fname] = checksum self.graph.memory.save_file_checksum(fname, checksum, processed) self._operations_this_cycle += processed print(f"[THINKER/INGEST] Processed {processed} entries from {fname}") def _ingest_entry(self, entry: dict, source: str = "data"): """ Ingest a single data entry into the knowledge graph. Creates nodes and edges based on entry type and fields. """ entry_type = entry.get("type", "fact") content = entry.get("content", "").strip() if not content: return tags = entry.get("tags", []) confidence = entry.get("confidence", config.DATA_KNOWLEDGE_CONFIDENCE) domain = entry.get("domain", "") related = entry.get("related", []) # ── Create main content node ── main_node = self.graph.add_node( content=content, node_type=self._map_entry_type_to_node_type(entry_type), source="data", weight=confidence, tags=tags ) if not main_node: return self._metrics["nodes_ingested"] += 1 # ── Handle domain as a concept node ── if domain: domain_node = self.graph.add_node( content=domain, node_type="concept", source="data", weight=0.8 ) if domain_node: self.graph.add_edge( from_id=main_node.id, to_id=domain_node.id, relation="part_of", confidence=0.8, source="data" ) self._metrics["edges_ingested"] += 1 # ── Handle related topics ── for rel_topic in related: rel_node = self.graph.add_node( content=rel_topic, node_type="concept", source="data", weight=0.7 ) if rel_node: self.graph.add_edge( from_id=main_node.id, to_id=rel_node.id, relation="related_to", confidence=0.7, source="data" ) self._metrics["edges_ingested"] += 1 # ── Type-specific handling ── self._ingest_type_specific(entry, main_node, entry_type) def _ingest_type_specific(self, entry: dict, main_node: Node, entry_type: str): """Handle type-specific fields for data entries.""" # ── relation type: explicit from/to ── if entry_type == "relation": from_content = entry.get("from", "") to_content = entry.get("to", "") relation = entry.get("relation", "related_to") if from_content and to_content: from_node = self.graph.add_node( content=from_content, node_type="entity", source="data" ) to_node = self.graph.add_node( content=to_content, node_type="entity", source="data" ) if from_node and to_node: self.graph.add_edge( from_id=from_node.id, to_id=to_node.id, relation=relation, confidence=entry.get("confidence", 0.9), source="data" ) self._metrics["edges_ingested"] += 1 # ── definition / term: term node + defined_as edge ── elif entry_type in ("definition", "term"): term = entry.get("term", "") if term: term_node = self.graph.add_node( content=term, node_type="entity", source="data" ) if term_node: self.graph.add_edge( from_id=term_node.id, to_id=main_node.id, relation="defined_as", confidence=0.95, source="data" ) self._metrics["edges_ingested"] += 1 # ── cause_effect: cause → effect ── elif entry_type == "cause_effect": cause = entry.get("cause", "") effect = entry.get("effect", "") if cause and effect: cause_node = self.graph.add_node( content=cause, node_type="concept", source="data" ) effect_node = self.graph.add_node( content=effect, node_type="concept", source="data" ) if cause_node and effect_node: self.graph.add_edge( from_id=cause_node.id, to_id=effect_node.id, relation="causes", confidence=entry.get("confidence", 0.85), source="data" ) self._metrics["edges_ingested"] += 1 # ── hierarchy: parent → children ── elif entry_type == "hierarchy": parent = entry.get("parent", "") children = entry.get("children", []) if parent and children: parent_node = self.graph.add_node( content=parent, node_type="concept", source="data" ) if parent_node: for child in children: child_node = self.graph.add_node( content=child, node_type="entity", source="data" ) if child_node: self.graph.add_edge( from_id=child_node.id, to_id=parent_node.id, relation="is_a", confidence=0.9, source="data" ) self._metrics["edges_ingested"] += 1 # ── comparison: subject_a ↔ subject_b ── elif entry_type == "comparison": subj_a = entry.get("subject_a", "") subj_b = entry.get("subject_b", "") if subj_a and subj_b: node_a = self.graph.add_node( content=subj_a, node_type="entity", source="data" ) node_b = self.graph.add_node( content=subj_b, node_type="entity", source="data" ) if node_a and node_b: self.graph.add_edge( from_id=node_a.id, to_id=node_b.id, relation="related_to", confidence=0.8, source="data" ) self.graph.add_edge( from_id=node_b.id, to_id=node_a.id, relation="related_to", confidence=0.8, source="data" ) self._metrics["edges_ingested"] += 2 # ── qa: question → answer ── elif entry_type == "qa": question = entry.get("question", "") answer = entry.get("answer", "") if question and answer: q_node = self.graph.add_node( content=question, node_type="concept", source="data" ) a_node = self.graph.add_node( content=answer, node_type="fact", source="data" ) if q_node and a_node: self.graph.add_edge( from_id=q_node.id, to_id=a_node.id, relation="defined_as", confidence=0.9, source="data" ) self._metrics["edges_ingested"] += 1 # ── synonym: bidirectional synonym_of ── elif entry_type == "synonym": terms = entry.get("terms", []) for i in range(len(terms)): for j in range(i + 1, len(terms)): node_i = self.graph.add_node( content=terms[i], node_type="entity", source="data" ) node_j = self.graph.add_node( content=terms[j], node_type="entity", source="data" ) if node_i and node_j: self.graph.add_edge( from_id=node_i.id, to_id=node_j.id, relation="synonym_of", confidence=0.9, source="data" ) self.graph.add_edge( from_id=node_j.id, to_id=node_i.id, relation="synonym_of", confidence=0.9, source="data" ) self._metrics["edges_ingested"] += 2 # ── process / procedure: sequential steps ── elif entry_type in ("process", "procedure"): steps = entry.get("steps", []) title = entry.get("title", "") if title: title_node = self.graph.add_node( content=title, node_type="concept", source="data" ) if title_node: self.graph.add_edge( from_id=main_node.id, to_id=title_node.id, relation="defined_as", confidence=0.85, source="data" ) prev_step_node = None for step_text in steps: step_node = self.graph.add_node( content=step_text, node_type="fact", source="data" ) if step_node: self.graph.add_edge( from_id=step_node.id, to_id=main_node.id, relation="part_of", confidence=0.8, source="data" ) if prev_step_node: self.graph.add_edge( from_id=prev_step_node.id, to_id=step_node.id, relation="follows", confidence=0.9, source="data" ) self._metrics["edges_ingested"] += 1 prev_step_node = step_node self._metrics["edges_ingested"] += 1 # ── quote: author + content ── elif entry_type == "quote": author = entry.get("author", "") if author: author_node = self.graph.add_node( content=author, node_type="entity", source="data" ) if author_node: self.graph.add_edge( from_id=main_node.id, to_id=author_node.id, relation="created_by", confidence=0.9, source="data" ) self._metrics["edges_ingested"] += 1 # ── event: actors, location, date ── elif entry_type == "event": actors = entry.get("actors", []) location = entry.get("location", "") for actor in actors: actor_node = self.graph.add_node( content=actor, node_type="entity", source="data" ) if actor_node: self.graph.add_edge( from_id=actor_node.id, to_id=main_node.id, relation="related_to", confidence=0.85, source="data" ) self._metrics["edges_ingested"] += 1 if location: loc_node = self.graph.add_node( content=location, node_type="entity", source="data" ) if loc_node: self.graph.add_edge( from_id=main_node.id, to_id=loc_node.id, relation="located_in", confidence=0.85, source="data" ) self._metrics["edges_ingested"] += 1 # ── analogy: subject ↔ analogy ── elif entry_type == "analogy": subject = entry.get("subject", "") analogy_text = entry.get("analogy", "") if subject and analogy_text: subj_node = self.graph.add_node( content=subject, node_type="concept", source="data" ) ana_node = self.graph.add_node( content=analogy_text, node_type="concept", source="data" ) if subj_node and ana_node: self.graph.add_edge( from_id=subj_node.id, to_id=ana_node.id, relation="analogous_to", confidence=0.75, source="data" ) self._metrics["edges_ingested"] += 1 # ── Paragraph: extract keywords as connected entities ── elif entry_type == "paragraph": keywords = utils.extract_keywords(entry.get("content", ""), max_keywords=10) for kw in keywords: kw_node = self.graph.add_node( content=kw, node_type="concept", source="data", weight=0.6 ) if kw_node: self.graph.add_edge( from_id=main_node.id, to_id=kw_node.id, relation="related_to", confidence=0.6, source="data" ) self._metrics["edges_ingested"] += 1 def _map_entry_type_to_node_type(self, entry_type: str) -> str: """Map data entry type to graph node type.""" type_map = { "fact": "fact", "definition": "definition", "explanation": "fact", "description": "fact", "property": "fact", "statistic": "fact", "measurement": "fact", "term": "definition", "abbreviation": "definition", "jargon": "definition", "slang": "definition", "idiom": "definition", "synonym": "entity", "antonym": "entity", "quote": "fact", "rule": "fact", "example": "fact", "analogy": "concept", "opinion": "fact", "paragraph": "fact", "relation": "fact", "cause_effect": "fact", "comparison": "fact", "hierarchy": "concept", "composition": "concept", "dependency": "fact", "contradiction": "fact", "timeline": "fact", "process": "fact", "procedure": "fact", "event": "fact", "history": "fact", "change": "fact", "qa": "fact", } if entry_type.startswith("custom_"): return "fact" return type_map.get(entry_type, "fact") # ═══════════════════════════════════════════════════ # PHASE 2: CONNECT # ═══════════════════════════════════════════════════ def _phase_connect(self): """ Find hidden connections between nodes. Nodes with high vector similarity but no edge → create edge. Focuses on least-connected nodes first. """ candidates = self.graph.get_least_connected_nodes( limit=config.THINKING_BATCH_SIZE ) connections_made = 0 for node in candidates: if connections_made >= config.THINKING_BATCH_SIZE: break # Find similar nodes similar = self.graph.find_similar_to_node( node.id, top_k=10, min_similarity=config.SIMILARITY_THRESHOLD ) for similar_node, similarity in similar: # Skip if edge already exists (either direction) if self.graph.edge_exists(node.id, similar_node.id): continue if self.graph.edge_exists(similar_node.id, node.id): continue # Create new connection edge = self.graph.add_edge( from_id=node.id, to_id=similar_node.id, relation="similar_to", weight=similarity, confidence=similarity * 0.9, source="inferred" ) if edge: connections_made += 1 self._operations_this_cycle += 1 if connections_made > 0: self._metrics["connections_found"] += connections_made if config.LOG_THINKING_DETAILS: print(f"[THINKER/CONNECT] Found {connections_made} new connections") # ═══════════════════════════════════════════════════ # PHASE 3: INFER # ═══════════════════════════════════════════════════ def _phase_infer(self): """ Transitive and analogical inference. If A→B and B→C, maybe A→C. Discovers knowledge not present in original data. """ inferences_made = 0 # ── Transitive Inference ── inferences_made += self._transitive_inference() # ── Analogical Inference ── inferences_made += self._analogical_inference() if inferences_made > 0: self._metrics["inferences_made"] += inferences_made self._operations_this_cycle += inferences_made if config.LOG_THINKING_DETAILS: print(f"[THINKER/INFER] Made {inferences_made} inferences") def _transitive_inference(self) -> int: """ If A→B and B→C exist, infer A→C with decayed confidence. Limited per cycle to prevent explosion. """ count = 0 # Sample a batch of nodes to check node_ids = list(self.graph.nodes.keys()) if len(node_ids) > config.THINKING_BATCH_SIZE: sample_indices = np.random.choice( len(node_ids), config.THINKING_BATCH_SIZE, replace=False ) node_ids = [node_ids[i] for i in sample_indices] for node_a_id in node_ids: if count >= config.MAX_INFERENCES_PER_CYCLE: break edges_ab = self.graph.get_edges_from(node_a_id) for edge_ab in edges_ab: if count >= config.MAX_INFERENCES_PER_CYCLE: break node_b_id = edge_ab.to_node edges_bc = self.graph.get_edges_from(node_b_id) for edge_bc in edges_bc: node_c_id = edge_bc.to_node # Skip self-loops and existing edges if node_c_id == node_a_id: continue if self.graph.edge_exists(node_a_id, node_c_id): continue # Calculate inferred confidence inferred_confidence = ( edge_ab.confidence * edge_bc.confidence * config.INFERENCE_DECAY ) if inferred_confidence < config.INFERENCE_CONFIDENCE_MIN: continue # Determine inferred relation inferred_relation = self._infer_relation( edge_ab.relation, edge_bc.relation ) # Create inferred edge edge = self.graph.add_edge( from_id=node_a_id, to_id=node_c_id, relation=inferred_relation, weight=inferred_confidence, confidence=inferred_confidence, source="inferred" ) if edge: count += 1 if count >= config.MAX_INFERENCES_PER_CYCLE: break return count def _analogical_inference(self) -> int: """ If A relates to B like C relates to ?, find ? using vector arithmetic. A - B ≈ C - ? → ? ≈ C - A + B """ count = 0 # Find pairs with strong, specific relations strong_edges = [ e for e in self.graph.edges.values() if e.confidence > 0.7 and e.relation not in ("similar_to", "related_to") ] if len(strong_edges) < 2: return 0 # Sample pairs to compare sample_size = min(20, len(strong_edges)) sampled = np.random.choice(len(strong_edges), sample_size, replace=False) for i in sampled: if count >= config.MAX_INFERENCES_PER_CYCLE // 4: break edge = strong_edges[i] node_a = self.graph.get_node(edge.from_node) node_b = self.graph.get_node(edge.to_node) if not node_a or not node_b: continue # Find nodes similar to A (potential C candidates) similar_to_a = self.graph.find_similar_to_node( node_a.id, top_k=5, min_similarity=config.ANALOGICAL_SIMILARITY_MIN ) for node_c, sim_ac in similar_to_a: if node_c.id == node_b.id: continue # Vector arithmetic: ? ≈ C - A + B target_vector = utils.normalize( node_c.vector - node_a.vector + node_b.vector ) # Find nearest to target vector candidates = self.graph.find_similar_nodes( target_vector, top_k=3, min_similarity=config.ANALOGICAL_SIMILARITY_MIN, exclude_ids={node_a.id, node_b.id, node_c.id} ) for candidate_node, sim_score in candidates: if self.graph.edge_exists(node_c.id, candidate_node.id, edge.relation): continue inferred_confidence = sim_ac * sim_score * config.INFERENCE_DECAY if inferred_confidence < config.INFERENCE_CONFIDENCE_MIN: continue new_edge = self.graph.add_edge( from_id=node_c.id, to_id=candidate_node.id, relation=edge.relation, weight=inferred_confidence, confidence=inferred_confidence, source="inferred" ) if new_edge: count += 1 break # One analogy per C return count def _infer_relation(self, rel_ab: str, rel_bc: str) -> str: """Determine relation type for transitive inference A→C from A→B→C.""" # Same relation → same if rel_ab == rel_bc: return rel_ab # Specific known transitive patterns transitive_map = { ("is_a", "is_a"): "is_a", ("part_of", "part_of"): "part_of", ("is_a", "has"): "has", ("is_a", "located_in"): "located_in", ("part_of", "located_in"): "located_in", ("is_a", "used_for"): "used_for", ("causes", "causes"): "causes", ("follows", "follows"): "follows", ("requires", "requires"): "requires", ("instance_of", "is_a"): "instance_of", } return transitive_map.get((rel_ab, rel_bc), "inferred_relation") # ═══════════════════════════════════════════════════ # PHASE 4: ABSTRACT # ═══════════════════════════════════════════════════ def _phase_abstract(self): """ Cluster similar nodes into abstraction nodes. Creates higher-level concepts from concrete instances. Recursive: abstractions can be abstracted further. """ # Skip if graph is too small if len(self.graph.nodes) < config.CLUSTER_MIN_SIZE * 2: return abstractions_created = 0 # ── Level 1: Concrete → Abstraction ── abstractions_created += self._create_abstractions( source_types=["entity", "fact", "concept"], abstraction_type="abstraction" ) # ── Level 2+: Abstraction → Meta-Abstraction ── if self._total_cycles % (config.COMPRESS_INTERVAL * 2) == 0: existing_abstractions = self.graph.get_nodes_by_type("abstraction") if len(existing_abstractions) >= config.CLUSTER_MIN_SIZE * 2: abstractions_created += self._create_abstractions( source_types=["abstraction"], abstraction_type="meta_abstraction" ) if abstractions_created > 0: self._metrics["abstractions_created"] += abstractions_created self._operations_this_cycle += abstractions_created if config.LOG_THINKING_DETAILS: print(f"[THINKER/ABSTRACT] Created {abstractions_created} abstractions") def _create_abstractions( self, source_types: List[str], abstraction_type: str ) -> int: """Create abstraction nodes from clusters of source-typed nodes.""" # Gather source nodes source_nodes = [] for stype in source_types: source_nodes.extend(self.graph.get_nodes_by_type(stype)) if len(source_nodes) < config.CLUSTER_MIN_SIZE: return 0 # Build vector matrix for clustering vectors = np.array( [n.vector for n in source_nodes], dtype=np.float32 ) node_ids = [n.id for n in source_nodes] # Find natural clusters clusters = utils.find_natural_clusters( vectors, similarity_threshold=config.CLUSTER_SIMILARITY_INTRA ) count = 0 for cluster_indices in clusters: if count >= config.THINKING_BATCH_SIZE // 2: break # Check if this cluster already has an abstraction member_ids = [node_ids[i] for i in cluster_indices] already_abstracted = False for mid in member_ids: for edge in self.graph.get_edges_from(mid): if edge.relation == "instance_of": already_abstracted = True break if already_abstracted: break if already_abstracted: continue # Compute centroid cluster_vectors = vectors[cluster_indices] centroid = utils.vector_mean(list(cluster_vectors)) # Generate label from common keywords all_content = " ".join( self.graph.nodes[node_ids[i]].content for i in cluster_indices if node_ids[i] in self.graph.nodes ) keywords = utils.extract_keywords(all_content, max_keywords=5) label = " + ".join(keywords[:3]) if keywords else "abstract_concept" # Check depth limit current_depth = 0 if abstraction_type == "meta_abstraction": for i in cluster_indices: nid = node_ids[i] depth = self.graph._get_abstraction_depth(nid) current_depth = max(current_depth, depth) if current_depth >= config.MAX_ABSTRACTION_DEPTH: continue # Create abstraction node abs_node = self.graph.add_node( content=f"[{abstraction_type}] {label}", node_type=abstraction_type, source="inferred", vector=utils.normalize(centroid), weight=config.ABSTRACTION_MIN_CONFIDENCE ) if not abs_node: continue # Link members to abstraction for i in cluster_indices: member_id = node_ids[i] self.graph.add_edge( from_id=member_id, to_id=abs_node.id, relation="instance_of", weight=0.8, confidence=0.8, source="inferred" ) count += 1 return count # ═══════════════════════════════════════════════════ # PHASE 5: STRENGTHEN / WEAKEN # ═══════════════════════════════════════════════════ def _phase_strengthen_weaken(self): """ Strengthen edges that are frequently used. Weaken edges that haven't been used. Nodes with more connections get slight weight boost. """ # ── Weaken unused edges (periodic) ── if self._total_cycles % config.WEIGHT_DECAY_INTERVAL_CYCLES == 0: decay_count = 0 edges = list(self.graph.edges.values()) for edge in edges: if edge.used_count == 0 and edge.source == "inferred": self.graph.decay_edge(edge.id) decay_count += 1 self._metrics["edges_decayed"] += decay_count self._operations_this_cycle += decay_count if config.LOG_THINKING_DETAILS and decay_count > 0: print(f"[THINKER/WEAKEN] Decayed {decay_count} unused edges") # ── Boost well-connected nodes ── nodes = list(self.graph.nodes.values()) sample_size = min(config.THINKING_BATCH_SIZE, len(nodes)) if sample_size == 0: return sampled = np.random.choice(len(nodes), sample_size, replace=False) reinforced = 0 for idx in sampled: node = nodes[idx] if node.connections > 3: bonus = config.NODE_WEIGHT_CONNECTION_BONUS * min(node.connections, 20) new_weight = min(node.weight + bonus, config.WEIGHT_MAX) if new_weight != node.weight: self.graph.update_node_weight(node.id, new_weight) reinforced += 1 self._metrics["edges_reinforced"] += reinforced self._operations_this_cycle += reinforced # ═══════════════════════════════════════════════════ # PHASE 6: COMPRESS # ═══════════════════════════════════════════════════ def _phase_compress(self): """ Merge redundant nodes. Prune dead edges. Prune orphan nodes. Keep the graph efficient and clean. """ if self._total_cycles % config.COMPRESS_INTERVAL != 0: return # ── Merge redundant nodes ── redundant_pairs = self.graph.find_redundant_pairs(limit=10) merged = 0 for id_keep, id_remove, similarity in redundant_pairs: if self.graph.merge_nodes(id_keep, id_remove): merged += 1 # ── Prune weak edges ── pruned_edges = self.graph.prune_weak_edges() # ── Prune orphan nodes ── pruned_nodes = self.graph.prune_orphan_nodes() self._metrics["nodes_merged"] += merged self._metrics["edges_pruned"] += pruned_edges self._metrics["nodes_pruned"] += pruned_nodes self._operations_this_cycle += merged + pruned_edges + pruned_nodes total_ops = merged + pruned_edges + pruned_nodes if config.LOG_THINKING_DETAILS and total_ops > 0: print( f"[THINKER/COMPRESS] Merged {merged} nodes, " f"pruned {pruned_edges} edges, {pruned_nodes} orphan nodes" ) # ═══════════════════════════════════════════════════ # PHASE 7: VALIDATE # ═══════════════════════════════════════════════════ def _phase_validate(self): """ Check logical consistency of the graph. Resolve contradictions. Detect and break circular inferences. """ if self._total_cycles % config.VALIDATE_INTERVAL != 0: return resolved = 0 # ── Detect contradictions ── resolved += self._resolve_contradictions() # ── Detect circular inferences ── resolved += self._break_circular_inferences() if resolved > 0: self._metrics["contradictions_resolved"] += resolved self._operations_this_cycle += resolved if config.LOG_THINKING_DETAILS: print(f"[THINKER/VALIDATE] Resolved {resolved} issues") def _resolve_contradictions(self) -> int: """ Find and resolve contradictions. If A→B (positive) and A→¬B (opposite_of) exist, keep higher confidence. """ resolved = 0 # Sample nodes to check node_ids = list(self.graph.nodes.keys()) sample_size = min(config.THINKING_BATCH_SIZE, len(node_ids)) if sample_size == 0: return 0 sampled = np.random.choice(len(node_ids), sample_size, replace=False) for idx in sampled: node_id = node_ids[idx] edges_out = self.graph.get_edges_from(node_id) # Group edges by target target_edges: Dict[str, List[Edge]] = {} for edge in edges_out: key = edge.to_node if key not in target_edges: target_edges[key] = [] target_edges[key].append(edge) # Check for contradictory relations to same target for target_id, edges in target_edges.items(): if len(edges) < 2: continue # Check for opposing relations contradictory_pairs = { ("causes", "prevents"), ("is_a", "opposite_of"), ("synonym_of", "opposite_of"), ("requires", "prevents"), } for i in range(len(edges)): for j in range(i + 1, len(edges)): pair = (edges[i].relation, edges[j].relation) reverse_pair = (edges[j].relation, edges[i].relation) if pair in contradictory_pairs or reverse_pair in contradictory_pairs: # Keep higher confidence, remove lower if edges[i].confidence >= edges[j].confidence: self.graph.remove_edge(edges[j].id) else: self.graph.remove_edge(edges[i].id) resolved += 1 return resolved def _break_circular_inferences(self) -> int: """ Detect inference chains that loop back on themselves. Break the weakest link in each cycle. """ broken = 0 # Sample inferred edges inferred_edges = [ e for e in self.graph.edges.values() if e.source == "inferred" ] sample_size = min(config.THINKING_BATCH_SIZE, len(inferred_edges)) if sample_size == 0: return 0 sampled_indices = np.random.choice( len(inferred_edges), sample_size, replace=False ) for idx in sampled_indices: edge = inferred_edges[idx] # Check if this edge creates a cycle # Simple: does a path exist from to_node back to from_node? paths = self.graph.find_paths( edge.to_node, edge.from_node, max_depth=4, max_paths=1 ) if paths: # Cycle detected — remove weakest edge in cycle cycle_path = [edge.from_node, edge.id] + paths[0] weakest_edge_id = None weakest_weight = float('inf') for item_id in cycle_path: if item_id in self.graph.edges: e = self.graph.edges[item_id] if e.weight < weakest_weight and e.source == "inferred": weakest_weight = e.weight weakest_edge_id = e.id if weakest_edge_id: self.graph.remove_edge(weakest_edge_id) broken += 1 return broken # ═══════════════════════════════════════════════════ # PHASE 8: SELF-QUESTION # ═══════════════════════════════════════════════════ def _phase_self_question(self): """ Generate internal questions to fill knowledge gaps. Ask: "What connects X to Y?" where X and Y are distant but possibly related. If a new chain is found, save it. """ if self._total_cycles % config.SELF_QUESTION_INTERVAL != 0: return if len(self.graph.nodes) < 10: return questions_asked = 0 questions_answered = 0 # Strategy 1: Find disconnected clusters and try to bridge them questions_answered += self._bridge_disconnected() questions_asked += 3 # Strategy 2: Explore high-weight nodes that lack certain relation types questions_answered += self._fill_relation_gaps() questions_asked += 3 # Strategy 3: Challenge existing weak inferences questions_answered += self._challenge_weak_inferences() questions_asked += 2 self._metrics["self_questions_asked"] += questions_asked self._metrics["self_questions_answered"] += questions_answered self._operations_this_cycle += questions_answered if config.LOG_THINKING_DETAILS and questions_answered > 0: print( f"[THINKER/SELF-Q] Asked {questions_asked} questions, " f"answered {questions_answered}" ) def _bridge_disconnected(self) -> int: """Try to find connections between disconnected subgraphs.""" connected = 0 # Pick two random nodes that have no path between them node_ids = list(self.graph.nodes.keys()) if len(node_ids) < 10: return 0 for _ in range(3): idx = np.random.choice(len(node_ids), 2, replace=False) id_a, id_b = node_ids[idx[0]], node_ids[idx[1]] node_a = self.graph.get_node(id_a) node_b = self.graph.get_node(id_b) if not node_a or not node_b: continue # Are they already connected? paths = self.graph.find_paths(id_a, id_b, max_depth=4, max_paths=1) if paths: continue # Can we connect them via vector similarity? sim = utils.cosine_similarity(node_a.vector, node_b.vector) if sim > config.SIMILARITY_THRESHOLD * 0.8: # They're somewhat similar but not connected → connect edge = self.graph.add_edge( from_id=id_a, to_id=id_b, relation="inferred_relation", weight=sim * 0.7, confidence=sim * 0.6, source="inferred" ) if edge: connected += 1 return connected def _fill_relation_gaps(self) -> int: """Find high-weight nodes missing common relations and try to fill them.""" filled = 0 # Get well-known nodes important_nodes = sorted( self.graph.nodes.values(), key=lambda n: n.weight * n.connections, reverse=True )[:20] common_relations = ["is_a", "part_of", "has", "used_for", "related_to"] for node in important_nodes[:5]: existing_relations = set() for edge in self.graph.get_edges_from(node.id): existing_relations.add(edge.relation) for relation in common_relations: if relation in existing_relations: continue # Can we find a target for this relation via similarity? # Look for nodes that commonly have this relation candidates = self.graph.find_similar_to_node( node.id, top_k=5, min_similarity=config.SIMILARITY_THRESHOLD ) for candidate, sim in candidates: # Check if candidate has this relation type outgoing candidate_rels = [ e.relation for e in self.graph.get_edges_from(candidate.id) ] if relation in candidate_rels: # This candidate has the relation → node might too for edge in self.graph.get_edges_from(candidate.id): if edge.relation == relation: target = self.graph.get_node(edge.to_node) if target and not self.graph.edge_exists( node.id, target.id, relation ): confidence = sim * edge.confidence * config.INFERENCE_DECAY if confidence >= config.INFERENCE_CONFIDENCE_MIN: new_edge = self.graph.add_edge( from_id=node.id, to_id=target.id, relation=relation, weight=confidence, confidence=confidence, source="inferred" ) if new_edge: filled += 1 break break # One fill per missing relation if filled >= 5: break return filled def _challenge_weak_inferences(self) -> int: """ Re-examine weak inferred edges. If supporting evidence exists, strengthen. If contradicting evidence exists, remove. """ improved = 0 weak_edges = self.graph.get_weakest_edges( limit=20, source_filter="inferred" ) for edge in weak_edges: from_node = self.graph.get_node(edge.from_node) to_node = self.graph.get_node(edge.to_node) if not from_node or not to_node: continue # Check if there's additional evidence # (other paths between these nodes) paths = self.graph.find_paths( edge.from_node, edge.to_node, max_depth=4, max_paths=3 ) # Filter paths that don't use this edge alternative_paths = [ p for p in paths if edge.id not in p ] if alternative_paths: # Multiple paths support this edge → strengthen support_factor = 1.0 + 0.05 * len(alternative_paths) new_weight = min( edge.weight * support_factor, config.WEIGHT_MAX ) self.graph.edges[edge.id].weight = new_weight self.graph.edges[edge.id].confidence = min( edge.confidence * support_factor, 1.0 ) self.graph.edges[edge.id].mark_dirty() self.graph.memory.save_edge(edge.to_dict()) improved += 1 else: # No alternative support → further weaken if edge.weight < config.PRUNE_WEIGHT_THRESHOLD * 2: self.graph.remove_edge(edge.id) improved += 1 return improved # ═══════════════════════════════════════════════════ # USER KNOWLEDGE EXTRACTION # ═══════════════════════════════════════════════════ def extract_from_user_message(self, message: str): """ Extract knowledge from a user's chat message. Called by brain.py after processing a user request. Does NOT store the raw message — only extracted knowledge. """ if not message or len(message.strip()) < 10: return message = message.strip() # Extract keywords keywords = utils.extract_keywords(message, max_keywords=15) if len(keywords) < 2: return # Extract entities entities = utils.extract_entities_simple(message) # Create entity nodes entity_nodes = [] for entity in entities[:5]: node = self.graph.add_node( content=entity, node_type="entity", source="user_chat", weight=config.USER_KNOWLEDGE_CONFIDENCE ) if node: entity_nodes.append(node) # Create concept nodes from keywords not already entities entity_lower = {e.lower() for e in entities} for kw in keywords: if kw.lower() not in entity_lower: node = self.graph.add_node( content=kw, node_type="concept", source="user_chat", weight=config.USER_KNOWLEDGE_CONFIDENCE * 0.7 ) # If message contains informational content, create fact node if len(message) > 30 and any( p in message.lower() for p in [ "adalah", "merupakan", "yaitu", "ialah", "is", "are", "means", "defined" ] ): fact_node = self.graph.add_node( content=message[:500], node_type="fact", source="user_chat", weight=config.USER_KNOWLEDGE_CONFIDENCE ) # Connect fact to entities mentioned if fact_node: for en in entity_nodes: self.graph.add_edge( from_id=fact_node.id, to_id=en.id, relation="related_to", confidence=config.USER_KNOWLEDGE_CONFIDENCE * 0.8, source="user_chat" )