File size: 14,332 Bytes
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81fdfc6
 
 
 
 
 
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c76d17b
 
 
 
 
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81fdfc6
 
 
 
 
 
0913c52
 
81fdfc6
 
 
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
2f99c61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0913c52
 
2f99c61
0913c52
 
2f99c61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
"""
Data Workflow

Partial workflow that only runs DataAgent for data analysis.
Useful for debugging the data analysis phase independently.
"""

import shutil
from pathlib import Path
from typing import Literal

from loguru import logger
from pydantic import BaseModel, PrivateAttr

from scievo.agents import data_agent
from scievo.agents.data_agent.state import DataAgentState
from scievo.core.brain import Brain
from scievo.core.code_env import LocalEnv
from scievo.prompts import PROMPTS
from scievo.workflows.utils import get_separator


class DataWorkflow(BaseModel):
    """
    Data Workflow - runs only the DataAgent for data analysis.

    This workflow executes:
    1. DataAgent - Analyzes input data, produces data_analysis.md

    Usage:
        workflow = DataWorkflow(
            data_path="data/data.csv",
            workspace_path="workspace",
        )
        workflow.run()
        print(workflow.data_summary)
    """

    # ==================== INPUT ====================
    data_path: Path
    workspace_path: Path
    recursion_limit: int = 100
    data_desc: str | None = None  # Optional additional description of the data

    # Memory directories (optional - if None, will create new Brain session)
    sess_dir: Path | None = None
    long_term_mem_dir: Path | None = None
    project_mem_dir: Path | None = None
    session_name: str | None = None  # Only used if sess_dir is None

    # ==================== INTERNAL STATE ====================
    current_phase: Literal["init", "data_analysis", "complete", "failed"] = "init"

    # ==================== OUTPUT ====================
    final_status: Literal["success", "failed"] | None = None
    data_summary: str = ""
    data_agent_history: list = []
    data_agent_intermediate_state: list[dict] = []
    error_message: str | None = None

    # Paper subagent results (from DataAgentState)
    papers: list[dict] = []
    datasets: list[dict] = []
    metrics: list[dict] = []
    paper_search_summary: str | None = None

    # Internal: compiled graph (lazy loaded)
    _data_agent_graph: object = PrivateAttr(default=None)

    def _ensure_graph(self):
        """Lazily compile agent graph."""
        if self._data_agent_graph is None:
            self._data_agent_graph = data_agent.build().compile()

    def _setup_directories(self):
        """Setup workspace and memory directories.

        If sess_dir is provided (from FullWorkflow), use it.
        Otherwise, create new Brain session (standalone mode).
        """
        # Setup workspace
        self.workspace_path.mkdir(parents=True, exist_ok=True)

        # Only create Brain session if directories not provided
        if self.sess_dir is None:
            logger.debug("No sess_dir provided, creating new Brain session")
            brain = Brain.instance()
            if self.session_name:
                brain_session = Brain.new_session_named(self.session_name)
            else:
                brain_session = Brain.new_session()

            # Set memory directories from Brain
            self.sess_dir = brain_session.session_dir
            self.long_term_mem_dir = brain.brain_dir / "mem_long_term"
            self.project_mem_dir = brain.brain_dir / "mem_project"

            # Ensure memory directories exist
            self.long_term_mem_dir.mkdir(parents=True, exist_ok=True)
            self.project_mem_dir.mkdir(parents=True, exist_ok=True)
        else:
            logger.debug(f"Using provided sess_dir: {self.sess_dir}")

        # Ensure short_term directory exists in session directory
        short_term_dir = Path(self.sess_dir) / "short_term"
        short_term_dir.mkdir(parents=True, exist_ok=True)
        logger.debug(f"Short-term memory directory: {short_term_dir}")

        logger.info(f"Session directory: {self.sess_dir}")
        logger.debug(f"Long-term memory: {self.long_term_mem_dir}")
        logger.debug(f"Project memory: {self.project_mem_dir}")

    def run(self) -> "DataWorkflow":
        """
        Run the data analysis workflow.

        Returns:
            self (for chaining)
        """
        self._ensure_graph()
        self._setup_directories()

        logger.info(get_separator())
        logger.info("Starting Data Workflow")
        logger.info(get_separator())

        success = self._run_data_agent()

        self._finalize(success)

        return self

    def _run_data_agent(self) -> bool:
        """
        Run DataAgent to analyze the input data.

        Returns:
            True if successful, False if failed
        """
        logger.info("Running DataAgent for data analysis")
        self.current_phase = "data_analysis"

        # Construct query for data analysis
        data_query = PROMPTS.data.user_prompt.render(
            dir=str(self.data_path),
            data_desc=self.data_desc,
        )

        # Prepare state
        data_state = DataAgentState(
            workspace=LocalEnv(self.workspace_path),
            sess_dir=Path(self.sess_dir),
            long_term_mem_dir=Path(self.long_term_mem_dir),
            project_mem_dir=Path(self.project_mem_dir),
            user_query=data_query,
            data_desc=self.data_desc,
            talk_mode=False,
        )

        try:
            result = self._data_agent_graph.invoke(
                data_state,
                {"recursion_limit": self.recursion_limit},
            )
            result_state = DataAgentState(**result)

            # Extract data summary from history
            self.data_agent_history = result_state.history
            self.data_agent_intermediate_state = result_state.intermediate_state
            self.data_summary = self._extract_data_summary(result_state)

            # Extract paper subagent results
            self.papers = result_state.papers
            self.datasets = result_state.datasets
            self.metrics = result_state.metrics
            self.paper_search_summary = result_state.paper_search_summary

            logger.info("DataAgent completed successfully")
            logger.debug(f"Data summary: {len(self.data_summary)} chars")
            logger.debug(
                f"Papers: {len(self.papers)}, Datasets: {len(self.datasets)}, Metrics: {len(self.metrics)}"
            )
            return True

        except Exception as e:
            logger.exception("DataAgent failed")
            self.error_message = f"DataAgent failed: {e}"
            self.current_phase = "failed"
            return False

    def _extract_data_summary(self, result_state: DataAgentState) -> str:
        """Extract data summary from DataAgent state."""
        # First try to read from output_summary field
        if result_state.output_summary:
            return result_state.output_summary

        # Fallback 1: Try to extract from generate_summary node in intermediate_state
        for item in reversed(result_state.intermediate_state):
            if item.get("node_name") == "generate_summary":
                output = item.get("output", "")
                if output and output != "No summary generated":
                    logger.info("Extracted summary from intermediate_state")
                    return output

        # Fallback 2: Try to extract from last assistant message in history
        # (the summary might be in the last message)
        if result_state.history:
            for msg in reversed(result_state.history):
                if hasattr(msg, "role") and msg.role == "assistant":
                    if hasattr(msg, "content") and msg.content:
                        content = msg.content
                        # Check if this looks like a summary (not an error message)
                        if content and not content.startswith("Failed to generate"):
                            logger.info("Extracted summary from last assistant message")
                            return content

        # Fallback 3: Try to read saved analysis.md file
        analysis_file = self.workspace_path / "analysis.md"
        if analysis_file.exists():
            logger.info("Extracted summary from analysis.md file")
            return analysis_file.read_text()

        # Fallback 4: Try data_analysis.md
        data_analysis_file = self.workspace_path / "data_analysis.md"
        if data_analysis_file.exists():
            logger.info("Extracted summary from data_analysis.md file")
            return data_analysis_file.read_text()

        # Fallback 5: Generate a basic summary from available information
        logger.warning("No summary found, generating basic summary from available data")
        summary_parts = []

        if result_state.paper_search_summary:
            summary_parts.append(f"## Paper Search Results\n{result_state.paper_search_summary}")

        if result_state.papers:
            summary_parts.append(
                f"\n## Papers Found\nFound {len(result_state.papers)} relevant papers."
            )

        if result_state.datasets:
            summary_parts.append(
                f"\n## Datasets Found\nFound {len(result_state.datasets)} relevant datasets."
            )

        if result_state.metrics:
            summary_parts.append(
                f"\n## Metrics\nExtracted {len(result_state.metrics)} evaluation metrics."
            )

        if result_state.intermediate_state:
            # Include some intermediate outputs
            summary_parts.append("\n## Analysis Process")
            for item in result_state.intermediate_state[-5:]:  # Last 5 items
                node_name = item.get("node_name", "unknown")
                output = item.get("output", "")
                if output and len(output) < 500:  # Only include short outputs
                    summary_parts.append(f"\n### {node_name}\n{output[:300]}...")

        if summary_parts:
            return "\n".join(summary_parts)

        # Last resort: return a minimal summary
        return "Data analysis completed. No detailed summary available. Please check the intermediate states for more information."

    def _finalize(self, success: bool):
        """Finalize the workflow."""
        logger.info("Finalizing data workflow")

        if success:
            self.final_status = "success"
            self.current_phase = "complete"
        else:
            self.final_status = "failed"

        logger.info(get_separator())
        logger.info(f"Data Workflow completed: {self.final_status}")
        logger.info(get_separator())

    def save_summary(self, path: str | Path | None = None) -> Path:
        """Save the data summary to a file."""
        if path is None:
            path = self.workspace_path / "data_analysis.md"
        path = Path(path)
        path.write_text(self.data_summary)
        logger.info(f"Data summary saved to {path}")
        return path


def run_data_workflow(
    data_path: str | Path,
    workspace_path: str | Path,
    recursion_limit: int = 100,
    session_name: str | None = None,
    sess_dir: str | Path | None = None,
    long_term_mem_dir: str | Path | None = None,
    project_mem_dir: str | Path | None = None,
    data_desc: str | None = None,
) -> DataWorkflow:
    """
    Convenience function to run the data analysis workflow.

    Args:
        data_path: Path to the data file or directory to analyze
        workspace_path: Workspace directory for the analysis
        recursion_limit: Recursion limit for DataAgent (default=100)
        session_name: Optional custom session name (only used if sess_dir is None)
        sess_dir: Optional session directory (if None, creates new Brain session)
        long_term_mem_dir: Optional long-term memory directory
        project_mem_dir: Optional project memory directory
        data_desc: Optional additional description of the data

    Returns:
        DataWorkflow: Completed workflow with results

    Example:
        >>> # Standalone mode (creates new Brain session)
        >>> result = run_data_workflow(
        ...     data_path="data/data.csv",
        ...     workspace_path="workspace",
        ... )
        >>> print(result.data_summary)

        >>> # With provided directories (e.g., from FullWorkflow)
        >>> result = run_data_workflow(
        ...     data_path="data/data.csv",
        ...     workspace_path="workspace",
        ...     sess_dir=Path("brain/ss_existing"),
        ...     long_term_mem_dir=Path("brain/mem_long_term"),
        ...     project_mem_dir=Path("brain/mem_project"),
        ... )

    Note:
        When sess_dir is None, creates new Brain session automatically:
        - Session dir: Created via Brain.new_session()
        - Long-term memory: brain_dir/mem_long_term
        - Project memory: brain_dir/mem_project
    """
    workflow = DataWorkflow(
        data_path=Path(data_path),
        workspace_path=Path(workspace_path),
        recursion_limit=recursion_limit,
        sess_dir=Path(sess_dir) if sess_dir else None,
        long_term_mem_dir=Path(long_term_mem_dir) if long_term_mem_dir else None,
        project_mem_dir=Path(project_mem_dir) if project_mem_dir else None,
        session_name=session_name,
        data_desc=data_desc,
    )
    return workflow.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(
        description="Data Workflow - Run DataAgent for data analysis",
        prog="python -m scievo.workflows.data_workflow",
    )
    parser.add_argument("data_path", help="Path to the data file or directory to analyze")
    parser.add_argument("workspace_path", help="Workspace directory for the workflow")
    parser.add_argument(
        "--recursion-limit",
        type=int,
        default=100,
        help="Recursion limit for DataAgent (default: 100)",
    )
    parser.add_argument(
        "--session-name",
        default=None,
        help="Custom session name (otherwise uses timestamp)",
    )

    args = parser.parse_args()

    result = run_data_workflow(
        data_path=args.data_path,
        workspace_path=args.workspace_path,
        recursion_limit=args.recursion_limit,
        session_name=args.session_name,
    )

    print("\n" + get_separator())
    print("DATA WORKFLOW COMPLETE")
    print(get_separator())
    print(f"\nStatus: {result.final_status}")
    print(f"\nData Summary:\n{result.data_summary}")