Spaces:
Running
Running
| """ | |
| Experiment Workflow | |
| Partial workflow that only runs ExperimentAgent for experiment execution. | |
| Requires a pre-existing data summary (e.g., from DataWorkflow or manual input). | |
| Useful for debugging the experiment phase independently. | |
| """ | |
| import shutil | |
| from pathlib import Path | |
| from typing import Literal | |
| from loguru import logger | |
| from pydantic import BaseModel, PrivateAttr | |
| from scievo.agents import experiment_agent | |
| from scievo.agents.experiment_agent.state import ExperimentAgentState | |
| from scievo.core.code_env import LocalEnv | |
| from scievo.workflows.utils import get_separator | |
| class ExperimentWorkflow(BaseModel): | |
| """ | |
| Experiment Workflow - runs only the ExperimentAgent. | |
| This workflow executes: | |
| 1. ExperimentAgent - Generates code, executes experiments, produces metrics | |
| Requires: | |
| - data_summary: Either a string containing data analysis, or a path to data_analysis.md | |
| Usage: | |
| workflow = ExperimentWorkflow( | |
| workspace_path="workspace", | |
| user_query="Train an SVR model", | |
| data_summary="... analysis from DataAgent ...", | |
| ) | |
| workflow.run() | |
| print(workflow.final_summary) | |
| """ | |
| # ==================== INPUT ==================== | |
| workspace_path: Path | |
| user_query: str | |
| data_summary: str # Can be loaded from file or passed directly | |
| repo_source: str | None = None | |
| max_revisions: int = 5 | |
| recursion_limit: int = 100 | |
| # ==================== INTERNAL STATE ==================== | |
| current_phase: Literal["init", "experiment", "complete", "failed"] = "init" | |
| # ==================== OUTPUT ==================== | |
| final_status: Literal["success", "failed", "max_revisions_reached"] | None = None | |
| final_summary: str = "" | |
| execution_results: list = [] | |
| current_revision: int = 0 | |
| error_message: str | None = None | |
| experiment_agent_intermediate_state: list[dict] = [] | |
| # Internal: compiled graph (lazy loaded) | |
| _experiment_agent_graph: object = PrivateAttr(default=None) | |
| def _ensure_graph(self): | |
| """Lazily compile agent graph.""" | |
| if self._experiment_agent_graph is None: | |
| self._experiment_agent_graph = experiment_agent.build().compile() | |
| def _setup_directories(self): | |
| """Setup workspace directory.""" | |
| self.workspace_path.mkdir(parents=True, exist_ok=True) | |
| def from_data_analysis_file( | |
| cls, | |
| workspace_path: str | Path, | |
| user_query: str, | |
| data_analysis_path: str | Path | None = None, | |
| repo_source: str | None = None, | |
| max_revisions: int = 5, | |
| recursion_limit: int = 100, | |
| ) -> "ExperimentWorkflow": | |
| """ | |
| Create ExperimentWorkflow by loading data summary from file. | |
| Args: | |
| workspace_path: Workspace directory for the experiment | |
| user_query: User's experiment objective | |
| data_analysis_path: Path to data_analysis.md (defaults to workspace/data_analysis.md) | |
| repo_source: Optional repository source | |
| max_revisions: Maximum revision loops | |
| recursion_limit: Recursion limit for ExperimentAgent | |
| Returns: | |
| ExperimentWorkflow instance | |
| """ | |
| workspace_path = Path(workspace_path) | |
| if data_analysis_path is None: | |
| data_analysis_path = workspace_path / "data_analysis.md" | |
| else: | |
| data_analysis_path = Path(data_analysis_path) | |
| if not data_analysis_path.exists(): | |
| raise FileNotFoundError( | |
| f"Data analysis file not found: {data_analysis_path}. " | |
| "Run DataWorkflow first or provide data_summary directly." | |
| ) | |
| data_summary = data_analysis_path.read_text() | |
| return cls( | |
| workspace_path=workspace_path, | |
| user_query=user_query, | |
| data_summary=data_summary, | |
| repo_source=repo_source, | |
| max_revisions=max_revisions, | |
| recursion_limit=recursion_limit, | |
| ) | |
| def run(self) -> "ExperimentWorkflow": | |
| """ | |
| Run the experiment workflow. | |
| Returns: | |
| self (for chaining) | |
| """ | |
| self._ensure_graph() | |
| self._setup_directories() | |
| logger.info(get_separator()) | |
| logger.info("Starting Experiment Workflow") | |
| logger.info(get_separator()) | |
| success = self._run_experiment_agent() | |
| self._finalize(success) | |
| return self | |
| def _run_experiment_agent(self) -> bool: | |
| """ | |
| Run ExperimentAgent to generate and execute experiments. | |
| Returns: | |
| True if successful, False if failed | |
| """ | |
| logger.info("Running ExperimentAgent") | |
| self.current_phase = "experiment" | |
| exp_state = ExperimentAgentState( | |
| workspace=LocalEnv(self.workspace_path), | |
| data_summary=self.data_summary, | |
| user_query=self.user_query, | |
| repo_source=self.repo_source, | |
| max_revisions=self.max_revisions, | |
| ) | |
| try: | |
| result = self._experiment_agent_graph.invoke( | |
| exp_state, | |
| {"recursion_limit": self.recursion_limit}, | |
| ) | |
| result_state = ExperimentAgentState(**result) | |
| # Extract results | |
| self.final_status = result_state.final_status | |
| self.execution_results = result_state.all_execution_results | |
| self.current_revision = result_state.current_revision | |
| self.experiment_agent_intermediate_state = result_state.intermediate_state | |
| self.final_summary = self._compose_summary(result_state) | |
| self.current_phase = "complete" | |
| logger.info(f"ExperimentAgent completed: {self.final_status}") | |
| return True | |
| except Exception as e: | |
| logger.exception("ExperimentAgent failed") | |
| self.error_message = f"ExperimentAgent failed: {e}" | |
| self.current_phase = "failed" | |
| self.final_status = "failed" | |
| return False | |
| def _compose_summary(self, exp_state: ExperimentAgentState) -> str: | |
| """Compose the final summary.""" | |
| DATA_SUMMARY_LIMITS = 2000 | |
| return f"""\ | |
| === Experiment Workflow Summary === | |
| ====== Data Analysis (Input) ====== | |
| {self.data_summary[:DATA_SUMMARY_LIMITS]}{'...' if len(self.data_summary) > DATA_SUMMARY_LIMITS else ''} | |
| --- | |
| ====== Workflow Metadata ====== | |
| - **Workspace**: {self.workspace_path} | |
| - **Repo Source**: {self.repo_source or 'Not specified'} | |
| - **Final Status**: {self.final_status} | |
| - **Total Revisions**: {exp_state.current_revision} | |
| --- | |
| ====== Experiment Results ====== | |
| {exp_state.final_summary} | |
| """ | |
| def _finalize(self, success: bool): | |
| """Finalize the workflow.""" | |
| logger.info("Finalizing experiment workflow") | |
| if not success and not self.final_summary: | |
| self.final_summary = f"# Experiment Workflow Failed\n\nError: {self.error_message}" | |
| logger.info(get_separator()) | |
| logger.info(f"Experiment Workflow completed: {self.final_status}") | |
| logger.info(get_separator()) | |
| def save_summary(self, path: str | Path | None = None) -> Path: | |
| """Save the final summary to a file.""" | |
| if path is None: | |
| path = self.workspace_path / "experiment_summary.md" | |
| path = Path(path) | |
| path.write_text(self.final_summary) | |
| logger.info(f"Summary saved to {path}") | |
| return path | |
| def run_experiment_workflow( | |
| workspace_path: str | Path, | |
| user_query: str, | |
| data_summary: str | None = None, | |
| data_analysis_path: str | Path | None = None, | |
| repo_source: str | None = None, | |
| max_revisions: int = 5, | |
| recursion_limit: int = 100, | |
| ) -> ExperimentWorkflow: | |
| """ | |
| Convenience function to run the experiment workflow. | |
| Args: | |
| workspace_path: Workspace directory for the experiment | |
| user_query: User's experiment objective | |
| data_summary: Data analysis text (if not provided, loads from file) | |
| data_analysis_path: Path to data_analysis.md (defaults to workspace/data_analysis.md) | |
| repo_source: Optional repository source (local path or git URL) | |
| max_revisions: Maximum revision loops for experiment agent | |
| recursion_limit: Recursion limit for ExperimentAgent (default=100) | |
| Returns: | |
| ExperimentWorkflow: Completed workflow with results | |
| Example: | |
| >>> # Option 1: Load from file | |
| >>> result = run_experiment_workflow( | |
| ... workspace_path="workspace", | |
| ... user_query="Train an SVR model to predict prices", | |
| ... ) | |
| >>> | |
| >>> # Option 2: Pass data summary directly | |
| >>> result = run_experiment_workflow( | |
| ... workspace_path="workspace", | |
| ... user_query="Train an SVR model", | |
| ... data_summary="The dataset contains 1000 rows...", | |
| ... ) | |
| >>> print(result.final_summary) | |
| """ | |
| if data_summary is not None: | |
| workflow = ExperimentWorkflow( | |
| workspace_path=Path(workspace_path), | |
| user_query=user_query, | |
| data_summary=data_summary, | |
| repo_source=repo_source, | |
| max_revisions=max_revisions, | |
| recursion_limit=recursion_limit, | |
| ) | |
| else: | |
| workflow = ExperimentWorkflow.from_data_analysis_file( | |
| workspace_path=workspace_path, | |
| user_query=user_query, | |
| data_analysis_path=data_analysis_path, | |
| repo_source=repo_source, | |
| max_revisions=max_revisions, | |
| recursion_limit=recursion_limit, | |
| ) | |
| return workflow.run() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Experiment Workflow - Run ExperimentAgent for code generation and execution", | |
| prog="python -m scievo.workflows.experiment_workflow", | |
| ) | |
| parser.add_argument("workspace_path", help="Workspace directory for the workflow") | |
| parser.add_argument("user_query", help="User's experiment objective") | |
| parser.add_argument( | |
| "data_analysis_path", | |
| nargs="?", | |
| default=None, | |
| help="Path to existing data_analysis.md file (optional)", | |
| ) | |
| parser.add_argument( | |
| "--recursion-limit", | |
| type=int, | |
| default=100, | |
| help="Recursion limit for ExperimentAgent (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "--max-revisions", | |
| type=int, | |
| default=5, | |
| help="Maximum revision loops (default: 5)", | |
| ) | |
| args = parser.parse_args() | |
| result = run_experiment_workflow( | |
| workspace_path=args.workspace_path, | |
| user_query=args.user_query, | |
| data_analysis_path=args.data_analysis_path, | |
| recursion_limit=args.recursion_limit, | |
| max_revisions=args.max_revisions, | |
| ) | |
| print("\n" + get_separator()) | |
| print("EXPERIMENT WORKFLOW COMPLETE") | |
| print(get_separator()) | |
| print(f"\nStatus: {result.final_status}") | |
| print(f"\nFinal Summary:\n{result.final_summary}") | |