orbit-engine / orbit_engineering.py
Codex Deploy
Add persistent learning support for mounted storage
49bd248
import os
import re
from datetime import datetime
import orbit_core
RECENT_CYCLE_GAP_SECONDS = 600
FOLLOW_UP_COUNT = 3
OPEN_COUNT = 3
MAX_SUBJECT_TOKENS = 4
PREDICATE_WORDS = {
"is",
"are",
"was",
"were",
"be",
"being",
"been",
"can",
"cannot",
"comes",
"come",
"has",
"have",
"had",
"uses",
"use",
"predicts",
"explains",
"describes",
"models",
"treats",
"links",
"includes",
"supports",
"works",
"helps",
"states",
"places",
"implies",
"unifies",
"requires",
"reduces",
"postulates",
"defines",
"applies",
"combines",
"underlies",
"represents",
"attributes",
"organizes",
"groups",
"fits",
"connects",
"informs",
"operates",
"gained",
"distinguishes",
"accounts",
"interprets",
"relies",
"provides",
"proposes",
"remains",
"follows",
"transformed",
}
LEADING_ARTICLES = {"a", "an", "the"}
VAGUE_SUBJECTS = {
"it",
"this",
"that",
"they",
"these",
"those",
"someone",
"something",
}
def load_beliefs():
try:
beliefs = orbit_core.load_beliefs()
except ValueError:
print("Beliefs file is not in the expected format.")
return []
except OSError:
print("Could not open beliefs file.")
return []
if not isinstance(beliefs, list):
print("Beliefs file is not in the expected format.")
return []
return beliefs
def normalize_text(text):
text = re.sub(r"^\s*\d+\s*[\.\)]\s*", "", text.lower())
text = re.sub(r"^\s*[-*]+\s*", "", text)
cleaned = re.sub(r"[^\w\s]", " ", text).strip()
return " ".join(cleaned.split())
def statement_tokens(statement):
return normalize_text(statement).split()
def looks_like_fragment(statement):
tokens = statement_tokens(statement)
return len(tokens) <= 2
def belief_subject(statement):
tokens = statement_tokens(statement)
while tokens and tokens[0] in LEADING_ARTICLES:
tokens.pop(0)
subject = []
for token in tokens:
if token in PREDICATE_WORDS:
break
subject.append(token)
if not subject:
return normalize_text(statement)
return " ".join(subject)
def subject_is_usable(subject):
tokens = subject.split()
if not tokens:
return False
if tokens[0] in VAGUE_SUBJECTS:
return False
if len(tokens) > MAX_SUBJECT_TOKENS:
return False
return True
def parse_created_at(value):
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def recent_cycle_beliefs(beliefs):
timed = []
for belief in beliefs:
created_at = parse_created_at(belief.get("created_at"))
if created_at is None:
continue
timed.append((created_at, belief))
if not timed:
return beliefs[-20:]
timed.sort(key=lambda item: item[0], reverse=True)
recent = [timed[0][1]]
previous_time = timed[0][0]
for created_at, belief in timed[1:]:
gap = (previous_time - created_at).total_seconds()
if gap > RECENT_CYCLE_GAP_SECONDS:
break
recent.append(belief)
previous_time = created_at
return recent
def belief_map_by_id(beliefs):
mapped = {}
for belief in beliefs:
belief_id = belief.get("id")
if belief_id:
mapped[belief_id] = belief
return mapped
def relation_ids(belief):
relation_ids = []
for relation in belief.get("relations", []):
if not isinstance(relation, dict):
continue
belief_id = relation.get("belief_id")
if belief_id:
relation_ids.append(belief_id)
return relation_ids
def cluster_recent_subjects(recent_beliefs):
clusters = {}
for belief in recent_beliefs:
statement = belief.get("statement", "")
subject = belief_subject(statement)
if not subject_is_usable(subject):
continue
clusters.setdefault(subject, []).append(belief)
grouped = list(clusters.items())
grouped.sort(
key=lambda item: (
-len(item[1]),
-sum(belief.get("support_count", 0) for belief in item[1]),
item[0],
)
)
return grouped
def definition_phrase(subject, statement):
normalized = normalize_text(statement)
subject_prefix = subject + " "
if not normalized.startswith(subject_prefix):
return None
remaining = normalized[len(subject_prefix):]
for prefix in ("is a ", "is an ", "is the ", "are ", "was a ", "was an "):
if remaining.startswith(prefix):
phrase = remaining[len(prefix):]
break
else:
return None
for separator in (" with ", " that ", " which ", " using ", " through "):
if separator in phrase:
phrase = phrase.split(separator, 1)[0]
if any(char.isdigit() for char in phrase):
return None
if len(phrase.split()) > 5:
return None
return phrase.strip()
def follow_up_question(subject, beliefs):
statements = [belief.get("statement", "") for belief in beliefs]
phrases = []
for statement in statements:
phrase = definition_phrase(subject, statement)
if phrase:
phrases.append(phrase)
if phrases:
phrases.sort(key=lambda item: (item.startswith("theory"), item.startswith("model"), len(item), item))
phrase = phrases[0]
if phrase.startswith("theory") or phrase.startswith("model") or phrase.startswith("hypothesis"):
return f"What does {subject} explain, and where are its limits?"
return f"What defines {phrase}?"
if any("predicts" in normalize_text(statement) for statement in statements):
return f"What evidence or observations matter most for understanding {subject}?"
if any(
word in normalize_text(statement)
for statement in statements
for word in ("explains", "describes", "models")
):
return f"What does {subject} explain, and where are its limits?"
return f"What should Orbit understand next about {subject}?"
def contradiction_pairs(beliefs, excluded_ids):
belief_map = belief_map_by_id(beliefs)
pairs = []
seen = set()
for belief in beliefs:
belief_id = belief.get("id")
for relation in belief.get("relations", []):
if relation.get("type") != "contradicts":
continue
other_id = relation.get("belief_id")
if not other_id or other_id not in belief_map:
continue
pair_key = tuple(sorted((belief_id, other_id)))
if pair_key in seen:
continue
seen.add(pair_key)
pair_beliefs = [belief_map[pair_key[0]], belief_map[pair_key[1]]]
if excluded_ids and pair_key[0] in excluded_ids and pair_key[1] in excluded_ids:
continue
pairs.append(pair_beliefs)
pairs.sort(
key=lambda pair: (
not contradiction_question(pair).startswith("Under what conditions"),
-max(pair[0].get("contradiction_count", 0), pair[1].get("contradiction_count", 0)),
pair[0].get("statement", ""),
pair[1].get("statement", ""),
)
)
return pairs
def trailing_phrase(statement, subject):
tokens = statement_tokens(statement)
subject_tokens = subject.split()
if tokens[:len(subject_tokens)] != subject_tokens:
return ""
remaining = tokens[len(subject_tokens):]
while remaining and remaining[0] in {"is", "are", "was", "were", "can"}:
remaining.pop(0)
if remaining and remaining[0] == "not":
remaining.pop(0)
return " ".join(remaining).strip()
def contradiction_question(pair):
first = pair[0].get("statement", "")
second = pair[1].get("statement", "")
first_subject = belief_subject(first)
second_subject = belief_subject(second)
if first_subject == second_subject:
if " cannot " in f" {normalize_text(first)} " or " cannot " in f" {normalize_text(second)} ":
positive = first
if " cannot " in f" {normalize_text(first)} ":
positive = second
phrase = trailing_phrase(positive, first_subject)
if phrase:
return f"Under what conditions does {first_subject} {phrase}?"
return f'How should Orbit reconcile "{first}" and "{second}"?'
def isolated_candidates(beliefs, excluded_subjects):
candidates = []
for belief in beliefs:
statement = belief.get("statement", "")
subject = belief_subject(statement)
if subject in excluded_subjects:
continue
if looks_like_fragment(statement):
continue
if belief.get("support_count", 0) != 0:
continue
if belief.get("contradiction_count", 0) != 0:
continue
candidates.append(belief)
candidates.sort(
key=lambda belief: (
statement_tokens(belief.get("statement", ""))[0] in LEADING_ARTICLES,
-belief.get("confidence", 0),
-len(statement_tokens(belief.get("statement", ""))),
belief.get("statement", ""),
)
)
return candidates
def build_follow_up_questions(beliefs):
recent_beliefs = recent_cycle_beliefs(beliefs)
questions = []
used = set()
for subject, grouped_beliefs in cluster_recent_subjects(recent_beliefs):
question = follow_up_question(subject, grouped_beliefs)
if question in used:
continue
used.add(question)
questions.append(question)
if len(questions) == FOLLOW_UP_COUNT:
break
if len(questions) < FOLLOW_UP_COUNT:
for belief in recent_beliefs:
subject = belief_subject(belief.get("statement", ""))
if not subject_is_usable(subject):
continue
question = f"What should Orbit understand next about {subject}?"
if question in used:
continue
used.add(question)
questions.append(question)
if len(questions) == FOLLOW_UP_COUNT:
break
while len(questions) < FOLLOW_UP_COUNT:
questions.append("What should Orbit understand next about its most recent beliefs?")
return questions, recent_beliefs
def build_open_questions(beliefs, recent_beliefs):
questions = []
used = set()
recent_ids = {belief.get("id") for belief in recent_beliefs if belief.get("id")}
recent_subjects = {
belief_subject(belief.get("statement", ""))
for belief in recent_beliefs
if belief.get("statement")
}
fragment_subject = None
for pair in contradiction_pairs(beliefs, recent_ids):
question = contradiction_question(pair)
if question in used:
continue
used.add(question)
questions.append(question)
if len(questions) == OPEN_COUNT:
return questions
break
fragments = []
for belief in beliefs:
statement = belief.get("statement", "")
subject = belief_subject(statement)
if not looks_like_fragment(statement):
continue
if subject in recent_subjects:
continue
fragments.append(belief)
fragments.sort(
key=lambda belief: (
-belief.get("support_count", 0),
-belief.get("confidence", 0),
belief.get("statement", ""),
)
)
if fragments:
fragment = fragments[0]
fragment_tokens = statement_tokens(fragment.get("statement", ""))
if fragment_tokens:
fragment_subject = fragment_tokens[0]
else:
fragment_subject = belief_subject(fragment.get("statement", ""))
question = f'What relationship was intended by "{fragment.get("statement", "")}"?'
if question not in used:
used.add(question)
questions.append(question)
if len(questions) < OPEN_COUNT and fragment_subject:
for belief in isolated_candidates(beliefs, recent_subjects):
if belief_subject(belief.get("statement", "")) != fragment_subject:
continue
question = f'What broader definition or context would help explain "{belief.get("statement", "")}"?'
if question in used:
continue
used.add(question)
questions.append(question)
break
if len(questions) < OPEN_COUNT:
for belief in isolated_candidates(beliefs, recent_subjects):
question = f'What broader definition or context would help explain "{belief.get("statement", "")}"?'
if question in used:
continue
used.add(question)
questions.append(question)
if len(questions) == OPEN_COUNT:
break
while len(questions) < OPEN_COUNT:
questions.append("What part of Orbit's current graph still needs clearer context?")
return questions
def show_questions(follow_up_questions, open_questions):
print("Orbit engineering mode started.")
print()
print("Follow-up questions:")
for index, question in enumerate(follow_up_questions, 1):
print(f"{index}. {question}")
print()
print("Open questions:")
for offset, question in enumerate(open_questions, FOLLOW_UP_COUNT + 1):
print(f"{offset}. {question}")
print()
print("Answer freely.")
print('Type "end session" when finished.')
def collect_answers():
answers = []
while True:
answer = input("> ")
if answer == "end session":
return answers
cleaned = answer.strip()
if not cleaned:
continue
answers.append(cleaned)
def write_session_intake(answers):
if not answers:
return ""
return "\n".join(answers) + "\n"
def run_learning(answers):
intake_backup = ""
if os.path.exists(orbit_core.INTAKE_FILE):
with open(orbit_core.INTAKE_FILE, "r", encoding="utf-8-sig") as f:
intake_backup = f.read()
session_intake = write_session_intake(answers)
try:
with open(orbit_core.INTAKE_FILE, "w", encoding="utf-8") as f:
f.write(session_intake)
orbit_core.intake()
finally:
with open(orbit_core.INTAKE_FILE, "w", encoding="utf-8") as f:
f.write(intake_backup)
def choose_learning():
while True:
choice = input("> ").strip().lower()
if choice in {"y", "n"}:
return choice
print("Enter y or n.")
def main():
beliefs = load_beliefs()
if not beliefs:
return
follow_up_questions, recent_beliefs = build_follow_up_questions(beliefs)
open_questions = build_open_questions(beliefs, recent_beliefs)
show_questions(follow_up_questions, open_questions)
answers = collect_answers()
print()
print("Session complete.")
print(f"{len(answers)} answers collected.")
print("Run learning now? [y/n]")
if choose_learning() == "y":
run_learning(answers)
else:
print("Session discarded.")
print("No learning applied.")
if __name__ == "__main__":
main()