DreamMachine / orchestrator.py
Dave Roby
Fix model availability issues and disable Zero GPU
77a06d0
"""
Orchestrator Module for DReamMachine
Manages the 7-step multi-agent dream cycle
"""
import logging
import time
from typing import Dict, List, Optional, Any
from datetime import datetime
import yaml
from prompt_manager import PromptManager
from llm_agent import LLMAgent
from data_logger import DataLogger
logger = logging.getLogger(__name__)
class DreamOrchestrator:
"""
Orchestrates the 7-step dream cycle across multiple LLM agents
Steps:
A.1. Setup - Initialize session & constraints
A.2. Dream & Generate - 3x Dreamer LLMs create ideas
A.3. Log & Narrate - Writer/Logger/Narrator refine output
A.4. Deep Think & Verify - Deep Thinker evaluates feasibility
A.5. Curate & Grade - Curator scores the concept
A.6. Data Storage - Save everything
A.7. Reforge Loop - Decide next stage or new prompt
"""
def __init__(self, config_path: str = "config.yaml", hf_token: Optional[str] = None):
"""
Initialize the Dream Orchestrator
Args:
config_path: Path to configuration file
hf_token: HuggingFace API token
"""
# Load configuration
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Initialize modules
self.prompt_manager = PromptManager(config_path)
self.llm_agent = LLMAgent(config_path, hf_token)
self.data_logger = DataLogger(config_path, hf_token)
# Orchestration settings
orch_config = self.config.get('orchestration', {})
self.max_iterations = orch_config.get('max_iterations', 1000)
self.run_interval = orch_config.get('run_interval', 3600)
self.max_runtime = orch_config.get('max_runtime', 21600)
# Thresholds
thresholds = orch_config.get('auto_advance_threshold', {})
self.feasibility_min = thresholds.get('feasibility_min', 7)
self.originality_min = thresholds.get('originality_min', 5)
# Life stage progression
self.life_stages = ['init_1_25', 'mid_26_50', 'late_51_75', 'final_76_100']
self.current_stage_index = 0
# Session tracking
self.session_history = []
self.current_idea_context = None
logger.info("DreamOrchestrator initialized successfully")
def run_dream_round(
self,
stage: Optional[str] = None,
previous_context: Optional[str] = None
) -> Dict[str, Any]:
"""
Execute a complete dream round (steps A.1 through A.7)
Args:
stage: Life stage name (or None to use current)
previous_context: Context from previous stage (for reforge)
Returns:
Complete session data dictionary
"""
logger.info("=" * 80)
logger.info(f"Starting Dream Round - Stage: {stage or 'init_1_25'}")
logger.info("=" * 80)
session_start_time = time.time()
# A.1. Setup - Initialize Session & Constraints
logger.info("\n[A.1] SETUP - Initializing session...")
stage = stage or 'init_1_25'
initial_prompt = self.prompt_manager.get_life_stage_prompt(stage, previous_context)
# A.2. Dream & Generate - 3x Dreamer LLMs
logger.info("\n[A.2] DREAM & GENERATE - Running dreamer models...")
dream_outputs = self._run_dreamers(initial_prompt)
# A.3. Log & Narrate - Writer/Logger/Narrator
logger.info("\n[A.3] LOG & NARRATE - Refining outputs...")
refinement_results = self._run_refinement(dream_outputs)
pitch_narrative = refinement_results['pitch']
technical_components = refinement_results['technical']
final_presentation = refinement_results['presentation']
# A.4. Deep Think & Verify - Feasibility Check
logger.info("\n[A.4] DEEP THINK & VERIFY - Evaluating feasibility...")
feasibility_report = self._run_deep_thinker(technical_components)
# A.5. Curate & Grade - Final Evaluation
logger.info("\n[A.5] CURATE & GRADE - Final scoring...")
curator_scorecard = self._run_curator(pitch_narrative, feasibility_report)
# A.6. Data Storage - Archive Session
logger.info("\n[A.6] DATA STORAGE - Archiving session...")
session_data = {
'life_stage': stage,
'initial_prompt': initial_prompt,
'dream_outputs': dream_outputs,
'pitch_narrative': pitch_narrative,
'technical_components': technical_components,
'final_presentation': final_presentation,
'feasibility_report': feasibility_report,
'curator_scorecard': curator_scorecard,
'execution_time_seconds': time.time() - session_start_time
}
session_id = self.data_logger.log_session_data(session_data)
session_data['session_id'] = session_id
# A.7. Reforge Loop - Iteration Prep
logger.info("\n[A.7] REFORGE LOOP - Determining next action...")
next_action = self._determine_next_action(curator_scorecard, stage, session_data)
session_data['next_action'] = next_action
# Log summary
self._log_session_summary(session_data)
self.session_history.append(session_data)
return session_data
def _run_dreamers(self, prompt: str) -> List[str]:
"""
Step A.2: Run multiple dreamer models in parallel
Args:
prompt: The dream prompt
Returns:
List of dream outputs
"""
logger.info("Generating creative visions from 3 dreamer models...")
try:
dreams = self.llm_agent.run_parallel_dreamers(prompt, num_dreamers=3)
for i, dream in enumerate(dreams, 1):
logger.info(f"Dreamer {i} generated {len(dream)} characters")
return dreams
except Exception as e:
logger.error(f"Error in dreamer stage: {str(e)}")
return [f"[Error: {str(e)}]"] * 3
def _run_refinement(self, dream_outputs: List[str]) -> Dict[str, str]:
"""
Step A.3: Run Writer, Logger, and Narrator to refine outputs
Args:
dream_outputs: Raw dream texts from dreamers
Returns:
Dictionary with pitch, technical, and presentation outputs
"""
logger.info("Refining dream outputs...")
try:
# Writer: Create coherent pitch
writer_prompt = self.prompt_manager.get_writer_prompt(dream_outputs)
pitch = self.llm_agent.get_writer_output(writer_prompt)
logger.info(f"Writer created pitch ({len(pitch)} characters)")
# Logger: Extract technical components
logger_prompt = self.prompt_manager.get_logger_prompt(pitch)
technical = self.llm_agent.get_logger_output(logger_prompt)
logger.info(f"Logger extracted technical components ({len(technical)} characters)")
# Narrator: Create final presentation
narrator_prompt = self.prompt_manager.get_narrator_prompt(pitch, technical)
presentation = self.llm_agent.get_narrator_output(narrator_prompt)
logger.info(f"Narrator created presentation ({len(presentation)} characters)")
return {
'pitch': pitch,
'technical': technical,
'presentation': presentation
}
except Exception as e:
logger.error(f"Error in refinement stage: {str(e)}")
return {
'pitch': f"[Error: {str(e)}]",
'technical': f"[Error: {str(e)}]",
'presentation': f"[Error: {str(e)}]"
}
def _run_deep_thinker(self, technical_components: str) -> str:
"""
Step A.4: Run deep thinker to evaluate feasibility
Args:
technical_components: Technical specification
Returns:
Feasibility report
"""
logger.info("Running feasibility analysis...")
try:
prompt = self.prompt_manager.get_deep_thinker_prompt(technical_components)
report = self.llm_agent.get_deep_thinker_output(prompt)
logger.info(f"Deep Thinker completed analysis ({len(report)} characters)")
return report
except Exception as e:
logger.error(f"Error in deep thinker stage: {str(e)}")
return f"[Error in feasibility analysis: {str(e)}]"
def _run_curator(self, pitch: str, feasibility_report: str) -> Dict[str, Any]:
"""
Step A.5: Run curator to score and evaluate
Args:
pitch: The narrative pitch
feasibility_report: Feasibility analysis
Returns:
Curator scorecard dictionary
"""
logger.info("Running final curation and scoring...")
try:
prompt = self.prompt_manager.get_curator_prompt(pitch, feasibility_report)
scorecard = self.llm_agent.get_curator_score(prompt)
logger.info(f"Curator assigned scores:")
logger.info(f" - Originality: {scorecard.get('originality', 'N/A')}")
logger.info(f" - Feasibility: {scorecard.get('feasibility', 'N/A')}")
logger.info(f" - Global Impact: {scorecard.get('global_impact', 'N/A')}")
logger.info(f" - Narrative Coherence: {scorecard.get('narrative_coherence', 'N/A')}")
logger.info(f" - Reforge Flag: {scorecard.get('reforge_flag', False)}")
return scorecard
except Exception as e:
logger.error(f"Error in curator stage: {str(e)}")
return {
'originality': 0,
'feasibility': 0,
'global_impact': 0,
'narrative_coherence': 0,
'reforge_flag': False,
'overall_assessment': f'Error: {str(e)}'
}
def _determine_next_action(
self,
scorecard: Dict[str, Any],
current_stage: str,
session_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Step A.7: Determine next action based on scorecard
Args:
scorecard: Curator scorecard
current_stage: Current life stage
session_data: Full session data
Returns:
Dictionary describing next action
"""
reforge_flag = scorecard.get('reforge_flag', False)
current_index = self.life_stages.index(current_stage)
if reforge_flag and current_index < len(self.life_stages) - 1:
# Advance to next life stage
next_stage = self.life_stages[current_index + 1]
action = {
'type': 'advance',
'next_stage': next_stage,
'reason': 'Idea meets criteria for advancement',
'context': session_data['pitch_narrative']
}
logger.info(f"✓ Advancing idea to next stage: {next_stage}")
elif reforge_flag and current_index == len(self.life_stages) - 1:
# Completed all stages successfully
action = {
'type': 'complete',
'reason': 'Idea successfully completed all life stages',
'final_assessment': scorecard.get('overall_assessment', '')
}
logger.info("✓ Idea has completed all life stages successfully!")
else:
# Archive and start new idea
action = {
'type': 'new_idea',
'reason': f'Scores did not meet threshold (F:{scorecard.get("feasibility")}, O:{scorecard.get("originality")})',
'next_stage': 'init_1_25'
}
logger.info("→ Archiving idea and starting fresh with new prompt")
return action
def _log_session_summary(self, session_data: Dict[str, Any]) -> None:
"""Log a human-readable summary of the session"""
logger.info("\n" + "=" * 80)
logger.info("SESSION SUMMARY")
logger.info("=" * 80)
logger.info(f"Session ID: {session_data.get('session_id', 'N/A')}")
logger.info(f"Life Stage: {session_data.get('life_stage', 'N/A')}")
logger.info(f"Execution Time: {session_data.get('execution_time_seconds', 0):.2f}s")
scorecard = session_data.get('curator_scorecard', {})
logger.info(f"\nScores:")
logger.info(f" Originality: {scorecard.get('originality', 'N/A')}/10")
logger.info(f" Feasibility: {scorecard.get('feasibility', 'N/A')}/10")
logger.info(f" Global Impact: {scorecard.get('global_impact', 'N/A')}/10")
logger.info(f" Narrative Coherence: {scorecard.get('narrative_coherence', 'N/A')}/10")
logger.info(f"\nReforge Flag: {scorecard.get('reforge_flag', False)}")
next_action = session_data.get('next_action', {})
logger.info(f"\nNext Action: {next_action.get('type', 'unknown')}")
logger.info(f"Reason: {next_action.get('reason', 'N/A')}")
logger.info("=" * 80 + "\n")
def run_batch_mode(self, num_rounds: int = 10, sleep_between: int = 0) -> List[Dict[str, Any]]:
"""
Run multiple dream rounds in batch mode
Args:
num_rounds: Number of rounds to run
sleep_between: Seconds to sleep between rounds
Returns:
List of all session data
"""
logger.info(f"Starting batch mode: {num_rounds} rounds")
results = []
for i in range(num_rounds):
logger.info(f"\n### BATCH ROUND {i + 1}/{num_rounds} ###\n")
try:
# Determine stage and context based on previous session
if self.session_history and self.session_history[-1]['next_action']['type'] == 'advance':
last_session = self.session_history[-1]
stage = last_session['next_action']['next_stage']
context = last_session['next_action']['context']
else:
stage = 'init_1_25'
context = None
# Run dream round
session_data = self.run_dream_round(stage=stage, previous_context=context)
results.append(session_data)
# Sleep between rounds if configured
if sleep_between > 0 and i < num_rounds - 1:
logger.info(f"Sleeping {sleep_between} seconds before next round...")
time.sleep(sleep_between)
except Exception as e:
logger.error(f"Error in batch round {i + 1}: {str(e)}")
continue
logger.info(f"\nBatch mode completed: {len(results)} successful rounds")
return results
def run_scheduled_mode(self) -> None:
"""
Run in scheduled mode (for HuggingFace Spaces)
Runs until max_runtime is reached
"""
logger.info(f"Starting scheduled mode (max runtime: {self.max_runtime}s)")
start_time = time.time()
round_count = 0
while True:
elapsed = time.time() - start_time
if elapsed >= self.max_runtime:
logger.info(f"Max runtime reached ({self.max_runtime}s). Stopping.")
break
if round_count >= self.max_iterations:
logger.info(f"Max iterations reached ({self.max_iterations}). Stopping.")
break
try:
logger.info(f"\n### SCHEDULED ROUND {round_count + 1} (Elapsed: {elapsed:.0f}s) ###\n")
# Determine stage and context
if self.session_history and self.session_history[-1]['next_action']['type'] == 'advance':
last_session = self.session_history[-1]
stage = last_session['next_action']['next_stage']
context = last_session['next_action']['context']
else:
stage = 'init_1_25'
context = None
# Run dream round
self.run_dream_round(stage=stage, previous_context=context)
round_count += 1
# Wait before next iteration
if self.run_interval > 0:
logger.info(f"Waiting {self.run_interval}s before next round...")
time.sleep(self.run_interval)
except Exception as e:
logger.error(f"Error in scheduled round: {str(e)}")
time.sleep(60) # Wait 1 minute on error before retrying
logger.info(f"\nScheduled mode completed: {round_count} rounds in {time.time() - start_time:.0f}s")
# Convenience function
def create_orchestrator(config_path: str = "config.yaml", hf_token: Optional[str] = None) -> DreamOrchestrator:
"""Create and return a configured DreamOrchestrator"""
return DreamOrchestrator(config_path, hf_token)