Spaces:
Runtime error
Runtime error
| """ | |
| Orchestrator Module for DReamMachine | |
| Manages the 7-step multi-agent dream cycle | |
| """ | |
| import logging | |
| import time | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| import yaml | |
| from prompt_manager import PromptManager | |
| from llm_agent import LLMAgent | |
| from data_logger import DataLogger | |
| logger = logging.getLogger(__name__) | |
| class DreamOrchestrator: | |
| """ | |
| Orchestrates the 7-step dream cycle across multiple LLM agents | |
| Steps: | |
| A.1. Setup - Initialize session & constraints | |
| A.2. Dream & Generate - 3x Dreamer LLMs create ideas | |
| A.3. Log & Narrate - Writer/Logger/Narrator refine output | |
| A.4. Deep Think & Verify - Deep Thinker evaluates feasibility | |
| A.5. Curate & Grade - Curator scores the concept | |
| A.6. Data Storage - Save everything | |
| A.7. Reforge Loop - Decide next stage or new prompt | |
| """ | |
| def __init__(self, config_path: str = "config.yaml", hf_token: Optional[str] = None): | |
| """ | |
| Initialize the Dream Orchestrator | |
| Args: | |
| config_path: Path to configuration file | |
| hf_token: HuggingFace API token | |
| """ | |
| # Load configuration | |
| with open(config_path, 'r') as f: | |
| self.config = yaml.safe_load(f) | |
| # Initialize modules | |
| self.prompt_manager = PromptManager(config_path) | |
| self.llm_agent = LLMAgent(config_path, hf_token) | |
| self.data_logger = DataLogger(config_path, hf_token) | |
| # Orchestration settings | |
| orch_config = self.config.get('orchestration', {}) | |
| self.max_iterations = orch_config.get('max_iterations', 1000) | |
| self.run_interval = orch_config.get('run_interval', 3600) | |
| self.max_runtime = orch_config.get('max_runtime', 21600) | |
| # Thresholds | |
| thresholds = orch_config.get('auto_advance_threshold', {}) | |
| self.feasibility_min = thresholds.get('feasibility_min', 7) | |
| self.originality_min = thresholds.get('originality_min', 5) | |
| # Life stage progression | |
| self.life_stages = ['init_1_25', 'mid_26_50', 'late_51_75', 'final_76_100'] | |
| self.current_stage_index = 0 | |
| # Session tracking | |
| self.session_history = [] | |
| self.current_idea_context = None | |
| logger.info("DreamOrchestrator initialized successfully") | |
| def run_dream_round( | |
| self, | |
| stage: Optional[str] = None, | |
| previous_context: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute a complete dream round (steps A.1 through A.7) | |
| Args: | |
| stage: Life stage name (or None to use current) | |
| previous_context: Context from previous stage (for reforge) | |
| Returns: | |
| Complete session data dictionary | |
| """ | |
| logger.info("=" * 80) | |
| logger.info(f"Starting Dream Round - Stage: {stage or 'init_1_25'}") | |
| logger.info("=" * 80) | |
| session_start_time = time.time() | |
| # A.1. Setup - Initialize Session & Constraints | |
| logger.info("\n[A.1] SETUP - Initializing session...") | |
| stage = stage or 'init_1_25' | |
| initial_prompt = self.prompt_manager.get_life_stage_prompt(stage, previous_context) | |
| # A.2. Dream & Generate - 3x Dreamer LLMs | |
| logger.info("\n[A.2] DREAM & GENERATE - Running dreamer models...") | |
| dream_outputs = self._run_dreamers(initial_prompt) | |
| # A.3. Log & Narrate - Writer/Logger/Narrator | |
| logger.info("\n[A.3] LOG & NARRATE - Refining outputs...") | |
| refinement_results = self._run_refinement(dream_outputs) | |
| pitch_narrative = refinement_results['pitch'] | |
| technical_components = refinement_results['technical'] | |
| final_presentation = refinement_results['presentation'] | |
| # A.4. Deep Think & Verify - Feasibility Check | |
| logger.info("\n[A.4] DEEP THINK & VERIFY - Evaluating feasibility...") | |
| feasibility_report = self._run_deep_thinker(technical_components) | |
| # A.5. Curate & Grade - Final Evaluation | |
| logger.info("\n[A.5] CURATE & GRADE - Final scoring...") | |
| curator_scorecard = self._run_curator(pitch_narrative, feasibility_report) | |
| # A.6. Data Storage - Archive Session | |
| logger.info("\n[A.6] DATA STORAGE - Archiving session...") | |
| session_data = { | |
| 'life_stage': stage, | |
| 'initial_prompt': initial_prompt, | |
| 'dream_outputs': dream_outputs, | |
| 'pitch_narrative': pitch_narrative, | |
| 'technical_components': technical_components, | |
| 'final_presentation': final_presentation, | |
| 'feasibility_report': feasibility_report, | |
| 'curator_scorecard': curator_scorecard, | |
| 'execution_time_seconds': time.time() - session_start_time | |
| } | |
| session_id = self.data_logger.log_session_data(session_data) | |
| session_data['session_id'] = session_id | |
| # A.7. Reforge Loop - Iteration Prep | |
| logger.info("\n[A.7] REFORGE LOOP - Determining next action...") | |
| next_action = self._determine_next_action(curator_scorecard, stage, session_data) | |
| session_data['next_action'] = next_action | |
| # Log summary | |
| self._log_session_summary(session_data) | |
| self.session_history.append(session_data) | |
| return session_data | |
| def _run_dreamers(self, prompt: str) -> List[str]: | |
| """ | |
| Step A.2: Run multiple dreamer models in parallel | |
| Args: | |
| prompt: The dream prompt | |
| Returns: | |
| List of dream outputs | |
| """ | |
| logger.info("Generating creative visions from 3 dreamer models...") | |
| try: | |
| dreams = self.llm_agent.run_parallel_dreamers(prompt, num_dreamers=3) | |
| for i, dream in enumerate(dreams, 1): | |
| logger.info(f"Dreamer {i} generated {len(dream)} characters") | |
| return dreams | |
| except Exception as e: | |
| logger.error(f"Error in dreamer stage: {str(e)}") | |
| return [f"[Error: {str(e)}]"] * 3 | |
| def _run_refinement(self, dream_outputs: List[str]) -> Dict[str, str]: | |
| """ | |
| Step A.3: Run Writer, Logger, and Narrator to refine outputs | |
| Args: | |
| dream_outputs: Raw dream texts from dreamers | |
| Returns: | |
| Dictionary with pitch, technical, and presentation outputs | |
| """ | |
| logger.info("Refining dream outputs...") | |
| try: | |
| # Writer: Create coherent pitch | |
| writer_prompt = self.prompt_manager.get_writer_prompt(dream_outputs) | |
| pitch = self.llm_agent.get_writer_output(writer_prompt) | |
| logger.info(f"Writer created pitch ({len(pitch)} characters)") | |
| # Logger: Extract technical components | |
| logger_prompt = self.prompt_manager.get_logger_prompt(pitch) | |
| technical = self.llm_agent.get_logger_output(logger_prompt) | |
| logger.info(f"Logger extracted technical components ({len(technical)} characters)") | |
| # Narrator: Create final presentation | |
| narrator_prompt = self.prompt_manager.get_narrator_prompt(pitch, technical) | |
| presentation = self.llm_agent.get_narrator_output(narrator_prompt) | |
| logger.info(f"Narrator created presentation ({len(presentation)} characters)") | |
| return { | |
| 'pitch': pitch, | |
| 'technical': technical, | |
| 'presentation': presentation | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in refinement stage: {str(e)}") | |
| return { | |
| 'pitch': f"[Error: {str(e)}]", | |
| 'technical': f"[Error: {str(e)}]", | |
| 'presentation': f"[Error: {str(e)}]" | |
| } | |
| def _run_deep_thinker(self, technical_components: str) -> str: | |
| """ | |
| Step A.4: Run deep thinker to evaluate feasibility | |
| Args: | |
| technical_components: Technical specification | |
| Returns: | |
| Feasibility report | |
| """ | |
| logger.info("Running feasibility analysis...") | |
| try: | |
| prompt = self.prompt_manager.get_deep_thinker_prompt(technical_components) | |
| report = self.llm_agent.get_deep_thinker_output(prompt) | |
| logger.info(f"Deep Thinker completed analysis ({len(report)} characters)") | |
| return report | |
| except Exception as e: | |
| logger.error(f"Error in deep thinker stage: {str(e)}") | |
| return f"[Error in feasibility analysis: {str(e)}]" | |
| def _run_curator(self, pitch: str, feasibility_report: str) -> Dict[str, Any]: | |
| """ | |
| Step A.5: Run curator to score and evaluate | |
| Args: | |
| pitch: The narrative pitch | |
| feasibility_report: Feasibility analysis | |
| Returns: | |
| Curator scorecard dictionary | |
| """ | |
| logger.info("Running final curation and scoring...") | |
| try: | |
| prompt = self.prompt_manager.get_curator_prompt(pitch, feasibility_report) | |
| scorecard = self.llm_agent.get_curator_score(prompt) | |
| logger.info(f"Curator assigned scores:") | |
| logger.info(f" - Originality: {scorecard.get('originality', 'N/A')}") | |
| logger.info(f" - Feasibility: {scorecard.get('feasibility', 'N/A')}") | |
| logger.info(f" - Global Impact: {scorecard.get('global_impact', 'N/A')}") | |
| logger.info(f" - Narrative Coherence: {scorecard.get('narrative_coherence', 'N/A')}") | |
| logger.info(f" - Reforge Flag: {scorecard.get('reforge_flag', False)}") | |
| return scorecard | |
| except Exception as e: | |
| logger.error(f"Error in curator stage: {str(e)}") | |
| return { | |
| 'originality': 0, | |
| 'feasibility': 0, | |
| 'global_impact': 0, | |
| 'narrative_coherence': 0, | |
| 'reforge_flag': False, | |
| 'overall_assessment': f'Error: {str(e)}' | |
| } | |
| def _determine_next_action( | |
| self, | |
| scorecard: Dict[str, Any], | |
| current_stage: str, | |
| session_data: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Step A.7: Determine next action based on scorecard | |
| Args: | |
| scorecard: Curator scorecard | |
| current_stage: Current life stage | |
| session_data: Full session data | |
| Returns: | |
| Dictionary describing next action | |
| """ | |
| reforge_flag = scorecard.get('reforge_flag', False) | |
| current_index = self.life_stages.index(current_stage) | |
| if reforge_flag and current_index < len(self.life_stages) - 1: | |
| # Advance to next life stage | |
| next_stage = self.life_stages[current_index + 1] | |
| action = { | |
| 'type': 'advance', | |
| 'next_stage': next_stage, | |
| 'reason': 'Idea meets criteria for advancement', | |
| 'context': session_data['pitch_narrative'] | |
| } | |
| logger.info(f"✓ Advancing idea to next stage: {next_stage}") | |
| elif reforge_flag and current_index == len(self.life_stages) - 1: | |
| # Completed all stages successfully | |
| action = { | |
| 'type': 'complete', | |
| 'reason': 'Idea successfully completed all life stages', | |
| 'final_assessment': scorecard.get('overall_assessment', '') | |
| } | |
| logger.info("✓ Idea has completed all life stages successfully!") | |
| else: | |
| # Archive and start new idea | |
| action = { | |
| 'type': 'new_idea', | |
| 'reason': f'Scores did not meet threshold (F:{scorecard.get("feasibility")}, O:{scorecard.get("originality")})', | |
| 'next_stage': 'init_1_25' | |
| } | |
| logger.info("→ Archiving idea and starting fresh with new prompt") | |
| return action | |
| def _log_session_summary(self, session_data: Dict[str, Any]) -> None: | |
| """Log a human-readable summary of the session""" | |
| logger.info("\n" + "=" * 80) | |
| logger.info("SESSION SUMMARY") | |
| logger.info("=" * 80) | |
| logger.info(f"Session ID: {session_data.get('session_id', 'N/A')}") | |
| logger.info(f"Life Stage: {session_data.get('life_stage', 'N/A')}") | |
| logger.info(f"Execution Time: {session_data.get('execution_time_seconds', 0):.2f}s") | |
| scorecard = session_data.get('curator_scorecard', {}) | |
| logger.info(f"\nScores:") | |
| logger.info(f" Originality: {scorecard.get('originality', 'N/A')}/10") | |
| logger.info(f" Feasibility: {scorecard.get('feasibility', 'N/A')}/10") | |
| logger.info(f" Global Impact: {scorecard.get('global_impact', 'N/A')}/10") | |
| logger.info(f" Narrative Coherence: {scorecard.get('narrative_coherence', 'N/A')}/10") | |
| logger.info(f"\nReforge Flag: {scorecard.get('reforge_flag', False)}") | |
| next_action = session_data.get('next_action', {}) | |
| logger.info(f"\nNext Action: {next_action.get('type', 'unknown')}") | |
| logger.info(f"Reason: {next_action.get('reason', 'N/A')}") | |
| logger.info("=" * 80 + "\n") | |
| def run_batch_mode(self, num_rounds: int = 10, sleep_between: int = 0) -> List[Dict[str, Any]]: | |
| """ | |
| Run multiple dream rounds in batch mode | |
| Args: | |
| num_rounds: Number of rounds to run | |
| sleep_between: Seconds to sleep between rounds | |
| Returns: | |
| List of all session data | |
| """ | |
| logger.info(f"Starting batch mode: {num_rounds} rounds") | |
| results = [] | |
| for i in range(num_rounds): | |
| logger.info(f"\n### BATCH ROUND {i + 1}/{num_rounds} ###\n") | |
| try: | |
| # Determine stage and context based on previous session | |
| if self.session_history and self.session_history[-1]['next_action']['type'] == 'advance': | |
| last_session = self.session_history[-1] | |
| stage = last_session['next_action']['next_stage'] | |
| context = last_session['next_action']['context'] | |
| else: | |
| stage = 'init_1_25' | |
| context = None | |
| # Run dream round | |
| session_data = self.run_dream_round(stage=stage, previous_context=context) | |
| results.append(session_data) | |
| # Sleep between rounds if configured | |
| if sleep_between > 0 and i < num_rounds - 1: | |
| logger.info(f"Sleeping {sleep_between} seconds before next round...") | |
| time.sleep(sleep_between) | |
| except Exception as e: | |
| logger.error(f"Error in batch round {i + 1}: {str(e)}") | |
| continue | |
| logger.info(f"\nBatch mode completed: {len(results)} successful rounds") | |
| return results | |
| def run_scheduled_mode(self) -> None: | |
| """ | |
| Run in scheduled mode (for HuggingFace Spaces) | |
| Runs until max_runtime is reached | |
| """ | |
| logger.info(f"Starting scheduled mode (max runtime: {self.max_runtime}s)") | |
| start_time = time.time() | |
| round_count = 0 | |
| while True: | |
| elapsed = time.time() - start_time | |
| if elapsed >= self.max_runtime: | |
| logger.info(f"Max runtime reached ({self.max_runtime}s). Stopping.") | |
| break | |
| if round_count >= self.max_iterations: | |
| logger.info(f"Max iterations reached ({self.max_iterations}). Stopping.") | |
| break | |
| try: | |
| logger.info(f"\n### SCHEDULED ROUND {round_count + 1} (Elapsed: {elapsed:.0f}s) ###\n") | |
| # Determine stage and context | |
| if self.session_history and self.session_history[-1]['next_action']['type'] == 'advance': | |
| last_session = self.session_history[-1] | |
| stage = last_session['next_action']['next_stage'] | |
| context = last_session['next_action']['context'] | |
| else: | |
| stage = 'init_1_25' | |
| context = None | |
| # Run dream round | |
| self.run_dream_round(stage=stage, previous_context=context) | |
| round_count += 1 | |
| # Wait before next iteration | |
| if self.run_interval > 0: | |
| logger.info(f"Waiting {self.run_interval}s before next round...") | |
| time.sleep(self.run_interval) | |
| except Exception as e: | |
| logger.error(f"Error in scheduled round: {str(e)}") | |
| time.sleep(60) # Wait 1 minute on error before retrying | |
| logger.info(f"\nScheduled mode completed: {round_count} rounds in {time.time() - start_time:.0f}s") | |
| # Convenience function | |
| def create_orchestrator(config_path: str = "config.yaml", hf_token: Optional[str] = None) -> DreamOrchestrator: | |
| """Create and return a configured DreamOrchestrator""" | |
| return DreamOrchestrator(config_path, hf_token) | |