CogniEngine / thinker.py
sadidft's picture
Create thinker.py
f43310f verified
"""
Cogni-Engine v1 — 24/7 Thinking Loop
The autonomous cognitive process that never stops.
8 phases of continuous reasoning that make the AI smarter over time.
Phases:
1. INGEST — Scan /data/ folder, parse new JSONL files
2. CONNECT — Find hidden connections between nodes via similarity
3. INFER — Transitive & analogical inference to discover new knowledge
4. ABSTRACT — Cluster similar nodes into higher-level abstractions
5. STRENGTHEN/WEAKEN — Reinforce used edges, decay unused ones
6. COMPRESS — Merge redundant nodes, prune dead edges
7. VALIDATE — Check logical consistency, resolve contradictions
8. SELF-QUESTION — Generate and answer internal questions to find gaps
"""
import os
import json
import time
import threading
import traceback
from typing import List, Dict, Optional, Tuple
import numpy as np
import config
import utils
from knowledge import KnowledgeGraph, Node, Edge, ReasoningChain
# ═══════════════════════════════════════════════════════════
# THINKER ENGINE
# ═══════════════════════════════════════════════════════════
class Thinker:
"""
Autonomous thinking engine.
Runs in a background thread, continuously processing
and enriching the knowledge graph.
"""
def __init__(self, graph: KnowledgeGraph):
self.graph = graph
self._thread: Optional[threading.Thread] = None
self._running = False
self._paused = False
# Thinking state
self._cycle_count = 0
self._total_cycles = 0
self._current_phase = "init"
self._phase_index = 0
self._interval = config.THINKING_INTERVAL_FAST
self._operations_this_cycle = 0
# File tracking
self._file_checksums: Dict[str, str] = {}
# Phase definitions (ordered)
self._phases = [
("ingest", self._phase_ingest),
("connect", self._phase_connect),
("infer", self._phase_infer),
("abstract", self._phase_abstract),
("strengthen", self._phase_strengthen_weaken),
("compress", self._phase_compress),
("validate", self._phase_validate),
("self_question", self._phase_self_question),
]
# Metrics
self._metrics = {
"nodes_ingested": 0,
"edges_ingested": 0,
"connections_found": 0,
"inferences_made": 0,
"abstractions_created": 0,
"edges_reinforced": 0,
"edges_decayed": 0,
"nodes_merged": 0,
"edges_pruned": 0,
"nodes_pruned": 0,
"contradictions_resolved": 0,
"self_questions_asked": 0,
"self_questions_answered": 0,
}
# ───────────────────────────────────────────────────
# LIFECYCLE
# ───────────────────────────────────────────────────
def start(self):
"""Start the thinking loop in a background thread."""
if self._running:
print("[THINKER] Already running.")
return
# Load previous state
self._load_state()
self._running = True
self._thread = threading.Thread(
target=self._thinking_loop,
name="CogniThinker",
daemon=True
)
self._thread.start()
print(f"[THINKER] Started. Resuming from cycle {self._total_cycles}.")
def stop(self):
"""Stop the thinking loop gracefully."""
if not self._running:
return
print("[THINKER] Stopping...")
self._running = False
if self._thread:
self._thread.join(timeout=30)
self._save_state()
print("[THINKER] Stopped.")
def pause(self):
"""Pause thinking (for heavy API load)."""
self._paused = True
def resume(self):
"""Resume thinking."""
self._paused = False
@property
def is_running(self) -> bool:
return self._running
@property
def current_phase(self) -> str:
return self._current_phase
@property
def total_cycles(self) -> int:
return self._total_cycles
@property
def metrics(self) -> dict:
return dict(self._metrics)
def get_status(self) -> dict:
"""Get detailed thinker status."""
return {
"running": self._running,
"paused": self._paused,
"current_phase": self._current_phase,
"cycle_count": self._cycle_count,
"total_cycles": self._total_cycles,
"interval_seconds": self._interval,
"operations_last_cycle": self._operations_this_cycle,
"metrics": dict(self._metrics)
}
# ───────────────────────────────────────────────────
# STATE PERSISTENCE
# ───────────────────────────────────────────────────
def _load_state(self):
"""Load thinking state from DB."""
state = self.graph.memory.load_thinking_state()
self._total_cycles = state.get("total_cycles", 0)
self._cycle_count = state.get("current_cycle", 0)
self._current_phase = state.get("phase", "init")
saved_metrics = state.get("metrics", {})
if saved_metrics:
for key in self._metrics:
if key in saved_metrics:
self._metrics[key] = saved_metrics[key]
# Load file checksums
self._file_checksums = self.graph.memory.load_file_checksums()
def _save_state(self):
"""Save thinking state to DB."""
self.graph.memory.save_thinking_state({
"current_cycle": self._cycle_count,
"total_cycles": self._total_cycles,
"cursor_position": "",
"phase": self._current_phase,
"metrics": dict(self._metrics)
})
# ───────────────────────────────────────────────────
# MAIN LOOP
# ───────────────────────────────────────────────────
def _thinking_loop(self):
"""
Main thinking loop. Runs continuously until stopped.
Cycles through all 8 phases, adapting speed based on activity.
"""
print("[THINKER] Thinking loop started.")
while self._running:
try:
# Wait if paused
if self._paused:
time.sleep(1)
continue
# Execute current phase
self._operations_this_cycle = 0
phase_name, phase_func = self._phases[self._phase_index]
self._current_phase = phase_name
try:
phase_func()
except Exception as e:
print(f"[THINKER] Error in phase '{phase_name}': {e}")
if config.LOG_THINKING_DETAILS:
traceback.print_exc()
# Advance to next phase
self._phase_index = (self._phase_index + 1) % len(self._phases)
# Count cycles (one full rotation = 1 cycle)
if self._phase_index == 0:
self._cycle_count += 1
self._total_cycles += 1
# Periodic state save
if self._total_cycles % config.SYNC_INTERVAL_CYCLES == 0:
self._save_state()
self.graph.sync()
# Adaptive speed
self._adapt_speed()
# Log progress periodically
if self._total_cycles % 100 == 0 and config.LOG_THINKING_DETAILS:
stats = self.graph.get_stats()
score = self.graph.get_intelligence_score()
print(
f"[THINKER] Cycle {self._total_cycles}: "
f"nodes={stats['total_nodes']}, "
f"edges={stats['total_edges']}, "
f"inferred={stats['inferred_edges']}, "
f"score={score:.2f}"
)
# Sleep between phases
time.sleep(self._interval / len(self._phases))
except Exception as e:
print(f"[THINKER] Loop error: {e}")
traceback.print_exc()
time.sleep(5) # Recovery pause
print("[THINKER] Thinking loop ended.")
def _adapt_speed(self):
"""Adjust thinking speed based on activity."""
if self._operations_this_cycle > config.THINKING_STABILITY_THRESHOLD:
# Active: think faster
self._interval = max(
config.THINKING_INTERVAL_FAST,
self._interval * 0.9
)
else:
# Stable: think slower
self._interval = min(
config.THINKING_INTERVAL_SLOW,
self._interval * 1.1
)
# ═══════════════════════════════════════════════════
# PHASE 1: INGEST
# ═══════════════════════════════════════════════════
def _phase_ingest(self):
"""
Scan /data/ folder for new or changed JSONL files.
Parse entries and create nodes + edges in graph.
"""
if not os.path.exists(config.DATA_DIR):
return
files = []
for fname in os.listdir(config.DATA_DIR):
if any(fname.endswith(ext) for ext in config.SUPPORTED_DATA_EXTENSIONS):
files.append(fname)
if not files:
return
for fname in files:
filepath = os.path.join(config.DATA_DIR, fname)
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
print(f"[THINKER/INGEST] Error reading {fname}: {e}")
continue
# Check if file has changed
checksum = utils.hash_file_content(content)
if self._file_checksums.get(fname) == checksum:
continue # File unchanged, skip
print(f"[THINKER/INGEST] Processing file: {fname}")
lines = content.strip().split('\n')
processed = 0
for line_num, line in enumerate(lines):
if processed >= config.MAX_LINES_PER_INGEST:
break
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
self._ingest_entry(entry, source=fname)
processed += 1
# Mark file as processed
self._file_checksums[fname] = checksum
self.graph.memory.save_file_checksum(fname, checksum, processed)
self._operations_this_cycle += processed
print(f"[THINKER/INGEST] Processed {processed} entries from {fname}")
def _ingest_entry(self, entry: dict, source: str = "data"):
"""
Ingest a single data entry into the knowledge graph.
Creates nodes and edges based on entry type and fields.
"""
entry_type = entry.get("type", "fact")
content = entry.get("content", "").strip()
if not content:
return
tags = entry.get("tags", [])
confidence = entry.get("confidence", config.DATA_KNOWLEDGE_CONFIDENCE)
domain = entry.get("domain", "")
related = entry.get("related", [])
# ── Create main content node ──
main_node = self.graph.add_node(
content=content,
node_type=self._map_entry_type_to_node_type(entry_type),
source="data",
weight=confidence,
tags=tags
)
if not main_node:
return
self._metrics["nodes_ingested"] += 1
# ── Handle domain as a concept node ──
if domain:
domain_node = self.graph.add_node(
content=domain,
node_type="concept",
source="data",
weight=0.8
)
if domain_node:
self.graph.add_edge(
from_id=main_node.id,
to_id=domain_node.id,
relation="part_of",
confidence=0.8,
source="data"
)
self._metrics["edges_ingested"] += 1
# ── Handle related topics ──
for rel_topic in related:
rel_node = self.graph.add_node(
content=rel_topic,
node_type="concept",
source="data",
weight=0.7
)
if rel_node:
self.graph.add_edge(
from_id=main_node.id,
to_id=rel_node.id,
relation="related_to",
confidence=0.7,
source="data"
)
self._metrics["edges_ingested"] += 1
# ── Type-specific handling ──
self._ingest_type_specific(entry, main_node, entry_type)
def _ingest_type_specific(self, entry: dict, main_node: Node, entry_type: str):
"""Handle type-specific fields for data entries."""
# ── relation type: explicit from/to ──
if entry_type == "relation":
from_content = entry.get("from", "")
to_content = entry.get("to", "")
relation = entry.get("relation", "related_to")
if from_content and to_content:
from_node = self.graph.add_node(
content=from_content, node_type="entity", source="data"
)
to_node = self.graph.add_node(
content=to_content, node_type="entity", source="data"
)
if from_node and to_node:
self.graph.add_edge(
from_id=from_node.id,
to_id=to_node.id,
relation=relation,
confidence=entry.get("confidence", 0.9),
source="data"
)
self._metrics["edges_ingested"] += 1
# ── definition / term: term node + defined_as edge ──
elif entry_type in ("definition", "term"):
term = entry.get("term", "")
if term:
term_node = self.graph.add_node(
content=term, node_type="entity", source="data"
)
if term_node:
self.graph.add_edge(
from_id=term_node.id,
to_id=main_node.id,
relation="defined_as",
confidence=0.95,
source="data"
)
self._metrics["edges_ingested"] += 1
# ── cause_effect: cause → effect ──
elif entry_type == "cause_effect":
cause = entry.get("cause", "")
effect = entry.get("effect", "")
if cause and effect:
cause_node = self.graph.add_node(
content=cause, node_type="concept", source="data"
)
effect_node = self.graph.add_node(
content=effect, node_type="concept", source="data"
)
if cause_node and effect_node:
self.graph.add_edge(
from_id=cause_node.id,
to_id=effect_node.id,
relation="causes",
confidence=entry.get("confidence", 0.85),
source="data"
)
self._metrics["edges_ingested"] += 1
# ── hierarchy: parent → children ──
elif entry_type == "hierarchy":
parent = entry.get("parent", "")
children = entry.get("children", [])
if parent and children:
parent_node = self.graph.add_node(
content=parent, node_type="concept", source="data"
)
if parent_node:
for child in children:
child_node = self.graph.add_node(
content=child, node_type="entity", source="data"
)
if child_node:
self.graph.add_edge(
from_id=child_node.id,
to_id=parent_node.id,
relation="is_a",
confidence=0.9,
source="data"
)
self._metrics["edges_ingested"] += 1
# ── comparison: subject_a ↔ subject_b ──
elif entry_type == "comparison":
subj_a = entry.get("subject_a", "")
subj_b = entry.get("subject_b", "")
if subj_a and subj_b:
node_a = self.graph.add_node(
content=subj_a, node_type="entity", source="data"
)
node_b = self.graph.add_node(
content=subj_b, node_type="entity", source="data"
)
if node_a and node_b:
self.graph.add_edge(
from_id=node_a.id, to_id=node_b.id,
relation="related_to", confidence=0.8, source="data"
)
self.graph.add_edge(
from_id=node_b.id, to_id=node_a.id,
relation="related_to", confidence=0.8, source="data"
)
self._metrics["edges_ingested"] += 2
# ── qa: question → answer ──
elif entry_type == "qa":
question = entry.get("question", "")
answer = entry.get("answer", "")
if question and answer:
q_node = self.graph.add_node(
content=question, node_type="concept", source="data"
)
a_node = self.graph.add_node(
content=answer, node_type="fact", source="data"
)
if q_node and a_node:
self.graph.add_edge(
from_id=q_node.id, to_id=a_node.id,
relation="defined_as", confidence=0.9, source="data"
)
self._metrics["edges_ingested"] += 1
# ── synonym: bidirectional synonym_of ──
elif entry_type == "synonym":
terms = entry.get("terms", [])
for i in range(len(terms)):
for j in range(i + 1, len(terms)):
node_i = self.graph.add_node(
content=terms[i], node_type="entity", source="data"
)
node_j = self.graph.add_node(
content=terms[j], node_type="entity", source="data"
)
if node_i and node_j:
self.graph.add_edge(
from_id=node_i.id, to_id=node_j.id,
relation="synonym_of", confidence=0.9, source="data"
)
self.graph.add_edge(
from_id=node_j.id, to_id=node_i.id,
relation="synonym_of", confidence=0.9, source="data"
)
self._metrics["edges_ingested"] += 2
# ── process / procedure: sequential steps ──
elif entry_type in ("process", "procedure"):
steps = entry.get("steps", [])
title = entry.get("title", "")
if title:
title_node = self.graph.add_node(
content=title, node_type="concept", source="data"
)
if title_node:
self.graph.add_edge(
from_id=main_node.id, to_id=title_node.id,
relation="defined_as", confidence=0.85, source="data"
)
prev_step_node = None
for step_text in steps:
step_node = self.graph.add_node(
content=step_text, node_type="fact", source="data"
)
if step_node:
self.graph.add_edge(
from_id=step_node.id, to_id=main_node.id,
relation="part_of", confidence=0.8, source="data"
)
if prev_step_node:
self.graph.add_edge(
from_id=prev_step_node.id, to_id=step_node.id,
relation="follows", confidence=0.9, source="data"
)
self._metrics["edges_ingested"] += 1
prev_step_node = step_node
self._metrics["edges_ingested"] += 1
# ── quote: author + content ──
elif entry_type == "quote":
author = entry.get("author", "")
if author:
author_node = self.graph.add_node(
content=author, node_type="entity", source="data"
)
if author_node:
self.graph.add_edge(
from_id=main_node.id, to_id=author_node.id,
relation="created_by", confidence=0.9, source="data"
)
self._metrics["edges_ingested"] += 1
# ── event: actors, location, date ──
elif entry_type == "event":
actors = entry.get("actors", [])
location = entry.get("location", "")
for actor in actors:
actor_node = self.graph.add_node(
content=actor, node_type="entity", source="data"
)
if actor_node:
self.graph.add_edge(
from_id=actor_node.id, to_id=main_node.id,
relation="related_to", confidence=0.85, source="data"
)
self._metrics["edges_ingested"] += 1
if location:
loc_node = self.graph.add_node(
content=location, node_type="entity", source="data"
)
if loc_node:
self.graph.add_edge(
from_id=main_node.id, to_id=loc_node.id,
relation="located_in", confidence=0.85, source="data"
)
self._metrics["edges_ingested"] += 1
# ── analogy: subject ↔ analogy ──
elif entry_type == "analogy":
subject = entry.get("subject", "")
analogy_text = entry.get("analogy", "")
if subject and analogy_text:
subj_node = self.graph.add_node(
content=subject, node_type="concept", source="data"
)
ana_node = self.graph.add_node(
content=analogy_text, node_type="concept", source="data"
)
if subj_node and ana_node:
self.graph.add_edge(
from_id=subj_node.id, to_id=ana_node.id,
relation="analogous_to", confidence=0.75, source="data"
)
self._metrics["edges_ingested"] += 1
# ── Paragraph: extract keywords as connected entities ──
elif entry_type == "paragraph":
keywords = utils.extract_keywords(entry.get("content", ""), max_keywords=10)
for kw in keywords:
kw_node = self.graph.add_node(
content=kw, node_type="concept", source="data",
weight=0.6
)
if kw_node:
self.graph.add_edge(
from_id=main_node.id, to_id=kw_node.id,
relation="related_to", confidence=0.6, source="data"
)
self._metrics["edges_ingested"] += 1
def _map_entry_type_to_node_type(self, entry_type: str) -> str:
"""Map data entry type to graph node type."""
type_map = {
"fact": "fact",
"definition": "definition",
"explanation": "fact",
"description": "fact",
"property": "fact",
"statistic": "fact",
"measurement": "fact",
"term": "definition",
"abbreviation": "definition",
"jargon": "definition",
"slang": "definition",
"idiom": "definition",
"synonym": "entity",
"antonym": "entity",
"quote": "fact",
"rule": "fact",
"example": "fact",
"analogy": "concept",
"opinion": "fact",
"paragraph": "fact",
"relation": "fact",
"cause_effect": "fact",
"comparison": "fact",
"hierarchy": "concept",
"composition": "concept",
"dependency": "fact",
"contradiction": "fact",
"timeline": "fact",
"process": "fact",
"procedure": "fact",
"event": "fact",
"history": "fact",
"change": "fact",
"qa": "fact",
}
if entry_type.startswith("custom_"):
return "fact"
return type_map.get(entry_type, "fact")
# ═══════════════════════════════════════════════════
# PHASE 2: CONNECT
# ═══════════════════════════════════════════════════
def _phase_connect(self):
"""
Find hidden connections between nodes.
Nodes with high vector similarity but no edge → create edge.
Focuses on least-connected nodes first.
"""
candidates = self.graph.get_least_connected_nodes(
limit=config.THINKING_BATCH_SIZE
)
connections_made = 0
for node in candidates:
if connections_made >= config.THINKING_BATCH_SIZE:
break
# Find similar nodes
similar = self.graph.find_similar_to_node(
node.id,
top_k=10,
min_similarity=config.SIMILARITY_THRESHOLD
)
for similar_node, similarity in similar:
# Skip if edge already exists (either direction)
if self.graph.edge_exists(node.id, similar_node.id):
continue
if self.graph.edge_exists(similar_node.id, node.id):
continue
# Create new connection
edge = self.graph.add_edge(
from_id=node.id,
to_id=similar_node.id,
relation="similar_to",
weight=similarity,
confidence=similarity * 0.9,
source="inferred"
)
if edge:
connections_made += 1
self._operations_this_cycle += 1
if connections_made > 0:
self._metrics["connections_found"] += connections_made
if config.LOG_THINKING_DETAILS:
print(f"[THINKER/CONNECT] Found {connections_made} new connections")
# ═══════════════════════════════════════════════════
# PHASE 3: INFER
# ═══════════════════════════════════════════════════
def _phase_infer(self):
"""
Transitive and analogical inference.
If A→B and B→C, maybe A→C.
Discovers knowledge not present in original data.
"""
inferences_made = 0
# ── Transitive Inference ──
inferences_made += self._transitive_inference()
# ── Analogical Inference ──
inferences_made += self._analogical_inference()
if inferences_made > 0:
self._metrics["inferences_made"] += inferences_made
self._operations_this_cycle += inferences_made
if config.LOG_THINKING_DETAILS:
print(f"[THINKER/INFER] Made {inferences_made} inferences")
def _transitive_inference(self) -> int:
"""
If A→B and B→C exist, infer A→C with decayed confidence.
Limited per cycle to prevent explosion.
"""
count = 0
# Sample a batch of nodes to check
node_ids = list(self.graph.nodes.keys())
if len(node_ids) > config.THINKING_BATCH_SIZE:
sample_indices = np.random.choice(
len(node_ids), config.THINKING_BATCH_SIZE, replace=False
)
node_ids = [node_ids[i] for i in sample_indices]
for node_a_id in node_ids:
if count >= config.MAX_INFERENCES_PER_CYCLE:
break
edges_ab = self.graph.get_edges_from(node_a_id)
for edge_ab in edges_ab:
if count >= config.MAX_INFERENCES_PER_CYCLE:
break
node_b_id = edge_ab.to_node
edges_bc = self.graph.get_edges_from(node_b_id)
for edge_bc in edges_bc:
node_c_id = edge_bc.to_node
# Skip self-loops and existing edges
if node_c_id == node_a_id:
continue
if self.graph.edge_exists(node_a_id, node_c_id):
continue
# Calculate inferred confidence
inferred_confidence = (
edge_ab.confidence *
edge_bc.confidence *
config.INFERENCE_DECAY
)
if inferred_confidence < config.INFERENCE_CONFIDENCE_MIN:
continue
# Determine inferred relation
inferred_relation = self._infer_relation(
edge_ab.relation, edge_bc.relation
)
# Create inferred edge
edge = self.graph.add_edge(
from_id=node_a_id,
to_id=node_c_id,
relation=inferred_relation,
weight=inferred_confidence,
confidence=inferred_confidence,
source="inferred"
)
if edge:
count += 1
if count >= config.MAX_INFERENCES_PER_CYCLE:
break
return count
def _analogical_inference(self) -> int:
"""
If A relates to B like C relates to ?, find ? using vector arithmetic.
A - B ≈ C - ? → ? ≈ C - A + B
"""
count = 0
# Find pairs with strong, specific relations
strong_edges = [
e for e in self.graph.edges.values()
if e.confidence > 0.7 and e.relation not in ("similar_to", "related_to")
]
if len(strong_edges) < 2:
return 0
# Sample pairs to compare
sample_size = min(20, len(strong_edges))
sampled = np.random.choice(len(strong_edges), sample_size, replace=False)
for i in sampled:
if count >= config.MAX_INFERENCES_PER_CYCLE // 4:
break
edge = strong_edges[i]
node_a = self.graph.get_node(edge.from_node)
node_b = self.graph.get_node(edge.to_node)
if not node_a or not node_b:
continue
# Find nodes similar to A (potential C candidates)
similar_to_a = self.graph.find_similar_to_node(
node_a.id, top_k=5,
min_similarity=config.ANALOGICAL_SIMILARITY_MIN
)
for node_c, sim_ac in similar_to_a:
if node_c.id == node_b.id:
continue
# Vector arithmetic: ? ≈ C - A + B
target_vector = utils.normalize(
node_c.vector - node_a.vector + node_b.vector
)
# Find nearest to target vector
candidates = self.graph.find_similar_nodes(
target_vector, top_k=3,
min_similarity=config.ANALOGICAL_SIMILARITY_MIN,
exclude_ids={node_a.id, node_b.id, node_c.id}
)
for candidate_node, sim_score in candidates:
if self.graph.edge_exists(node_c.id, candidate_node.id, edge.relation):
continue
inferred_confidence = sim_ac * sim_score * config.INFERENCE_DECAY
if inferred_confidence < config.INFERENCE_CONFIDENCE_MIN:
continue
new_edge = self.graph.add_edge(
from_id=node_c.id,
to_id=candidate_node.id,
relation=edge.relation,
weight=inferred_confidence,
confidence=inferred_confidence,
source="inferred"
)
if new_edge:
count += 1
break # One analogy per C
return count
def _infer_relation(self, rel_ab: str, rel_bc: str) -> str:
"""Determine relation type for transitive inference A→C from A→B→C."""
# Same relation → same
if rel_ab == rel_bc:
return rel_ab
# Specific known transitive patterns
transitive_map = {
("is_a", "is_a"): "is_a",
("part_of", "part_of"): "part_of",
("is_a", "has"): "has",
("is_a", "located_in"): "located_in",
("part_of", "located_in"): "located_in",
("is_a", "used_for"): "used_for",
("causes", "causes"): "causes",
("follows", "follows"): "follows",
("requires", "requires"): "requires",
("instance_of", "is_a"): "instance_of",
}
return transitive_map.get((rel_ab, rel_bc), "inferred_relation")
# ═══════════════════════════════════════════════════
# PHASE 4: ABSTRACT
# ═══════════════════════════════════════════════════
def _phase_abstract(self):
"""
Cluster similar nodes into abstraction nodes.
Creates higher-level concepts from concrete instances.
Recursive: abstractions can be abstracted further.
"""
# Skip if graph is too small
if len(self.graph.nodes) < config.CLUSTER_MIN_SIZE * 2:
return
abstractions_created = 0
# ── Level 1: Concrete → Abstraction ──
abstractions_created += self._create_abstractions(
source_types=["entity", "fact", "concept"],
abstraction_type="abstraction"
)
# ── Level 2+: Abstraction → Meta-Abstraction ──
if self._total_cycles % (config.COMPRESS_INTERVAL * 2) == 0:
existing_abstractions = self.graph.get_nodes_by_type("abstraction")
if len(existing_abstractions) >= config.CLUSTER_MIN_SIZE * 2:
abstractions_created += self._create_abstractions(
source_types=["abstraction"],
abstraction_type="meta_abstraction"
)
if abstractions_created > 0:
self._metrics["abstractions_created"] += abstractions_created
self._operations_this_cycle += abstractions_created
if config.LOG_THINKING_DETAILS:
print(f"[THINKER/ABSTRACT] Created {abstractions_created} abstractions")
def _create_abstractions(
self,
source_types: List[str],
abstraction_type: str
) -> int:
"""Create abstraction nodes from clusters of source-typed nodes."""
# Gather source nodes
source_nodes = []
for stype in source_types:
source_nodes.extend(self.graph.get_nodes_by_type(stype))
if len(source_nodes) < config.CLUSTER_MIN_SIZE:
return 0
# Build vector matrix for clustering
vectors = np.array(
[n.vector for n in source_nodes],
dtype=np.float32
)
node_ids = [n.id for n in source_nodes]
# Find natural clusters
clusters = utils.find_natural_clusters(
vectors,
similarity_threshold=config.CLUSTER_SIMILARITY_INTRA
)
count = 0
for cluster_indices in clusters:
if count >= config.THINKING_BATCH_SIZE // 2:
break
# Check if this cluster already has an abstraction
member_ids = [node_ids[i] for i in cluster_indices]
already_abstracted = False
for mid in member_ids:
for edge in self.graph.get_edges_from(mid):
if edge.relation == "instance_of":
already_abstracted = True
break
if already_abstracted:
break
if already_abstracted:
continue
# Compute centroid
cluster_vectors = vectors[cluster_indices]
centroid = utils.vector_mean(list(cluster_vectors))
# Generate label from common keywords
all_content = " ".join(
self.graph.nodes[node_ids[i]].content
for i in cluster_indices
if node_ids[i] in self.graph.nodes
)
keywords = utils.extract_keywords(all_content, max_keywords=5)
label = " + ".join(keywords[:3]) if keywords else "abstract_concept"
# Check depth limit
current_depth = 0
if abstraction_type == "meta_abstraction":
for i in cluster_indices:
nid = node_ids[i]
depth = self.graph._get_abstraction_depth(nid)
current_depth = max(current_depth, depth)
if current_depth >= config.MAX_ABSTRACTION_DEPTH:
continue
# Create abstraction node
abs_node = self.graph.add_node(
content=f"[{abstraction_type}] {label}",
node_type=abstraction_type,
source="inferred",
vector=utils.normalize(centroid),
weight=config.ABSTRACTION_MIN_CONFIDENCE
)
if not abs_node:
continue
# Link members to abstraction
for i in cluster_indices:
member_id = node_ids[i]
self.graph.add_edge(
from_id=member_id,
to_id=abs_node.id,
relation="instance_of",
weight=0.8,
confidence=0.8,
source="inferred"
)
count += 1
return count
# ═══════════════════════════════════════════════════
# PHASE 5: STRENGTHEN / WEAKEN
# ═══════════════════════════════════════════════════
def _phase_strengthen_weaken(self):
"""
Strengthen edges that are frequently used.
Weaken edges that haven't been used.
Nodes with more connections get slight weight boost.
"""
# ── Weaken unused edges (periodic) ──
if self._total_cycles % config.WEIGHT_DECAY_INTERVAL_CYCLES == 0:
decay_count = 0
edges = list(self.graph.edges.values())
for edge in edges:
if edge.used_count == 0 and edge.source == "inferred":
self.graph.decay_edge(edge.id)
decay_count += 1
self._metrics["edges_decayed"] += decay_count
self._operations_this_cycle += decay_count
if config.LOG_THINKING_DETAILS and decay_count > 0:
print(f"[THINKER/WEAKEN] Decayed {decay_count} unused edges")
# ── Boost well-connected nodes ──
nodes = list(self.graph.nodes.values())
sample_size = min(config.THINKING_BATCH_SIZE, len(nodes))
if sample_size == 0:
return
sampled = np.random.choice(len(nodes), sample_size, replace=False)
reinforced = 0
for idx in sampled:
node = nodes[idx]
if node.connections > 3:
bonus = config.NODE_WEIGHT_CONNECTION_BONUS * min(node.connections, 20)
new_weight = min(node.weight + bonus, config.WEIGHT_MAX)
if new_weight != node.weight:
self.graph.update_node_weight(node.id, new_weight)
reinforced += 1
self._metrics["edges_reinforced"] += reinforced
self._operations_this_cycle += reinforced
# ═══════════════════════════════════════════════════
# PHASE 6: COMPRESS
# ═══════════════════════════════════════════════════
def _phase_compress(self):
"""
Merge redundant nodes.
Prune dead edges.
Prune orphan nodes.
Keep the graph efficient and clean.
"""
if self._total_cycles % config.COMPRESS_INTERVAL != 0:
return
# ── Merge redundant nodes ──
redundant_pairs = self.graph.find_redundant_pairs(limit=10)
merged = 0
for id_keep, id_remove, similarity in redundant_pairs:
if self.graph.merge_nodes(id_keep, id_remove):
merged += 1
# ── Prune weak edges ──
pruned_edges = self.graph.prune_weak_edges()
# ── Prune orphan nodes ──
pruned_nodes = self.graph.prune_orphan_nodes()
self._metrics["nodes_merged"] += merged
self._metrics["edges_pruned"] += pruned_edges
self._metrics["nodes_pruned"] += pruned_nodes
self._operations_this_cycle += merged + pruned_edges + pruned_nodes
total_ops = merged + pruned_edges + pruned_nodes
if config.LOG_THINKING_DETAILS and total_ops > 0:
print(
f"[THINKER/COMPRESS] Merged {merged} nodes, "
f"pruned {pruned_edges} edges, {pruned_nodes} orphan nodes"
)
# ═══════════════════════════════════════════════════
# PHASE 7: VALIDATE
# ═══════════════════════════════════════════════════
def _phase_validate(self):
"""
Check logical consistency of the graph.
Resolve contradictions.
Detect and break circular inferences.
"""
if self._total_cycles % config.VALIDATE_INTERVAL != 0:
return
resolved = 0
# ── Detect contradictions ──
resolved += self._resolve_contradictions()
# ── Detect circular inferences ──
resolved += self._break_circular_inferences()
if resolved > 0:
self._metrics["contradictions_resolved"] += resolved
self._operations_this_cycle += resolved
if config.LOG_THINKING_DETAILS:
print(f"[THINKER/VALIDATE] Resolved {resolved} issues")
def _resolve_contradictions(self) -> int:
"""
Find and resolve contradictions.
If A→B (positive) and A→¬B (opposite_of) exist, keep higher confidence.
"""
resolved = 0
# Sample nodes to check
node_ids = list(self.graph.nodes.keys())
sample_size = min(config.THINKING_BATCH_SIZE, len(node_ids))
if sample_size == 0:
return 0
sampled = np.random.choice(len(node_ids), sample_size, replace=False)
for idx in sampled:
node_id = node_ids[idx]
edges_out = self.graph.get_edges_from(node_id)
# Group edges by target
target_edges: Dict[str, List[Edge]] = {}
for edge in edges_out:
key = edge.to_node
if key not in target_edges:
target_edges[key] = []
target_edges[key].append(edge)
# Check for contradictory relations to same target
for target_id, edges in target_edges.items():
if len(edges) < 2:
continue
# Check for opposing relations
contradictory_pairs = {
("causes", "prevents"),
("is_a", "opposite_of"),
("synonym_of", "opposite_of"),
("requires", "prevents"),
}
for i in range(len(edges)):
for j in range(i + 1, len(edges)):
pair = (edges[i].relation, edges[j].relation)
reverse_pair = (edges[j].relation, edges[i].relation)
if pair in contradictory_pairs or reverse_pair in contradictory_pairs:
# Keep higher confidence, remove lower
if edges[i].confidence >= edges[j].confidence:
self.graph.remove_edge(edges[j].id)
else:
self.graph.remove_edge(edges[i].id)
resolved += 1
return resolved
def _break_circular_inferences(self) -> int:
"""
Detect inference chains that loop back on themselves.
Break the weakest link in each cycle.
"""
broken = 0
# Sample inferred edges
inferred_edges = [
e for e in self.graph.edges.values()
if e.source == "inferred"
]
sample_size = min(config.THINKING_BATCH_SIZE, len(inferred_edges))
if sample_size == 0:
return 0
sampled_indices = np.random.choice(
len(inferred_edges), sample_size, replace=False
)
for idx in sampled_indices:
edge = inferred_edges[idx]
# Check if this edge creates a cycle
# Simple: does a path exist from to_node back to from_node?
paths = self.graph.find_paths(
edge.to_node, edge.from_node,
max_depth=4, max_paths=1
)
if paths:
# Cycle detected — remove weakest edge in cycle
cycle_path = [edge.from_node, edge.id] + paths[0]
weakest_edge_id = None
weakest_weight = float('inf')
for item_id in cycle_path:
if item_id in self.graph.edges:
e = self.graph.edges[item_id]
if e.weight < weakest_weight and e.source == "inferred":
weakest_weight = e.weight
weakest_edge_id = e.id
if weakest_edge_id:
self.graph.remove_edge(weakest_edge_id)
broken += 1
return broken
# ═══════════════════════════════════════════════════
# PHASE 8: SELF-QUESTION
# ═══════════════════════════════════════════════════
def _phase_self_question(self):
"""
Generate internal questions to fill knowledge gaps.
Ask: "What connects X to Y?" where X and Y are distant but possibly related.
If a new chain is found, save it.
"""
if self._total_cycles % config.SELF_QUESTION_INTERVAL != 0:
return
if len(self.graph.nodes) < 10:
return
questions_asked = 0
questions_answered = 0
# Strategy 1: Find disconnected clusters and try to bridge them
questions_answered += self._bridge_disconnected()
questions_asked += 3
# Strategy 2: Explore high-weight nodes that lack certain relation types
questions_answered += self._fill_relation_gaps()
questions_asked += 3
# Strategy 3: Challenge existing weak inferences
questions_answered += self._challenge_weak_inferences()
questions_asked += 2
self._metrics["self_questions_asked"] += questions_asked
self._metrics["self_questions_answered"] += questions_answered
self._operations_this_cycle += questions_answered
if config.LOG_THINKING_DETAILS and questions_answered > 0:
print(
f"[THINKER/SELF-Q] Asked {questions_asked} questions, "
f"answered {questions_answered}"
)
def _bridge_disconnected(self) -> int:
"""Try to find connections between disconnected subgraphs."""
connected = 0
# Pick two random nodes that have no path between them
node_ids = list(self.graph.nodes.keys())
if len(node_ids) < 10:
return 0
for _ in range(3):
idx = np.random.choice(len(node_ids), 2, replace=False)
id_a, id_b = node_ids[idx[0]], node_ids[idx[1]]
node_a = self.graph.get_node(id_a)
node_b = self.graph.get_node(id_b)
if not node_a or not node_b:
continue
# Are they already connected?
paths = self.graph.find_paths(id_a, id_b, max_depth=4, max_paths=1)
if paths:
continue
# Can we connect them via vector similarity?
sim = utils.cosine_similarity(node_a.vector, node_b.vector)
if sim > config.SIMILARITY_THRESHOLD * 0.8:
# They're somewhat similar but not connected → connect
edge = self.graph.add_edge(
from_id=id_a,
to_id=id_b,
relation="inferred_relation",
weight=sim * 0.7,
confidence=sim * 0.6,
source="inferred"
)
if edge:
connected += 1
return connected
def _fill_relation_gaps(self) -> int:
"""Find high-weight nodes missing common relations and try to fill them."""
filled = 0
# Get well-known nodes
important_nodes = sorted(
self.graph.nodes.values(),
key=lambda n: n.weight * n.connections,
reverse=True
)[:20]
common_relations = ["is_a", "part_of", "has", "used_for", "related_to"]
for node in important_nodes[:5]:
existing_relations = set()
for edge in self.graph.get_edges_from(node.id):
existing_relations.add(edge.relation)
for relation in common_relations:
if relation in existing_relations:
continue
# Can we find a target for this relation via similarity?
# Look for nodes that commonly have this relation
candidates = self.graph.find_similar_to_node(
node.id, top_k=5,
min_similarity=config.SIMILARITY_THRESHOLD
)
for candidate, sim in candidates:
# Check if candidate has this relation type outgoing
candidate_rels = [
e.relation for e in self.graph.get_edges_from(candidate.id)
]
if relation in candidate_rels:
# This candidate has the relation → node might too
for edge in self.graph.get_edges_from(candidate.id):
if edge.relation == relation:
target = self.graph.get_node(edge.to_node)
if target and not self.graph.edge_exists(
node.id, target.id, relation
):
confidence = sim * edge.confidence * config.INFERENCE_DECAY
if confidence >= config.INFERENCE_CONFIDENCE_MIN:
new_edge = self.graph.add_edge(
from_id=node.id,
to_id=target.id,
relation=relation,
weight=confidence,
confidence=confidence,
source="inferred"
)
if new_edge:
filled += 1
break
break # One fill per missing relation
if filled >= 5:
break
return filled
def _challenge_weak_inferences(self) -> int:
"""
Re-examine weak inferred edges.
If supporting evidence exists, strengthen.
If contradicting evidence exists, remove.
"""
improved = 0
weak_edges = self.graph.get_weakest_edges(
limit=20, source_filter="inferred"
)
for edge in weak_edges:
from_node = self.graph.get_node(edge.from_node)
to_node = self.graph.get_node(edge.to_node)
if not from_node or not to_node:
continue
# Check if there's additional evidence
# (other paths between these nodes)
paths = self.graph.find_paths(
edge.from_node, edge.to_node,
max_depth=4, max_paths=3
)
# Filter paths that don't use this edge
alternative_paths = [
p for p in paths
if edge.id not in p
]
if alternative_paths:
# Multiple paths support this edge → strengthen
support_factor = 1.0 + 0.05 * len(alternative_paths)
new_weight = min(
edge.weight * support_factor,
config.WEIGHT_MAX
)
self.graph.edges[edge.id].weight = new_weight
self.graph.edges[edge.id].confidence = min(
edge.confidence * support_factor, 1.0
)
self.graph.edges[edge.id].mark_dirty()
self.graph.memory.save_edge(edge.to_dict())
improved += 1
else:
# No alternative support → further weaken
if edge.weight < config.PRUNE_WEIGHT_THRESHOLD * 2:
self.graph.remove_edge(edge.id)
improved += 1
return improved
# ═══════════════════════════════════════════════════
# USER KNOWLEDGE EXTRACTION
# ═══════════════════════════════════════════════════
def extract_from_user_message(self, message: str):
"""
Extract knowledge from a user's chat message.
Called by brain.py after processing a user request.
Does NOT store the raw message — only extracted knowledge.
"""
if not message or len(message.strip()) < 10:
return
message = message.strip()
# Extract keywords
keywords = utils.extract_keywords(message, max_keywords=15)
if len(keywords) < 2:
return
# Extract entities
entities = utils.extract_entities_simple(message)
# Create entity nodes
entity_nodes = []
for entity in entities[:5]:
node = self.graph.add_node(
content=entity,
node_type="entity",
source="user_chat",
weight=config.USER_KNOWLEDGE_CONFIDENCE
)
if node:
entity_nodes.append(node)
# Create concept nodes from keywords not already entities
entity_lower = {e.lower() for e in entities}
for kw in keywords:
if kw.lower() not in entity_lower:
node = self.graph.add_node(
content=kw,
node_type="concept",
source="user_chat",
weight=config.USER_KNOWLEDGE_CONFIDENCE * 0.7
)
# If message contains informational content, create fact node
if len(message) > 30 and any(
p in message.lower() for p in [
"adalah", "merupakan", "yaitu", "ialah",
"is", "are", "means", "defined"
]
):
fact_node = self.graph.add_node(
content=message[:500],
node_type="fact",
source="user_chat",
weight=config.USER_KNOWLEDGE_CONFIDENCE
)
# Connect fact to entities mentioned
if fact_node:
for en in entity_nodes:
self.graph.add_edge(
from_id=fact_node.id,
to_id=en.id,
relation="related_to",
confidence=config.USER_KNOWLEDGE_CONFIDENCE * 0.8,
source="user_chat"
)