Aetherius / services /secondary_brain.py
KingOfThoughtFleuren's picture
Update services/secondary_brain.py
a52334d verified
raw
history blame
20.3 kB
# ===== FILE: services/secondary_brain.py =====
import os
import json
import time
import datetime
import tempfile
from pathlib import Path
# Minimum seconds between expensive API calls per domain (1 hour)
_DOMAIN_API_COOLDOWN = 3600
def _safe_write(filepath: str, content: str):
"""
Bucket-safe atomic write. Writes to a temp file in the SAME directory,
then renames over the target. Within one directory on a FUSE-mounted
bucket, rename is atomic. Direct open('w') is NOT safe β€” a crash
mid-write silently zeroes the file on object storage.
"""
dirpath = os.path.dirname(os.path.abspath(filepath))
os.makedirs(dirpath, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(prefix=".tmp_sb_", dir=dirpath)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
f.flush()
os.replace(tmp_path, filepath)
except Exception:
try:
os.remove(tmp_path)
except FileNotFoundError:
pass
raise
# Tags that indicate procedural/how-to content worth extracting separately
PROCEDURAL_TAGS = {
"algorithm", "method", "formula", "process", "technique",
"procedure", "tutorial", "implementation", "steps", "how-to",
"derivation", "proof", "synthesis", "protocol", "workflow"
}
# How many legend entries a domain can hold before self-condensation runs
CONDENSATION_THRESHOLD = 150
# A domain becomes "active" for SQT purposes if it received a concept within this window (seconds)
ACTIVE_DOMAIN_WINDOW = 3600 # 1 hour
class DomainLayer:
"""
Represents a single knowledge domain (e.g. 'coding', 'chemistry').
Manages its own legend, procedures file, and condensed ontology.
Crystallizes automatically when first written to.
"""
def __init__(self, domain_name: str, base_path: str):
self.domain_name = domain_name
self.domain_dir = os.path.join(base_path, "domains", domain_name)
os.makedirs(self.domain_dir, exist_ok=True)
self.legend_path = os.path.join(self.domain_dir, "legend.jsonl")
self.procedures_path = os.path.join(self.domain_dir, "procedures.jsonl")
self.ontology_path = os.path.join(self.domain_dir, "condensed_ontology.txt")
# Per-domain cooldown timestamps β€” prevent runaway API billing
self._last_procedure_time: float = 0.0
self._last_condense_time: float = 0.0
def _count_entries(self, filepath: str) -> int:
if not os.path.exists(filepath):
return 0
count = 0
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
count += 1
return count
def append_concept(self, sqt_data: dict):
"""
Appends a new SQT entry to the domain legend.
No API call β€” pure file write.
"""
entry = {
"sqt": sqt_data.get("sqt", ""),
"summary": sqt_data.get("summary", ""),
"tags": sqt_data.get("tags", []),
"domain": self.domain_name,
"timestamp": datetime.datetime.now().isoformat()
}
with open(self.legend_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"[SecondaryBrain] Appended concept to '{self.domain_name}' domain legend.", flush=True)
def extract_procedure(self, raw_text: str, model) -> bool:
"""
Calls the model once to extract procedural/how-to knowledge from raw_text.
Appends the result to procedures.jsonl.
Returns True if a procedure was extracted, False otherwise.
Enforces a 1-hour per-domain cooldown to prevent excessive API billing.
"""
if not model:
return False
now = time.time()
if (now - self._last_procedure_time) < _DOMAIN_API_COOLDOWN:
print(f"[SecondaryBrain] '{self.domain_name}' procedure extraction on cooldown β€” skipping.", flush=True)
return False
self._last_procedure_time = now
prompt = (
f"You are analyzing text for procedural knowledge in the domain of '{self.domain_name}'.\n\n"
f"--- TEXT ---\n{raw_text[:3000]}\n--- END TEXT ---\n\n"
"If this text contains a clear method, algorithm, formula, process, or step-by-step procedure, "
"extract it. Respond with a JSON object with these keys:\n"
" 'found': true or false\n"
" 'title': short name for the procedure (if found)\n"
" 'steps': list of concise step strings (if found)\n"
" 'domain': the knowledge domain\n\n"
"If no clear procedure exists, return {\"found\": false}."
)
try:
response = model.generate_content(prompt)
cleaned = response.text.strip().replace("```json", "").replace("```", "")
result = json.loads(cleaned)
if result.get("found") and result.get("title") and result.get("steps"):
entry = {
"domain": self.domain_name,
"title": result["title"],
"steps": result["steps"],
"timestamp": datetime.datetime.now().isoformat()
}
with open(self.procedures_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"[SecondaryBrain] Extracted procedure '{result['title']}' into '{self.domain_name}'.", flush=True)
return True
except Exception as e:
print(f"[SecondaryBrain] Procedure extraction error for '{self.domain_name}': {e}", flush=True)
return False
def condense_if_needed(self, model) -> bool:
"""
If legend.jsonl exceeds CONDENSATION_THRESHOLD, runs one API call
to merge redundant entries and reduce the file back down.
Returns True if condensation ran, False if not needed.
Enforces a 1-hour per-domain cooldown to prevent excessive API billing.
"""
count = self._count_entries(self.legend_path)
if count < CONDENSATION_THRESHOLD:
return False
now = time.time()
if (now - self._last_condense_time) < _DOMAIN_API_COOLDOWN:
print(f"[SecondaryBrain] '{self.domain_name}' condensation on cooldown ({count} entries) β€” skipping.", flush=True)
return False
print(f"[SecondaryBrain] '{self.domain_name}' legend has {count} entries β€” condensing...", flush=True)
self._last_condense_time = now
if not model:
return False
# Read all current entries
entries = []
with open(self.legend_path, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
try:
entries.append(json.loads(line))
except Exception:
pass
entries_text = "\n".join([
f"- SQT: {e.get('sqt','')} | Summary: {e.get('summary','')} | Tags: {e.get('tags','')}"
for e in entries
])
prompt = (
f"You are condensing a knowledge domain legend for '{self.domain_name}'.\n\n"
f"Below are {count} SQT legend entries. Merge redundant or overlapping concepts, "
f"preserve all unique knowledge, and return a condensed list of AT MOST 60 entries.\n\n"
f"--- ENTRIES ---\n{entries_text[:6000]}\n--- END ENTRIES ---\n\n"
"Respond with a JSON array. Each item must have keys: 'sqt', 'summary', 'tags' (list).\n"
"Return ONLY the JSON array, no explanation."
)
try:
response = model.generate_content(prompt)
cleaned = response.text.strip().replace("```json", "").replace("```", "")
condensed = json.loads(cleaned)
if isinstance(condensed, list) and len(condensed) > 0:
# Build condensed legend content
now_iso = datetime.datetime.now().isoformat()
legend_lines = []
for item in condensed:
item["domain"] = self.domain_name
item["timestamp"] = now_iso
legend_lines.append(json.dumps(item, ensure_ascii=False))
# Atomic write β€” safe on bucket/FUSE storage
_safe_write(self.legend_path, "\n".join(legend_lines) + "\n")
print(f"[SecondaryBrain] '{self.domain_name}' condensed from {count} β†’ {len(condensed)} entries.", flush=True)
# Also update condensed_ontology.txt with a summary (atomic)
ontology_lines = [f"Domain: {self.domain_name}", f"Condensed at: {now_iso}", ""]
for item in condensed:
ontology_lines.append(f" [{item.get('sqt','')}] {item.get('summary','')}")
_safe_write(self.ontology_path, "\n".join(ontology_lines))
return True
except Exception as e:
print(f"[SecondaryBrain] Condensation error for '{self.domain_name}': {e}", flush=True)
return False
def search(self, keywords: list, top_k: int = 3) -> list:
"""
Keyword search across legend.jsonl and procedures.jsonl.
No API call β€” pure file scan.
Returns list of result dicts, ranked by match score.
"""
results = []
# Search legend
if os.path.exists(self.legend_path):
with open(self.legend_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
entry = json.loads(line)
score = 0
summary_lower = entry.get("summary", "").lower()
tags_lower = [t.lower() for t in entry.get("tags", [])]
for kw in keywords:
kw = kw.lower()
if kw in summary_lower:
score += 2
if any(kw in tag for tag in tags_lower):
score += 1
if score > 0:
results.append({"score": score, "type": "concept", "entry": entry})
except Exception:
pass
# Search procedures
if os.path.exists(self.procedures_path):
with open(self.procedures_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
entry = json.loads(line)
score = 0
title_lower = entry.get("title", "").lower()
for kw in keywords:
kw = kw.lower()
if kw in title_lower:
score += 3 # Procedures ranked higher on title match
for step in entry.get("steps", []):
if kw in step.lower():
score += 1
if score > 0:
results.append({"score": score, "type": "procedure", "entry": entry})
except Exception:
pass
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_k]
def get_context_snippet(self, max_entries: int = 5) -> str:
"""
Returns a readable sample of the domain legend for use in SQT prompts.
No API call.
"""
lines = []
if os.path.exists(self.legend_path):
with open(self.legend_path, "r", encoding="utf-8") as f:
all_lines = [l.strip() for l in f if l.strip()]
# Take the most recent entries
recent = all_lines[-max_entries:]
for line in recent:
try:
entry = json.loads(line)
lines.append(f" [{entry.get('sqt','')}] {entry.get('summary','')}")
except Exception:
pass
if os.path.exists(self.procedures_path):
with open(self.procedures_path, "r", encoding="utf-8") as f:
proc_lines = [l.strip() for l in f if l.strip()]
recent_procs = proc_lines[-2:]
for line in recent_procs:
try:
entry = json.loads(line)
lines.append(f" [PROCEDURE] {entry.get('title','')} β€” {entry.get('steps',[''])[0]}...")
except Exception:
pass
return "\n".join(lines) if lines else f"No {self.domain_name} knowledge stored yet."
class SecondaryBrain:
"""
The secondary brain node. Sits alongside the primary ontology.
Manages domain layers that crystallize automatically from SQT tags.
Searched in parallel with the primary brain during every response.
"""
def __init__(self, data_directory: str, models: dict):
self.base_path = os.path.join(data_directory, "SecondaryBrain")
self.models = models
self.index_path = os.path.join(self.base_path, "_brain_index.json")
self.domain_layers = {} # domain_name -> DomainLayer
os.makedirs(self.base_path, exist_ok=True)
self._load_index()
print(f"[SecondaryBrain] Online. {len(self.domain_layers)} domain(s) loaded: {list(self.domain_layers.keys())}", flush=True)
def _load_index(self):
"""
Loads the brain index and reinstantiates any existing domain layers.
"""
if os.path.exists(self.index_path):
try:
with open(self.index_path, "r", encoding="utf-8") as f:
index = json.load(f)
for domain_name in index.get("domains", {}).keys():
self.domain_layers[domain_name] = DomainLayer(domain_name, self.base_path)
except Exception as e:
print(f"[SecondaryBrain] Could not load brain index: {e}", flush=True)
def _save_index(self):
"""
Saves the brain index with domain stats and last_active timestamps.
"""
index = {"domains": {}}
for domain_name, layer in self.domain_layers.items():
concept_count = layer._count_entries(layer.legend_path)
# Try to get last_active from most recent legend entry
last_active = None
if os.path.exists(layer.legend_path):
try:
with open(layer.legend_path, "r", encoding="utf-8") as f:
all_lines = [l.strip() for l in f if l.strip()]
if all_lines:
last_entry = json.loads(all_lines[-1])
last_active = last_entry.get("timestamp")
except Exception:
pass
index["domains"][domain_name] = {
"concept_count": concept_count,
"last_active": last_active
}
# Atomic write β€” safe on bucket/FUSE storage
_safe_write(self.index_path, json.dumps(index, indent=2, ensure_ascii=False))
def _get_or_create_domain(self, domain_name: str) -> DomainLayer:
"""
Returns an existing domain layer or crystallizes a new one.
"""
if domain_name not in self.domain_layers:
print(f"[SecondaryBrain] Crystallizing new domain: '{domain_name}'", flush=True)
self.domain_layers[domain_name] = DomainLayer(domain_name, self.base_path)
return self.domain_layers[domain_name]
def ingest(self, sqt_data: dict, raw_text: str):
"""
Called from _orchestrate_mind_evolution after every assimilation.
Routes the SQT to the correct domain layer.
Triggers procedural extraction if warranted.
Triggers condensation if threshold exceeded.
"""
domain = sqt_data.get("domain")
if not domain:
return
domain = domain.lower().strip()
layer = self._get_or_create_domain(domain)
# 1. Append concept to domain legend (no API call)
layer.append_concept(sqt_data)
# 2. Check if procedural extraction is warranted (1 API call if yes)
tags = [t.lower() for t in sqt_data.get("tags", [])]
if any(t in PROCEDURAL_TAGS for t in tags):
model = self.models.get("logos_core") or self.models.get("logic_core")
layer.extract_procedure(raw_text, model)
# 3. Condense if over threshold (1 API call if yes, infrequent)
model = self.models.get("logos_core") or self.models.get("logic_core")
layer.condense_if_needed(model)
# 4. Update brain index
self._save_index()
def search(self, query: str, top_k: int = 3) -> str:
"""
Searches all domain layers for relevant concepts and procedures.
No API call β€” pure file scan across all domains.
Returns a formatted string ready to inject into the prompt.
"""
if not self.domain_layers:
return ""
keywords = [w for w in query.lower().split() if len(w) > 3]
if not keywords:
return ""
all_results = []
for domain_name, layer in self.domain_layers.items():
domain_results = layer.search(keywords, top_k=top_k)
for r in domain_results:
r["domain"] = domain_name
all_results.append(r)
# Sort all results across all domains by score
all_results.sort(key=lambda x: x["score"], reverse=True)
top_results = all_results[:top_k]
if not top_results:
return ""
output_lines = []
for r in top_results:
domain = r["domain"]
entry = r["entry"]
if r["type"] == "concept":
output_lines.append(
f"[{domain.upper()}] {entry.get('summary', '')} (SQT: {entry.get('sqt', '')})"
)
elif r["type"] == "procedure":
steps_preview = " β†’ ".join(entry.get("steps", [])[:3])
output_lines.append(
f"[{domain.upper()} PROCEDURE] {entry.get('title', '')}: {steps_preview}"
)
return "\n".join(output_lines)
def get_active_domain(self) -> str | None:
"""
Returns the name of the most recently active domain if it was
active within ACTIVE_DOMAIN_WINDOW seconds. Otherwise returns None.
Used by the continuum loop to decide whether to fire a domain SQT.
"""
if not os.path.exists(self.index_path):
return None
try:
with open(self.index_path, "r", encoding="utf-8") as f:
index = json.load(f)
except Exception:
return None
now = datetime.datetime.now()
best_domain = None
best_timestamp = None
for domain_name, stats in index.get("domains", {}).items():
last_active = stats.get("last_active")
if not last_active:
continue
try:
dt = datetime.datetime.fromisoformat(last_active)
elapsed = (now - dt).total_seconds()
if elapsed <= ACTIVE_DOMAIN_WINDOW:
if best_timestamp is None or dt > best_timestamp:
best_timestamp = dt
best_domain = domain_name
except Exception:
pass
return best_domain
def get_domain_context_snippet(self, domain: str) -> str:
"""
Returns a readable context snippet for a domain.
Used by _handle_domain_sqt in continuum_loop to ground the prompt.
No API call.
"""
domain = domain.lower().strip()
if domain not in self.domain_layers:
return f"No knowledge stored yet for domain '{domain}'."
return self.domain_layers[domain].get_context_snippet(max_entries=6)