Spaces:
Configuration error
Configuration error
| """ | |
| DataArchitectAgent - Autonomous Data Preparation Agent | |
| ======================================================= | |
| Takes raw CSV/JSON datasets and transforms them into high-quality | |
| HuggingFace-ready JSONL format for fine-tuning. | |
| """ | |
| import json | |
| import re | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from typing import Optional, List, Dict, Any, Tuple | |
| import pandas as pd | |
| from rich.console import Console | |
| from rich.progress import Progress, SpinnerColumn, TextColumn | |
| from rich.table import Table | |
| console = Console() | |
| class DatasetAnalysis: | |
| """Analysis results for a dataset.""" | |
| total_rows: int | |
| valid_rows: int | |
| invalid_rows: int | |
| duplicate_rows: int | |
| detected_columns: Dict[str, str] # column_name -> detected_type | |
| instruction_column: Optional[str] = None | |
| input_column: Optional[str] = None | |
| output_column: Optional[str] = None | |
| quality_score: float = 0.0 | |
| issues: List[str] = field(default_factory=list) | |
| class CleaningConfig: | |
| """Configuration for data cleaning.""" | |
| min_instruction_length: int = 10 | |
| max_instruction_length: int = 2048 | |
| min_response_length: int = 20 | |
| max_response_length: int = 4096 | |
| remove_duplicates: bool = True | |
| remove_empty: bool = True | |
| remove_special_chars: bool = False | |
| quality_threshold: float = 0.7 | |
| class DataArchitectAgent: | |
| """ | |
| Autonomous agent for data preparation and cleaning. | |
| This agent analyzes raw datasets, identifies instruction-response pairs, | |
| cleans the data, and formats it for HuggingFace fine-tuning. | |
| """ | |
| # Common column name patterns for auto-detection | |
| INSTRUCTION_PATTERNS = [ | |
| r'instruction', r'prompt', r'question', r'query', r'input_text', | |
| r'human', r'user', r'request', r'ask', r'command' | |
| ] | |
| INPUT_PATTERNS = [ | |
| r'context', r'input', r'background', r'reference', r'document' | |
| ] | |
| OUTPUT_PATTERNS = [ | |
| r'output', r'response', r'answer', r'completion', r'reply', | |
| r'assistant', r'bot', r'generated', r'target' | |
| ] | |
| def __init__(self, config: Optional[CleaningConfig] = None): | |
| """Initialize the DataArchitectAgent.""" | |
| self.config = config or CleaningConfig() | |
| self.analysis: Optional[DatasetAnalysis] = None | |
| def load_dataset(self, path: str) -> pd.DataFrame: | |
| """ | |
| Load a dataset from CSV or JSON file. | |
| Args: | |
| path: Path to the dataset file | |
| Returns: | |
| Loaded DataFrame | |
| """ | |
| path = Path(path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Dataset not found: {path}") | |
| console.print(f"[bold blue]📂 Loading dataset:[/] {path}") | |
| if path.suffix.lower() == '.csv': | |
| df = pd.read_csv(path) | |
| elif path.suffix.lower() in ['.json', '.jsonl']: | |
| if path.suffix.lower() == '.jsonl': | |
| df = pd.read_json(path, lines=True) | |
| else: | |
| df = pd.read_json(path) | |
| else: | |
| raise ValueError(f"Unsupported file format: {path.suffix}") | |
| console.print(f"[green]✓ Loaded {len(df)} rows with {len(df.columns)} columns[/]") | |
| return df | |
| def _match_column_pattern(self, column: str, patterns: List[str]) -> bool: | |
| """Check if a column name matches any of the given patterns.""" | |
| column_lower = column.lower() | |
| for pattern in patterns: | |
| if re.search(pattern, column_lower): | |
| return True | |
| return False | |
| def _detect_column_type(self, column: str) -> str: | |
| """Detect the type of a column based on its name.""" | |
| if self._match_column_pattern(column, self.INSTRUCTION_PATTERNS): | |
| return 'instruction' | |
| elif self._match_column_pattern(column, self.INPUT_PATTERNS): | |
| return 'input' | |
| elif self._match_column_pattern(column, self.OUTPUT_PATTERNS): | |
| return 'output' | |
| return 'unknown' | |
| def analyze_dataset(self, df: pd.DataFrame) -> DatasetAnalysis: | |
| """ | |
| Analyze a dataset to understand its structure and quality. | |
| Args: | |
| df: Input DataFrame | |
| Returns: | |
| DatasetAnalysis with detected columns and quality metrics | |
| """ | |
| console.print("\n[bold blue]🔍 Analyzing dataset structure...[/]") | |
| # Detect column types | |
| detected_columns = {} | |
| instruction_col = None | |
| input_col = None | |
| output_col = None | |
| for col in df.columns: | |
| col_type = self._detect_column_type(col) | |
| detected_columns[col] = col_type | |
| if col_type == 'instruction' and instruction_col is None: | |
| instruction_col = col | |
| elif col_type == 'input' and input_col is None: | |
| input_col = col | |
| elif col_type == 'output' and output_col is None: | |
| output_col = col | |
| # Count issues | |
| issues = [] | |
| valid_rows = 0 | |
| invalid_rows = 0 | |
| # Check for required columns | |
| if instruction_col is None: | |
| issues.append("❌ No instruction/prompt column detected") | |
| if output_col is None: | |
| issues.append("❌ No output/response column detected") | |
| # Analyze row validity | |
| for _, row in df.iterrows(): | |
| is_valid = True | |
| if instruction_col: | |
| inst_val = str(row.get(instruction_col, '')) | |
| if len(inst_val) < self.config.min_instruction_length: | |
| is_valid = False | |
| elif len(inst_val) > self.config.max_instruction_length: | |
| is_valid = False | |
| else: | |
| is_valid = False | |
| if output_col: | |
| out_val = str(row.get(output_col, '')) | |
| if len(out_val) < self.config.min_response_length: | |
| is_valid = False | |
| elif len(out_val) > self.config.max_response_length: | |
| is_valid = False | |
| else: | |
| is_valid = False | |
| if is_valid: | |
| valid_rows += 1 | |
| else: | |
| invalid_rows += 1 | |
| # Count duplicates | |
| duplicate_rows = 0 | |
| if instruction_col: | |
| duplicate_rows = df[instruction_col].duplicated().sum() | |
| # Calculate quality score | |
| quality_score = valid_rows / len(df) if len(df) > 0 else 0.0 | |
| self.analysis = DatasetAnalysis( | |
| total_rows=len(df), | |
| valid_rows=valid_rows, | |
| invalid_rows=invalid_rows, | |
| duplicate_rows=duplicate_rows, | |
| detected_columns=detected_columns, | |
| instruction_column=instruction_col, | |
| input_column=input_col, | |
| output_column=output_col, | |
| quality_score=quality_score, | |
| issues=issues | |
| ) | |
| # Display analysis results | |
| self._display_analysis() | |
| return self.analysis | |
| def _display_analysis(self): | |
| """Display the analysis results in a formatted table.""" | |
| if not self.analysis: | |
| return | |
| table = Table(title="Dataset Analysis", show_header=True) | |
| table.add_column("Metric", style="cyan") | |
| table.add_column("Value", style="green") | |
| table.add_row("Total Rows", str(self.analysis.total_rows)) | |
| table.add_row("Valid Rows", str(self.analysis.valid_rows)) | |
| table.add_row("Invalid Rows", str(self.analysis.invalid_rows)) | |
| table.add_row("Duplicate Rows", str(self.analysis.duplicate_rows)) | |
| table.add_row("Quality Score", f"{self.analysis.quality_score:.2%}") | |
| console.print(table) | |
| # Show detected columns | |
| console.print("\n[bold]Detected Column Mappings:[/]") | |
| console.print(f" • Instruction: [cyan]{self.analysis.instruction_column or 'Not detected'}[/]") | |
| console.print(f" • Input/Context: [cyan]{self.analysis.input_column or 'Not detected'}[/]") | |
| console.print(f" • Output/Response: [cyan]{self.analysis.output_column or 'Not detected'}[/]") | |
| if self.analysis.issues: | |
| console.print("\n[bold red]Issues Found:[/]") | |
| for issue in self.analysis.issues: | |
| console.print(f" {issue}") | |
| def clean_data( | |
| self, | |
| df: pd.DataFrame, | |
| instruction_col: Optional[str] = None, | |
| input_col: Optional[str] = None, | |
| output_col: Optional[str] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Clean and validate the dataset. | |
| Args: | |
| df: Input DataFrame | |
| instruction_col: Override instruction column name | |
| input_col: Override input column name | |
| output_col: Override output column name | |
| Returns: | |
| Cleaned DataFrame | |
| """ | |
| console.print("\n[bold blue]🧹 Cleaning dataset...[/]") | |
| # Use detected columns if not specified | |
| if self.analysis: | |
| instruction_col = instruction_col or self.analysis.instruction_column | |
| input_col = input_col or self.analysis.input_column | |
| output_col = output_col or self.analysis.output_column | |
| if not instruction_col or not output_col: | |
| raise ValueError("Instruction and output columns are required") | |
| df_clean = df.copy() | |
| original_count = len(df_clean) | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| console=console | |
| ) as progress: | |
| # Remove empty values | |
| task = progress.add_task("Removing empty values...", total=None) | |
| df_clean = df_clean.dropna(subset=[instruction_col, output_col]) | |
| progress.update(task, completed=True) | |
| # Remove duplicates | |
| if self.config.remove_duplicates: | |
| task = progress.add_task("Removing duplicates...", total=None) | |
| df_clean = df_clean.drop_duplicates(subset=[instruction_col]) | |
| progress.update(task, completed=True) | |
| # Filter by length constraints | |
| task = progress.add_task("Applying length filters...", total=None) | |
| # Instruction length filter | |
| df_clean = df_clean[ | |
| df_clean[instruction_col].str.len() >= self.config.min_instruction_length | |
| ] | |
| df_clean = df_clean[ | |
| df_clean[instruction_col].str.len() <= self.config.max_instruction_length | |
| ] | |
| # Response length filter | |
| df_clean = df_clean[ | |
| df_clean[output_col].str.len() >= self.config.min_response_length | |
| ] | |
| df_clean = df_clean[ | |
| df_clean[output_col].str.len() <= self.config.max_response_length | |
| ] | |
| progress.update(task, completed=True) | |
| # Clean text | |
| task = progress.add_task("Cleaning text...", total=None) | |
| df_clean[instruction_col] = df_clean[instruction_col].str.strip() | |
| df_clean[output_col] = df_clean[output_col].str.strip() | |
| if input_col and input_col in df_clean.columns: | |
| df_clean[input_col] = df_clean[input_col].fillna('').str.strip() | |
| progress.update(task, completed=True) | |
| removed_count = original_count - len(df_clean) | |
| console.print(f"[green]✓ Cleaned dataset: {len(df_clean)} rows remaining ({removed_count} removed)[/]") | |
| return df_clean | |
| def format_for_training( | |
| self, | |
| df: pd.DataFrame, | |
| goal: str, | |
| output_path: str, | |
| instruction_col: Optional[str] = None, | |
| input_col: Optional[str] = None, | |
| output_col: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Format the dataset into HuggingFace-ready JSONL. | |
| Args: | |
| df: Cleaned DataFrame | |
| goal: Training goal/purpose (e.g., 'medical_assistant') | |
| output_path: Path to save the JSONL file | |
| instruction_col: Instruction column name | |
| input_col: Input/context column name | |
| output_col: Output/response column name | |
| Returns: | |
| Path to the created JSONL file | |
| """ | |
| console.print(f"\n[bold blue]📝 Formatting for training goal: [cyan]{goal}[/][/]") | |
| # Use detected columns if not specified | |
| if self.analysis: | |
| instruction_col = instruction_col or self.analysis.instruction_column | |
| input_col = input_col or self.analysis.input_column | |
| output_col = output_col or self.analysis.output_column | |
| if not instruction_col or not output_col: | |
| raise ValueError("Instruction and output columns are required") | |
| output_path = Path(output_path) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Create system prompt based on goal | |
| system_prompt = self._generate_system_prompt(goal) | |
| formatted_data = [] | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task("Formatting entries...", total=len(df)) | |
| for _, row in df.iterrows(): | |
| instruction = str(row[instruction_col]) | |
| output = str(row[output_col]) | |
| context = str(row.get(input_col, '')) if input_col and input_col in df.columns else '' | |
| # Format as Alpaca-style instruction format | |
| entry = { | |
| "instruction": instruction, | |
| "input": context, | |
| "output": output, | |
| "system": system_prompt | |
| } | |
| # Also create chat format for compatibility | |
| entry["conversations"] = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": instruction + (f"\n\nContext: {context}" if context else "")}, | |
| {"role": "assistant", "content": output} | |
| ] | |
| formatted_data.append(entry) | |
| progress.advance(task) | |
| # Write JSONL | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| for entry in formatted_data: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| console.print(f"[green]✓ Created training file: {output_path}[/]") | |
| console.print(f" • Total samples: {len(formatted_data)}") | |
| console.print(f" • Format: JSONL (Alpaca-style + Chat format)") | |
| return str(output_path) | |
| def _generate_system_prompt(self, goal: str) -> str: | |
| """Generate a system prompt based on the training goal.""" | |
| goal_lower = goal.lower().replace('_', ' ').replace('-', ' ') | |
| # Common goal templates | |
| templates = { | |
| 'medical': "You are a knowledgeable medical assistant. Provide accurate, helpful medical information while always recommending users consult healthcare professionals for specific medical advice.", | |
| 'legal': "You are a legal information assistant. Provide helpful legal information while noting that you are not a lawyer and users should consult legal professionals for specific legal advice.", | |
| 'coding': "You are an expert programming assistant. Help users write clean, efficient, and well-documented code. Explain your solutions clearly.", | |
| 'customer': "You are a helpful customer service assistant. Be polite, professional, and focused on solving customer issues efficiently.", | |
| 'education': "You are an educational assistant. Explain concepts clearly and adapt your explanations to the user's level of understanding.", | |
| 'writing': "You are a skilled writing assistant. Help users improve their writing with clear, constructive feedback and suggestions.", | |
| 'assistant': "You are a helpful AI assistant. Provide accurate, useful responses while being conversational and engaging." | |
| } | |
| # Find matching template | |
| for key, prompt in templates.items(): | |
| if key in goal_lower: | |
| return prompt | |
| # Default template | |
| return f"You are a specialized AI assistant for {goal}. Provide helpful, accurate, and relevant responses to user queries." | |
| def process( | |
| self, | |
| input_path: str, | |
| output_path: str, | |
| goal: str, | |
| instruction_col: Optional[str] = None, | |
| input_col: Optional[str] = None, | |
| output_col: Optional[str] = None | |
| ) -> Tuple[str, DatasetAnalysis]: | |
| """ | |
| Complete end-to-end processing pipeline. | |
| Args: | |
| input_path: Path to input dataset | |
| output_path: Path for output JSONL | |
| goal: Training goal | |
| instruction_col: Override instruction column | |
| input_col: Override input column | |
| output_col: Override output column | |
| Returns: | |
| Tuple of (output_path, analysis) | |
| """ | |
| console.print("\n" + "="*60) | |
| console.print("[bold magenta]🏗️ DATA ARCHITECT AGENT[/]") | |
| console.print("="*60) | |
| # Load | |
| df = self.load_dataset(input_path) | |
| # Analyze | |
| analysis = self.analyze_dataset(df) | |
| # Check quality | |
| if analysis.quality_score < self.config.quality_threshold: | |
| console.print(f"[yellow]⚠️ Warning: Quality score ({analysis.quality_score:.2%}) below threshold ({self.config.quality_threshold:.2%})[/]") | |
| # Clean | |
| df_clean = self.clean_data( | |
| df, | |
| instruction_col=instruction_col or analysis.instruction_column, | |
| input_col=input_col or analysis.input_column, | |
| output_col=output_col or analysis.output_column | |
| ) | |
| # Format | |
| final_path = self.format_for_training( | |
| df_clean, | |
| goal=goal, | |
| output_path=output_path, | |
| instruction_col=instruction_col or analysis.instruction_column, | |
| input_col=input_col or analysis.input_column, | |
| output_col=output_col or analysis.output_column | |
| ) | |
| console.print("\n[bold green]✅ Data preparation complete![/]") | |
| return final_path, analysis | |
| if __name__ == "__main__": | |
| # Example usage | |
| import sys | |
| if len(sys.argv) < 3: | |
| print("Usage: python data_architect.py <input_file> <goal>") | |
| sys.exit(1) | |
| input_file = sys.argv[1] | |
| goal = sys.argv[2] | |
| output_file = f"./output/processed_data/{goal}_training.jsonl" | |
| agent = DataArchitectAgent() | |
| agent.process(input_file, output_file, goal) | |