Spaces:
Build error
Build error
| # ===== FILE: services/ontology_query_engine.py ===== | |
| import os | |
| import json | |
| from collections import deque | |
| import services.config as config | |
| class OntologyQueryEngine: | |
| """ | |
| Graph-traversal engine over Aetherius's A-SMDL semantic network. | |
| Reads from OntologyArchitect's supertoken_legend.jsonl and builds | |
| an in-memory adjacency map keyed by concept name / SQT token. | |
| Provides BFS graph walks, path finding, and concept clustering. | |
| """ | |
| def __init__(self, data_directory=None): | |
| self.data_directory = data_directory or config.DATA_DIR | |
| self.legend_file = os.path.join(self.data_directory, "supertoken_legend.jsonl") | |
| self.index_file = os.path.join(self.data_directory, "ontology_index.json") | |
| self.graph: dict = {} # concept → {definition, domain, related_concepts[]} | |
| self._loaded = False | |
| print("[OntologyQueryEngine] Semantic query engine online.", flush=True) | |
| # ── Graph construction ──────────────────────────────────────────────────── | |
| def _ensure_loaded(self): | |
| """Lazy-loads the graph on first query so boot time is unaffected.""" | |
| if self._loaded: | |
| return | |
| self._load_from_legend() | |
| self._load_from_index() | |
| self._loaded = True | |
| def _load_from_legend(self): | |
| """Ingests supertoken_legend.jsonl — the richest source of concept data.""" | |
| if not os.path.exists(self.legend_file): | |
| return | |
| try: | |
| with open(self.legend_file, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| entry = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| # SQT entries use 'sqt' as the primary key | |
| key = (entry.get("sqt") or entry.get("term") or "").strip() | |
| if not key: | |
| continue | |
| related = entry.get("related_concepts", []) | |
| if isinstance(related, str): | |
| related = [r.strip() for r in related.split(",") if r.strip()] | |
| self.graph[key] = { | |
| "definition": entry.get("definition", ""), | |
| "domain": entry.get("domain", ""), | |
| "related_concepts": list(related), | |
| "source": "legend", | |
| } | |
| except Exception as e: | |
| print(f"[OntologyQueryEngine] WARNING loading legend: {e}", flush=True) | |
| def _load_from_index(self): | |
| """Supplements with ontology_index.json if present.""" | |
| if not os.path.exists(self.index_file): | |
| return | |
| try: | |
| with open(self.index_file, "r", encoding="utf-8") as f: | |
| index = json.load(f) | |
| if isinstance(index, dict): | |
| for key, data in index.items(): | |
| if key not in self.graph: | |
| related = data.get("related_concepts", []) | |
| if isinstance(related, str): | |
| related = [r.strip() for r in related.split(",") if r.strip()] | |
| self.graph[key] = { | |
| "definition": data.get("definition", ""), | |
| "domain": data.get("domain", ""), | |
| "related_concepts": list(related), | |
| "source": "index", | |
| } | |
| except Exception as e: | |
| print(f"[OntologyQueryEngine] WARNING loading index: {e}", flush=True) | |
| def reload(self): | |
| """Forces a full reload on next query — call after OntologyArchitect writes.""" | |
| self._loaded = False | |
| self.graph = {} | |
| # ── Query API ───────────────────────────────────────────────────────────── | |
| def query_graph(self, start_concept: str, max_depth: int = 3) -> dict: | |
| """ | |
| BFS from start_concept, returning all reachable nodes up to max_depth. | |
| Returns a dict of {concept: node_data} for every visited node. | |
| """ | |
| self._ensure_loaded() | |
| start = start_concept.strip() | |
| if start not in self.graph: | |
| # Case-insensitive fallback | |
| match = next((k for k in self.graph if k.lower() == start.lower()), None) | |
| if not match: | |
| return {"error": f"Concept '{start_concept}' not found in ontology.", | |
| "graph_size": len(self.graph)} | |
| start = match | |
| visited: dict = {} | |
| queue = deque([(start, 0)]) | |
| seen = {start} | |
| while queue: | |
| concept, depth = queue.popleft() | |
| node = self.graph.get(concept) | |
| if not node: | |
| continue | |
| visited[concept] = {**node, "depth_from_start": depth} | |
| if depth < max_depth: | |
| for neighbour in node.get("related_concepts", []): | |
| neighbour = neighbour.strip() | |
| if neighbour and neighbour not in seen and neighbour in self.graph: | |
| seen.add(neighbour) | |
| queue.append((neighbour, depth + 1)) | |
| return { | |
| "start": start, | |
| "max_depth": max_depth, | |
| "nodes_found": len(visited), | |
| "subgraph": visited, | |
| } | |
| def find_path(self, concept_a: str, concept_b: str) -> dict: | |
| """ | |
| BFS shortest path between two concepts. | |
| Returns the path as an ordered list of concept names, or an empty list | |
| if no path exists within the graph. | |
| """ | |
| self._ensure_loaded() | |
| a = concept_a.strip() | |
| b = concept_b.strip() | |
| # Case-insensitive matching | |
| keys_lower = {k.lower(): k for k in self.graph} | |
| a = keys_lower.get(a.lower(), a) | |
| b = keys_lower.get(b.lower(), b) | |
| if a not in self.graph: | |
| return {"error": f"Start concept '{concept_a}' not found.", "path": []} | |
| if b not in self.graph: | |
| return {"error": f"End concept '{concept_b}' not found.", "path": []} | |
| if a == b: | |
| return {"path": [a], "length": 0} | |
| # Standard BFS with parent tracking | |
| parent = {a: None} | |
| queue = deque([a]) | |
| found = False | |
| while queue and not found: | |
| current = queue.popleft() | |
| for neighbour in self.graph.get(current, {}).get("related_concepts", []): | |
| neighbour = neighbour.strip() | |
| if not neighbour or neighbour not in self.graph: | |
| continue | |
| if neighbour not in parent: | |
| parent[neighbour] = current | |
| if neighbour == b: | |
| found = True | |
| break | |
| queue.append(neighbour) | |
| if not found: | |
| return {"path": [], "length": -1, | |
| "message": f"No path found between '{a}' and '{b}'."} | |
| # Reconstruct path | |
| path = [] | |
| node = b | |
| while node is not None: | |
| path.append(node) | |
| node = parent[node] | |
| path.reverse() | |
| return {"path": path, "length": len(path) - 1} | |
| def cluster_around(self, concept: str) -> dict: | |
| """ | |
| Returns the immediate neighbourhood of a concept: | |
| the concept itself, all its direct neighbours, and their domains. | |
| Useful for contextual understanding without deep traversal. | |
| """ | |
| self._ensure_loaded() | |
| key = concept.strip() | |
| keys_lower = {k.lower(): k for k in self.graph} | |
| key = keys_lower.get(key.lower(), key) | |
| if key not in self.graph: | |
| return {"error": f"Concept '{concept}' not found.", "cluster": {}} | |
| center = self.graph[key] | |
| cluster = {key: {**center, "role": "center"}} | |
| for neighbour in center.get("related_concepts", []): | |
| neighbour = neighbour.strip() | |
| if neighbour and neighbour in self.graph: | |
| cluster[neighbour] = {**self.graph[neighbour], "role": "neighbour"} | |
| return { | |
| "center": key, | |
| "cluster_size": len(cluster), | |
| "cluster": cluster, | |
| } | |
| def search_by_domain(self, domain: str) -> list: | |
| """Returns all concepts belonging to a given domain string.""" | |
| self._ensure_loaded() | |
| domain_lower = domain.lower() | |
| return [ | |
| {"concept": k, "definition": v.get("definition", ""), | |
| "related_concepts": v.get("related_concepts", [])} | |
| for k, v in self.graph.items() | |
| if domain_lower in v.get("domain", "").lower() | |
| ] | |
| def stats(self) -> dict: | |
| """Returns a summary of the loaded ontology graph.""" | |
| self._ensure_loaded() | |
| total_edges = sum(len(v.get("related_concepts", [])) for v in self.graph.values()) | |
| domains = {} | |
| for v in self.graph.values(): | |
| d = v.get("domain", "unknown") or "unknown" | |
| domains[d] = domains.get(d, 0) + 1 | |
| return { | |
| "total_concepts": len(self.graph), | |
| "total_edges": total_edges, | |
| "domains": domains, | |
| } | |