Spaces:
Running
Running
| """ | |
| Full SciEvo Workflow | |
| Complete workflow that chains DataAgent and ExperimentAgent from scratch. | |
| This workflow takes raw data, analyzes it, generates experiment code, | |
| executes it, and produces final metrics. | |
| For partial workflows (e.g., starting from existing data analysis), see: | |
| - data_workflow.py: Only runs DataAgent | |
| - experiment_workflow.py: Only runs ExperimentAgent | |
| """ | |
| import shutil | |
| from pathlib import Path | |
| from typing import Literal | |
| from loguru import logger | |
| from pydantic import BaseModel, PrivateAttr | |
| from scievo.core.brain import Brain | |
| from scievo.workflows.data_workflow import DataWorkflow | |
| from scievo.workflows.experiment_workflow import ExperimentWorkflow | |
| from scievo.workflows.utils import get_separator | |
| class FullWorkflow(BaseModel): | |
| """ | |
| Full SciEvo Workflow - chains DataAgent and ExperimentAgent from scratch. | |
| This workflow executes: | |
| 1. DataWorkflow - Analyzes input data, produces data_analysis.md | |
| 2. ExperimentWorkflow - Generates code, executes experiments, produces metrics | |
| Internally uses DataWorkflow and ExperimentWorkflow for better modularity. | |
| Usage: | |
| workflow = FullWorkflow( | |
| data_path="data/data.csv", | |
| workspace_path="workspace", | |
| user_query="Train an SVR model", | |
| ) | |
| workflow.run() | |
| print(workflow.final_summary) | |
| """ | |
| # ==================== INPUT ==================== | |
| data_path: Path | |
| workspace_path: Path | |
| user_query: str | |
| repo_source: str | None = None | |
| max_revisions: int = 5 | |
| data_agent_recursion_limit: int = 100 | |
| experiment_agent_recursion_limit: int = 100 | |
| session_name: str | None = None # Optional custom session name | |
| data_desc: str | None = None # Optional additional description of the data | |
| # ==================== INTERNAL STATE ==================== | |
| current_phase: Literal["init", "data_analysis", "experiment", "complete", "failed"] = "init" | |
| data_summary: str = "" | |
| data_agent_history: list = [] | |
| # Paper subagent results (from DataWorkflow) | |
| papers: list[dict] = [] | |
| datasets: list[dict] = [] | |
| metrics: list[dict] = [] | |
| paper_search_summary: str | None = None | |
| # Brain-managed directories (initialized in _setup_brain) | |
| sess_dir: Path | None = None | |
| long_term_mem_dir: Path | None = None | |
| project_mem_dir: Path | None = None | |
| # ==================== OUTPUT ==================== | |
| final_status: Literal["success", "failed", "max_revisions_reached"] | None = None | |
| final_summary: str = "" | |
| execution_results: list = [] | |
| error_message: str | None = None | |
| # Internal: sub-workflows | |
| _data_workflow: DataWorkflow | None = PrivateAttr(default=None) | |
| _experiment_workflow: ExperimentWorkflow | None = PrivateAttr(default=None) | |
| def _setup_brain(self): | |
| """Setup Brain session and memory directories.""" | |
| # Setup workspace | |
| self.workspace_path.mkdir(parents=True, exist_ok=True) | |
| # Get Brain instance (uses brain_dir or BRAIN_DIR env) | |
| brain = Brain.instance() | |
| # Create session | |
| if self.session_name: | |
| brain_session = Brain.new_session_named(self.session_name) | |
| else: | |
| brain_session = Brain.new_session() | |
| # Set memory directories from Brain | |
| self.sess_dir = brain_session.session_dir | |
| self.long_term_mem_dir = brain.brain_dir / "mem_long_term" | |
| self.project_mem_dir = brain.brain_dir / "mem_project" | |
| # Ensure memory directories exist | |
| self.long_term_mem_dir.mkdir(parents=True, exist_ok=True) | |
| self.project_mem_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"Brain session: {self.sess_dir}") | |
| logger.debug(f"Long-term memory: {self.long_term_mem_dir}") | |
| logger.debug(f"Project memory: {self.project_mem_dir}") | |
| def run(self) -> "FullWorkflow": | |
| """ | |
| Run the complete workflow: DataWorkflow -> ExperimentWorkflow. | |
| Returns: | |
| self (for chaining) | |
| """ | |
| # Step 0: Setup Brain session | |
| self._setup_brain() | |
| logger.info(get_separator()) | |
| logger.info("Starting Full SciEvo Workflow") | |
| logger.info(get_separator()) | |
| # Step 1: Run DataWorkflow | |
| if not self._run_data_phase(): | |
| self._finalize() | |
| return self | |
| # Step 2: Run ExperimentWorkflow | |
| self._run_experiment_phase() | |
| # Step 3: Finalize | |
| self._finalize() | |
| return self | |
| def _run_data_phase(self) -> bool: | |
| """ | |
| Run DataWorkflow to analyze the input data. | |
| Returns: | |
| True if successful, False if failed | |
| """ | |
| logger.info("Phase 1: Running DataWorkflow for data analysis") | |
| self.current_phase = "data_analysis" | |
| self._data_workflow = DataWorkflow( | |
| data_path=self.data_path, | |
| workspace_path=self.workspace_path, | |
| recursion_limit=self.data_agent_recursion_limit, | |
| data_desc=self.data_desc, | |
| # Pass Brain-managed directories | |
| sess_dir=self.sess_dir, | |
| long_term_mem_dir=self.long_term_mem_dir, | |
| project_mem_dir=self.project_mem_dir, | |
| ) | |
| try: | |
| self._data_workflow.run() | |
| if self._data_workflow.final_status == "success": | |
| self.data_summary = self._data_workflow.data_summary | |
| self.data_agent_history = self._data_workflow.data_agent_history | |
| # Extract paper subagent results | |
| self.papers = self._data_workflow.papers | |
| self.datasets = self._data_workflow.datasets | |
| self.metrics = self._data_workflow.metrics | |
| self.paper_search_summary = self._data_workflow.paper_search_summary | |
| self._data_workflow.save_summary() | |
| logger.info("DataWorkflow completed successfully") | |
| return True | |
| else: | |
| self.error_message = self._data_workflow.error_message | |
| self.current_phase = "failed" | |
| return False | |
| except Exception as e: | |
| logger.exception("DataWorkflow failed") | |
| self.error_message = f"DataWorkflow failed: {e}" | |
| self.current_phase = "failed" | |
| return False | |
| def _run_experiment_phase(self) -> bool: | |
| """ | |
| Run ExperimentWorkflow to generate and execute experiments. | |
| Returns: | |
| True if successful, False if failed | |
| """ | |
| logger.info("Phase 2: Running ExperimentWorkflow") | |
| self.current_phase = "experiment" | |
| self._experiment_workflow = ExperimentWorkflow( | |
| workspace_path=self.workspace_path, | |
| user_query=self.user_query, | |
| data_summary=self.data_summary, | |
| repo_source=self.repo_source, | |
| max_revisions=self.max_revisions, | |
| recursion_limit=self.experiment_agent_recursion_limit, | |
| # Pass Brain-managed directories (for future use in ExperimentAgent) | |
| sess_dir=self.sess_dir, | |
| long_term_mem_dir=self.long_term_mem_dir, | |
| project_mem_dir=self.project_mem_dir, | |
| ) | |
| try: | |
| self._experiment_workflow.run() | |
| # Extract results from experiment workflow | |
| self.final_status = self._experiment_workflow.final_status | |
| self.execution_results = self._experiment_workflow.execution_results | |
| self.final_summary = self._compose_summary() | |
| self.current_phase = "complete" | |
| logger.info(f"ExperimentWorkflow completed: {self.final_status}") | |
| return True | |
| except Exception as e: | |
| logger.exception("ExperimentWorkflow failed") | |
| self.error_message = f"ExperimentWorkflow failed: {e}" | |
| self.current_phase = "failed" | |
| self.final_status = "failed" | |
| return False | |
| def _compose_summary(self) -> str: | |
| """Compose the final summary.""" | |
| exp_summary = ( | |
| self._experiment_workflow.final_summary if self._experiment_workflow else "N/A" | |
| ) | |
| current_revision = ( | |
| self._experiment_workflow.current_revision if self._experiment_workflow else 0 | |
| ) | |
| return f"""# Full SciEvo Workflow Summary | |
| ## Data Analysis | |
| {self.data_summary} | |
| --- | |
| ## Experiment Results | |
| {exp_summary} | |
| --- | |
| ## Workflow Metadata | |
| - **Data Path**: {self.data_path} | |
| - **Workspace**: {self.workspace_path} | |
| - **Repo Source**: {self.repo_source or 'Not specified'} | |
| - **Final Status**: {self.final_status} | |
| - **Total Revisions**: {current_revision} | |
| """ | |
| def _finalize(self): | |
| """Finalize the workflow.""" | |
| logger.info("Finalizing workflow") | |
| if self.current_phase == "failed": | |
| self.final_summary = f"# Workflow Failed\n\nError: {self.error_message}" | |
| elif not self.final_summary: | |
| self.final_summary = "# Workflow Completed\n\nNo summary available." | |
| logger.info(get_separator()) | |
| logger.info(f"Workflow completed: {self.final_status}") | |
| logger.info(get_separator()) | |
| def save_summary(self, path: str | Path | None = None) -> Path: | |
| """Save the final summary to a file.""" | |
| if path is None: | |
| path = self.workspace_path / "workflow_summary.md" | |
| path = Path(path) | |
| path.write_text(self.final_summary) | |
| logger.info(f"Summary saved to {path}") | |
| return path | |
| def run_full_workflow( | |
| data_path: str | Path, | |
| workspace_path: str | Path, | |
| user_query: str, | |
| repo_source: str | None = None, | |
| max_revisions: int = 5, | |
| data_agent_recursion_limit: int = 100, | |
| experiment_agent_recursion_limit: int = 100, | |
| session_name: str | None = None, | |
| data_desc: str | None = None, | |
| ) -> FullWorkflow: | |
| """ | |
| Convenience function to run the full SciEvo workflow. | |
| Args: | |
| data_path: Path to the data file or directory to analyze | |
| workspace_path: Workspace directory for the experiment | |
| user_query: User's experiment objective | |
| repo_source: Optional repository source (local path or git URL) | |
| max_revisions: Maximum revision loops for experiment agent | |
| data_agent_recursion_limit: Recursion limit for DataAgent (default=100) | |
| experiment_agent_recursion_limit: Recursion limit for ExperimentAgent (default=100) | |
| session_name: Optional custom session name (otherwise uses timestamp) | |
| data_desc: Optional additional description of the data | |
| Returns: | |
| FullWorkflow: Completed workflow with results | |
| Example: | |
| >>> result = run_full_workflow( | |
| ... data_path="data/data.csv", | |
| ... workspace_path="workspace", | |
| ... user_query="Train an SVR model to predict prices", | |
| ... ) | |
| >>> print(result.final_summary) | |
| Note: | |
| Memory directories are managed by Brain singleton at FullWorkflow level: | |
| - Session dir: Created via Brain.new_session() | |
| - Long-term memory: brain_dir/mem_long_term | |
| - Project memory: brain_dir/mem_project | |
| These directories are then passed to DataWorkflow and ExperimentWorkflow. | |
| """ | |
| workflow = FullWorkflow( | |
| data_path=data_path, | |
| workspace_path=workspace_path, | |
| user_query=user_query, | |
| repo_source=repo_source, | |
| max_revisions=max_revisions, | |
| data_agent_recursion_limit=data_agent_recursion_limit, | |
| experiment_agent_recursion_limit=experiment_agent_recursion_limit, | |
| session_name=session_name, | |
| data_desc=data_desc, | |
| ) | |
| return workflow.run() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Full SciEvo Workflow - Run complete workflow (DataAgent -> ExperimentAgent)", | |
| prog="python -m scievo.workflows.full_workflow", | |
| ) | |
| parser.add_argument("data_path", help="Path to the data file or directory to analyze") | |
| parser.add_argument("workspace_path", help="Workspace directory for the experiment") | |
| parser.add_argument("user_query", help="User's experiment objective") | |
| parser.add_argument( | |
| "repo_source", | |
| nargs="?", | |
| default=None, | |
| help="Optional repository source (local path or git URL)", | |
| ) | |
| parser.add_argument( | |
| "--max-revisions", | |
| type=int, | |
| default=5, | |
| help="Maximum revision loops for ExperimentAgent (default: 5)", | |
| ) | |
| parser.add_argument( | |
| "--data-recursion-limit", | |
| type=int, | |
| default=100, | |
| help="Recursion limit for DataAgent (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "--experiment-recursion-limit", | |
| type=int, | |
| default=100, | |
| help="Recursion limit for ExperimentAgent (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "--session-name", | |
| default=None, | |
| help="Custom session name (otherwise uses timestamp)", | |
| ) | |
| parser.add_argument( | |
| "--data-desc", | |
| default=None, | |
| help="Optional additional description of the data", | |
| ) | |
| args = parser.parse_args() | |
| result = run_full_workflow( | |
| data_path=args.data_path, | |
| workspace_path=args.workspace_path, | |
| user_query=args.user_query, | |
| repo_source=args.repo_source, | |
| max_revisions=args.max_revisions, | |
| data_agent_recursion_limit=args.data_recursion_limit, | |
| experiment_agent_recursion_limit=args.experiment_recursion_limit, | |
| session_name=args.session_name, | |
| data_desc=args.data_desc, | |
| ) | |
| print("\n" + get_separator()) | |
| print("FULL WORKFLOW COMPLETE") | |
| print(get_separator()) | |
| print(f"\nStatus: {result.final_status}") | |
| print(f"\nFinal Summary:\n{result.final_summary}") | |