|
|
| import logging
|
| from typing import List, Optional, Dict, Any
|
| from backend.db.models import Universe, Axiom, Theorem
|
| from backend.db.session import SessionLocal
|
|
|
| class AxiomLineage:
|
| """
|
| Tracks axiom evolution and lineage for research and explainability.
|
| """
|
| def __init__(self):
|
| self.lineage = {}
|
| def add(self, child_id, parent_id):
|
| self.lineage[child_id] = parent_id
|
| def get_lineage(self, axiom_id):
|
| path = []
|
| while axiom_id in self.lineage:
|
| parent = self.lineage[axiom_id]
|
| path.append(parent)
|
| axiom_id = parent
|
| return path
|
|
|
| class UniverseGenerator:
|
| """
|
| Advanced universe and axiom management for mathematical universe simulation.
|
| """
|
| def __init__(self, db_session=None, logger=None):
|
| self.db = db_session or SessionLocal()
|
| self.logger = logger or logging.getLogger("UniverseGenerator")
|
| self.axiom_lineage = AxiomLineage()
|
|
|
| def create_universe(self, name: str, description: str, universe_type: str, axioms: List[str], metadata: Optional[Dict[str, Any]] = None) -> Universe:
|
| """
|
| Create a new universe with initial axioms and optional metadata.
|
| """
|
| universe = Universe(name=name, description=description, universe_type=universe_type)
|
| self.db.add(universe)
|
| self.db.commit()
|
| self.db.refresh(universe)
|
| for axiom_text in axioms:
|
| axiom = Axiom(universe_id=universe.id, statement=axiom_text)
|
| self.db.add(axiom)
|
| self.db.commit()
|
| self.logger.info(f"Universe created: {universe.name} (id={universe.id})")
|
| return universe
|
|
|
| def get_universe(self, universe_id: int) -> Optional[Universe]:
|
| return self.db.query(Universe).filter(Universe.id == universe_id).first()
|
|
|
| def list_universes(self, filters: Optional[Dict[str, Any]] = None) -> List[Universe]:
|
| query = self.db.query(Universe)
|
| if filters:
|
| for k, v in filters.items():
|
| query = query.filter(getattr(Universe, k) == v)
|
| return query.all()
|
|
|
| def add_axiom(self, universe_id: int, statement: str, parent_axiom_id: Optional[int] = None, metadata: Optional[Dict[str, Any]] = None) -> Axiom:
|
| if not statement or len(statement.strip()) < 3:
|
| self.logger.error("Axiom statement too short or empty.")
|
| raise ValueError("Axiom statement too short or empty.")
|
| existing = self.db.query(Axiom).filter(Axiom.universe_id == universe_id, Axiom.statement == statement).first()
|
| if existing:
|
| self.logger.error("Axiom already exists in this universe.")
|
| raise ValueError("Axiom already exists in this universe.")
|
| axiom = Axiom(universe_id=universe_id, statement=statement)
|
| self.db.add(axiom)
|
| self.db.commit()
|
| self.db.refresh(axiom)
|
| if parent_axiom_id:
|
| self._track_axiom_lineage(axiom.id, parent_axiom_id)
|
| self.logger.info(f"Axiom added: {axiom.statement} (id={axiom.id}) to universe {universe_id}")
|
| return axiom
|
|
|
| def evolve_axiom(self, axiom_id: int, new_statement: str) -> Axiom:
|
| axiom = self.db.query(Axiom).filter(Axiom.id == axiom_id).first()
|
| if not axiom:
|
| self.logger.error("Axiom not found.")
|
| raise ValueError("Axiom not found.")
|
| if not new_statement or len(new_statement.strip()) < 3:
|
| self.logger.error("New axiom statement too short or empty.")
|
| raise ValueError("New axiom statement too short or empty.")
|
| existing = self.db.query(Axiom).filter(Axiom.universe_id == axiom.universe_id, Axiom.statement == new_statement).first()
|
| if existing:
|
| self.logger.error("Axiom already exists in this universe.")
|
| raise ValueError("Axiom already exists in this universe.")
|
| axiom.is_active = 0
|
| self.db.commit()
|
| new_axiom = Axiom(universe_id=axiom.universe_id, statement=new_statement)
|
| self.db.add(new_axiom)
|
| self.db.commit()
|
| self.db.refresh(new_axiom)
|
| self._track_axiom_lineage(new_axiom.id, axiom.id)
|
| self.logger.info(f"Axiom evolved: {axiom.id} -> {new_axiom.id}")
|
| return new_axiom
|
|
|
| def batch_add_axioms(self, universe_id: int, statements: List[str]) -> List[Axiom]:
|
| added = []
|
| for stmt in statements:
|
| try:
|
| ax = self.add_axiom(universe_id, stmt)
|
| added.append(ax)
|
| except Exception as e:
|
| self.logger.warning(f"Failed to add axiom '{stmt}': {e}")
|
| return added
|
|
|
| def _track_axiom_lineage(self, child_id: int, parent_id: int):
|
| self.axiom_lineage.add(child_id, parent_id)
|
| self.logger.info(f"Axiom lineage: parent {parent_id} -> child {child_id}")
|
|
|
| def get_axiom_lineage(self, axiom_id: int) -> List[int]:
|
| return self.axiom_lineage.get_lineage(axiom_id)
|
|
|
| def create_theorem(self, universe_id: int, statement: str, proof: str, metadata: Optional[Dict[str, Any]] = None) -> Theorem:
|
| theorem = Theorem(universe_id=universe_id, statement=statement, proof=proof)
|
| self.db.add(theorem)
|
| self.db.commit()
|
| self.db.refresh(theorem)
|
| self.logger.info(f"Theorem created: {theorem.statement} (id={theorem.id}) in universe {universe_id}")
|
| return theorem
|
|
|
| def list_theorems(self, universe_id: int, filters: Optional[Dict[str, Any]] = None) -> List[Theorem]:
|
| query = self.db.query(Theorem).filter(Theorem.universe_id == universe_id)
|
| if filters:
|
| for k, v in filters.items():
|
| query = query.filter(getattr(Theorem, k) == v)
|
| return query.all()
|
|
|
|
|
|
|
|
|
|
|