""" Data Workflow Partial workflow that only runs DataAgent for data analysis. Useful for debugging the data analysis phase independently. """ import shutil from pathlib import Path from typing import Literal from loguru import logger from pydantic import BaseModel, PrivateAttr from scievo.agents import data_agent from scievo.agents.data_agent.state import DataAgentState from scievo.core.brain import Brain from scievo.core.code_env import LocalEnv from scievo.prompts import PROMPTS from scievo.workflows.utils import get_separator class DataWorkflow(BaseModel): """ Data Workflow - runs only the DataAgent for data analysis. This workflow executes: 1. DataAgent - Analyzes input data, produces data_analysis.md Usage: workflow = DataWorkflow( data_path="data/data.csv", workspace_path="workspace", ) workflow.run() print(workflow.data_summary) """ # ==================== INPUT ==================== data_path: Path workspace_path: Path recursion_limit: int = 100 data_desc: str | None = None # Optional additional description of the data # Memory directories (optional - if None, will create new Brain session) sess_dir: Path | None = None long_term_mem_dir: Path | None = None project_mem_dir: Path | None = None session_name: str | None = None # Only used if sess_dir is None # ==================== INTERNAL STATE ==================== current_phase: Literal["init", "data_analysis", "complete", "failed"] = "init" # ==================== OUTPUT ==================== final_status: Literal["success", "failed"] | None = None data_summary: str = "" data_agent_history: list = [] data_agent_intermediate_state: list[dict] = [] error_message: str | None = None # Paper subagent results (from DataAgentState) papers: list[dict] = [] datasets: list[dict] = [] metrics: list[dict] = [] paper_search_summary: str | None = None # Internal: compiled graph (lazy loaded) _data_agent_graph: object = PrivateAttr(default=None) def _ensure_graph(self): """Lazily compile agent graph.""" if self._data_agent_graph is None: self._data_agent_graph = data_agent.build().compile() def _setup_directories(self): """Setup workspace and memory directories. If sess_dir is provided (from FullWorkflow), use it. Otherwise, create new Brain session (standalone mode). """ # Setup workspace self.workspace_path.mkdir(parents=True, exist_ok=True) # Only create Brain session if directories not provided if self.sess_dir is None: logger.debug("No sess_dir provided, creating new Brain session") brain = Brain.instance() if self.session_name: brain_session = Brain.new_session_named(self.session_name) else: brain_session = Brain.new_session() # Set memory directories from Brain self.sess_dir = brain_session.session_dir self.long_term_mem_dir = brain.brain_dir / "mem_long_term" self.project_mem_dir = brain.brain_dir / "mem_project" # Ensure memory directories exist self.long_term_mem_dir.mkdir(parents=True, exist_ok=True) self.project_mem_dir.mkdir(parents=True, exist_ok=True) else: logger.debug(f"Using provided sess_dir: {self.sess_dir}") # Ensure short_term directory exists in session directory short_term_dir = Path(self.sess_dir) / "short_term" short_term_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Short-term memory directory: {short_term_dir}") logger.info(f"Session directory: {self.sess_dir}") logger.debug(f"Long-term memory: {self.long_term_mem_dir}") logger.debug(f"Project memory: {self.project_mem_dir}") def run(self) -> "DataWorkflow": """ Run the data analysis workflow. Returns: self (for chaining) """ self._ensure_graph() self._setup_directories() logger.info(get_separator()) logger.info("Starting Data Workflow") logger.info(get_separator()) success = self._run_data_agent() self._finalize(success) return self def _run_data_agent(self) -> bool: """ Run DataAgent to analyze the input data. Returns: True if successful, False if failed """ logger.info("Running DataAgent for data analysis") self.current_phase = "data_analysis" # Construct query for data analysis data_query = PROMPTS.data.user_prompt.render( dir=str(self.data_path), data_desc=self.data_desc, ) # Prepare state data_state = DataAgentState( workspace=LocalEnv(self.workspace_path), sess_dir=Path(self.sess_dir), long_term_mem_dir=Path(self.long_term_mem_dir), project_mem_dir=Path(self.project_mem_dir), user_query=data_query, data_desc=self.data_desc, talk_mode=False, ) try: result = self._data_agent_graph.invoke( data_state, {"recursion_limit": self.recursion_limit}, ) result_state = DataAgentState(**result) # Extract data summary from history self.data_agent_history = result_state.history self.data_agent_intermediate_state = result_state.intermediate_state self.data_summary = self._extract_data_summary(result_state) # Extract paper subagent results self.papers = result_state.papers self.datasets = result_state.datasets self.metrics = result_state.metrics self.paper_search_summary = result_state.paper_search_summary logger.info("DataAgent completed successfully") logger.debug(f"Data summary: {len(self.data_summary)} chars") logger.debug( f"Papers: {len(self.papers)}, Datasets: {len(self.datasets)}, Metrics: {len(self.metrics)}" ) return True except Exception as e: logger.exception("DataAgent failed") self.error_message = f"DataAgent failed: {e}" self.current_phase = "failed" return False def _extract_data_summary(self, result_state: DataAgentState) -> str: """Extract data summary from DataAgent state.""" # First try to read from output_summary field if result_state.output_summary: return result_state.output_summary # Fallback 1: Try to extract from generate_summary node in intermediate_state for item in reversed(result_state.intermediate_state): if item.get("node_name") == "generate_summary": output = item.get("output", "") if output and output != "No summary generated": logger.info("Extracted summary from intermediate_state") return output # Fallback 2: Try to extract from last assistant message in history # (the summary might be in the last message) if result_state.history: for msg in reversed(result_state.history): if hasattr(msg, "role") and msg.role == "assistant": if hasattr(msg, "content") and msg.content: content = msg.content # Check if this looks like a summary (not an error message) if content and not content.startswith("Failed to generate"): logger.info("Extracted summary from last assistant message") return content # Fallback 3: Try to read saved analysis.md file analysis_file = self.workspace_path / "analysis.md" if analysis_file.exists(): logger.info("Extracted summary from analysis.md file") return analysis_file.read_text() # Fallback 4: Try data_analysis.md data_analysis_file = self.workspace_path / "data_analysis.md" if data_analysis_file.exists(): logger.info("Extracted summary from data_analysis.md file") return data_analysis_file.read_text() # Fallback 5: Generate a basic summary from available information logger.warning("No summary found, generating basic summary from available data") summary_parts = [] if result_state.paper_search_summary: summary_parts.append(f"## Paper Search Results\n{result_state.paper_search_summary}") if result_state.papers: summary_parts.append( f"\n## Papers Found\nFound {len(result_state.papers)} relevant papers." ) if result_state.datasets: summary_parts.append( f"\n## Datasets Found\nFound {len(result_state.datasets)} relevant datasets." ) if result_state.metrics: summary_parts.append( f"\n## Metrics\nExtracted {len(result_state.metrics)} evaluation metrics." ) if result_state.intermediate_state: # Include some intermediate outputs summary_parts.append("\n## Analysis Process") for item in result_state.intermediate_state[-5:]: # Last 5 items node_name = item.get("node_name", "unknown") output = item.get("output", "") if output and len(output) < 500: # Only include short outputs summary_parts.append(f"\n### {node_name}\n{output[:300]}...") if summary_parts: return "\n".join(summary_parts) # Last resort: return a minimal summary return "Data analysis completed. No detailed summary available. Please check the intermediate states for more information." def _finalize(self, success: bool): """Finalize the workflow.""" logger.info("Finalizing data workflow") if success: self.final_status = "success" self.current_phase = "complete" else: self.final_status = "failed" logger.info(get_separator()) logger.info(f"Data Workflow completed: {self.final_status}") logger.info(get_separator()) def save_summary(self, path: str | Path | None = None) -> Path: """Save the data summary to a file.""" if path is None: path = self.workspace_path / "data_analysis.md" path = Path(path) path.write_text(self.data_summary) logger.info(f"Data summary saved to {path}") return path def run_data_workflow( data_path: str | Path, workspace_path: str | Path, recursion_limit: int = 100, session_name: str | None = None, sess_dir: str | Path | None = None, long_term_mem_dir: str | Path | None = None, project_mem_dir: str | Path | None = None, data_desc: str | None = None, ) -> DataWorkflow: """ Convenience function to run the data analysis workflow. Args: data_path: Path to the data file or directory to analyze workspace_path: Workspace directory for the analysis recursion_limit: Recursion limit for DataAgent (default=100) session_name: Optional custom session name (only used if sess_dir is None) sess_dir: Optional session directory (if None, creates new Brain session) long_term_mem_dir: Optional long-term memory directory project_mem_dir: Optional project memory directory data_desc: Optional additional description of the data Returns: DataWorkflow: Completed workflow with results Example: >>> # Standalone mode (creates new Brain session) >>> result = run_data_workflow( ... data_path="data/data.csv", ... workspace_path="workspace", ... ) >>> print(result.data_summary) >>> # With provided directories (e.g., from FullWorkflow) >>> result = run_data_workflow( ... data_path="data/data.csv", ... workspace_path="workspace", ... sess_dir=Path("brain/ss_existing"), ... long_term_mem_dir=Path("brain/mem_long_term"), ... project_mem_dir=Path("brain/mem_project"), ... ) Note: When sess_dir is None, creates new Brain session automatically: - Session dir: Created via Brain.new_session() - Long-term memory: brain_dir/mem_long_term - Project memory: brain_dir/mem_project """ workflow = DataWorkflow( data_path=Path(data_path), workspace_path=Path(workspace_path), recursion_limit=recursion_limit, sess_dir=Path(sess_dir) if sess_dir else None, long_term_mem_dir=Path(long_term_mem_dir) if long_term_mem_dir else None, project_mem_dir=Path(project_mem_dir) if project_mem_dir else None, session_name=session_name, data_desc=data_desc, ) return workflow.run() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Data Workflow - Run DataAgent for data analysis", prog="python -m scievo.workflows.data_workflow", ) parser.add_argument("data_path", help="Path to the data file or directory to analyze") parser.add_argument("workspace_path", help="Workspace directory for the workflow") parser.add_argument( "--recursion-limit", type=int, default=100, help="Recursion limit for DataAgent (default: 100)", ) parser.add_argument( "--session-name", default=None, help="Custom session name (otherwise uses timestamp)", ) args = parser.parse_args() result = run_data_workflow( data_path=args.data_path, workspace_path=args.workspace_path, recursion_limit=args.recursion_limit, session_name=args.session_name, ) print("\n" + get_separator()) print("DATA WORKFLOW COMPLETE") print(get_separator()) print(f"\nStatus: {result.final_status}") print(f"\nData Summary:\n{result.data_summary}")