""" DataArchitectAgent - Autonomous Data Preparation Agent ======================================================= Takes raw CSV/JSON datasets and transforms them into high-quality HuggingFace-ready JSONL format for fine-tuning. """ import json import re from pathlib import Path from dataclasses import dataclass, field from typing import Optional, List, Dict, Any, Tuple import pandas as pd from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table console = Console() @dataclass class DatasetAnalysis: """Analysis results for a dataset.""" total_rows: int valid_rows: int invalid_rows: int duplicate_rows: int detected_columns: Dict[str, str] # column_name -> detected_type instruction_column: Optional[str] = None input_column: Optional[str] = None output_column: Optional[str] = None quality_score: float = 0.0 issues: List[str] = field(default_factory=list) @dataclass class CleaningConfig: """Configuration for data cleaning.""" min_instruction_length: int = 10 max_instruction_length: int = 2048 min_response_length: int = 20 max_response_length: int = 4096 remove_duplicates: bool = True remove_empty: bool = True remove_special_chars: bool = False quality_threshold: float = 0.7 class DataArchitectAgent: """ Autonomous agent for data preparation and cleaning. This agent analyzes raw datasets, identifies instruction-response pairs, cleans the data, and formats it for HuggingFace fine-tuning. """ # Common column name patterns for auto-detection INSTRUCTION_PATTERNS = [ r'instruction', r'prompt', r'question', r'query', r'input_text', r'human', r'user', r'request', r'ask', r'command' ] INPUT_PATTERNS = [ r'context', r'input', r'background', r'reference', r'document' ] OUTPUT_PATTERNS = [ r'output', r'response', r'answer', r'completion', r'reply', r'assistant', r'bot', r'generated', r'target' ] def __init__(self, config: Optional[CleaningConfig] = None): """Initialize the DataArchitectAgent.""" self.config = config or CleaningConfig() self.analysis: Optional[DatasetAnalysis] = None def load_dataset(self, path: str) -> pd.DataFrame: """ Load a dataset from CSV or JSON file. Args: path: Path to the dataset file Returns: Loaded DataFrame """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"Dataset not found: {path}") console.print(f"[bold blue]๐Ÿ“‚ Loading dataset:[/] {path}") if path.suffix.lower() == '.csv': df = pd.read_csv(path) elif path.suffix.lower() in ['.json', '.jsonl']: if path.suffix.lower() == '.jsonl': df = pd.read_json(path, lines=True) else: df = pd.read_json(path) else: raise ValueError(f"Unsupported file format: {path.suffix}") console.print(f"[green]โœ“ Loaded {len(df)} rows with {len(df.columns)} columns[/]") return df def _match_column_pattern(self, column: str, patterns: List[str]) -> bool: """Check if a column name matches any of the given patterns.""" column_lower = column.lower() for pattern in patterns: if re.search(pattern, column_lower): return True return False def _detect_column_type(self, column: str) -> str: """Detect the type of a column based on its name.""" if self._match_column_pattern(column, self.INSTRUCTION_PATTERNS): return 'instruction' elif self._match_column_pattern(column, self.INPUT_PATTERNS): return 'input' elif self._match_column_pattern(column, self.OUTPUT_PATTERNS): return 'output' return 'unknown' def analyze_dataset(self, df: pd.DataFrame) -> DatasetAnalysis: """ Analyze a dataset to understand its structure and quality. Args: df: Input DataFrame Returns: DatasetAnalysis with detected columns and quality metrics """ console.print("\n[bold blue]๐Ÿ” Analyzing dataset structure...[/]") # Detect column types detected_columns = {} instruction_col = None input_col = None output_col = None for col in df.columns: col_type = self._detect_column_type(col) detected_columns[col] = col_type if col_type == 'instruction' and instruction_col is None: instruction_col = col elif col_type == 'input' and input_col is None: input_col = col elif col_type == 'output' and output_col is None: output_col = col # Count issues issues = [] valid_rows = 0 invalid_rows = 0 # Check for required columns if instruction_col is None: issues.append("โŒ No instruction/prompt column detected") if output_col is None: issues.append("โŒ No output/response column detected") # Analyze row validity for _, row in df.iterrows(): is_valid = True if instruction_col: inst_val = str(row.get(instruction_col, '')) if len(inst_val) < self.config.min_instruction_length: is_valid = False elif len(inst_val) > self.config.max_instruction_length: is_valid = False else: is_valid = False if output_col: out_val = str(row.get(output_col, '')) if len(out_val) < self.config.min_response_length: is_valid = False elif len(out_val) > self.config.max_response_length: is_valid = False else: is_valid = False if is_valid: valid_rows += 1 else: invalid_rows += 1 # Count duplicates duplicate_rows = 0 if instruction_col: duplicate_rows = df[instruction_col].duplicated().sum() # Calculate quality score quality_score = valid_rows / len(df) if len(df) > 0 else 0.0 self.analysis = DatasetAnalysis( total_rows=len(df), valid_rows=valid_rows, invalid_rows=invalid_rows, duplicate_rows=duplicate_rows, detected_columns=detected_columns, instruction_column=instruction_col, input_column=input_col, output_column=output_col, quality_score=quality_score, issues=issues ) # Display analysis results self._display_analysis() return self.analysis def _display_analysis(self): """Display the analysis results in a formatted table.""" if not self.analysis: return table = Table(title="Dataset Analysis", show_header=True) table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Total Rows", str(self.analysis.total_rows)) table.add_row("Valid Rows", str(self.analysis.valid_rows)) table.add_row("Invalid Rows", str(self.analysis.invalid_rows)) table.add_row("Duplicate Rows", str(self.analysis.duplicate_rows)) table.add_row("Quality Score", f"{self.analysis.quality_score:.2%}") console.print(table) # Show detected columns console.print("\n[bold]Detected Column Mappings:[/]") console.print(f" โ€ข Instruction: [cyan]{self.analysis.instruction_column or 'Not detected'}[/]") console.print(f" โ€ข Input/Context: [cyan]{self.analysis.input_column or 'Not detected'}[/]") console.print(f" โ€ข Output/Response: [cyan]{self.analysis.output_column or 'Not detected'}[/]") if self.analysis.issues: console.print("\n[bold red]Issues Found:[/]") for issue in self.analysis.issues: console.print(f" {issue}") def clean_data( self, df: pd.DataFrame, instruction_col: Optional[str] = None, input_col: Optional[str] = None, output_col: Optional[str] = None ) -> pd.DataFrame: """ Clean and validate the dataset. Args: df: Input DataFrame instruction_col: Override instruction column name input_col: Override input column name output_col: Override output column name Returns: Cleaned DataFrame """ console.print("\n[bold blue]๐Ÿงน Cleaning dataset...[/]") # Use detected columns if not specified if self.analysis: instruction_col = instruction_col or self.analysis.instruction_column input_col = input_col or self.analysis.input_column output_col = output_col or self.analysis.output_column if not instruction_col or not output_col: raise ValueError("Instruction and output columns are required") df_clean = df.copy() original_count = len(df_clean) with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console ) as progress: # Remove empty values task = progress.add_task("Removing empty values...", total=None) df_clean = df_clean.dropna(subset=[instruction_col, output_col]) progress.update(task, completed=True) # Remove duplicates if self.config.remove_duplicates: task = progress.add_task("Removing duplicates...", total=None) df_clean = df_clean.drop_duplicates(subset=[instruction_col]) progress.update(task, completed=True) # Filter by length constraints task = progress.add_task("Applying length filters...", total=None) # Instruction length filter df_clean = df_clean[ df_clean[instruction_col].str.len() >= self.config.min_instruction_length ] df_clean = df_clean[ df_clean[instruction_col].str.len() <= self.config.max_instruction_length ] # Response length filter df_clean = df_clean[ df_clean[output_col].str.len() >= self.config.min_response_length ] df_clean = df_clean[ df_clean[output_col].str.len() <= self.config.max_response_length ] progress.update(task, completed=True) # Clean text task = progress.add_task("Cleaning text...", total=None) df_clean[instruction_col] = df_clean[instruction_col].str.strip() df_clean[output_col] = df_clean[output_col].str.strip() if input_col and input_col in df_clean.columns: df_clean[input_col] = df_clean[input_col].fillna('').str.strip() progress.update(task, completed=True) removed_count = original_count - len(df_clean) console.print(f"[green]โœ“ Cleaned dataset: {len(df_clean)} rows remaining ({removed_count} removed)[/]") return df_clean def format_for_training( self, df: pd.DataFrame, goal: str, output_path: str, instruction_col: Optional[str] = None, input_col: Optional[str] = None, output_col: Optional[str] = None ) -> str: """ Format the dataset into HuggingFace-ready JSONL. Args: df: Cleaned DataFrame goal: Training goal/purpose (e.g., 'medical_assistant') output_path: Path to save the JSONL file instruction_col: Instruction column name input_col: Input/context column name output_col: Output/response column name Returns: Path to the created JSONL file """ console.print(f"\n[bold blue]๐Ÿ“ Formatting for training goal: [cyan]{goal}[/][/]") # Use detected columns if not specified if self.analysis: instruction_col = instruction_col or self.analysis.instruction_column input_col = input_col or self.analysis.input_column output_col = output_col or self.analysis.output_column if not instruction_col or not output_col: raise ValueError("Instruction and output columns are required") output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # Create system prompt based on goal system_prompt = self._generate_system_prompt(goal) formatted_data = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console ) as progress: task = progress.add_task("Formatting entries...", total=len(df)) for _, row in df.iterrows(): instruction = str(row[instruction_col]) output = str(row[output_col]) context = str(row.get(input_col, '')) if input_col and input_col in df.columns else '' # Format as Alpaca-style instruction format entry = { "instruction": instruction, "input": context, "output": output, "system": system_prompt } # Also create chat format for compatibility entry["conversations"] = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": instruction + (f"\n\nContext: {context}" if context else "")}, {"role": "assistant", "content": output} ] formatted_data.append(entry) progress.advance(task) # Write JSONL with open(output_path, 'w', encoding='utf-8') as f: for entry in formatted_data: f.write(json.dumps(entry, ensure_ascii=False) + '\n') console.print(f"[green]โœ“ Created training file: {output_path}[/]") console.print(f" โ€ข Total samples: {len(formatted_data)}") console.print(f" โ€ข Format: JSONL (Alpaca-style + Chat format)") return str(output_path) def _generate_system_prompt(self, goal: str) -> str: """Generate a system prompt based on the training goal.""" goal_lower = goal.lower().replace('_', ' ').replace('-', ' ') # Common goal templates templates = { 'medical': "You are a knowledgeable medical assistant. Provide accurate, helpful medical information while always recommending users consult healthcare professionals for specific medical advice.", 'legal': "You are a legal information assistant. Provide helpful legal information while noting that you are not a lawyer and users should consult legal professionals for specific legal advice.", 'coding': "You are an expert programming assistant. Help users write clean, efficient, and well-documented code. Explain your solutions clearly.", 'customer': "You are a helpful customer service assistant. Be polite, professional, and focused on solving customer issues efficiently.", 'education': "You are an educational assistant. Explain concepts clearly and adapt your explanations to the user's level of understanding.", 'writing': "You are a skilled writing assistant. Help users improve their writing with clear, constructive feedback and suggestions.", 'assistant': "You are a helpful AI assistant. Provide accurate, useful responses while being conversational and engaging." } # Find matching template for key, prompt in templates.items(): if key in goal_lower: return prompt # Default template return f"You are a specialized AI assistant for {goal}. Provide helpful, accurate, and relevant responses to user queries." def process( self, input_path: str, output_path: str, goal: str, instruction_col: Optional[str] = None, input_col: Optional[str] = None, output_col: Optional[str] = None ) -> Tuple[str, DatasetAnalysis]: """ Complete end-to-end processing pipeline. Args: input_path: Path to input dataset output_path: Path for output JSONL goal: Training goal instruction_col: Override instruction column input_col: Override input column output_col: Override output column Returns: Tuple of (output_path, analysis) """ console.print("\n" + "="*60) console.print("[bold magenta]๐Ÿ—๏ธ DATA ARCHITECT AGENT[/]") console.print("="*60) # Load df = self.load_dataset(input_path) # Analyze analysis = self.analyze_dataset(df) # Check quality if analysis.quality_score < self.config.quality_threshold: console.print(f"[yellow]โš ๏ธ Warning: Quality score ({analysis.quality_score:.2%}) below threshold ({self.config.quality_threshold:.2%})[/]") # Clean df_clean = self.clean_data( df, instruction_col=instruction_col or analysis.instruction_column, input_col=input_col or analysis.input_column, output_col=output_col or analysis.output_column ) # Format final_path = self.format_for_training( df_clean, goal=goal, output_path=output_path, instruction_col=instruction_col or analysis.instruction_column, input_col=input_col or analysis.input_column, output_col=output_col or analysis.output_column ) console.print("\n[bold green]โœ… Data preparation complete![/]") return final_path, analysis if __name__ == "__main__": # Example usage import sys if len(sys.argv) < 3: print("Usage: python data_architect.py ") sys.exit(1) input_file = sys.argv[1] goal = sys.argv[2] output_file = f"./output/processed_data/{goal}_training.jsonl" agent = DataArchitectAgent() agent.process(input_file, output_file, goal)