""" Data Processor for MangoMAS Local Training This module processes the original MangoMAS datasets from JSONL format into training-ready datasets with proper splits and preprocessing. """ import argparse import json import logging from pathlib import Path from typing import Dict, List, Tuple import yaml from sklearn.model_selection import train_test_split from tqdm import tqdm logger = logging.getLogger(__name__) class MangoMASDataProcessor: """Process MangoMAS datasets for local training.""" def __init__( self, input_dir, output_dir=None, min_length: int = 10, max_length: int = 2048, config_path: str = None, ): """Initialize with input/output directories or config path for flexibility.""" # Support both test interface and config-driven approach if config_path is not None: # Config-driven initialization (original functionality) with open(config_path, "r") as f: self.config = yaml.safe_load(f) self.data_config = self.config["data"] self.agents_config = self.config["agents"] self.input_dir = Path(input_dir) if input_dir else None self.output_dir = Path(output_dir) if output_dir else Path("/Volumes/Mango_MAS/data/processed") self.min_length = self.data_config.get("preprocessing", {}).get( "min_length", min_length ) self.max_length = self.data_config.get("preprocessing", {}).get( "max_length", max_length ) else: # Direct initialization (test interface) self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) if output_dir else Path("/Volumes/Mango_MAS/data/processed") self.min_length = min_length self.max_length = max_length self.config = None self.data_config = None self.agents_config = None logging.basicConfig(level=logging.INFO) def process_datasets( self, input_dir: str, output_dir: str = "/Volumes/Mango_MAS/data/processed" ) -> None: """ Process all agent datasets from input directory. Args: input_dir: Directory containing original JSONL files output_dir: Directory to save processed datasets """ input_path = Path(input_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Define dataset mappings datasets = { "infrastructure": input_path / "infrastructure_agent_synthetic_prompts.jsonl", "devsecops": input_path / "devsecops_agent_synthetic_prompts.jsonl", "risk_assessment": input_path / "risk_assessment_agent_synthetic_prompts.jsonl", } for agent_type, file_path in datasets.items(): if file_path.exists(): logger.info(f"Processing {agent_type} dataset from {file_path}") self._process_single_dataset(file_path, output_path, agent_type) else: logger.warning(f"Dataset file not found: {file_path}") def _process_single_dataset( self, input_file: Path, output_dir: Path, agent_type: str ) -> None: """Process a single agent dataset.""" # Load data data = self._load_jsonl(input_file) logger.info(f"Loaded {len(data)} samples for {agent_type}") # Clean and preprocess cleaned_data = self._clean_data(data) logger.info(f"After cleaning: {len(cleaned_data)} samples") # Convert to training format training_data = self._convert_to_training_format(cleaned_data, agent_type) # Create splits train_data, val_data, test_data = self._create_splits(training_data) # Save processed datasets self._save_datasets(train_data, val_data, test_data, output_dir, agent_type) logger.info( f"Saved {agent_type} dataset: " f"{len(train_data)} train, {len(val_data)} val, {len(test_data)} test" ) def _load_jsonl(self, file_path: Path) -> List[Dict]: """Load data from JSONL file.""" data = [] with open(file_path, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): try: data.append(json.loads(line.strip())) except json.JSONDecodeError as e: logger.warning(f"Skipping invalid JSON on line {line_num}: {e}") return data def _clean_data(self, data: List[Dict]) -> List[Dict]: """Clean and validate the data.""" cleaned = [] for item in tqdm(data, desc="Cleaning data"): # Check required fields if not all(key in item for key in ["instruction", "input", "output"]): continue # Check text lengths input_text = f"{item['instruction']} {item['input']}" output_text = item["output"] if ( len(input_text) < self.data_config["preprocessing"]["min_length"] or len(input_text) > self.data_config["preprocessing"]["max_length"] ): continue if ( len(output_text) < self.data_config["preprocessing"]["min_length"] or len(output_text) > self.data_config["preprocessing"]["max_length"] ): continue cleaned.append(item) # Remove duplicates if configured if self.data_config["preprocessing"]["remove_duplicates"]: cleaned = self._remove_duplicates(cleaned) return cleaned def _remove_duplicates(self, data: List[Dict]) -> List[Dict]: """Remove duplicate entries based on input text.""" seen_inputs = set() unique_data = [] for item in data: input_text = f"{item['instruction']} {item['input']}" if input_text not in seen_inputs: seen_inputs.add(input_text) unique_data.append(item) logger.info(f"Removed {len(data) - len(unique_data)} duplicates") return unique_data def _validate_sample(self, sample: Dict) -> bool: """Validate a single sample for required fields and length constraints.""" # Check required fields required_fields = ["instruction", "input", "output", "agent_type"] if not all(key in sample for key in required_fields): return False # Check text lengths combined_text = f"{sample['instruction']} {sample['input']} {sample['output']}" if len(combined_text) < self.min_length or len(combined_text) > self.max_length: return False return True def _clean_text(self, text: str) -> str: """Clean text by normalizing whitespace and removing extra spaces.""" import re # Remove extra whitespace and normalize cleaned = re.sub(r"\s+", " ", text.strip()) return cleaned def _format_conversation(self, sample: Dict) -> Dict: """Format sample into conversation format suitable for training.""" # Create conversation text if sample.get("input", "").strip(): conversation_text = f"Human: {sample['instruction']}\n{sample['input']}\n\nAssistant: {sample['output']}" else: conversation_text = ( f"Human: {sample['instruction']}\n\nAssistant: {sample['output']}" ) return { "text": conversation_text, "agent_type": sample["agent_type"], "instruction": sample["instruction"], "input": sample["input"], "output": sample["output"], } def _split_dataset( self, data: List[Dict], train_ratio: float = 0.8, val_ratio: float = 0.1, test_ratio: float = 0.1, ) -> Tuple[List[Dict], List[Dict], List[Dict]]: """Split dataset into train/validation/test sets.""" if abs(train_ratio + val_ratio + test_ratio - 1.0) > 1e-6: raise ValueError( f"Split ratios must sum to 1.0, got {train_ratio + val_ratio + test_ratio}" ) if not data: return [], [], [] # Use sklearn for consistent splitting from sklearn.model_selection import train_test_split # First split: train vs (val + test) if len(data) == 1: return data, [], [] train_data, temp_data = train_test_split( data, test_size=(val_ratio + test_ratio), random_state=42, shuffle=True ) # Second split: val vs test if temp_data and val_ratio > 0 and test_ratio > 0: val_ratio_normalized = val_ratio / (val_ratio + test_ratio) val_data, test_data = train_test_split( temp_data, test_size=(1 - val_ratio_normalized), random_state=42, shuffle=True, ) elif val_ratio > 0: val_data, test_data = temp_data, [] else: val_data, test_data = [], temp_data return train_data, val_data, test_data def _calculate_stats(self, data: List[Dict]) -> Dict: """Calculate statistics for the dataset.""" if not data: return { "total_samples": 0, "avg_length": 0, "min_length": 0, "max_length": 0, "agent_distribution": {}, } lengths = [len(item.get("text", "")) for item in data] agent_counts = {} for item in data: agent = item.get("agent_type", "unknown") agent_counts[agent] = agent_counts.get(agent, 0) + 1 return { "total_samples": len(data), "avg_length": sum(lengths) / len(lengths), "min_length": min(lengths), "max_length": max(lengths), "agent_distribution": agent_counts, } def _load_agent_data(self, agent_type: str) -> List[Dict]: """Load data for a specific agent type.""" if not self.input_dir: return [] # Look for files matching the agent type. We intentionally call glob even # if the directory may not exist in test environments, since tests patch # pathlib.Path.glob. pattern = f"*{agent_type}*.jsonl" matching_files = list(self.input_dir.glob(pattern)) data = [] for file_path in matching_files: file_data = self._load_jsonl(file_path) data.extend(file_data) return data def _save_jsonl(self, data: List[Dict], output_path: Path) -> None: """Save data to JSONL file.""" output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: for item in data: f.write(json.dumps(item, ensure_ascii=False) + "\n") def _save_stats(self, stats: Dict, output_path: Path) -> None: """Save statistics to JSON file.""" output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: json.dump(stats, f, indent=2, ensure_ascii=False) def process_agent( self, agent_type: str, train_ratio: float = 0.8, val_ratio: float = 0.1, test_ratio: float = 0.1, ) -> None: """Process data for a single agent type.""" # Load data data = self._load_agent_data(agent_type) if not data: raise ValueError(f"No valid data found for agent type: {agent_type}") # Validate and clean data valid_data = [] for sample in data: if self._validate_sample(sample): formatted = self._format_conversation(sample) valid_data.append(formatted) if not valid_data: raise ValueError( f"No valid data found after processing for agent type: {agent_type}" ) # Remove duplicates unique_data = self._remove_duplicates(valid_data) # Split dataset train_data, val_data, test_data = self._split_dataset( unique_data, train_ratio, val_ratio, test_ratio ) # Save datasets self.output_dir.mkdir(parents=True, exist_ok=True) self._save_jsonl(train_data, self.output_dir / f"{agent_type}_train.jsonl") self._save_jsonl(val_data, self.output_dir / f"{agent_type}_val.jsonl") self._save_jsonl(test_data, self.output_dir / f"{agent_type}_test.jsonl") # Save statistics stats = self._calculate_stats(unique_data) self._save_stats(stats, self.output_dir / f"{agent_type}_stats.json") logger.info( f"Processed {agent_type}: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test samples" ) def _convert_to_training_format( self, data: List[Dict], agent_type: str ) -> List[Dict]: """Convert to format suitable for training.""" training_data = [] for item in data: # Create conversation format suitable for language modeling conversation = { "messages": [ { "role": "system", "content": f"You are a {agent_type.replace('_', ' ')} specialist. " f"Provide expert recommendations and analysis.", }, { "role": "user", "content": f"{item['instruction']}\n\n{item['input']}", }, {"role": "assistant", "content": item["output"]}, ], "metadata": item.get("metadata", {}), "agent_type": agent_type, } training_data.append(conversation) return training_data def _create_splits( self, data: List[Dict] ) -> Tuple[List[Dict], List[Dict], List[Dict]]: """Create train/validation/test splits.""" train_size = self.data_config["train_split"] val_size = self.data_config["validation_split"] test_size = self.data_config["test_split"] # Normalize splits to sum to 1 total = train_size + val_size + test_size train_size /= total val_size /= total test_size /= total # First split: train vs (val + test) train_data, temp_data = train_test_split( data, test_size=(val_size + test_size), random_state=42, shuffle=True ) # Second split: val vs test val_ratio = val_size / (val_size + test_size) val_data, test_data = train_test_split( temp_data, test_size=(1 - val_ratio), random_state=42, shuffle=True ) return train_data, val_data, test_data def _save_datasets( self, train_data: List[Dict], val_data: List[Dict], test_data: List[Dict], output_dir: Path, agent_type: str, ) -> None: """Save processed datasets to files.""" datasets = {"train": train_data, "validation": val_data, "test": test_data} for split_name, split_data in datasets.items(): output_file = output_dir / f"{agent_type}_{split_name}.jsonl" with open(output_file, "w", encoding="utf-8") as f: for item in split_data: f.write(json.dumps(item, ensure_ascii=False) + "\n") logger.info(f"Saved {len(split_data)} samples to {output_file}") def create_combined_dataset(self, output_dir: str = "/Volumes/Mango_MAS/data/processed") -> None: """Create combined dataset with all agent types for multi-task training.""" output_path = Path(output_dir) # Collect all processed data all_train_data = [] all_val_data = [] all_test_data = [] for agent_type in self.agents_config.keys(): for split in ["train", "validation", "test"]: file_path = output_path / f"{agent_type}_{split}.jsonl" if file_path.exists(): data = self._load_jsonl(file_path) if split == "train": all_train_data.extend(data) elif split == "validation": all_val_data.extend(data) else: all_test_data.extend(data) # Shuffle combined datasets import random random.seed(42) random.shuffle(all_train_data) random.shuffle(all_val_data) random.shuffle(all_test_data) # Save combined datasets combined_datasets = { "train": all_train_data, "validation": all_val_data, "test": all_test_data, } for split_name, split_data in combined_datasets.items(): output_file = output_path / f"combined_{split_name}.jsonl" with open(output_file, "w", encoding="utf-8") as f: for item in split_data: f.write(json.dumps(item, ensure_ascii=False) + "\n") logger.info( f"Saved combined {split_name} dataset: {len(split_data)} samples" ) def generate_statistics(self, output_dir: str = "/Volumes/Mango_MAS/data/processed") -> Dict: """Generate statistics about the processed datasets.""" output_path = Path(output_dir) stats = {} for agent_type in list(self.agents_config.keys()) + ["combined"]: agent_stats = {} for split in ["train", "validation", "test"]: file_path = output_path / f"{agent_type}_{split}.jsonl" if file_path.exists(): data = self._load_jsonl(file_path) # Calculate statistics lengths = [] for item in data: if "messages" in item: # Calculate total text length total_length = sum( len(msg["content"]) for msg in item["messages"] ) lengths.append(total_length) agent_stats[split] = { "count": len(data), "avg_length": sum(lengths) / len(lengths) if lengths else 0, "min_length": min(lengths) if lengths else 0, "max_length": max(lengths) if lengths else 0, } stats[agent_type] = agent_stats # Save statistics stats_file = output_path / "dataset_statistics.json" with open(stats_file, "w") as f: json.dump(stats, f, indent=2) logger.info(f"Generated dataset statistics: {stats_file}") return stats def main(): parser = argparse.ArgumentParser( description="Process MangoMAS datasets for local training" ) parser.add_argument( "--input_dir", type=str, default="/Users/iancruickshank/Documents/Model/mangomas-datasets/agents/", help="Directory containing original JSONL files", ) parser.add_argument( "--output_dir", type=str, default="/Volumes/Mango_MAS/data/processed", help="Directory to save processed datasets", ) parser.add_argument( "--config", type=str, default="config/training/distillation.yaml", help="Path to configuration file", ) parser.add_argument( "--create_combined", action="store_true", help="Create combined multi-agent dataset", ) args = parser.parse_args() # Initialize processor processor = MangoMASDataProcessor(args.config) # Process datasets processor.process_datasets(args.input_dir, args.output_dir) # Create combined dataset if requested if args.create_combined: processor.create_combined_dataset(args.output_dir) # Generate statistics stats = processor.generate_statistics(args.output_dir) print("\nDataset Statistics:") print("=" * 50) for agent_type, agent_stats in stats.items(): print(f"\n{agent_type.upper()}:") for split, split_stats in agent_stats.items(): print( f" {split}: {split_stats['count']} samples, " f"avg length: {split_stats['avg_length']:.0f} chars" ) if __name__ == "__main__": main()