""" Forge Engine - Main orchestrator for the multi-agent reasoning forge. Coordinates the full forge cycle: concept -> problem_generator -> each agent analyzes -> critic evaluates -> (feedback loop: weak agents revise) -> synthesis_engine -> training example Supports three modes: 1. forge_single() — Original single-pass (fast, good for bulk generation) 2. forge_with_feedback() — Closed critic loop (agents revise based on scores) 3. forge_with_debate() — Multi-turn debate (agents challenge each other) Outputs JSONL training data in OpenAI chat format. """ import json import os import sys import random from typing import TextIO from reasoning_forge.agents.newton_agent import NewtonAgent from reasoning_forge.agents.quantum_agent import QuantumAgent from reasoning_forge.agents.ethics_agent import EthicsAgent from reasoning_forge.agents.philosophy_agent import PhilosophyAgent from reasoning_forge.agents.davinci_agent import DaVinciAgent from reasoning_forge.agents.empathy_agent import EmpathyAgent from reasoning_forge.agents.critic_agent import CriticAgent from reasoning_forge.synthesis_engine import SynthesisEngine from reasoning_forge.problem_generator import ProblemGenerator from reasoning_forge.epistemic_metrics import EpistemicMetrics SYSTEM_PROMPT = ( "You are Codette, a multi-perspective reasoning AI. You analyze concepts " "by examining them through multiple intellectual lenses -- physics, " "philosophy, ethics, creative invention, and human empathy -- then " "synthesize a unified understanding that is richer than any single " "perspective. You think carefully, acknowledge uncertainty, and connect " "abstract reasoning to concrete human experience." ) # Score below which an agent gets sent back for revision _REVISION_THRESHOLD = 0.6 class ForgeEngine: """Main orchestrator for multi-agent reasoning data generation.""" def __init__(self): # Initialize all reasoning agents self.newton = NewtonAgent() self.quantum = QuantumAgent() self.ethics = EthicsAgent() self.philosophy = PhilosophyAgent() self.davinci = DaVinciAgent() self.empathy = EmpathyAgent() self.critic = CriticAgent() self.analysis_agents = [ self.newton, self.quantum, self.ethics, self.philosophy, self.davinci, self.empathy, ] # Initialize supporting engines self.synthesis = SynthesisEngine() self.problem_generator = ProblemGenerator() self.epistemic = EpistemicMetrics() def forge_single(self, concept: str) -> dict: """Run full forge cycle on one concept (original single-pass mode). The cycle: 1. Generate reasoning problems from the concept. 2. Each analysis agent produces its perspective. 3. The critic evaluates the ensemble. 4. The synthesis engine combines everything. 5. Package as a training example. Args: concept: The concept text to forge. Returns: Training example dict in OpenAI chat format. """ # Step 1: Generate reasoning problems problems = self.problem_generator.generate_problems(concept) # Step 2: Each agent analyzes the concept analyses = {} for agent in self.analysis_agents: analyses[agent.name] = agent.analyze(concept) # Step 3: Critic evaluates the ensemble critique = self.critic.evaluate_ensemble(concept, analyses) # Step 4: Synthesis engine combines everything synthesized_response = self.synthesis.synthesize( concept, analyses, critique ) # Step 5: Build the user prompt if problems and random.random() < 0.5: problem_type, problem_text = random.choice(problems) user_content = problem_text else: user_content = ( f"Analyze this concept from multiple perspectives:\n\n{concept}" ) # Step 6: Compute RC+xi epistemic metrics epistemic_report = self.epistemic.full_epistemic_report( analyses, synthesized_response ) # Step 7: Package as training example training_example = { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, {"role": "assistant", "content": synthesized_response}, ], "metadata": { "concept": concept, "agent_scores": critique.get("agent_scores", {}), "overall_quality": critique.get("overall_quality", 0.0), "problems_generated": len(problems), "problem_types": [p[0] for p in problems], "redundancies_found": len(critique.get("redundancies", [])), "missing_perspectives": len( critique.get("missing_perspectives", []) ), "epistemic_tension": epistemic_report.get("tension_magnitude", 0), "ensemble_coherence": epistemic_report.get("ensemble_coherence", 0), "perspective_coverage": epistemic_report.get("perspective_coverage", {}), "tension_productivity": epistemic_report.get("tension_productivity", {}), }, } return training_example # -- Closed Critic Feedback Loop (new) --------------------------------- def forge_with_feedback( self, concept: str, max_revisions: int = 2, ) -> dict: """Run forge with closed critic feedback loop. After initial analysis, the critic scores each agent. Agents scoring below the revision threshold are sent back with specific critique for a second attempt. The best version (original or revised) is kept. Args: concept: The concept text to forge. max_revisions: Maximum revision rounds per weak agent. Returns: Training example dict with revision metadata. """ problems = self.problem_generator.generate_problems(concept) # Initial analysis pass analyses = {} for agent in self.analysis_agents: analyses[agent.name] = agent.analyze(concept) revision_counts = {agent.name: 0 for agent in self.analysis_agents} for revision_round in range(max_revisions): critique = self.critic.evaluate_ensemble(concept, analyses) agent_scores = critique.get("agent_scores", {}) suggestions = critique.get("improvement_suggestions", []) # Find agents below threshold weak_agents = [ agent for agent in self.analysis_agents if agent_scores.get(agent.name, {}).get("combined", 1.0) < _REVISION_THRESHOLD ] if not weak_agents: break # All agents above threshold — converged for agent in weak_agents: score = agent_scores.get(agent.name, {}) # Build revision directive from critic feedback directive = self._build_revision_directive( agent.name, score, suggestions, concept ) # Agent re-analyzes with the directive prepended to concept revised = agent.analyze(f"{directive}\n\n{concept}") # Keep revision only if it scores better (evaluate in full ensemble context) old_score = score.get("combined", 0) test_analyses = dict(analyses) test_analyses[agent.name] = revised new_critique = self.critic.evaluate_ensemble( concept, test_analyses ) new_score = new_critique.get("agent_scores", {}).get( agent.name, {} ).get("combined", 0) if new_score > old_score: analyses[agent.name] = revised revision_counts[agent.name] += 1 # Final critique and synthesis final_critique = self.critic.evaluate_ensemble(concept, analyses) synthesized = self.synthesis.synthesize(concept, analyses, final_critique) epistemic_report = self.epistemic.full_epistemic_report(analyses, synthesized) if problems and random.random() < 0.5: problem_type, problem_text = random.choice(problems) user_content = problem_text else: user_content = f"Analyze this concept from multiple perspectives:\n\n{concept}" return { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, {"role": "assistant", "content": synthesized}, ], "metadata": { "concept": concept, "agent_scores": final_critique.get("agent_scores", {}), "overall_quality": final_critique.get("overall_quality", 0.0), "problems_generated": len(problems), "revision_counts": revision_counts, "total_revisions": sum(revision_counts.values()), "epistemic_tension": epistemic_report.get("tension_magnitude", 0), "ensemble_coherence": epistemic_report.get("ensemble_coherence", 0), "tension_productivity": epistemic_report.get("tension_productivity", {}), "forge_mode": "feedback_loop", }, } # -- Multi-Turn Debate (new) ------------------------------------------- def forge_with_debate( self, concept: str, debate_rounds: int = 2, ) -> dict: """Run forge with multi-turn agent debate. Each round: 1. All agents produce their analysis. 2. Random pairs are formed for cross-perspective challenge. 3. Each agent in a pair sees the other's analysis and produces a response that engages with it. 4. Epistemic tension is tracked per round. 5. After all rounds, synthesis incorporates debate history. Args: concept: The concept text to forge. debate_rounds: Number of debate rounds. Returns: Training example with debate history and tension decay metrics. """ problems = self.problem_generator.generate_problems(concept) # Round 0: initial analyses analyses = {} for agent in self.analysis_agents: analyses[agent.name] = agent.analyze(concept) round_analyses = [dict(analyses)] # snapshot for tension tracking debate_log = [] for round_num in range(debate_rounds): # Form random pairs (odd agent out debates the first agent) agents_shuffled = list(self.analysis_agents) random.shuffle(agents_shuffled) pairs = [] for i in range(0, len(agents_shuffled) - 1, 2): pairs.append((agents_shuffled[i], agents_shuffled[i + 1])) if len(agents_shuffled) % 2 == 1: pairs.append((agents_shuffled[-1], agents_shuffled[0])) round_debates = [] for agent_a, agent_b in pairs: # Agent A sees B's analysis and responds challenge_prompt = ( f"Another perspective on '{concept}' argues:\n\n" f"{analyses[agent_b.name]}\n\n" f"Respond to this from your {agent_a.perspective} perspective. " f"Where do you agree, disagree, or see complementary insights?" ) response_a = agent_a.analyze(challenge_prompt) # Agent B sees A's response counter_prompt = ( f"A {agent_a.perspective} perspective responded to your analysis " f"of '{concept}':\n\n{response_a}\n\n" f"Integrate their insights with your own view." ) response_b = agent_b.analyze(counter_prompt) # Update analyses with debate-enriched versions analyses[agent_a.name] = response_a analyses[agent_b.name] = response_b round_debates.append({ "pair": f"{agent_a.name}_vs_{agent_b.name}", "challenge": response_a[:200], "counter": response_b[:200], }) debate_log.append({ "round": round_num + 1, "debates": round_debates, }) round_analyses.append(dict(analyses)) # Track tension decay across rounds convergence = self.epistemic.score_debate_convergence(round_analyses) # Final critique and synthesis critique = self.critic.evaluate_ensemble(concept, analyses) synthesized = self.synthesis.synthesize(concept, analyses, critique) epistemic_report = self.epistemic.full_epistemic_report(analyses, synthesized) if problems and random.random() < 0.5: problem_type, problem_text = random.choice(problems) user_content = problem_text else: user_content = f"Analyze this concept from multiple perspectives:\n\n{concept}" return { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, {"role": "assistant", "content": synthesized}, ], "metadata": { "concept": concept, "agent_scores": critique.get("agent_scores", {}), "overall_quality": critique.get("overall_quality", 0.0), "problems_generated": len(problems), "debate_rounds": debate_rounds, "debate_log": debate_log, "tension_decay": convergence, "epistemic_tension": epistemic_report.get("tension_magnitude", 0), "ensemble_coherence": epistemic_report.get("ensemble_coherence", 0), "tension_productivity": epistemic_report.get("tension_productivity", {}), "forge_mode": "debate", }, } # -- Helpers ----------------------------------------------------------- def _build_revision_directive( self, agent_name: str, score: dict, suggestions: list, concept: str, ) -> str: """Build a revision directive for a weak agent.""" parts = [ f"[REVISION REQUESTED for {agent_name}]", f"Your previous analysis scored {score.get('combined', 0):.2f}/1.00.", ] if score.get("logical_clarity", 1) < 0.5: parts.append( "Improve logical clarity: use connectives (therefore, because, however), " "avoid vague language, structure your argument explicitly." ) if score.get("conceptual_accuracy", 1) < 0.5: parts.append( "Improve conceptual accuracy: engage directly with the specific concept, " "use domain vocabulary, avoid generic placeholder framing." ) if suggestions: parts.append(f"Critic suggests: {suggestions[0]}") parts.append("Reanalyze with these improvements:") return " ".join(parts) def forge_batch( self, concept: str, variants: int = 3 ) -> list[dict]: """Generate multiple training examples from one concept. Uses different problem framings and agent template selections to produce varied training data from the same concept. Args: concept: The concept text. variants: Number of variants to generate. Returns: List of training example dicts. """ examples = [] for _ in range(variants): example = self.forge_single(concept) examples.append(example) return examples def forge_dataset( self, concepts: list[str], output_path: str, variants_per_concept: int = 1, verbose: bool = False, ) -> dict: """Run forge on a list of concepts and write JSONL output. Args: concepts: List of concept strings. output_path: Path to output JSONL file. variants_per_concept: Number of training examples per concept. verbose: Whether to print progress. Returns: Summary dict with counts and quality statistics. """ os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) total_examples = 0 total_quality = 0.0 quality_scores = [] with open(output_path, "w", encoding="utf-8") as f: for i, concept in enumerate(concepts): if verbose: print( f"[{i + 1}/{len(concepts)}] Forging: " f"{concept[:60]}{'...' if len(concept) > 60 else ''}", file=sys.stderr, ) for variant in range(variants_per_concept): example = self.forge_single(concept) quality = example["metadata"]["overall_quality"] # Write the messages (without metadata) for training training_record = {"messages": example["messages"]} f.write(json.dumps(training_record, ensure_ascii=False) + "\n") total_examples += 1 total_quality += quality quality_scores.append(quality) summary = { "total_examples": total_examples, "total_concepts": len(concepts), "variants_per_concept": variants_per_concept, "output_path": output_path, "avg_quality": round(total_quality / max(1, total_examples), 3), "min_quality": round(min(quality_scores) if quality_scores else 0, 3), "max_quality": round(max(quality_scores) if quality_scores else 0, 3), } if verbose: print(f"\nForge complete: {summary}", file=sys.stderr) return summary def forge_from_dataset( self, input_jsonl: str, output_path: str, concept_field: str = "text", variants_per_concept: int = 1, verbose: bool = False, ) -> dict: """Read an existing JSONL dataset and run forge on each entry. Expects each line to be a JSON object with a text field containing the concept. Supports common field names: 'text', 'concept', 'content', 'input', 'question', 'prompt'. Args: input_jsonl: Path to input JSONL file. output_path: Path to output JSONL file. concept_field: Name of the field containing the concept text. variants_per_concept: Number of training examples per concept. verbose: Whether to print progress. Returns: Summary dict with counts and quality statistics. """ # Candidate field names to try candidate_fields = [ concept_field, "text", "concept", "content", "input", "question", "prompt", ] concepts = [] with open(input_jsonl, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: record = json.loads(line) except json.JSONDecodeError: if verbose: print( f"Warning: skipping malformed JSON on line {line_num}", file=sys.stderr, ) continue # Try candidate fields in order concept_text = None if isinstance(record, dict): for field in candidate_fields: if field in record and isinstance(record[field], str): concept_text = record[field].strip() break # Fallback: if record has 'messages', extract user content if concept_text is None and "messages" in record: for msg in record["messages"]: if msg.get("role") == "user": concept_text = msg["content"].strip() break elif isinstance(record, str): concept_text = record.strip() if concept_text: concepts.append(concept_text) if verbose: print( f"Loaded {len(concepts)} concepts from {input_jsonl}", file=sys.stderr, ) return self.forge_dataset( concepts, output_path, variants_per_concept=variants_per_concept, verbose=verbose, ) def forge_single_detailed(self, concept: str) -> dict: """Run forge cycle and return all intermediate outputs. Useful for debugging, inspection, and quality analysis. Args: concept: The concept text. Returns: Dict with all intermediate results: { "concept": str, "problems": [(type, text), ...], "analyses": {agent_name: analysis_text, ...}, "critique": {...}, "synthesis": str, "training_example": {...}, } """ problems = self.problem_generator.generate_problems(concept) analyses = {} for agent in self.analysis_agents: analyses[agent.name] = agent.analyze(concept) critique = self.critic.evaluate_ensemble(concept, analyses) synthesized = self.synthesis.synthesize(concept, analyses, critique) user_content = ( f"Analyze this concept from multiple perspectives:\n\n{concept}" ) training_example = { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, {"role": "assistant", "content": synthesized}, ], } return { "concept": concept, "problems": problems, "analyses": analyses, "critique": critique, "synthesis": synthesized, "training_example": training_example, }