""" Orchestrator Module for DReamMachine Manages the 7-step multi-agent dream cycle """ import logging import time from typing import Dict, List, Optional, Any from datetime import datetime import yaml from prompt_manager import PromptManager from llm_agent import LLMAgent from data_logger import DataLogger logger = logging.getLogger(__name__) class DreamOrchestrator: """ Orchestrates the 7-step dream cycle across multiple LLM agents Steps: A.1. Setup - Initialize session & constraints A.2. Dream & Generate - 3x Dreamer LLMs create ideas A.3. Log & Narrate - Writer/Logger/Narrator refine output A.4. Deep Think & Verify - Deep Thinker evaluates feasibility A.5. Curate & Grade - Curator scores the concept A.6. Data Storage - Save everything A.7. Reforge Loop - Decide next stage or new prompt """ def __init__(self, config_path: str = "config.yaml", hf_token: Optional[str] = None): """ Initialize the Dream Orchestrator Args: config_path: Path to configuration file hf_token: HuggingFace API token """ # Load configuration with open(config_path, 'r') as f: self.config = yaml.safe_load(f) # Initialize modules self.prompt_manager = PromptManager(config_path) self.llm_agent = LLMAgent(config_path, hf_token) self.data_logger = DataLogger(config_path, hf_token) # Orchestration settings orch_config = self.config.get('orchestration', {}) self.max_iterations = orch_config.get('max_iterations', 1000) self.run_interval = orch_config.get('run_interval', 3600) self.max_runtime = orch_config.get('max_runtime', 21600) # Thresholds thresholds = orch_config.get('auto_advance_threshold', {}) self.feasibility_min = thresholds.get('feasibility_min', 7) self.originality_min = thresholds.get('originality_min', 5) # Life stage progression self.life_stages = ['init_1_25', 'mid_26_50', 'late_51_75', 'final_76_100'] self.current_stage_index = 0 # Session tracking self.session_history = [] self.current_idea_context = None logger.info("DreamOrchestrator initialized successfully") def run_dream_round( self, stage: Optional[str] = None, previous_context: Optional[str] = None ) -> Dict[str, Any]: """ Execute a complete dream round (steps A.1 through A.7) Args: stage: Life stage name (or None to use current) previous_context: Context from previous stage (for reforge) Returns: Complete session data dictionary """ logger.info("=" * 80) logger.info(f"Starting Dream Round - Stage: {stage or 'init_1_25'}") logger.info("=" * 80) session_start_time = time.time() # A.1. Setup - Initialize Session & Constraints logger.info("\n[A.1] SETUP - Initializing session...") stage = stage or 'init_1_25' initial_prompt = self.prompt_manager.get_life_stage_prompt(stage, previous_context) # A.2. Dream & Generate - 3x Dreamer LLMs logger.info("\n[A.2] DREAM & GENERATE - Running dreamer models...") dream_outputs = self._run_dreamers(initial_prompt) # A.3. Log & Narrate - Writer/Logger/Narrator logger.info("\n[A.3] LOG & NARRATE - Refining outputs...") refinement_results = self._run_refinement(dream_outputs) pitch_narrative = refinement_results['pitch'] technical_components = refinement_results['technical'] final_presentation = refinement_results['presentation'] # A.4. Deep Think & Verify - Feasibility Check logger.info("\n[A.4] DEEP THINK & VERIFY - Evaluating feasibility...") feasibility_report = self._run_deep_thinker(technical_components) # A.5. Curate & Grade - Final Evaluation logger.info("\n[A.5] CURATE & GRADE - Final scoring...") curator_scorecard = self._run_curator(pitch_narrative, feasibility_report) # A.6. Data Storage - Archive Session logger.info("\n[A.6] DATA STORAGE - Archiving session...") session_data = { 'life_stage': stage, 'initial_prompt': initial_prompt, 'dream_outputs': dream_outputs, 'pitch_narrative': pitch_narrative, 'technical_components': technical_components, 'final_presentation': final_presentation, 'feasibility_report': feasibility_report, 'curator_scorecard': curator_scorecard, 'execution_time_seconds': time.time() - session_start_time } session_id = self.data_logger.log_session_data(session_data) session_data['session_id'] = session_id # A.7. Reforge Loop - Iteration Prep logger.info("\n[A.7] REFORGE LOOP - Determining next action...") next_action = self._determine_next_action(curator_scorecard, stage, session_data) session_data['next_action'] = next_action # Log summary self._log_session_summary(session_data) self.session_history.append(session_data) return session_data def _run_dreamers(self, prompt: str) -> List[str]: """ Step A.2: Run multiple dreamer models in parallel Args: prompt: The dream prompt Returns: List of dream outputs """ logger.info("Generating creative visions from 3 dreamer models...") try: dreams = self.llm_agent.run_parallel_dreamers(prompt, num_dreamers=3) for i, dream in enumerate(dreams, 1): logger.info(f"Dreamer {i} generated {len(dream)} characters") return dreams except Exception as e: logger.error(f"Error in dreamer stage: {str(e)}") return [f"[Error: {str(e)}]"] * 3 def _run_refinement(self, dream_outputs: List[str]) -> Dict[str, str]: """ Step A.3: Run Writer, Logger, and Narrator to refine outputs Args: dream_outputs: Raw dream texts from dreamers Returns: Dictionary with pitch, technical, and presentation outputs """ logger.info("Refining dream outputs...") try: # Writer: Create coherent pitch writer_prompt = self.prompt_manager.get_writer_prompt(dream_outputs) pitch = self.llm_agent.get_writer_output(writer_prompt) logger.info(f"Writer created pitch ({len(pitch)} characters)") # Logger: Extract technical components logger_prompt = self.prompt_manager.get_logger_prompt(pitch) technical = self.llm_agent.get_logger_output(logger_prompt) logger.info(f"Logger extracted technical components ({len(technical)} characters)") # Narrator: Create final presentation narrator_prompt = self.prompt_manager.get_narrator_prompt(pitch, technical) presentation = self.llm_agent.get_narrator_output(narrator_prompt) logger.info(f"Narrator created presentation ({len(presentation)} characters)") return { 'pitch': pitch, 'technical': technical, 'presentation': presentation } except Exception as e: logger.error(f"Error in refinement stage: {str(e)}") return { 'pitch': f"[Error: {str(e)}]", 'technical': f"[Error: {str(e)}]", 'presentation': f"[Error: {str(e)}]" } def _run_deep_thinker(self, technical_components: str) -> str: """ Step A.4: Run deep thinker to evaluate feasibility Args: technical_components: Technical specification Returns: Feasibility report """ logger.info("Running feasibility analysis...") try: prompt = self.prompt_manager.get_deep_thinker_prompt(technical_components) report = self.llm_agent.get_deep_thinker_output(prompt) logger.info(f"Deep Thinker completed analysis ({len(report)} characters)") return report except Exception as e: logger.error(f"Error in deep thinker stage: {str(e)}") return f"[Error in feasibility analysis: {str(e)}]" def _run_curator(self, pitch: str, feasibility_report: str) -> Dict[str, Any]: """ Step A.5: Run curator to score and evaluate Args: pitch: The narrative pitch feasibility_report: Feasibility analysis Returns: Curator scorecard dictionary """ logger.info("Running final curation and scoring...") try: prompt = self.prompt_manager.get_curator_prompt(pitch, feasibility_report) scorecard = self.llm_agent.get_curator_score(prompt) logger.info(f"Curator assigned scores:") logger.info(f" - Originality: {scorecard.get('originality', 'N/A')}") logger.info(f" - Feasibility: {scorecard.get('feasibility', 'N/A')}") logger.info(f" - Global Impact: {scorecard.get('global_impact', 'N/A')}") logger.info(f" - Narrative Coherence: {scorecard.get('narrative_coherence', 'N/A')}") logger.info(f" - Reforge Flag: {scorecard.get('reforge_flag', False)}") return scorecard except Exception as e: logger.error(f"Error in curator stage: {str(e)}") return { 'originality': 0, 'feasibility': 0, 'global_impact': 0, 'narrative_coherence': 0, 'reforge_flag': False, 'overall_assessment': f'Error: {str(e)}' } def _determine_next_action( self, scorecard: Dict[str, Any], current_stage: str, session_data: Dict[str, Any] ) -> Dict[str, Any]: """ Step A.7: Determine next action based on scorecard Args: scorecard: Curator scorecard current_stage: Current life stage session_data: Full session data Returns: Dictionary describing next action """ reforge_flag = scorecard.get('reforge_flag', False) current_index = self.life_stages.index(current_stage) if reforge_flag and current_index < len(self.life_stages) - 1: # Advance to next life stage next_stage = self.life_stages[current_index + 1] action = { 'type': 'advance', 'next_stage': next_stage, 'reason': 'Idea meets criteria for advancement', 'context': session_data['pitch_narrative'] } logger.info(f"✓ Advancing idea to next stage: {next_stage}") elif reforge_flag and current_index == len(self.life_stages) - 1: # Completed all stages successfully action = { 'type': 'complete', 'reason': 'Idea successfully completed all life stages', 'final_assessment': scorecard.get('overall_assessment', '') } logger.info("✓ Idea has completed all life stages successfully!") else: # Archive and start new idea action = { 'type': 'new_idea', 'reason': f'Scores did not meet threshold (F:{scorecard.get("feasibility")}, O:{scorecard.get("originality")})', 'next_stage': 'init_1_25' } logger.info("→ Archiving idea and starting fresh with new prompt") return action def _log_session_summary(self, session_data: Dict[str, Any]) -> None: """Log a human-readable summary of the session""" logger.info("\n" + "=" * 80) logger.info("SESSION SUMMARY") logger.info("=" * 80) logger.info(f"Session ID: {session_data.get('session_id', 'N/A')}") logger.info(f"Life Stage: {session_data.get('life_stage', 'N/A')}") logger.info(f"Execution Time: {session_data.get('execution_time_seconds', 0):.2f}s") scorecard = session_data.get('curator_scorecard', {}) logger.info(f"\nScores:") logger.info(f" Originality: {scorecard.get('originality', 'N/A')}/10") logger.info(f" Feasibility: {scorecard.get('feasibility', 'N/A')}/10") logger.info(f" Global Impact: {scorecard.get('global_impact', 'N/A')}/10") logger.info(f" Narrative Coherence: {scorecard.get('narrative_coherence', 'N/A')}/10") logger.info(f"\nReforge Flag: {scorecard.get('reforge_flag', False)}") next_action = session_data.get('next_action', {}) logger.info(f"\nNext Action: {next_action.get('type', 'unknown')}") logger.info(f"Reason: {next_action.get('reason', 'N/A')}") logger.info("=" * 80 + "\n") def run_batch_mode(self, num_rounds: int = 10, sleep_between: int = 0) -> List[Dict[str, Any]]: """ Run multiple dream rounds in batch mode Args: num_rounds: Number of rounds to run sleep_between: Seconds to sleep between rounds Returns: List of all session data """ logger.info(f"Starting batch mode: {num_rounds} rounds") results = [] for i in range(num_rounds): logger.info(f"\n### BATCH ROUND {i + 1}/{num_rounds} ###\n") try: # Determine stage and context based on previous session if self.session_history and self.session_history[-1]['next_action']['type'] == 'advance': last_session = self.session_history[-1] stage = last_session['next_action']['next_stage'] context = last_session['next_action']['context'] else: stage = 'init_1_25' context = None # Run dream round session_data = self.run_dream_round(stage=stage, previous_context=context) results.append(session_data) # Sleep between rounds if configured if sleep_between > 0 and i < num_rounds - 1: logger.info(f"Sleeping {sleep_between} seconds before next round...") time.sleep(sleep_between) except Exception as e: logger.error(f"Error in batch round {i + 1}: {str(e)}") continue logger.info(f"\nBatch mode completed: {len(results)} successful rounds") return results def run_scheduled_mode(self) -> None: """ Run in scheduled mode (for HuggingFace Spaces) Runs until max_runtime is reached """ logger.info(f"Starting scheduled mode (max runtime: {self.max_runtime}s)") start_time = time.time() round_count = 0 while True: elapsed = time.time() - start_time if elapsed >= self.max_runtime: logger.info(f"Max runtime reached ({self.max_runtime}s). Stopping.") break if round_count >= self.max_iterations: logger.info(f"Max iterations reached ({self.max_iterations}). Stopping.") break try: logger.info(f"\n### SCHEDULED ROUND {round_count + 1} (Elapsed: {elapsed:.0f}s) ###\n") # Determine stage and context if self.session_history and self.session_history[-1]['next_action']['type'] == 'advance': last_session = self.session_history[-1] stage = last_session['next_action']['next_stage'] context = last_session['next_action']['context'] else: stage = 'init_1_25' context = None # Run dream round self.run_dream_round(stage=stage, previous_context=context) round_count += 1 # Wait before next iteration if self.run_interval > 0: logger.info(f"Waiting {self.run_interval}s before next round...") time.sleep(self.run_interval) except Exception as e: logger.error(f"Error in scheduled round: {str(e)}") time.sleep(60) # Wait 1 minute on error before retrying logger.info(f"\nScheduled mode completed: {round_count} rounds in {time.time() - start_time:.0f}s") # Convenience function def create_orchestrator(config_path: str = "config.yaml", hf_token: Optional[str] = None) -> DreamOrchestrator: """Create and return a configured DreamOrchestrator""" return DreamOrchestrator(config_path, hf_token)