| |
| """ |
| Dataset Builder |
| |
| Creates and manages finetuning datasets from legislation analysis results. |
| Handles data formatting, validation, and export in multiple formats. |
| """ |
|
|
| import os |
| import json |
| import time |
| from typing import List, Dict, Any, Optional, Tuple |
| from pathlib import Path |
| import pandas as pd |
| from datetime import datetime |
| import uuid |
|
|
| class DatasetBuilder: |
| """Builder for creating finetuning datasets from legislation analysis""" |
|
|
| def __init__(self, output_dir: str = "datasets"): |
| """ |
| Initialize the dataset builder |
| |
| Args: |
| output_dir: Directory to save datasets |
| """ |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(exist_ok=True) |
|
|
| |
| self.metadata = { |
| 'version': '1.0', |
| 'created_at': datetime.now().isoformat(), |
| 'total_entries': 0, |
| 'analysis_types': set(), |
| 'legislation_sources': set(), |
| 'quality_metrics': {} |
| } |
|
|
| def create_finetuning_dataset(self, analysis_results: List[Dict[str, Any]], |
| dataset_name: str = None, |
| include_metadata: bool = True) -> Dict[str, Any]: |
| """ |
| Create a finetuning dataset from analysis results |
| |
| Args: |
| analysis_results: List of analysis results from LLM analyzer |
| dataset_name: Name for the dataset (optional) |
| include_metadata: Whether to include metadata in the dataset |
| |
| Returns: |
| Dataset information and statistics |
| """ |
| if not dataset_name: |
| timestamp = int(time.time()) |
| dataset_name = f"nz_legislation_dataset_{timestamp}" |
|
|
| dataset_entries = [] |
| successful_entries = 0 |
|
|
| for result in analysis_results: |
| if 'error' in result: |
| continue |
|
|
| |
| entry = self._create_finetuning_entry(result) |
| if entry: |
| dataset_entries.append(entry) |
| successful_entries += 1 |
|
|
| |
| if 'analysis_type' in result: |
| self.metadata['analysis_types'].add(result['analysis_type']) |
|
|
| |
| self.metadata['total_entries'] = len(dataset_entries) |
| self.metadata['created_at'] = datetime.now().isoformat() |
|
|
| |
| self._calculate_quality_metrics(dataset_entries) |
|
|
| |
| dataset = { |
| 'metadata': dict(self.metadata), |
| 'entries': dataset_entries |
| } |
|
|
| if include_metadata: |
| dataset['metadata'].update({ |
| 'dataset_name': dataset_name, |
| 'successful_entries': successful_entries, |
| 'total_input_results': len(analysis_results), |
| 'success_rate': successful_entries / len(analysis_results) if analysis_results else 0 |
| }) |
|
|
| return dataset |
|
|
| def _create_finetuning_entry(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
| """ |
| Create a single finetuning dataset entry |
| |
| Args: |
| result: Analysis result from LLM analyzer |
| |
| Returns: |
| Finetuning entry or None if invalid |
| """ |
| try: |
| |
| chunk = result.get('chunk', '') |
| structured_analysis = result.get('structured_analysis', {}) |
| response = result.get('response', '') |
|
|
| |
| prompt = self._create_prompt(chunk, result.get('analysis_type', 'standard')) |
|
|
| |
| response_text = self._create_response(structured_analysis, response) |
|
|
| if not prompt or not response_text: |
| return None |
|
|
| |
| entry = { |
| 'id': str(uuid.uuid4()), |
| 'prompt': prompt, |
| 'response': response_text, |
| 'metadata': { |
| 'chunk_size': len(chunk), |
| 'word_count': len(chunk.split()), |
| 'analysis_type': result.get('analysis_type', 'standard'), |
| 'model_config': result.get('model_config', {}), |
| 'confidence_score': structured_analysis.get('confidence_score', 0), |
| 'analysis_quality': structured_analysis.get('analysis_quality', 'unknown'), |
| 'created_at': datetime.now().isoformat() |
| }, |
| 'raw_data': { |
| 'original_chunk': chunk, |
| 'structured_analysis': structured_analysis, |
| 'raw_response': response |
| } |
| } |
|
|
| return entry |
|
|
| except Exception as e: |
| print(f"Error creating finetuning entry: {e}") |
| return None |
|
|
| def _create_prompt(self, chunk: str, analysis_type: str) -> str: |
| """ |
| Create a standardized prompt for the finetuning dataset |
| |
| Args: |
| chunk: Text chunk to analyze |
| analysis_type: Type of analysis |
| |
| Returns: |
| Formatted prompt |
| """ |
| analysis_configs = { |
| 'standard': { |
| 'depth': 'Standard', |
| 'focus': 'loopholes, ambiguities, and unintended consequences' |
| }, |
| 'detailed': { |
| 'depth': 'Detailed', |
| 'focus': 'loopholes, ambiguities, unintended consequences, and implementation issues' |
| }, |
| 'comprehensive': { |
| 'depth': 'Comprehensive', |
| 'focus': 'all aspects including policy conflicts and enforcement challenges' |
| } |
| } |
|
|
| config = analysis_configs.get(analysis_type, analysis_configs['standard']) |
|
|
| prompt = f"""You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. |
| |
| LEGISLATION TEXT: |
| {chunk} |
| |
| TASK: Analyze this legislative text for potential loopholes, ambiguities, or unintended consequences. |
| |
| ANALYSIS DEPTH: {config['depth']} |
| FOCUS AREAS: {config['focus']} |
| |
| Provide a structured analysis covering: |
| 1. Text Meaning - Explain what the text means and its intended purpose |
| 2. Key Assumptions - Identify any assumptions that could be exploited |
| 3. Exploitable Interpretations - Discuss how the text could be interpreted in unintended ways |
| 4. Critical Loopholes - Identify specific loopholes or ambiguities |
| 5. Circumvention Strategies - Suggest practical methods for exploiting these loopholes |
| |
| Format your response clearly with section headers.""" |
|
|
| return prompt |
|
|
| def _create_response(self, structured_analysis: Dict[str, Any], raw_response: str) -> str: |
| """ |
| Create a standardized response format for the finetuning dataset |
| |
| Args: |
| structured_analysis: Structured analysis data |
| raw_response: Raw LLM response |
| |
| Returns: |
| Formatted response |
| """ |
| sections = [] |
|
|
| |
| if structured_analysis.get('text_meaning'): |
| sections.append(f"**Text Meaning:** {structured_analysis['text_meaning']}") |
|
|
| |
| if structured_analysis.get('key_assumptions'): |
| assumptions = structured_analysis['key_assumptions'] |
| if assumptions: |
| sections.append("**Key Assumptions:**") |
| for i, assumption in enumerate(assumptions, 1): |
| sections.append(f"{i}. {assumption}") |
|
|
| |
| if structured_analysis.get('exploitable_interpretations'): |
| interpretations = structured_analysis['exploitable_interpretations'] |
| if interpretations: |
| sections.append("**Exploitable Interpretations:**") |
| for i, interpretation in enumerate(interpretations, 1): |
| sections.append(f"{i}. {interpretation}") |
|
|
| |
| if structured_analysis.get('critical_loopholes'): |
| loopholes = structured_analysis['critical_loopholes'] |
| if loopholes: |
| sections.append("**Critical Loopholes:**") |
| for i, loophole in enumerate(loopholes, 1): |
| sections.append(f"{i}. {loophole}") |
|
|
| |
| if structured_analysis.get('circumvention_strategies'): |
| strategies = structured_analysis['circumvention_strategies'] |
| if strategies: |
| sections.append("**Circumvention Strategies:**") |
| for i, strategy in enumerate(strategies, 1): |
| sections.append(f"{i}. {strategy}") |
|
|
| |
| if structured_analysis.get('recommendations'): |
| recommendations = structured_analysis['recommendations'] |
| if recommendations: |
| sections.append("**Recommendations:**") |
| for i, rec in enumerate(recommendations, 1): |
| sections.append(f"{i}. {rec}") |
|
|
| return "\n\n".join(sections) if sections else raw_response |
|
|
| def _calculate_quality_metrics(self, entries: List[Dict[str, Any]]): |
| """Calculate quality metrics for the dataset""" |
| if not entries: |
| return |
|
|
| confidence_scores = [] |
| analysis_qualities = {'high': 0, 'medium': 0, 'low': 0, 'unknown': 0} |
|
|
| for entry in entries: |
| metadata = entry.get('metadata', {}) |
| confidence = metadata.get('confidence_score', 0) |
| quality = metadata.get('analysis_quality', 'unknown') |
|
|
| confidence_scores.append(confidence) |
| analysis_qualities[quality] = analysis_qualities.get(quality, 0) + 1 |
|
|
| self.metadata['quality_metrics'] = { |
| 'average_confidence': sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0, |
| 'max_confidence': max(confidence_scores) if confidence_scores else 0, |
| 'min_confidence': min(confidence_scores) if confidence_scores else 0, |
| 'quality_distribution': analysis_qualities, |
| 'total_entries': len(entries) |
| } |
|
|
| def save_dataset(self, dataset: Dict[str, Any], format_type: str = 'json', |
| filename: str = None) -> str: |
| """ |
| Save dataset in specified format |
| |
| Args: |
| dataset: Dataset to save |
| format_type: Format ('json', 'jsonl', 'csv', 'excel') |
| filename: Output filename (optional) |
| |
| Returns: |
| Path to saved file |
| """ |
| if not filename: |
| timestamp = int(time.time()) |
| filename = f"nz_legislation_dataset_{timestamp}" |
|
|
| |
| if not filename.endswith(f'.{format_type}'): |
| filename += f'.{format_type}' |
|
|
| filepath = self.output_dir / filename |
|
|
| try: |
| if format_type == 'json': |
| with open(filepath, 'w', encoding='utf-8') as f: |
| json.dump(dataset, f, indent=2, ensure_ascii=False) |
|
|
| elif format_type == 'jsonl': |
| with open(filepath, 'w', encoding='utf-8') as f: |
| for entry in dataset.get('entries', []): |
| json.dump(entry, f, ensure_ascii=False) |
| f.write('\n') |
|
|
| elif format_type == 'csv': |
| self._save_as_csv(dataset, filepath) |
|
|
| elif format_type == 'excel': |
| self._save_as_excel(dataset, filepath) |
|
|
| else: |
| raise ValueError(f"Unsupported format: {format_type}") |
|
|
| return str(filepath) |
|
|
| except Exception as e: |
| raise Exception(f"Error saving dataset: {e}") |
|
|
| def _save_as_csv(self, dataset: Dict[str, Any], filepath: Path): |
| """Save dataset as CSV""" |
| entries = dataset.get('entries', []) |
|
|
| if not entries: |
| |
| df = pd.DataFrame(columns=['id', 'prompt', 'response', 'metadata']) |
| df.to_csv(filepath, index=False) |
| return |
|
|
| |
| csv_data = [] |
| for entry in entries: |
| csv_row = { |
| 'id': entry.get('id', ''), |
| 'prompt': entry.get('prompt', ''), |
| 'response': entry.get('response', ''), |
| 'confidence_score': entry.get('metadata', {}).get('confidence_score', 0), |
| 'analysis_type': entry.get('metadata', {}).get('analysis_type', ''), |
| 'chunk_size': entry.get('metadata', {}).get('chunk_size', 0), |
| 'word_count': entry.get('metadata', {}).get('word_count', 0), |
| 'analysis_quality': entry.get('metadata', {}).get('analysis_quality', ''), |
| 'created_at': entry.get('metadata', {}).get('created_at', '') |
| } |
| csv_data.append(csv_row) |
|
|
| df = pd.DataFrame(csv_data) |
| df.to_csv(filepath, index=False, encoding='utf-8') |
|
|
| def _save_as_excel(self, dataset: Dict[str, Any], filepath: Path): |
| """Save dataset as Excel with multiple sheets""" |
| entries = dataset.get('entries', []) |
|
|
| with pd.ExcelWriter(filepath, engine='openpyxl') as writer: |
| |
| if entries: |
| csv_data = [] |
| for entry in entries: |
| csv_row = { |
| 'id': entry.get('id', ''), |
| 'prompt': entry.get('prompt', ''), |
| 'response': entry.get('response', ''), |
| 'confidence_score': entry.get('metadata', {}).get('confidence_score', 0), |
| 'analysis_type': entry.get('metadata', {}).get('analysis_type', ''), |
| 'chunk_size': entry.get('metadata', {}).get('chunk_size', 0), |
| 'word_count': entry.get('metadata', {}).get('word_count', 0), |
| 'analysis_quality': entry.get('metadata', {}).get('analysis_quality', ''), |
| 'created_at': entry.get('metadata', {}).get('created_at', '') |
| } |
| csv_data.append(csv_row) |
|
|
| df_main = pd.DataFrame(csv_data) |
| df_main.to_excel(writer, sheet_name='Dataset', index=False) |
|
|
| |
| metadata_df = pd.DataFrame([dataset.get('metadata', {})]) |
| metadata_df.to_excel(writer, sheet_name='Metadata', index=False) |
|
|
| |
| quality_data = dataset.get('metadata', {}).get('quality_metrics', {}) |
| if quality_data: |
| quality_df = pd.DataFrame([quality_data]) |
| quality_df.to_excel(writer, sheet_name='Quality_Metrics', index=False) |
|
|
| def load_dataset(self, filepath: str) -> Dict[str, Any]: |
| """ |
| Load a dataset from file |
| |
| Args: |
| filepath: Path to dataset file |
| |
| Returns: |
| Loaded dataset |
| """ |
| filepath = Path(filepath) |
|
|
| if not filepath.exists(): |
| raise FileNotFoundError(f"Dataset file not found: {filepath}") |
|
|
| try: |
| if filepath.suffix == '.json': |
| with open(filepath, 'r', encoding='utf-8') as f: |
| return json.load(f) |
|
|
| elif filepath.suffix == '.jsonl': |
| entries = [] |
| with open(filepath, 'r', encoding='utf-8') as f: |
| for line in f: |
| if line.strip(): |
| entries.append(json.loads(line)) |
|
|
| return { |
| 'metadata': { |
| 'loaded_from': str(filepath), |
| 'total_entries': len(entries) |
| }, |
| 'entries': entries |
| } |
|
|
| elif filepath.suffix in ['.csv', '.xlsx', '.xls']: |
| return self._load_from_spreadsheet(filepath) |
|
|
| else: |
| raise ValueError(f"Unsupported file format: {filepath.suffix}") |
|
|
| except Exception as e: |
| raise Exception(f"Error loading dataset: {e}") |
|
|
| def _load_from_spreadsheet(self, filepath: Path) -> Dict[str, Any]: |
| """Load dataset from spreadsheet format""" |
| try: |
| if filepath.suffix == '.csv': |
| df = pd.read_csv(filepath) |
| else: |
| df = pd.read_excel(filepath) |
|
|
| |
| entries = [] |
| for _, row in df.iterrows(): |
| entry = { |
| 'id': row.get('id', str(uuid.uuid4())), |
| 'prompt': row.get('prompt', ''), |
| 'response': row.get('response', ''), |
| 'metadata': { |
| 'confidence_score': row.get('confidence_score', 0), |
| 'analysis_type': row.get('analysis_type', 'standard'), |
| 'chunk_size': row.get('chunk_size', 0), |
| 'word_count': row.get('word_count', 0), |
| 'analysis_quality': row.get('analysis_quality', 'unknown'), |
| 'created_at': row.get('created_at', datetime.now().isoformat()) |
| } |
| } |
| entries.append(entry) |
|
|
| return { |
| 'metadata': { |
| 'loaded_from': str(filepath), |
| 'total_entries': len(entries), |
| 'original_format': filepath.suffix[1:] |
| }, |
| 'entries': entries |
| } |
|
|
| except Exception as e: |
| raise Exception(f"Error loading spreadsheet: {e}") |
|
|
| def merge_datasets(self, datasets: List[Dict[str, Any]], |
| output_name: str = None) -> Dict[str, Any]: |
| """ |
| Merge multiple datasets into one |
| |
| Args: |
| datasets: List of datasets to merge |
| output_name: Name for merged dataset |
| |
| Returns: |
| Merged dataset |
| """ |
| if not datasets: |
| return self.create_finetuning_dataset([]) |
|
|
| merged_entries = [] |
| all_analysis_types = set() |
| all_sources = set() |
|
|
| for dataset in datasets: |
| entries = dataset.get('entries', []) |
| merged_entries.extend(entries) |
|
|
| metadata = dataset.get('metadata', {}) |
| all_analysis_types.update(metadata.get('analysis_types', [])) |
| all_sources.update(metadata.get('legislation_sources', [])) |
|
|
| |
| merged_dataset = { |
| 'metadata': { |
| 'version': '1.0', |
| 'created_at': datetime.now().isoformat(), |
| 'dataset_name': output_name or f"merged_dataset_{int(time.time())}", |
| 'total_entries': len(merged_entries), |
| 'analysis_types': list(all_analysis_types), |
| 'legislation_sources': list(all_sources), |
| 'merged_from': len(datasets), |
| 'success_rate': 1.0 |
| }, |
| 'entries': merged_entries |
| } |
|
|
| |
| self._calculate_quality_metrics(merged_entries) |
| merged_dataset['metadata']['quality_metrics'] = self.metadata['quality_metrics'] |
|
|
| return merged_dataset |
|
|
| def validate_dataset(self, dataset: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Validate dataset quality and completeness |
| |
| Args: |
| dataset: Dataset to validate |
| |
| Returns: |
| Validation results |
| """ |
| validation = { |
| 'is_valid': True, |
| 'issues': [], |
| 'warnings': [], |
| 'statistics': {} |
| } |
|
|
| entries = dataset.get('entries', []) |
| metadata = dataset.get('metadata', {}) |
|
|
| |
| if not isinstance(entries, list): |
| validation['issues'].append("Entries must be a list") |
| validation['is_valid'] = False |
| return validation |
|
|
| if not entries: |
| validation['warnings'].append("Dataset is empty") |
| return validation |
|
|
| |
| valid_entries = 0 |
| total_confidence = 0 |
|
|
| for i, entry in enumerate(entries): |
| if not isinstance(entry, dict): |
| validation['issues'].append(f"Entry {i} is not a dictionary") |
| continue |
|
|
| |
| required_fields = ['id', 'prompt', 'response'] |
| for field in required_fields: |
| if field not in entry: |
| validation['issues'].append(f"Entry {i} missing required field: {field}") |
|
|
| |
| prompt = entry.get('prompt', '') |
| response = entry.get('response', '') |
|
|
| if len(prompt.strip()) < 10: |
| validation['warnings'].append(f"Entry {i} has very short prompt") |
|
|
| if len(response.strip()) < 10: |
| validation['warnings'].append(f"Entry {i} has very short response") |
|
|
| |
| confidence = entry.get('metadata', {}).get('confidence_score', 0) |
| total_confidence += confidence |
|
|
| valid_entries += 1 |
|
|
| |
| validation['statistics'] = { |
| 'total_entries': len(entries), |
| 'valid_entries': valid_entries, |
| 'average_confidence': total_confidence / valid_entries if valid_entries > 0 else 0, |
| 'validation_rate': valid_entries / len(entries) if entries else 0 |
| } |
|
|
| return validation |
|
|
| def get_dataset_statistics(self, dataset: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Get comprehensive statistics about the dataset |
| |
| Args: |
| dataset: Dataset to analyze |
| |
| Returns: |
| Dataset statistics |
| """ |
| entries = dataset.get('entries', []) |
|
|
| if not entries: |
| return {'total_entries': 0} |
|
|
| |
| stats = { |
| 'total_entries': len(entries), |
| 'total_prompts': len([e for e in entries if e.get('prompt')]), |
| 'total_responses': len([e for e in entries if e.get('response')]), |
| 'average_prompt_length': 0, |
| 'average_response_length': 0, |
| 'confidence_distribution': {}, |
| 'analysis_type_distribution': {}, |
| 'quality_distribution': {} |
| } |
|
|
| |
| prompt_lengths = [len(e.get('prompt', '')) for e in entries if e.get('prompt')] |
| response_lengths = [len(e.get('response', '')) for e in entries if e.get('response')] |
|
|
| if prompt_lengths: |
| stats['average_prompt_length'] = sum(prompt_lengths) / len(prompt_lengths) |
| if response_lengths: |
| stats['average_response_length'] = sum(response_lengths) / len(response_lengths) |
|
|
| |
| for entry in entries: |
| metadata = entry.get('metadata', {}) |
|
|
| |
| confidence = metadata.get('confidence_score', 0) |
| conf_range = f"{(confidence // 20) * 20}-{(confidence // 20) * 20 + 19}" |
| stats['confidence_distribution'][conf_range] = stats['confidence_distribution'].get(conf_range, 0) + 1 |
|
|
| |
| analysis_type = metadata.get('analysis_type', 'unknown') |
| stats['analysis_type_distribution'][analysis_type] = stats['analysis_type_distribution'].get(analysis_type, 0) + 1 |
|
|
| |
| quality = metadata.get('analysis_quality', 'unknown') |
| stats['quality_distribution'][quality] = stats['quality_distribution'].get(quality, 0) + 1 |
|
|
| return stats |
|
|