Auto-FineTune-Ops / agents /data_architect.py
aneeb15's picture
Initial release of Auto-FineTune-Ops
d4398e6
"""
DataArchitectAgent - Autonomous Data Preparation Agent
=======================================================
Takes raw CSV/JSON datasets and transforms them into high-quality
HuggingFace-ready JSONL format for fine-tuning.
"""
import json
import re
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Tuple
import pandas as pd
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
console = Console()
@dataclass
class DatasetAnalysis:
"""Analysis results for a dataset."""
total_rows: int
valid_rows: int
invalid_rows: int
duplicate_rows: int
detected_columns: Dict[str, str] # column_name -> detected_type
instruction_column: Optional[str] = None
input_column: Optional[str] = None
output_column: Optional[str] = None
quality_score: float = 0.0
issues: List[str] = field(default_factory=list)
@dataclass
class CleaningConfig:
"""Configuration for data cleaning."""
min_instruction_length: int = 10
max_instruction_length: int = 2048
min_response_length: int = 20
max_response_length: int = 4096
remove_duplicates: bool = True
remove_empty: bool = True
remove_special_chars: bool = False
quality_threshold: float = 0.7
class DataArchitectAgent:
"""
Autonomous agent for data preparation and cleaning.
This agent analyzes raw datasets, identifies instruction-response pairs,
cleans the data, and formats it for HuggingFace fine-tuning.
"""
# Common column name patterns for auto-detection
INSTRUCTION_PATTERNS = [
r'instruction', r'prompt', r'question', r'query', r'input_text',
r'human', r'user', r'request', r'ask', r'command'
]
INPUT_PATTERNS = [
r'context', r'input', r'background', r'reference', r'document'
]
OUTPUT_PATTERNS = [
r'output', r'response', r'answer', r'completion', r'reply',
r'assistant', r'bot', r'generated', r'target'
]
def __init__(self, config: Optional[CleaningConfig] = None):
"""Initialize the DataArchitectAgent."""
self.config = config or CleaningConfig()
self.analysis: Optional[DatasetAnalysis] = None
def load_dataset(self, path: str) -> pd.DataFrame:
"""
Load a dataset from CSV or JSON file.
Args:
path: Path to the dataset file
Returns:
Loaded DataFrame
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Dataset not found: {path}")
console.print(f"[bold blue]📂 Loading dataset:[/] {path}")
if path.suffix.lower() == '.csv':
df = pd.read_csv(path)
elif path.suffix.lower() in ['.json', '.jsonl']:
if path.suffix.lower() == '.jsonl':
df = pd.read_json(path, lines=True)
else:
df = pd.read_json(path)
else:
raise ValueError(f"Unsupported file format: {path.suffix}")
console.print(f"[green]✓ Loaded {len(df)} rows with {len(df.columns)} columns[/]")
return df
def _match_column_pattern(self, column: str, patterns: List[str]) -> bool:
"""Check if a column name matches any of the given patterns."""
column_lower = column.lower()
for pattern in patterns:
if re.search(pattern, column_lower):
return True
return False
def _detect_column_type(self, column: str) -> str:
"""Detect the type of a column based on its name."""
if self._match_column_pattern(column, self.INSTRUCTION_PATTERNS):
return 'instruction'
elif self._match_column_pattern(column, self.INPUT_PATTERNS):
return 'input'
elif self._match_column_pattern(column, self.OUTPUT_PATTERNS):
return 'output'
return 'unknown'
def analyze_dataset(self, df: pd.DataFrame) -> DatasetAnalysis:
"""
Analyze a dataset to understand its structure and quality.
Args:
df: Input DataFrame
Returns:
DatasetAnalysis with detected columns and quality metrics
"""
console.print("\n[bold blue]🔍 Analyzing dataset structure...[/]")
# Detect column types
detected_columns = {}
instruction_col = None
input_col = None
output_col = None
for col in df.columns:
col_type = self._detect_column_type(col)
detected_columns[col] = col_type
if col_type == 'instruction' and instruction_col is None:
instruction_col = col
elif col_type == 'input' and input_col is None:
input_col = col
elif col_type == 'output' and output_col is None:
output_col = col
# Count issues
issues = []
valid_rows = 0
invalid_rows = 0
# Check for required columns
if instruction_col is None:
issues.append("❌ No instruction/prompt column detected")
if output_col is None:
issues.append("❌ No output/response column detected")
# Analyze row validity
for _, row in df.iterrows():
is_valid = True
if instruction_col:
inst_val = str(row.get(instruction_col, ''))
if len(inst_val) < self.config.min_instruction_length:
is_valid = False
elif len(inst_val) > self.config.max_instruction_length:
is_valid = False
else:
is_valid = False
if output_col:
out_val = str(row.get(output_col, ''))
if len(out_val) < self.config.min_response_length:
is_valid = False
elif len(out_val) > self.config.max_response_length:
is_valid = False
else:
is_valid = False
if is_valid:
valid_rows += 1
else:
invalid_rows += 1
# Count duplicates
duplicate_rows = 0
if instruction_col:
duplicate_rows = df[instruction_col].duplicated().sum()
# Calculate quality score
quality_score = valid_rows / len(df) if len(df) > 0 else 0.0
self.analysis = DatasetAnalysis(
total_rows=len(df),
valid_rows=valid_rows,
invalid_rows=invalid_rows,
duplicate_rows=duplicate_rows,
detected_columns=detected_columns,
instruction_column=instruction_col,
input_column=input_col,
output_column=output_col,
quality_score=quality_score,
issues=issues
)
# Display analysis results
self._display_analysis()
return self.analysis
def _display_analysis(self):
"""Display the analysis results in a formatted table."""
if not self.analysis:
return
table = Table(title="Dataset Analysis", show_header=True)
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Rows", str(self.analysis.total_rows))
table.add_row("Valid Rows", str(self.analysis.valid_rows))
table.add_row("Invalid Rows", str(self.analysis.invalid_rows))
table.add_row("Duplicate Rows", str(self.analysis.duplicate_rows))
table.add_row("Quality Score", f"{self.analysis.quality_score:.2%}")
console.print(table)
# Show detected columns
console.print("\n[bold]Detected Column Mappings:[/]")
console.print(f" • Instruction: [cyan]{self.analysis.instruction_column or 'Not detected'}[/]")
console.print(f" • Input/Context: [cyan]{self.analysis.input_column or 'Not detected'}[/]")
console.print(f" • Output/Response: [cyan]{self.analysis.output_column or 'Not detected'}[/]")
if self.analysis.issues:
console.print("\n[bold red]Issues Found:[/]")
for issue in self.analysis.issues:
console.print(f" {issue}")
def clean_data(
self,
df: pd.DataFrame,
instruction_col: Optional[str] = None,
input_col: Optional[str] = None,
output_col: Optional[str] = None
) -> pd.DataFrame:
"""
Clean and validate the dataset.
Args:
df: Input DataFrame
instruction_col: Override instruction column name
input_col: Override input column name
output_col: Override output column name
Returns:
Cleaned DataFrame
"""
console.print("\n[bold blue]🧹 Cleaning dataset...[/]")
# Use detected columns if not specified
if self.analysis:
instruction_col = instruction_col or self.analysis.instruction_column
input_col = input_col or self.analysis.input_column
output_col = output_col or self.analysis.output_column
if not instruction_col or not output_col:
raise ValueError("Instruction and output columns are required")
df_clean = df.copy()
original_count = len(df_clean)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console
) as progress:
# Remove empty values
task = progress.add_task("Removing empty values...", total=None)
df_clean = df_clean.dropna(subset=[instruction_col, output_col])
progress.update(task, completed=True)
# Remove duplicates
if self.config.remove_duplicates:
task = progress.add_task("Removing duplicates...", total=None)
df_clean = df_clean.drop_duplicates(subset=[instruction_col])
progress.update(task, completed=True)
# Filter by length constraints
task = progress.add_task("Applying length filters...", total=None)
# Instruction length filter
df_clean = df_clean[
df_clean[instruction_col].str.len() >= self.config.min_instruction_length
]
df_clean = df_clean[
df_clean[instruction_col].str.len() <= self.config.max_instruction_length
]
# Response length filter
df_clean = df_clean[
df_clean[output_col].str.len() >= self.config.min_response_length
]
df_clean = df_clean[
df_clean[output_col].str.len() <= self.config.max_response_length
]
progress.update(task, completed=True)
# Clean text
task = progress.add_task("Cleaning text...", total=None)
df_clean[instruction_col] = df_clean[instruction_col].str.strip()
df_clean[output_col] = df_clean[output_col].str.strip()
if input_col and input_col in df_clean.columns:
df_clean[input_col] = df_clean[input_col].fillna('').str.strip()
progress.update(task, completed=True)
removed_count = original_count - len(df_clean)
console.print(f"[green]✓ Cleaned dataset: {len(df_clean)} rows remaining ({removed_count} removed)[/]")
return df_clean
def format_for_training(
self,
df: pd.DataFrame,
goal: str,
output_path: str,
instruction_col: Optional[str] = None,
input_col: Optional[str] = None,
output_col: Optional[str] = None
) -> str:
"""
Format the dataset into HuggingFace-ready JSONL.
Args:
df: Cleaned DataFrame
goal: Training goal/purpose (e.g., 'medical_assistant')
output_path: Path to save the JSONL file
instruction_col: Instruction column name
input_col: Input/context column name
output_col: Output/response column name
Returns:
Path to the created JSONL file
"""
console.print(f"\n[bold blue]📝 Formatting for training goal: [cyan]{goal}[/][/]")
# Use detected columns if not specified
if self.analysis:
instruction_col = instruction_col or self.analysis.instruction_column
input_col = input_col or self.analysis.input_column
output_col = output_col or self.analysis.output_column
if not instruction_col or not output_col:
raise ValueError("Instruction and output columns are required")
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Create system prompt based on goal
system_prompt = self._generate_system_prompt(goal)
formatted_data = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console
) as progress:
task = progress.add_task("Formatting entries...", total=len(df))
for _, row in df.iterrows():
instruction = str(row[instruction_col])
output = str(row[output_col])
context = str(row.get(input_col, '')) if input_col and input_col in df.columns else ''
# Format as Alpaca-style instruction format
entry = {
"instruction": instruction,
"input": context,
"output": output,
"system": system_prompt
}
# Also create chat format for compatibility
entry["conversations"] = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": instruction + (f"\n\nContext: {context}" if context else "")},
{"role": "assistant", "content": output}
]
formatted_data.append(entry)
progress.advance(task)
# Write JSONL
with open(output_path, 'w', encoding='utf-8') as f:
for entry in formatted_data:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
console.print(f"[green]✓ Created training file: {output_path}[/]")
console.print(f" • Total samples: {len(formatted_data)}")
console.print(f" • Format: JSONL (Alpaca-style + Chat format)")
return str(output_path)
def _generate_system_prompt(self, goal: str) -> str:
"""Generate a system prompt based on the training goal."""
goal_lower = goal.lower().replace('_', ' ').replace('-', ' ')
# Common goal templates
templates = {
'medical': "You are a knowledgeable medical assistant. Provide accurate, helpful medical information while always recommending users consult healthcare professionals for specific medical advice.",
'legal': "You are a legal information assistant. Provide helpful legal information while noting that you are not a lawyer and users should consult legal professionals for specific legal advice.",
'coding': "You are an expert programming assistant. Help users write clean, efficient, and well-documented code. Explain your solutions clearly.",
'customer': "You are a helpful customer service assistant. Be polite, professional, and focused on solving customer issues efficiently.",
'education': "You are an educational assistant. Explain concepts clearly and adapt your explanations to the user's level of understanding.",
'writing': "You are a skilled writing assistant. Help users improve their writing with clear, constructive feedback and suggestions.",
'assistant': "You are a helpful AI assistant. Provide accurate, useful responses while being conversational and engaging."
}
# Find matching template
for key, prompt in templates.items():
if key in goal_lower:
return prompt
# Default template
return f"You are a specialized AI assistant for {goal}. Provide helpful, accurate, and relevant responses to user queries."
def process(
self,
input_path: str,
output_path: str,
goal: str,
instruction_col: Optional[str] = None,
input_col: Optional[str] = None,
output_col: Optional[str] = None
) -> Tuple[str, DatasetAnalysis]:
"""
Complete end-to-end processing pipeline.
Args:
input_path: Path to input dataset
output_path: Path for output JSONL
goal: Training goal
instruction_col: Override instruction column
input_col: Override input column
output_col: Override output column
Returns:
Tuple of (output_path, analysis)
"""
console.print("\n" + "="*60)
console.print("[bold magenta]🏗️ DATA ARCHITECT AGENT[/]")
console.print("="*60)
# Load
df = self.load_dataset(input_path)
# Analyze
analysis = self.analyze_dataset(df)
# Check quality
if analysis.quality_score < self.config.quality_threshold:
console.print(f"[yellow]⚠️ Warning: Quality score ({analysis.quality_score:.2%}) below threshold ({self.config.quality_threshold:.2%})[/]")
# Clean
df_clean = self.clean_data(
df,
instruction_col=instruction_col or analysis.instruction_column,
input_col=input_col or analysis.input_column,
output_col=output_col or analysis.output_column
)
# Format
final_path = self.format_for_training(
df_clean,
goal=goal,
output_path=output_path,
instruction_col=instruction_col or analysis.instruction_column,
input_col=input_col or analysis.input_column,
output_col=output_col or analysis.output_column
)
console.print("\n[bold green]✅ Data preparation complete![/]")
return final_path, analysis
if __name__ == "__main__":
# Example usage
import sys
if len(sys.argv) < 3:
print("Usage: python data_architect.py <input_file> <goal>")
sys.exit(1)
input_file = sys.argv[1]
goal = sys.argv[2]
output_file = f"./output/processed_data/{goal}_training.jsonl"
agent = DataArchitectAgent()
agent.process(input_file, output_file, goal)