Spaces:
Sleeping
Sleeping
| """ | |
| Cogni-Engine v1 — Knowledge Graph Engine | |
| In-memory graph structure with nodes, edges, traversal, similarity search. | |
| This is the core data structure that represents all knowledge. | |
| The "brain matter" — where concepts live and connect. | |
| """ | |
| import time | |
| import threading | |
| import json | |
| from typing import List, Dict, Optional, Set, Tuple, Any | |
| from collections import defaultdict | |
| import numpy as np | |
| import config | |
| import utils | |
| from memory import Memory | |
| # ═══════════════════════════════════════════════════════════ | |
| # DATA STRUCTURES | |
| # ═══════════════════════════════════════════════════════════ | |
| class Node: | |
| """A single knowledge node in the graph.""" | |
| __slots__ = [ | |
| 'id', 'type', 'content', 'vector', 'weight', | |
| 'connections', 'source', 'created_at', 'updated_at', | |
| '_dirty' | |
| ] | |
| def __init__( | |
| self, | |
| node_id: str, | |
| node_type: str, | |
| content: str, | |
| vector: np.ndarray = None, | |
| weight: float = 1.0, | |
| connections: int = 0, | |
| source: str = "data", | |
| created_at: str = "", | |
| updated_at: str = "" | |
| ): | |
| self.id = node_id | |
| self.type = node_type | |
| self.content = content | |
| self.vector = vector if vector is not None else np.zeros(config.VECTOR_DIM, dtype=np.float32) | |
| self.weight = weight | |
| self.connections = connections | |
| self.source = source | |
| self.created_at = created_at or utils.timestamp_now() | |
| self.updated_at = updated_at or utils.timestamp_now() | |
| self._dirty = False | |
| def to_dict(self) -> dict: | |
| """Serialize to dict for DB storage.""" | |
| return { | |
| "id": self.id, | |
| "type": self.type, | |
| "content": self.content, | |
| "vector": utils.vector_to_list(self.vector), | |
| "weight": round(self.weight, 6), | |
| "connections": self.connections, | |
| "source": self.source, | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at | |
| } | |
| def from_dict(data: dict) -> 'Node': | |
| """Deserialize from dict.""" | |
| vector = None | |
| if data.get("vector"): | |
| vector = utils.list_to_vector(data["vector"]) | |
| return Node( | |
| node_id=data["id"], | |
| node_type=data.get("type", "fact"), | |
| content=data.get("content", ""), | |
| vector=vector, | |
| weight=float(data.get("weight", 1.0)), | |
| connections=int(data.get("connections", 0)), | |
| source=data.get("source", "data"), | |
| created_at=data.get("created_at", ""), | |
| updated_at=data.get("updated_at", "") | |
| ) | |
| def mark_dirty(self): | |
| """Mark this node as needing DB sync.""" | |
| self._dirty = True | |
| self.updated_at = utils.timestamp_now() | |
| class Edge: | |
| """A directed relationship between two nodes.""" | |
| __slots__ = [ | |
| 'id', 'from_node', 'to_node', 'relation', 'weight', | |
| 'confidence', 'source', 'used_count', 'created_at', | |
| '_dirty' | |
| ] | |
| def __init__( | |
| self, | |
| edge_id: str, | |
| from_node: str, | |
| to_node: str, | |
| relation: str = "related_to", | |
| weight: float = 1.0, | |
| confidence: float = 1.0, | |
| source: str = "data", | |
| used_count: int = 0, | |
| created_at: str = "" | |
| ): | |
| self.id = edge_id | |
| self.from_node = from_node | |
| self.to_node = to_node | |
| self.relation = relation | |
| self.weight = weight | |
| self.confidence = confidence | |
| self.source = source | |
| self.used_count = used_count | |
| self.created_at = created_at or utils.timestamp_now() | |
| self._dirty = False | |
| def to_dict(self) -> dict: | |
| """Serialize to dict for DB storage.""" | |
| return { | |
| "id": self.id, | |
| "from_node": self.from_node, | |
| "to_node": self.to_node, | |
| "relation": self.relation, | |
| "weight": round(self.weight, 6), | |
| "confidence": round(self.confidence, 6), | |
| "source": self.source, | |
| "used_count": self.used_count, | |
| "created_at": self.created_at | |
| } | |
| def from_dict(data: dict) -> 'Edge': | |
| """Deserialize from dict.""" | |
| return Edge( | |
| edge_id=data["id"], | |
| from_node=data["from_node"], | |
| to_node=data["to_node"], | |
| relation=data.get("relation", "related_to"), | |
| weight=float(data.get("weight", 1.0)), | |
| confidence=float(data.get("confidence", 1.0)), | |
| source=data.get("source", "data"), | |
| used_count=int(data.get("used_count", 0)), | |
| created_at=data.get("created_at", "") | |
| ) | |
| def mark_dirty(self): | |
| """Mark edge as needing DB sync.""" | |
| self._dirty = True | |
| class ReasoningChain: | |
| """A discovered path of reasoning through the graph.""" | |
| __slots__ = [ | |
| 'id', 'path', 'conclusion', 'confidence', | |
| 'used_count', 'created_at' | |
| ] | |
| def __init__( | |
| self, | |
| chain_id: str, | |
| path: list, | |
| conclusion: str = "", | |
| confidence: float = 0.5, | |
| used_count: int = 0, | |
| created_at: str = "" | |
| ): | |
| self.id = chain_id | |
| self.path = path # [node_id, edge_id, node_id, edge_id, ...] | |
| self.conclusion = conclusion | |
| self.confidence = confidence | |
| self.used_count = used_count | |
| self.created_at = created_at or utils.timestamp_now() | |
| def to_dict(self) -> dict: | |
| return { | |
| "id": self.id, | |
| "path": self.path, | |
| "conclusion": self.conclusion, | |
| "confidence": round(self.confidence, 6), | |
| "used_count": self.used_count, | |
| "created_at": self.created_at | |
| } | |
| def from_dict(data: dict) -> 'ReasoningChain': | |
| return ReasoningChain( | |
| chain_id=data["id"], | |
| path=data.get("path", []), | |
| conclusion=data.get("conclusion", ""), | |
| confidence=float(data.get("confidence", 0.5)), | |
| used_count=int(data.get("used_count", 0)), | |
| created_at=data.get("created_at", "") | |
| ) | |
| # ═══════════════════════════════════════════════════════════ | |
| # KNOWLEDGE GRAPH | |
| # ═══════════════════════════════════════════════════════════ | |
| class KnowledgeGraph: | |
| """ | |
| In-memory knowledge graph with persistence via Memory. | |
| Structure: | |
| - nodes: dict of Node objects indexed by id | |
| - edges: dict of Edge objects indexed by id | |
| - adjacency_out: node_id → list of edge_ids (outgoing) | |
| - adjacency_in: node_id → list of edge_ids (incoming) | |
| - vector_index: numpy matrix of all node vectors for fast search | |
| - chains: dict of ReasoningChain objects | |
| Thread-safe via read-write lock: | |
| - Multiple readers allowed simultaneously | |
| - Writers get exclusive access | |
| """ | |
| def __init__(self, memory: Memory): | |
| self.memory = memory | |
| # Core data | |
| self.nodes: Dict[str, Node] = {} | |
| self.edges: Dict[str, Edge] = {} | |
| self.chains: Dict[str, ReasoningChain] = {} | |
| # Adjacency indexes | |
| self._adj_out: Dict[str, List[str]] = defaultdict(list) # node → [edge_ids outgoing] | |
| self._adj_in: Dict[str, List[str]] = defaultdict(list) # node → [edge_ids incoming] | |
| # Vector index for fast similarity search | |
| self._vector_matrix: Optional[np.ndarray] = None | |
| self._vector_node_ids: List[str] = [] | |
| self._vector_index_dirty = True | |
| # Thread safety | |
| self._lock = threading.RLock() | |
| # Stats | |
| self._stats = { | |
| "total_nodes": 0, | |
| "total_edges": 0, | |
| "total_chains": 0, | |
| "inferred_nodes": 0, | |
| "inferred_edges": 0, | |
| "max_abstraction_depth": 0, | |
| "avg_connections": 0.0, | |
| "avg_confidence": 0.0 | |
| } | |
| # ─────────────────────────────────────────────────── | |
| # INITIALIZATION | |
| # ─────────────────────────────────────────────────── | |
| def load_from_memory(self) -> bool: | |
| """ | |
| Load entire graph from TiDB via Memory. | |
| Called once at startup. | |
| """ | |
| state = self.memory.load_full_state() | |
| if not state.get("loaded", False) and not state["nodes"]: | |
| print("[GRAPH] No existing state found. Starting fresh.") | |
| self._rebuild_stats() | |
| return True | |
| with self._lock: | |
| # Load nodes | |
| for node_data in state["nodes"]: | |
| node = Node.from_dict(node_data) | |
| self.nodes[node.id] = node | |
| # Load edges | |
| for edge_data in state["edges"]: | |
| edge = Edge.from_dict(edge_data) | |
| self.edges[edge.id] = edge | |
| self._adj_out[edge.from_node].append(edge.id) | |
| self._adj_in[edge.to_node].append(edge.id) | |
| # Load chains | |
| for chain_data in state["chains"]: | |
| chain = ReasoningChain.from_dict(chain_data) | |
| self.chains[chain.id] = chain | |
| # Rebuild vector index | |
| self._rebuild_vector_index() | |
| self._rebuild_stats() | |
| print(f"[GRAPH] Loaded: {len(self.nodes)} nodes, " | |
| f"{len(self.edges)} edges, {len(self.chains)} chains") | |
| return True | |
| # ─────────────────────────────────────────────────── | |
| # NODE OPERATIONS | |
| # ─────────────────────────────────────────────────── | |
| def add_node( | |
| self, | |
| content: str, | |
| node_type: str = "fact", | |
| source: str = "data", | |
| weight: float = None, | |
| vector: np.ndarray = None, | |
| node_id: str = None, | |
| tags: List[str] = None | |
| ) -> Optional[Node]: | |
| """ | |
| Add a new node to the graph. | |
| If node with same id exists, update it instead. | |
| Returns the node, or None if invalid. | |
| """ | |
| if not content or not content.strip(): | |
| return None | |
| content = content.strip() | |
| if node_id is None: | |
| node_id = config.generate_node_id(content, node_type) | |
| # Generate vector if not provided | |
| if vector is None: | |
| vector = utils.text_to_vector_tfidf(content) | |
| # Register content with TF-IDF corpus | |
| tokens = utils.tokenize(content, remove_stopwords=True) | |
| utils.tfidf.add_document(tokens) | |
| if weight is None: | |
| weight = (config.DATA_KNOWLEDGE_CONFIDENCE | |
| if source == "data" | |
| else config.USER_KNOWLEDGE_CONFIDENCE) | |
| with self._lock: | |
| if node_id in self.nodes: | |
| # Update existing node | |
| existing = self.nodes[node_id] | |
| # Reinforce weight if seen again | |
| existing.weight = min( | |
| existing.weight * config.WEIGHT_REINFORCE, | |
| config.WEIGHT_MAX | |
| ) | |
| existing.mark_dirty() | |
| self.memory.save_node(existing.to_dict()) | |
| return existing | |
| # Create new node | |
| node = Node( | |
| node_id=node_id, | |
| node_type=node_type, | |
| content=content, | |
| vector=vector, | |
| weight=weight, | |
| connections=0, | |
| source=source | |
| ) | |
| # Safety check | |
| if len(self.nodes) >= config.MAX_GRAPH_MEMORY_NODES: | |
| print(f"[GRAPH] Node limit reached ({config.MAX_GRAPH_MEMORY_NODES}). Skipping.") | |
| return None | |
| self.nodes[node_id] = node | |
| self._vector_index_dirty = True | |
| # Buffer for DB write | |
| node._dirty = True | |
| self.memory.save_node(node.to_dict()) | |
| # Create edges from tags | |
| if tags: | |
| for tag in tags: | |
| tag_id = config.generate_node_id(tag, "concept") | |
| if tag_id not in self.nodes: | |
| self.add_node( | |
| content=tag, | |
| node_type="concept", | |
| source=source, | |
| weight=weight * 0.8 | |
| ) | |
| self.add_edge( | |
| from_id=node_id, | |
| to_id=tag_id, | |
| relation="related_to", | |
| source=source, | |
| confidence=weight * 0.7 | |
| ) | |
| return node | |
| def get_node(self, node_id: str) -> Optional[Node]: | |
| """Get a node by id.""" | |
| return self.nodes.get(node_id) | |
| def get_node_by_content(self, content: str, node_type: str = "") -> Optional[Node]: | |
| """Find node by exact content match.""" | |
| node_id = config.generate_node_id(content.strip(), node_type) | |
| return self.nodes.get(node_id) | |
| def remove_node(self, node_id: str) -> bool: | |
| """Remove a node and all its edges.""" | |
| with self._lock: | |
| if node_id not in self.nodes: | |
| return False | |
| # Remove connected edges | |
| edge_ids_to_remove = [] | |
| edge_ids_to_remove.extend(self._adj_out.get(node_id, [])) | |
| edge_ids_to_remove.extend(self._adj_in.get(node_id, [])) | |
| for edge_id in set(edge_ids_to_remove): | |
| self._remove_edge_internal(edge_id) | |
| # Remove adjacency entries | |
| self._adj_out.pop(node_id, None) | |
| self._adj_in.pop(node_id, None) | |
| # Remove node | |
| del self.nodes[node_id] | |
| self._vector_index_dirty = True | |
| # Buffer for DB delete | |
| self.memory.delete_node(node_id) | |
| return True | |
| def update_node_weight(self, node_id: str, new_weight: float): | |
| """Update a node's weight.""" | |
| with self._lock: | |
| node = self.nodes.get(node_id) | |
| if node: | |
| node.weight = utils.clamp(new_weight, config.WEIGHT_MIN, config.WEIGHT_MAX) | |
| node.mark_dirty() | |
| self.memory.save_node(node.to_dict()) | |
| def get_nodes_by_type(self, node_type: str) -> List[Node]: | |
| """Get all nodes of a specific type.""" | |
| return [n for n in self.nodes.values() if n.type == node_type] | |
| def get_nodes_by_source(self, source: str) -> List[Node]: | |
| """Get all nodes from a specific source.""" | |
| return [n for n in self.nodes.values() if n.source == source] | |
| def get_weakest_nodes(self, limit: int = 50) -> List[Node]: | |
| """Get nodes with lowest weight (candidates for pruning).""" | |
| sorted_nodes = sorted(self.nodes.values(), key=lambda n: n.weight) | |
| return sorted_nodes[:limit] | |
| def get_least_connected_nodes(self, limit: int = 50) -> List[Node]: | |
| """Get nodes with fewest connections (candidates for connecting).""" | |
| sorted_nodes = sorted(self.nodes.values(), key=lambda n: n.connections) | |
| return sorted_nodes[:limit] | |
| # ─────────────────────────────────────────────────── | |
| # EDGE OPERATIONS | |
| # ─────────────────────────────────────────────────── | |
| def add_edge( | |
| self, | |
| from_id: str, | |
| to_id: str, | |
| relation: str = "related_to", | |
| weight: float = 1.0, | |
| confidence: float = 1.0, | |
| source: str = "data", | |
| edge_id: str = None | |
| ) -> Optional[Edge]: | |
| """ | |
| Add a directed edge between two nodes. | |
| If edge exists, reinforce it. | |
| """ | |
| if from_id == to_id: | |
| return None # No self-loops | |
| if from_id not in self.nodes or to_id not in self.nodes: | |
| return None # Both nodes must exist | |
| if edge_id is None: | |
| edge_id = config.generate_edge_id(from_id, to_id, relation) | |
| with self._lock: | |
| if edge_id in self.edges: | |
| # Reinforce existing edge | |
| existing = self.edges[edge_id] | |
| existing.weight = min( | |
| existing.weight * config.WEIGHT_REINFORCE, | |
| config.WEIGHT_MAX | |
| ) | |
| existing.confidence = min( | |
| (existing.confidence + confidence) / 2.0 * 1.05, | |
| 1.0 | |
| ) | |
| existing.mark_dirty() | |
| self.memory.save_edge(existing.to_dict()) | |
| return existing | |
| # Safety check | |
| if len(self.edges) >= config.MAX_GRAPH_MEMORY_EDGES: | |
| print(f"[GRAPH] Edge limit reached ({config.MAX_GRAPH_MEMORY_EDGES}). Skipping.") | |
| return None | |
| edge = Edge( | |
| edge_id=edge_id, | |
| from_node=from_id, | |
| to_node=to_id, | |
| relation=relation, | |
| weight=weight, | |
| confidence=confidence, | |
| source=source | |
| ) | |
| self.edges[edge_id] = edge | |
| self._adj_out[from_id].append(edge_id) | |
| self._adj_in[to_id].append(edge_id) | |
| # Update connection counts | |
| self.nodes[from_id].connections += 1 | |
| self.nodes[to_id].connections += 1 | |
| # Buffer for DB | |
| edge._dirty = True | |
| self.memory.save_edge(edge.to_dict()) | |
| return edge | |
| def get_edge(self, edge_id: str) -> Optional[Edge]: | |
| """Get an edge by id.""" | |
| return self.edges.get(edge_id) | |
| def get_edges_from(self, node_id: str) -> List[Edge]: | |
| """Get all outgoing edges from a node.""" | |
| edge_ids = self._adj_out.get(node_id, []) | |
| return [self.edges[eid] for eid in edge_ids if eid in self.edges] | |
| def get_edges_to(self, node_id: str) -> List[Edge]: | |
| """Get all incoming edges to a node.""" | |
| edge_ids = self._adj_in.get(node_id, []) | |
| return [self.edges[eid] for eid in edge_ids if eid in self.edges] | |
| def get_all_edges_for(self, node_id: str) -> List[Edge]: | |
| """Get all edges (in + out) connected to a node.""" | |
| edges = self.get_edges_from(node_id) | |
| edges.extend(self.get_edges_to(node_id)) | |
| return edges | |
| def get_neighbors(self, node_id: str) -> List[Tuple[Node, Edge]]: | |
| """Get all neighboring nodes with their connecting edges.""" | |
| neighbors = [] | |
| for edge in self.get_edges_from(node_id): | |
| target = self.nodes.get(edge.to_node) | |
| if target: | |
| neighbors.append((target, edge)) | |
| for edge in self.get_edges_to(node_id): | |
| source = self.nodes.get(edge.from_node) | |
| if source: | |
| neighbors.append((source, edge)) | |
| return neighbors | |
| def edge_exists(self, from_id: str, to_id: str, relation: str = None) -> bool: | |
| """Check if an edge exists between two nodes.""" | |
| for edge_id in self._adj_out.get(from_id, []): | |
| edge = self.edges.get(edge_id) | |
| if edge and edge.to_node == to_id: | |
| if relation is None or edge.relation == relation: | |
| return True | |
| return False | |
| def remove_edge(self, edge_id: str) -> bool: | |
| """Remove an edge.""" | |
| with self._lock: | |
| return self._remove_edge_internal(edge_id) | |
| def _remove_edge_internal(self, edge_id: str) -> bool: | |
| """Internal edge removal (must be called under lock).""" | |
| edge = self.edges.get(edge_id) | |
| if not edge: | |
| return False | |
| # Remove from adjacency | |
| if edge_id in self._adj_out.get(edge.from_node, []): | |
| self._adj_out[edge.from_node].remove(edge_id) | |
| if edge_id in self._adj_in.get(edge.to_node, []): | |
| self._adj_in[edge.to_node].remove(edge_id) | |
| # Update connection counts | |
| from_node = self.nodes.get(edge.from_node) | |
| to_node = self.nodes.get(edge.to_node) | |
| if from_node: | |
| from_node.connections = max(0, from_node.connections - 1) | |
| if to_node: | |
| to_node.connections = max(0, to_node.connections - 1) | |
| # Remove edge | |
| del self.edges[edge_id] | |
| self.memory.delete_edge(edge_id) | |
| return True | |
| def reinforce_edge(self, edge_id: str, factor: float = None): | |
| """Increase edge weight (used when edge participates in response).""" | |
| if factor is None: | |
| factor = config.WEIGHT_REINFORCE | |
| with self._lock: | |
| edge = self.edges.get(edge_id) | |
| if edge: | |
| edge.weight = min(edge.weight * factor, config.WEIGHT_MAX) | |
| edge.used_count += 1 | |
| edge.mark_dirty() | |
| self.memory.save_edge(edge.to_dict()) | |
| def decay_edge(self, edge_id: str, factor: float = None): | |
| """Decrease edge weight (unused edge decay).""" | |
| if factor is None: | |
| factor = config.WEIGHT_DECAY_RATE | |
| with self._lock: | |
| edge = self.edges.get(edge_id) | |
| if edge: | |
| edge.weight = max(edge.weight * factor, config.WEIGHT_MIN) | |
| edge.mark_dirty() | |
| self.memory.save_edge(edge.to_dict()) | |
| def get_weakest_edges(self, limit: int = 100, source_filter: str = "inferred") -> List[Edge]: | |
| """Get edges with lowest weight (candidates for pruning).""" | |
| filtered = [ | |
| e for e in self.edges.values() | |
| if source_filter is None or e.source == source_filter | |
| ] | |
| sorted_edges = sorted(filtered, key=lambda e: e.weight) | |
| return sorted_edges[:limit] | |
| # ─────────────────────────────────────────────────── | |
| # VECTOR INDEX & SIMILARITY SEARCH | |
| # ─────────────────────────────────────────────────── | |
| def _rebuild_vector_index(self): | |
| """Rebuild the vector matrix for fast batch similarity search.""" | |
| with self._lock: | |
| if not self.nodes: | |
| self._vector_matrix = np.zeros((0, config.VECTOR_DIM), dtype=np.float32) | |
| self._vector_node_ids = [] | |
| self._vector_index_dirty = False | |
| return | |
| node_ids = [] | |
| vectors = [] | |
| for nid, node in self.nodes.items(): | |
| if node.vector is not None and len(node.vector) == config.VECTOR_DIM: | |
| node_ids.append(nid) | |
| vectors.append(node.vector) | |
| if vectors: | |
| self._vector_matrix = np.array(vectors, dtype=np.float32) | |
| else: | |
| self._vector_matrix = np.zeros((0, config.VECTOR_DIM), dtype=np.float32) | |
| self._vector_node_ids = node_ids | |
| self._vector_index_dirty = False | |
| def _ensure_vector_index(self): | |
| """Rebuild vector index if dirty.""" | |
| if self._vector_index_dirty: | |
| self._rebuild_vector_index() | |
| def find_similar_nodes( | |
| self, | |
| query_vector: np.ndarray, | |
| top_k: int = None, | |
| min_similarity: float = 0.0, | |
| exclude_ids: Set[str] = None, | |
| type_filter: str = None | |
| ) -> List[Tuple[Node, float]]: | |
| """ | |
| Find nodes most similar to query vector. | |
| Returns list of (node, similarity_score) sorted by similarity desc. | |
| """ | |
| if top_k is None: | |
| top_k = config.MAX_NODES_PER_SEARCH | |
| self._ensure_vector_index() | |
| if self._vector_matrix.shape[0] == 0: | |
| return [] | |
| # Batch cosine similarity | |
| similarities = utils.batch_cosine_similarity(query_vector, self._vector_matrix) | |
| # Apply filters and sort | |
| results = [] | |
| for i, sim in enumerate(similarities): | |
| sim_val = float(sim) | |
| if sim_val < min_similarity: | |
| continue | |
| node_id = self._vector_node_ids[i] | |
| if exclude_ids and node_id in exclude_ids: | |
| continue | |
| node = self.nodes.get(node_id) | |
| if not node: | |
| continue | |
| if type_filter and node.type != type_filter: | |
| continue | |
| results.append((node, sim_val)) | |
| # Sort by similarity descending | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| return results[:top_k] | |
| def find_similar_to_text( | |
| self, | |
| text: str, | |
| top_k: int = None, | |
| min_similarity: float = 0.0, | |
| exclude_ids: Set[str] = None, | |
| type_filter: str = None | |
| ) -> List[Tuple[Node, float]]: | |
| """ | |
| Find nodes most similar to a text query. | |
| Convenience wrapper around find_similar_nodes. | |
| """ | |
| query_vector = utils.text_to_vector_tfidf(text) | |
| return self.find_similar_nodes( | |
| query_vector, top_k, min_similarity, | |
| exclude_ids, type_filter | |
| ) | |
| def find_similar_to_node( | |
| self, | |
| node_id: str, | |
| top_k: int = None, | |
| min_similarity: float = None | |
| ) -> List[Tuple[Node, float]]: | |
| """Find nodes most similar to an existing node.""" | |
| node = self.nodes.get(node_id) | |
| if not node: | |
| return [] | |
| if min_similarity is None: | |
| min_similarity = config.SIMILARITY_THRESHOLD | |
| return self.find_similar_nodes( | |
| node.vector, top_k, min_similarity, | |
| exclude_ids={node_id} | |
| ) | |
| # ─────────────────────────────────────────────────── | |
| # GRAPH TRAVERSAL | |
| # ─────────────────────────────────────────────────── | |
| def traverse_bfs( | |
| self, | |
| start_ids: List[str], | |
| max_depth: int = None, | |
| max_nodes: int = 100 | |
| ) -> Dict[str, Tuple[int, List[str]]]: | |
| """ | |
| Breadth-first traversal from starting nodes. | |
| Returns: {node_id: (depth, [path_from_start])} | |
| """ | |
| if max_depth is None: | |
| max_depth = config.MAX_TRAVERSAL_DEPTH | |
| visited = {} # node_id → (depth, path) | |
| queue = [] | |
| for sid in start_ids: | |
| if sid in self.nodes: | |
| visited[sid] = (0, [sid]) | |
| queue.append((sid, 0, [sid])) | |
| while queue and len(visited) < max_nodes: | |
| current_id, depth, path = queue.pop(0) | |
| if depth >= max_depth: | |
| continue | |
| for neighbor, edge in self.get_neighbors(current_id): | |
| if neighbor.id not in visited: | |
| new_path = path + [edge.id, neighbor.id] | |
| visited[neighbor.id] = (depth + 1, new_path) | |
| queue.append((neighbor.id, depth + 1, new_path)) | |
| return visited | |
| def traverse_weighted_random( | |
| self, | |
| start_id: str, | |
| max_depth: int = None, | |
| temperature: float = 0.7 | |
| ) -> List[Tuple[str, str]]: | |
| """ | |
| Weighted random walk from a starting node. | |
| Edge weight determines probability of following that edge. | |
| Returns: [(node_id, edge_id), ...] — the path taken. | |
| """ | |
| if max_depth is None: | |
| max_depth = config.MAX_TRAVERSAL_DEPTH | |
| if start_id not in self.nodes: | |
| return [] | |
| path = [(start_id, "")] | |
| visited = {start_id} | |
| current = start_id | |
| for _ in range(max_depth): | |
| neighbors = self.get_neighbors(current) | |
| # Filter out already visited | |
| unvisited = [ | |
| (node, edge) for node, edge in neighbors | |
| if node.id not in visited | |
| ] | |
| if not unvisited: | |
| break | |
| # Weight-based selection | |
| items = unvisited | |
| weights = [ | |
| edge.weight * edge.confidence * node.weight | |
| for node, edge in items | |
| ] | |
| chosen_node, chosen_edge = utils.weighted_choice( | |
| items, weights, temperature | |
| ) | |
| visited.add(chosen_node.id) | |
| path.append((chosen_node.id, chosen_edge.id)) | |
| current = chosen_node.id | |
| return path | |
| def find_paths( | |
| self, | |
| from_id: str, | |
| to_id: str, | |
| max_depth: int = None, | |
| max_paths: int = 5 | |
| ) -> List[List[str]]: | |
| """ | |
| Find paths between two nodes using DFS. | |
| Returns list of paths, each path is [node_id, edge_id, node_id, ...]. | |
| """ | |
| if max_depth is None: | |
| max_depth = config.MAX_TRAVERSAL_DEPTH | |
| if from_id not in self.nodes or to_id not in self.nodes: | |
| return [] | |
| all_paths = [] | |
| def dfs(current: str, target: str, path: list, visited: set, depth: int): | |
| if len(all_paths) >= max_paths: | |
| return | |
| if depth > max_depth: | |
| return | |
| if current == target: | |
| all_paths.append(list(path)) | |
| return | |
| for neighbor, edge in self.get_neighbors(current): | |
| if neighbor.id not in visited: | |
| visited.add(neighbor.id) | |
| path.extend([edge.id, neighbor.id]) | |
| dfs(neighbor.id, target, path, visited, depth + 1) | |
| # Backtrack | |
| path.pop() | |
| path.pop() | |
| visited.discard(neighbor.id) | |
| dfs(from_id, to_id, [from_id], {from_id}, 0) | |
| return all_paths | |
| # ─────────────────────────────────────────────────── | |
| # REASONING CHAINS | |
| # ─────────────────────────────────────────────────── | |
| def build_reasoning_chains( | |
| self, | |
| start_nodes: List[str], | |
| max_chains: int = None, | |
| max_depth: int = None | |
| ) -> List[ReasoningChain]: | |
| """ | |
| Build reasoning chains from starting nodes. | |
| Combines BFS exploration with weighted random walks. | |
| Returns scored and sorted chains. | |
| """ | |
| if max_chains is None: | |
| max_chains = config.MAX_CHAINS_PER_RESPONSE | |
| if max_depth is None: | |
| max_depth = config.MAX_TRAVERSAL_DEPTH | |
| chains = [] | |
| for start_id in start_nodes: | |
| if start_id not in self.nodes: | |
| continue | |
| # Strategy 1: Weighted random walks (multiple) | |
| for _ in range(min(3, max_chains)): | |
| walk = self.traverse_weighted_random(start_id, max_depth) | |
| if len(walk) >= 2: | |
| path = [] | |
| for node_id, edge_id in walk: | |
| if edge_id: | |
| path.append(edge_id) | |
| path.append(node_id) | |
| confidence = self._score_chain(path) | |
| conclusion = self._chain_to_conclusion(path) | |
| chain = ReasoningChain( | |
| chain_id=config.generate_chain_id(path), | |
| path=path, | |
| conclusion=conclusion, | |
| confidence=confidence | |
| ) | |
| chains.append(chain) | |
| # Strategy 2: Follow high-weight edges | |
| high_weight_path = self._follow_strongest_path(start_id, max_depth) | |
| if len(high_weight_path) >= 3: | |
| confidence = self._score_chain(high_weight_path) | |
| conclusion = self._chain_to_conclusion(high_weight_path) | |
| chain = ReasoningChain( | |
| chain_id=config.generate_chain_id(high_weight_path), | |
| path=high_weight_path, | |
| conclusion=conclusion, | |
| confidence=confidence | |
| ) | |
| chains.append(chain) | |
| # Deduplicate by chain id | |
| seen = set() | |
| unique_chains = [] | |
| for c in chains: | |
| if c.id not in seen: | |
| seen.add(c.id) | |
| unique_chains.append(c) | |
| # Sort by confidence descending | |
| unique_chains.sort(key=lambda c: c.confidence, reverse=True) | |
| return unique_chains[:max_chains] | |
| def _follow_strongest_path(self, start_id: str, max_depth: int) -> list: | |
| """Follow the highest-weight edges from a starting node.""" | |
| path = [start_id] | |
| visited = {start_id} | |
| current = start_id | |
| for _ in range(max_depth): | |
| edges = self.get_edges_from(current) | |
| # Filter unvisited | |
| candidates = [ | |
| e for e in edges | |
| if e.to_node not in visited and e.to_node in self.nodes | |
| ] | |
| if not candidates: | |
| break | |
| # Pick strongest edge | |
| best_edge = max(candidates, key=lambda e: e.weight * e.confidence) | |
| path.append(best_edge.id) | |
| path.append(best_edge.to_node) | |
| visited.add(best_edge.to_node) | |
| current = best_edge.to_node | |
| return path | |
| def _score_chain(self, path: list) -> float: | |
| """ | |
| Score a reasoning chain. | |
| Considers: edge weights, confidences, chain length, node weights. | |
| """ | |
| if len(path) < 3: | |
| return 0.0 | |
| edge_scores = [] | |
| node_weights = [] | |
| for item_id in path: | |
| if item_id in self.edges: | |
| edge = self.edges[item_id] | |
| edge_scores.append(edge.weight * edge.confidence) | |
| elif item_id in self.nodes: | |
| node_weights.append(self.nodes[item_id].weight) | |
| if not edge_scores: | |
| return 0.0 | |
| avg_edge_score = sum(edge_scores) / len(edge_scores) | |
| avg_node_weight = sum(node_weights) / len(node_weights) if node_weights else 0.5 | |
| # Shorter chains are generally more reliable | |
| length_penalty = 1.0 / (1.0 + 0.1 * len(edge_scores)) | |
| score = avg_edge_score * avg_node_weight * length_penalty | |
| return utils.clamp(score, 0.0, 1.0) | |
| def _chain_to_conclusion(self, path: list) -> str: | |
| """ | |
| Generate a text conclusion from a reasoning chain path. | |
| Extracts content from nodes in the path. | |
| """ | |
| node_contents = [] | |
| for item_id in path: | |
| node = self.nodes.get(item_id) | |
| if node: | |
| node_contents.append(node.content) | |
| if not node_contents: | |
| return "" | |
| return " → ".join(node_contents) | |
| def save_chain(self, chain: ReasoningChain): | |
| """Save a reasoning chain.""" | |
| with self._lock: | |
| self.chains[chain.id] = chain | |
| self.memory.save_chain(chain.to_dict()) | |
| def reinforce_chain(self, chain_id: str): | |
| """Reinforce a chain that was used in a response.""" | |
| with self._lock: | |
| chain = self.chains.get(chain_id) | |
| if chain: | |
| chain.used_count += 1 | |
| chain.confidence = min(chain.confidence * 1.02, 1.0) | |
| self.memory.save_chain(chain.to_dict()) | |
| # Also reinforce all edges in the chain | |
| for item_id in chain.path: | |
| if item_id in self.edges: | |
| self.reinforce_edge(item_id) | |
| # ─────────────────────────────────────────────────── | |
| # MERGE & PRUNE | |
| # ─────────────────────────────────────────────────── | |
| def merge_nodes(self, node_id_keep: str, node_id_remove: str) -> bool: | |
| """ | |
| Merge two redundant nodes. Keep the first, remove the second. | |
| Redirect all edges from removed node to kept node. | |
| """ | |
| with self._lock: | |
| keep = self.nodes.get(node_id_keep) | |
| remove = self.nodes.get(node_id_remove) | |
| if not keep or not remove: | |
| return False | |
| # Combine weights | |
| keep.weight = min(keep.weight + remove.weight * 0.5, config.WEIGHT_MAX) | |
| # Average vectors | |
| keep.vector = utils.normalize( | |
| utils.vector_add(keep.vector, remove.vector) / 2.0 | |
| ) | |
| # Redirect edges | |
| edges_to_redirect = self.get_all_edges_for(node_id_remove) | |
| for edge in edges_to_redirect: | |
| new_from = node_id_keep if edge.from_node == node_id_remove else edge.from_node | |
| new_to = node_id_keep if edge.to_node == node_id_remove else edge.to_node | |
| if new_from == new_to: | |
| continue # Would create self-loop | |
| # Create redirected edge if doesn't exist | |
| if not self.edge_exists(new_from, new_to, edge.relation): | |
| self.add_edge( | |
| from_id=new_from, | |
| to_id=new_to, | |
| relation=edge.relation, | |
| weight=edge.weight, | |
| confidence=edge.confidence, | |
| source=edge.source | |
| ) | |
| # Remove the merged node (and its old edges) | |
| self.remove_node(node_id_remove) | |
| keep.mark_dirty() | |
| self.memory.save_node(keep.to_dict()) | |
| self._vector_index_dirty = True | |
| return True | |
| def prune_weak_edges(self, threshold: float = None) -> int: | |
| """Remove edges below weight threshold. Returns count removed.""" | |
| if threshold is None: | |
| threshold = config.PRUNE_WEIGHT_THRESHOLD | |
| to_remove = [] | |
| for edge in self.edges.values(): | |
| if edge.weight < threshold and edge.source == "inferred": | |
| to_remove.append(edge.id) | |
| with self._lock: | |
| for edge_id in to_remove: | |
| self._remove_edge_internal(edge_id) | |
| return len(to_remove) | |
| def prune_orphan_nodes(self) -> int: | |
| """Remove nodes with no connections and low weight. Returns count removed.""" | |
| to_remove = [] | |
| for node in self.nodes.values(): | |
| if (node.connections == 0 and | |
| node.weight < config.WEIGHT_MIN * 2 and | |
| node.source == "inferred"): | |
| to_remove.append(node.id) | |
| with self._lock: | |
| for node_id in to_remove: | |
| if node_id in self.nodes: | |
| del self.nodes[node_id] | |
| self.memory.delete_node(node_id) | |
| if to_remove: | |
| self._vector_index_dirty = True | |
| return len(to_remove) | |
| def find_redundant_pairs(self, limit: int = 20) -> List[Tuple[str, str, float]]: | |
| """ | |
| Find pairs of nodes that might be redundant (very high similarity). | |
| Returns [(node_id_1, node_id_2, similarity), ...] | |
| """ | |
| self._ensure_vector_index() | |
| pairs = [] | |
| node_list = list(self.nodes.values()) | |
| # Sample to avoid O(n²) for large graphs | |
| if len(node_list) > 500: | |
| sample_indices = np.random.choice(len(node_list), 500, replace=False) | |
| node_list = [node_list[i] for i in sample_indices] | |
| for i in range(len(node_list)): | |
| for j in range(i + 1, len(node_list)): | |
| n1 = node_list[i] | |
| n2 = node_list[j] | |
| if n1.type != n2.type: | |
| continue # Only merge same-type nodes | |
| sim = utils.cosine_similarity(n1.vector, n2.vector) | |
| if sim >= config.MERGE_THRESHOLD: | |
| pairs.append((n1.id, n2.id, sim)) | |
| if len(pairs) >= limit: | |
| return pairs | |
| return pairs | |
| # ─────────────────────────────────────────────────── | |
| # STATISTICS | |
| # ─────────────────────────────────────────────────── | |
| def _rebuild_stats(self): | |
| """Rebuild graph statistics.""" | |
| total_nodes = len(self.nodes) | |
| total_edges = len(self.edges) | |
| inferred_nodes = sum(1 for n in self.nodes.values() if n.source == "inferred") | |
| inferred_edges = sum(1 for e in self.edges.values() if e.source == "inferred") | |
| avg_connections = 0.0 | |
| if total_nodes > 0: | |
| avg_connections = sum(n.connections for n in self.nodes.values()) / total_nodes | |
| avg_confidence = 0.0 | |
| if total_edges > 0: | |
| avg_confidence = sum(e.confidence for e in self.edges.values()) / total_edges | |
| # Max abstraction depth | |
| max_depth = 0 | |
| for node in self.nodes.values(): | |
| if node.type == "abstraction": | |
| depth = self._get_abstraction_depth(node.id) | |
| max_depth = max(max_depth, depth) | |
| self._stats = { | |
| "total_nodes": total_nodes, | |
| "total_edges": total_edges, | |
| "total_chains": len(self.chains), | |
| "inferred_nodes": inferred_nodes, | |
| "inferred_edges": inferred_edges, | |
| "max_abstraction_depth": max_depth, | |
| "avg_connections": round(avg_connections, 2), | |
| "avg_confidence": round(avg_confidence, 4), | |
| "inference_ratio": round( | |
| inferred_edges / max(total_edges, 1), 4 | |
| ), | |
| "avg_chain_length": round( | |
| sum(len(c.path) for c in self.chains.values()) / max(len(self.chains), 1), 2 | |
| ) | |
| } | |
| def _get_abstraction_depth(self, node_id: str, visited: set = None) -> int: | |
| """Get the abstraction depth of a node (recursive).""" | |
| if visited is None: | |
| visited = set() | |
| if node_id in visited: | |
| return 0 | |
| visited.add(node_id) | |
| max_child_depth = 0 | |
| for edge in self.get_edges_to(node_id): | |
| if edge.relation == "instance_of": | |
| child_depth = self._get_abstraction_depth(edge.from_node, visited) | |
| max_child_depth = max(max_child_depth, child_depth) | |
| return max_child_depth + 1 if max_child_depth > 0 else ( | |
| 1 if self.nodes.get(node_id, Node("", "", "")).type in ("abstraction", "meta_abstraction") else 0 | |
| ) | |
| def get_stats(self) -> dict: | |
| """Get current graph statistics.""" | |
| self._rebuild_stats() | |
| return dict(self._stats) | |
| def get_intelligence_score(self) -> float: | |
| """Calculate and return intelligence score.""" | |
| self._rebuild_stats() | |
| return utils.calculate_intelligence_score(self._stats) | |
| # ─────────────────────────────────────────────────── | |
| # SYNC | |
| # ─────────────────────────────────────────────────── | |
| def sync(self) -> Optional[dict]: | |
| """Flush buffered changes to DB if needed.""" | |
| return self.memory.flush_if_needed() | |
| def force_sync(self) -> dict: | |
| """Force flush all buffered changes to DB.""" | |
| return self.memory.flush() | |
| # ─────────────────────────────────────────────────── | |
| # DEBUG / INSPECTION | |
| # ─────────────────────────────────────────────────── | |
| def describe_node(self, node_id: str) -> Optional[dict]: | |
| """Get detailed description of a node and its connections.""" | |
| node = self.nodes.get(node_id) | |
| if not node: | |
| return None | |
| neighbors = self.get_neighbors(node_id) | |
| return { | |
| "id": node.id, | |
| "type": node.type, | |
| "content": node.content, | |
| "weight": node.weight, | |
| "connections": node.connections, | |
| "source": node.source, | |
| "neighbors": [ | |
| { | |
| "node_id": n.id, | |
| "content": utils.truncate_text(n.content, 80), | |
| "relation": e.relation, | |
| "edge_weight": e.weight, | |
| "edge_confidence": e.confidence | |
| } | |
| for n, e in neighbors | |
| ] | |
| } |