Spaces:
Running
Running
File size: 6,262 Bytes
2797314 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
"""
Workflow State Management
Stores intermediate results and metadata between steps to minimize LLM context.
"""
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
from datetime import datetime
class WorkflowState:
"""
Structured state object that holds workflow context.
Replaces storing everything in LLM conversation history.
"""
def __init__(self):
self.dataset_info: Optional[Dict[str, Any]] = None
self.profiling_summary: Optional[Dict[str, Any]] = None
self.quality_issues: Optional[Dict[str, Any]] = None
self.cleaning_results: Optional[Dict[str, Any]] = None
self.feature_engineering: Optional[Dict[str, Any]] = None
self.modeling_results: Optional[Dict[str, Any]] = None
self.visualization_paths: List[str] = []
self.current_file: Optional[str] = None
self.target_column: Optional[str] = None
self.task_type: Optional[str] = None # 'classification', 'regression', etc.
self.steps_completed: List[str] = []
self.created_at = datetime.utcnow().isoformat()
def update_dataset_info(self, info: Dict[str, Any]):
"""Store basic dataset metadata (schema, shape, etc.)"""
self.dataset_info = info
self.current_file = info.get('file_path')
self.steps_completed.append('dataset_loaded')
def update_profiling(self, summary: Dict[str, Any]):
"""Store profiling results summary"""
self.profiling_summary = summary
self.steps_completed.append('profiling_complete')
def update_quality(self, issues: Dict[str, Any]):
"""Store data quality assessment"""
self.quality_issues = issues
self.steps_completed.append('quality_checked')
def update_cleaning(self, results: Dict[str, Any]):
"""Store cleaning/preprocessing results"""
self.cleaning_results = results
if results.get('output_file'):
self.current_file = results['output_file']
self.steps_completed.append('data_cleaned')
def update_features(self, results: Dict[str, Any]):
"""Store feature engineering results"""
self.feature_engineering = results
if results.get('output_file'):
self.current_file = results['output_file']
self.steps_completed.append('features_engineered')
def update_modeling(self, results: Dict[str, Any]):
"""Store model training results"""
self.modeling_results = results
self.steps_completed.append('model_trained')
def add_visualization(self, path: str):
"""Track generated visualization"""
self.visualization_paths.append(path)
def get_context_for_step(self, step_name: str) -> Dict[str, Any]:
"""
Get minimal context needed for a specific step.
This replaces sending full conversation history to LLM.
"""
context = {
'current_file': self.current_file,
'target_column': self.target_column,
'task_type': self.task_type,
'steps_completed': self.steps_completed
}
# Step-specific context slicing
if step_name == 'profiling':
context['dataset_info'] = self.dataset_info
elif step_name == 'quality_check':
context['dataset_info'] = self.dataset_info
context['profiling'] = self.profiling_summary
elif step_name == 'cleaning':
context['quality_issues'] = self.quality_issues
context['profiling'] = self.profiling_summary
elif step_name == 'feature_engineering':
context['cleaning_results'] = self.cleaning_results
context['dataset_info'] = self.dataset_info
elif step_name == 'modeling':
context['feature_engineering'] = self.feature_engineering
context['cleaning_results'] = self.cleaning_results
context['target_column'] = self.target_column
context['task_type'] = self.task_type
elif step_name == 'visualization':
context['modeling_results'] = self.modeling_results
context['dataset_info'] = self.dataset_info
return context
def to_dict(self) -> Dict[str, Any]:
"""Serialize state for storage/debugging"""
return {
'dataset_info': self.dataset_info,
'profiling_summary': self.profiling_summary,
'quality_issues': self.quality_issues,
'cleaning_results': self.cleaning_results,
'feature_engineering': self.feature_engineering,
'modeling_results': self.modeling_results,
'visualization_paths': self.visualization_paths,
'current_file': self.current_file,
'target_column': self.target_column,
'task_type': self.task_type,
'steps_completed': self.steps_completed,
'created_at': self.created_at
}
def save_to_file(self, path: str):
"""Save state to JSON file"""
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
json.dump(self.to_dict(), f, indent=2)
@classmethod
def load_from_file(cls, path: str) -> 'WorkflowState':
"""Load state from JSON file"""
with open(path, 'r') as f:
data = json.load(f)
state = cls()
state.dataset_info = data.get('dataset_info')
state.profiling_summary = data.get('profiling_summary')
state.quality_issues = data.get('quality_issues')
state.cleaning_results = data.get('cleaning_results')
state.feature_engineering = data.get('feature_engineering')
state.modeling_results = data.get('modeling_results')
state.visualization_paths = data.get('visualization_paths', [])
state.current_file = data.get('current_file')
state.target_column = data.get('target_column')
state.task_type = data.get('task_type')
state.steps_completed = data.get('steps_completed', [])
state.created_at = data.get('created_at')
return state
|