#!/usr/bin/env python3 """ Dataset Builder Creates and manages finetuning datasets from legislation analysis results. Handles data formatting, validation, and export in multiple formats. """ import os import json import time from typing import List, Dict, Any, Optional, Tuple from pathlib import Path import pandas as pd from datetime import datetime import uuid class DatasetBuilder: """Builder for creating finetuning datasets from legislation analysis""" def __init__(self, output_dir: str = "datasets"): """ Initialize the dataset builder Args: output_dir: Directory to save datasets """ self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) # Dataset metadata self.metadata = { 'version': '1.0', 'created_at': datetime.now().isoformat(), 'total_entries': 0, 'analysis_types': set(), 'legislation_sources': set(), 'quality_metrics': {} } def create_finetuning_dataset(self, analysis_results: List[Dict[str, Any]], dataset_name: str = None, include_metadata: bool = True) -> Dict[str, Any]: """ Create a finetuning dataset from analysis results Args: analysis_results: List of analysis results from LLM analyzer dataset_name: Name for the dataset (optional) include_metadata: Whether to include metadata in the dataset Returns: Dataset information and statistics """ if not dataset_name: timestamp = int(time.time()) dataset_name = f"nz_legislation_dataset_{timestamp}" dataset_entries = [] successful_entries = 0 for result in analysis_results: if 'error' in result: continue # Create finetuning entry entry = self._create_finetuning_entry(result) if entry: dataset_entries.append(entry) successful_entries += 1 # Update metadata if 'analysis_type' in result: self.metadata['analysis_types'].add(result['analysis_type']) # Update metadata self.metadata['total_entries'] = len(dataset_entries) self.metadata['created_at'] = datetime.now().isoformat() # Calculate quality metrics self._calculate_quality_metrics(dataset_entries) # Create dataset structure dataset = { 'metadata': dict(self.metadata), 'entries': dataset_entries } if include_metadata: dataset['metadata'].update({ 'dataset_name': dataset_name, 'successful_entries': successful_entries, 'total_input_results': len(analysis_results), 'success_rate': successful_entries / len(analysis_results) if analysis_results else 0 }) return dataset def _create_finetuning_entry(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Create a single finetuning dataset entry Args: result: Analysis result from LLM analyzer Returns: Finetuning entry or None if invalid """ try: # Extract key components chunk = result.get('chunk', '') structured_analysis = result.get('structured_analysis', {}) response = result.get('response', '') # Create the prompt (input) prompt = self._create_prompt(chunk, result.get('analysis_type', 'standard')) # Create the response (output) - structured format response_text = self._create_response(structured_analysis, response) if not prompt or not response_text: return None # Create entry entry = { 'id': str(uuid.uuid4()), 'prompt': prompt, 'response': response_text, 'metadata': { 'chunk_size': len(chunk), 'word_count': len(chunk.split()), 'analysis_type': result.get('analysis_type', 'standard'), 'model_config': result.get('model_config', {}), 'confidence_score': structured_analysis.get('confidence_score', 0), 'analysis_quality': structured_analysis.get('analysis_quality', 'unknown'), 'created_at': datetime.now().isoformat() }, 'raw_data': { 'original_chunk': chunk, 'structured_analysis': structured_analysis, 'raw_response': response } } return entry except Exception as e: print(f"Error creating finetuning entry: {e}") return None def _create_prompt(self, chunk: str, analysis_type: str) -> str: """ Create a standardized prompt for the finetuning dataset Args: chunk: Text chunk to analyze analysis_type: Type of analysis Returns: Formatted prompt """ analysis_configs = { 'standard': { 'depth': 'Standard', 'focus': 'loopholes, ambiguities, and unintended consequences' }, 'detailed': { 'depth': 'Detailed', 'focus': 'loopholes, ambiguities, unintended consequences, and implementation issues' }, 'comprehensive': { 'depth': 'Comprehensive', 'focus': 'all aspects including policy conflicts and enforcement challenges' } } config = analysis_configs.get(analysis_type, analysis_configs['standard']) prompt = f"""You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. LEGISLATION TEXT: {chunk} TASK: Analyze this legislative text for potential loopholes, ambiguities, or unintended consequences. ANALYSIS DEPTH: {config['depth']} FOCUS AREAS: {config['focus']} Provide a structured analysis covering: 1. Text Meaning - Explain what the text means and its intended purpose 2. Key Assumptions - Identify any assumptions that could be exploited 3. Exploitable Interpretations - Discuss how the text could be interpreted in unintended ways 4. Critical Loopholes - Identify specific loopholes or ambiguities 5. Circumvention Strategies - Suggest practical methods for exploiting these loopholes Format your response clearly with section headers.""" return prompt def _create_response(self, structured_analysis: Dict[str, Any], raw_response: str) -> str: """ Create a standardized response format for the finetuning dataset Args: structured_analysis: Structured analysis data raw_response: Raw LLM response Returns: Formatted response """ sections = [] # Text Meaning if structured_analysis.get('text_meaning'): sections.append(f"**Text Meaning:** {structured_analysis['text_meaning']}") # Key Assumptions if structured_analysis.get('key_assumptions'): assumptions = structured_analysis['key_assumptions'] if assumptions: sections.append("**Key Assumptions:**") for i, assumption in enumerate(assumptions, 1): sections.append(f"{i}. {assumption}") # Exploitable Interpretations if structured_analysis.get('exploitable_interpretations'): interpretations = structured_analysis['exploitable_interpretations'] if interpretations: sections.append("**Exploitable Interpretations:**") for i, interpretation in enumerate(interpretations, 1): sections.append(f"{i}. {interpretation}") # Critical Loopholes if structured_analysis.get('critical_loopholes'): loopholes = structured_analysis['critical_loopholes'] if loopholes: sections.append("**Critical Loopholes:**") for i, loophole in enumerate(loopholes, 1): sections.append(f"{i}. {loophole}") # Circumvention Strategies if structured_analysis.get('circumvention_strategies'): strategies = structured_analysis['circumvention_strategies'] if strategies: sections.append("**Circumvention Strategies:**") for i, strategy in enumerate(strategies, 1): sections.append(f"{i}. {strategy}") # Recommendations if structured_analysis.get('recommendations'): recommendations = structured_analysis['recommendations'] if recommendations: sections.append("**Recommendations:**") for i, rec in enumerate(recommendations, 1): sections.append(f"{i}. {rec}") return "\n\n".join(sections) if sections else raw_response def _calculate_quality_metrics(self, entries: List[Dict[str, Any]]): """Calculate quality metrics for the dataset""" if not entries: return confidence_scores = [] analysis_qualities = {'high': 0, 'medium': 0, 'low': 0, 'unknown': 0} for entry in entries: metadata = entry.get('metadata', {}) confidence = metadata.get('confidence_score', 0) quality = metadata.get('analysis_quality', 'unknown') confidence_scores.append(confidence) analysis_qualities[quality] = analysis_qualities.get(quality, 0) + 1 self.metadata['quality_metrics'] = { 'average_confidence': sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0, 'max_confidence': max(confidence_scores) if confidence_scores else 0, 'min_confidence': min(confidence_scores) if confidence_scores else 0, 'quality_distribution': analysis_qualities, 'total_entries': len(entries) } def save_dataset(self, dataset: Dict[str, Any], format_type: str = 'json', filename: str = None) -> str: """ Save dataset in specified format Args: dataset: Dataset to save format_type: Format ('json', 'jsonl', 'csv', 'excel') filename: Output filename (optional) Returns: Path to saved file """ if not filename: timestamp = int(time.time()) filename = f"nz_legislation_dataset_{timestamp}" # Ensure filename has correct extension if not filename.endswith(f'.{format_type}'): filename += f'.{format_type}' filepath = self.output_dir / filename try: if format_type == 'json': with open(filepath, 'w', encoding='utf-8') as f: json.dump(dataset, f, indent=2, ensure_ascii=False) elif format_type == 'jsonl': with open(filepath, 'w', encoding='utf-8') as f: for entry in dataset.get('entries', []): json.dump(entry, f, ensure_ascii=False) f.write('\n') elif format_type == 'csv': self._save_as_csv(dataset, filepath) elif format_type == 'excel': self._save_as_excel(dataset, filepath) else: raise ValueError(f"Unsupported format: {format_type}") return str(filepath) except Exception as e: raise Exception(f"Error saving dataset: {e}") def _save_as_csv(self, dataset: Dict[str, Any], filepath: Path): """Save dataset as CSV""" entries = dataset.get('entries', []) if not entries: # Create empty CSV with headers df = pd.DataFrame(columns=['id', 'prompt', 'response', 'metadata']) df.to_csv(filepath, index=False) return # Flatten the data for CSV csv_data = [] for entry in entries: csv_row = { 'id': entry.get('id', ''), 'prompt': entry.get('prompt', ''), 'response': entry.get('response', ''), 'confidence_score': entry.get('metadata', {}).get('confidence_score', 0), 'analysis_type': entry.get('metadata', {}).get('analysis_type', ''), 'chunk_size': entry.get('metadata', {}).get('chunk_size', 0), 'word_count': entry.get('metadata', {}).get('word_count', 0), 'analysis_quality': entry.get('metadata', {}).get('analysis_quality', ''), 'created_at': entry.get('metadata', {}).get('created_at', '') } csv_data.append(csv_row) df = pd.DataFrame(csv_data) df.to_csv(filepath, index=False, encoding='utf-8') def _save_as_excel(self, dataset: Dict[str, Any], filepath: Path): """Save dataset as Excel with multiple sheets""" entries = dataset.get('entries', []) with pd.ExcelWriter(filepath, engine='openpyxl') as writer: # Main dataset sheet if entries: csv_data = [] for entry in entries: csv_row = { 'id': entry.get('id', ''), 'prompt': entry.get('prompt', ''), 'response': entry.get('response', ''), 'confidence_score': entry.get('metadata', {}).get('confidence_score', 0), 'analysis_type': entry.get('metadata', {}).get('analysis_type', ''), 'chunk_size': entry.get('metadata', {}).get('chunk_size', 0), 'word_count': entry.get('metadata', {}).get('word_count', 0), 'analysis_quality': entry.get('metadata', {}).get('analysis_quality', ''), 'created_at': entry.get('metadata', {}).get('created_at', '') } csv_data.append(csv_row) df_main = pd.DataFrame(csv_data) df_main.to_excel(writer, sheet_name='Dataset', index=False) # Metadata sheet metadata_df = pd.DataFrame([dataset.get('metadata', {})]) metadata_df.to_excel(writer, sheet_name='Metadata', index=False) # Quality metrics sheet quality_data = dataset.get('metadata', {}).get('quality_metrics', {}) if quality_data: quality_df = pd.DataFrame([quality_data]) quality_df.to_excel(writer, sheet_name='Quality_Metrics', index=False) def load_dataset(self, filepath: str) -> Dict[str, Any]: """ Load a dataset from file Args: filepath: Path to dataset file Returns: Loaded dataset """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"Dataset file not found: {filepath}") try: if filepath.suffix == '.json': with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) elif filepath.suffix == '.jsonl': entries = [] with open(filepath, 'r', encoding='utf-8') as f: for line in f: if line.strip(): entries.append(json.loads(line)) return { 'metadata': { 'loaded_from': str(filepath), 'total_entries': len(entries) }, 'entries': entries } elif filepath.suffix in ['.csv', '.xlsx', '.xls']: return self._load_from_spreadsheet(filepath) else: raise ValueError(f"Unsupported file format: {filepath.suffix}") except Exception as e: raise Exception(f"Error loading dataset: {e}") def _load_from_spreadsheet(self, filepath: Path) -> Dict[str, Any]: """Load dataset from spreadsheet format""" try: if filepath.suffix == '.csv': df = pd.read_csv(filepath) else: df = pd.read_excel(filepath) # Convert back to dataset format entries = [] for _, row in df.iterrows(): entry = { 'id': row.get('id', str(uuid.uuid4())), 'prompt': row.get('prompt', ''), 'response': row.get('response', ''), 'metadata': { 'confidence_score': row.get('confidence_score', 0), 'analysis_type': row.get('analysis_type', 'standard'), 'chunk_size': row.get('chunk_size', 0), 'word_count': row.get('word_count', 0), 'analysis_quality': row.get('analysis_quality', 'unknown'), 'created_at': row.get('created_at', datetime.now().isoformat()) } } entries.append(entry) return { 'metadata': { 'loaded_from': str(filepath), 'total_entries': len(entries), 'original_format': filepath.suffix[1:] }, 'entries': entries } except Exception as e: raise Exception(f"Error loading spreadsheet: {e}") def merge_datasets(self, datasets: List[Dict[str, Any]], output_name: str = None) -> Dict[str, Any]: """ Merge multiple datasets into one Args: datasets: List of datasets to merge output_name: Name for merged dataset Returns: Merged dataset """ if not datasets: return self.create_finetuning_dataset([]) merged_entries = [] all_analysis_types = set() all_sources = set() for dataset in datasets: entries = dataset.get('entries', []) merged_entries.extend(entries) metadata = dataset.get('metadata', {}) all_analysis_types.update(metadata.get('analysis_types', [])) all_sources.update(metadata.get('legislation_sources', [])) # Create merged dataset merged_dataset = { 'metadata': { 'version': '1.0', 'created_at': datetime.now().isoformat(), 'dataset_name': output_name or f"merged_dataset_{int(time.time())}", 'total_entries': len(merged_entries), 'analysis_types': list(all_analysis_types), 'legislation_sources': list(all_sources), 'merged_from': len(datasets), 'success_rate': 1.0 # Assuming all entries are valid }, 'entries': merged_entries } # Recalculate quality metrics self._calculate_quality_metrics(merged_entries) merged_dataset['metadata']['quality_metrics'] = self.metadata['quality_metrics'] return merged_dataset def validate_dataset(self, dataset: Dict[str, Any]) -> Dict[str, Any]: """ Validate dataset quality and completeness Args: dataset: Dataset to validate Returns: Validation results """ validation = { 'is_valid': True, 'issues': [], 'warnings': [], 'statistics': {} } entries = dataset.get('entries', []) metadata = dataset.get('metadata', {}) # Check basic structure if not isinstance(entries, list): validation['issues'].append("Entries must be a list") validation['is_valid'] = False return validation if not entries: validation['warnings'].append("Dataset is empty") return validation # Validate entries valid_entries = 0 total_confidence = 0 for i, entry in enumerate(entries): if not isinstance(entry, dict): validation['issues'].append(f"Entry {i} is not a dictionary") continue # Check required fields required_fields = ['id', 'prompt', 'response'] for field in required_fields: if field not in entry: validation['issues'].append(f"Entry {i} missing required field: {field}") # Check prompt and response quality prompt = entry.get('prompt', '') response = entry.get('response', '') if len(prompt.strip()) < 10: validation['warnings'].append(f"Entry {i} has very short prompt") if len(response.strip()) < 10: validation['warnings'].append(f"Entry {i} has very short response") # Check confidence score confidence = entry.get('metadata', {}).get('confidence_score', 0) total_confidence += confidence valid_entries += 1 # Calculate statistics validation['statistics'] = { 'total_entries': len(entries), 'valid_entries': valid_entries, 'average_confidence': total_confidence / valid_entries if valid_entries > 0 else 0, 'validation_rate': valid_entries / len(entries) if entries else 0 } return validation def get_dataset_statistics(self, dataset: Dict[str, Any]) -> Dict[str, Any]: """ Get comprehensive statistics about the dataset Args: dataset: Dataset to analyze Returns: Dataset statistics """ entries = dataset.get('entries', []) if not entries: return {'total_entries': 0} # Basic statistics stats = { 'total_entries': len(entries), 'total_prompts': len([e for e in entries if e.get('prompt')]), 'total_responses': len([e for e in entries if e.get('response')]), 'average_prompt_length': 0, 'average_response_length': 0, 'confidence_distribution': {}, 'analysis_type_distribution': {}, 'quality_distribution': {} } # Calculate averages prompt_lengths = [len(e.get('prompt', '')) for e in entries if e.get('prompt')] response_lengths = [len(e.get('response', '')) for e in entries if e.get('response')] if prompt_lengths: stats['average_prompt_length'] = sum(prompt_lengths) / len(prompt_lengths) if response_lengths: stats['average_response_length'] = sum(response_lengths) / len(response_lengths) # Distribution analysis for entry in entries: metadata = entry.get('metadata', {}) # Confidence distribution confidence = metadata.get('confidence_score', 0) conf_range = f"{(confidence // 20) * 20}-{(confidence // 20) * 20 + 19}" stats['confidence_distribution'][conf_range] = stats['confidence_distribution'].get(conf_range, 0) + 1 # Analysis type distribution analysis_type = metadata.get('analysis_type', 'unknown') stats['analysis_type_distribution'][analysis_type] = stats['analysis_type_distribution'].get(analysis_type, 0) + 1 # Quality distribution quality = metadata.get('analysis_quality', 'unknown') stats['quality_distribution'][quality] = stats['quality_distribution'].get(quality, 0) + 1 return stats