scider / scievo /workflows /full_workflow.py
harry-lu-0708's picture
clean HF Space commit (no binary history)
0913c52
"""
Full SciEvo Workflow
Complete workflow that chains DataAgent and ExperimentAgent from scratch.
This workflow takes raw data, analyzes it, generates experiment code,
executes it, and produces final metrics.
For partial workflows (e.g., starting from existing data analysis), see:
- data_workflow.py: Only runs DataAgent
- experiment_workflow.py: Only runs ExperimentAgent
"""
import shutil
from pathlib import Path
from typing import Literal
from loguru import logger
from pydantic import BaseModel, PrivateAttr
from scievo.core.brain import Brain
from scievo.workflows.data_workflow import DataWorkflow
from scievo.workflows.experiment_workflow import ExperimentWorkflow
from scievo.workflows.utils import get_separator
class FullWorkflow(BaseModel):
"""
Full SciEvo Workflow - chains DataAgent and ExperimentAgent from scratch.
This workflow executes:
1. DataWorkflow - Analyzes input data, produces data_analysis.md
2. ExperimentWorkflow - Generates code, executes experiments, produces metrics
Internally uses DataWorkflow and ExperimentWorkflow for better modularity.
Usage:
workflow = FullWorkflow(
data_path="data/data.csv",
workspace_path="workspace",
user_query="Train an SVR model",
)
workflow.run()
print(workflow.final_summary)
"""
# ==================== INPUT ====================
data_path: Path
workspace_path: Path
user_query: str
repo_source: str | None = None
max_revisions: int = 5
data_agent_recursion_limit: int = 100
experiment_agent_recursion_limit: int = 100
session_name: str | None = None # Optional custom session name
data_desc: str | None = None # Optional additional description of the data
# ==================== INTERNAL STATE ====================
current_phase: Literal["init", "data_analysis", "experiment", "complete", "failed"] = "init"
data_summary: str = ""
data_agent_history: list = []
# Paper subagent results (from DataWorkflow)
papers: list[dict] = []
datasets: list[dict] = []
metrics: list[dict] = []
paper_search_summary: str | None = None
# Brain-managed directories (initialized in _setup_brain)
sess_dir: Path | None = None
long_term_mem_dir: Path | None = None
project_mem_dir: Path | None = None
# ==================== OUTPUT ====================
final_status: Literal["success", "failed", "max_revisions_reached"] | None = None
final_summary: str = ""
execution_results: list = []
error_message: str | None = None
# Internal: sub-workflows
_data_workflow: DataWorkflow | None = PrivateAttr(default=None)
_experiment_workflow: ExperimentWorkflow | None = PrivateAttr(default=None)
def _setup_brain(self):
"""Setup Brain session and memory directories."""
# Setup workspace
self.workspace_path.mkdir(parents=True, exist_ok=True)
# Get Brain instance (uses brain_dir or BRAIN_DIR env)
brain = Brain.instance()
# Create session
if self.session_name:
brain_session = Brain.new_session_named(self.session_name)
else:
brain_session = Brain.new_session()
# Set memory directories from Brain
self.sess_dir = brain_session.session_dir
self.long_term_mem_dir = brain.brain_dir / "mem_long_term"
self.project_mem_dir = brain.brain_dir / "mem_project"
# Ensure memory directories exist
self.long_term_mem_dir.mkdir(parents=True, exist_ok=True)
self.project_mem_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Brain session: {self.sess_dir}")
logger.debug(f"Long-term memory: {self.long_term_mem_dir}")
logger.debug(f"Project memory: {self.project_mem_dir}")
def run(self) -> "FullWorkflow":
"""
Run the complete workflow: DataWorkflow -> ExperimentWorkflow.
Returns:
self (for chaining)
"""
# Step 0: Setup Brain session
self._setup_brain()
logger.info(get_separator())
logger.info("Starting Full SciEvo Workflow")
logger.info(get_separator())
# Step 1: Run DataWorkflow
if not self._run_data_phase():
self._finalize()
return self
# Step 2: Run ExperimentWorkflow
self._run_experiment_phase()
# Step 3: Finalize
self._finalize()
return self
def _run_data_phase(self) -> bool:
"""
Run DataWorkflow to analyze the input data.
Returns:
True if successful, False if failed
"""
logger.info("Phase 1: Running DataWorkflow for data analysis")
self.current_phase = "data_analysis"
self._data_workflow = DataWorkflow(
data_path=self.data_path,
workspace_path=self.workspace_path,
recursion_limit=self.data_agent_recursion_limit,
data_desc=self.data_desc,
# Pass Brain-managed directories
sess_dir=self.sess_dir,
long_term_mem_dir=self.long_term_mem_dir,
project_mem_dir=self.project_mem_dir,
)
try:
self._data_workflow.run()
if self._data_workflow.final_status == "success":
self.data_summary = self._data_workflow.data_summary
self.data_agent_history = self._data_workflow.data_agent_history
# Extract paper subagent results
self.papers = self._data_workflow.papers
self.datasets = self._data_workflow.datasets
self.metrics = self._data_workflow.metrics
self.paper_search_summary = self._data_workflow.paper_search_summary
self._data_workflow.save_summary()
logger.info("DataWorkflow completed successfully")
return True
else:
self.error_message = self._data_workflow.error_message
self.current_phase = "failed"
return False
except Exception as e:
logger.exception("DataWorkflow failed")
self.error_message = f"DataWorkflow failed: {e}"
self.current_phase = "failed"
return False
def _run_experiment_phase(self) -> bool:
"""
Run ExperimentWorkflow to generate and execute experiments.
Returns:
True if successful, False if failed
"""
logger.info("Phase 2: Running ExperimentWorkflow")
self.current_phase = "experiment"
self._experiment_workflow = ExperimentWorkflow(
workspace_path=self.workspace_path,
user_query=self.user_query,
data_summary=self.data_summary,
repo_source=self.repo_source,
max_revisions=self.max_revisions,
recursion_limit=self.experiment_agent_recursion_limit,
# Pass Brain-managed directories (for future use in ExperimentAgent)
sess_dir=self.sess_dir,
long_term_mem_dir=self.long_term_mem_dir,
project_mem_dir=self.project_mem_dir,
)
try:
self._experiment_workflow.run()
# Extract results from experiment workflow
self.final_status = self._experiment_workflow.final_status
self.execution_results = self._experiment_workflow.execution_results
self.final_summary = self._compose_summary()
self.current_phase = "complete"
logger.info(f"ExperimentWorkflow completed: {self.final_status}")
return True
except Exception as e:
logger.exception("ExperimentWorkflow failed")
self.error_message = f"ExperimentWorkflow failed: {e}"
self.current_phase = "failed"
self.final_status = "failed"
return False
def _compose_summary(self) -> str:
"""Compose the final summary."""
exp_summary = (
self._experiment_workflow.final_summary if self._experiment_workflow else "N/A"
)
current_revision = (
self._experiment_workflow.current_revision if self._experiment_workflow else 0
)
return f"""# Full SciEvo Workflow Summary
## Data Analysis
{self.data_summary}
---
## Experiment Results
{exp_summary}
---
## Workflow Metadata
- **Data Path**: {self.data_path}
- **Workspace**: {self.workspace_path}
- **Repo Source**: {self.repo_source or 'Not specified'}
- **Final Status**: {self.final_status}
- **Total Revisions**: {current_revision}
"""
def _finalize(self):
"""Finalize the workflow."""
logger.info("Finalizing workflow")
if self.current_phase == "failed":
self.final_summary = f"# Workflow Failed\n\nError: {self.error_message}"
elif not self.final_summary:
self.final_summary = "# Workflow Completed\n\nNo summary available."
logger.info(get_separator())
logger.info(f"Workflow completed: {self.final_status}")
logger.info(get_separator())
def save_summary(self, path: str | Path | None = None) -> Path:
"""Save the final summary to a file."""
if path is None:
path = self.workspace_path / "workflow_summary.md"
path = Path(path)
path.write_text(self.final_summary)
logger.info(f"Summary saved to {path}")
return path
def run_full_workflow(
data_path: str | Path,
workspace_path: str | Path,
user_query: str,
repo_source: str | None = None,
max_revisions: int = 5,
data_agent_recursion_limit: int = 100,
experiment_agent_recursion_limit: int = 100,
session_name: str | None = None,
data_desc: str | None = None,
) -> FullWorkflow:
"""
Convenience function to run the full SciEvo workflow.
Args:
data_path: Path to the data file or directory to analyze
workspace_path: Workspace directory for the experiment
user_query: User's experiment objective
repo_source: Optional repository source (local path or git URL)
max_revisions: Maximum revision loops for experiment agent
data_agent_recursion_limit: Recursion limit for DataAgent (default=100)
experiment_agent_recursion_limit: Recursion limit for ExperimentAgent (default=100)
session_name: Optional custom session name (otherwise uses timestamp)
data_desc: Optional additional description of the data
Returns:
FullWorkflow: Completed workflow with results
Example:
>>> result = run_full_workflow(
... data_path="data/data.csv",
... workspace_path="workspace",
... user_query="Train an SVR model to predict prices",
... )
>>> print(result.final_summary)
Note:
Memory directories are managed by Brain singleton at FullWorkflow level:
- Session dir: Created via Brain.new_session()
- Long-term memory: brain_dir/mem_long_term
- Project memory: brain_dir/mem_project
These directories are then passed to DataWorkflow and ExperimentWorkflow.
"""
workflow = FullWorkflow(
data_path=data_path,
workspace_path=workspace_path,
user_query=user_query,
repo_source=repo_source,
max_revisions=max_revisions,
data_agent_recursion_limit=data_agent_recursion_limit,
experiment_agent_recursion_limit=experiment_agent_recursion_limit,
session_name=session_name,
data_desc=data_desc,
)
return workflow.run()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Full SciEvo Workflow - Run complete workflow (DataAgent -> ExperimentAgent)",
prog="python -m scievo.workflows.full_workflow",
)
parser.add_argument("data_path", help="Path to the data file or directory to analyze")
parser.add_argument("workspace_path", help="Workspace directory for the experiment")
parser.add_argument("user_query", help="User's experiment objective")
parser.add_argument(
"repo_source",
nargs="?",
default=None,
help="Optional repository source (local path or git URL)",
)
parser.add_argument(
"--max-revisions",
type=int,
default=5,
help="Maximum revision loops for ExperimentAgent (default: 5)",
)
parser.add_argument(
"--data-recursion-limit",
type=int,
default=100,
help="Recursion limit for DataAgent (default: 100)",
)
parser.add_argument(
"--experiment-recursion-limit",
type=int,
default=100,
help="Recursion limit for ExperimentAgent (default: 100)",
)
parser.add_argument(
"--session-name",
default=None,
help="Custom session name (otherwise uses timestamp)",
)
parser.add_argument(
"--data-desc",
default=None,
help="Optional additional description of the data",
)
args = parser.parse_args()
result = run_full_workflow(
data_path=args.data_path,
workspace_path=args.workspace_path,
user_query=args.user_query,
repo_source=args.repo_source,
max_revisions=args.max_revisions,
data_agent_recursion_limit=args.data_recursion_limit,
experiment_agent_recursion_limit=args.experiment_recursion_limit,
session_name=args.session_name,
data_desc=args.data_desc,
)
print("\n" + get_separator())
print("FULL WORKFLOW COMPLETE")
print(get_separator())
print(f"\nStatus: {result.final_status}")
print(f"\nFinal Summary:\n{result.final_summary}")