Spaces:
Building
Building
| """ | |
| YAML Schema Validator for Reference Lists Configuration | |
| Handles detection and validation of old vs new schema formats. | |
| """ | |
| import yaml | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class SchemaValidator: | |
| """Validates and detects YAML schema formats for reference lists.""" | |
| # New schema required fields | |
| NEW_SCHEMA_FIELDS = { | |
| 'analysis_type', | |
| 'log_transformable', | |
| 'selectable_measures', | |
| 'default_measures', | |
| 'default_log_transforms' | |
| } | |
| # Old schema indicator fields | |
| OLD_SCHEMA_FIELDS = { | |
| 'files' # Old schema uses files.token/files.lemma | |
| } | |
| def detect_schema_version(cls, config_data: Dict[str, Any]) -> str: | |
| """ | |
| Detect whether configuration uses old or new schema. | |
| Args: | |
| config_data: Parsed YAML configuration data | |
| Returns: | |
| 'old', 'new', or 'mixed' schema version | |
| """ | |
| old_count = 0 | |
| new_count = 0 | |
| # Check all language/type/entry combinations | |
| for language, lang_data in config_data.items(): | |
| if not isinstance(lang_data, dict): | |
| continue | |
| for ngram_type, type_data in lang_data.items(): | |
| if not isinstance(type_data, dict): | |
| continue | |
| for entry_name, entry_config in type_data.items(): | |
| if not isinstance(entry_config, dict): | |
| continue | |
| # Check for old schema indicators | |
| if any(field in entry_config for field in cls.OLD_SCHEMA_FIELDS): | |
| old_count += 1 | |
| # Check for new schema indicators | |
| if any(field in entry_config for field in cls.NEW_SCHEMA_FIELDS): | |
| new_count += 1 | |
| if old_count > 0 and new_count == 0: | |
| return 'old' | |
| elif new_count > 0 and old_count == 0: | |
| return 'new' | |
| elif old_count > 0 and new_count > 0: | |
| return 'mixed' | |
| else: | |
| # Default assumption if no clear indicators | |
| return 'old' | |
| def validate_old_schema(cls, entry_config: Dict[str, Any]) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate old schema entry format. | |
| Args: | |
| entry_config: Single entry configuration | |
| Returns: | |
| Tuple of (is_valid, error_messages) | |
| """ | |
| errors = [] | |
| # Required fields for old schema | |
| required_fields = {'display_name', 'description', 'files', 'format', 'columns', 'enabled'} | |
| for field in required_fields: | |
| if field not in entry_config: | |
| errors.append(f"Missing required field: {field}") | |
| # Validate files structure | |
| if 'files' in entry_config: | |
| files = entry_config['files'] | |
| if not isinstance(files, dict): | |
| errors.append("'files' must be a dictionary") | |
| else: | |
| if 'token' not in files and 'lemma' not in files: | |
| errors.append("'files' must contain at least 'token' or 'lemma'") | |
| # Validate columns structure | |
| if 'columns' in entry_config: | |
| columns = entry_config['columns'] | |
| if not isinstance(columns, dict): | |
| errors.append("'columns' must be a dictionary") | |
| return len(errors) == 0, errors | |
| def validate_new_schema(cls, entry_config: Dict[str, Any]) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate new schema entry format. | |
| Args: | |
| entry_config: Single entry configuration | |
| Returns: | |
| Tuple of (is_valid, error_messages) | |
| """ | |
| errors = [] | |
| # Required fields for new schema | |
| required_fields = { | |
| 'display_name', 'description', 'file', 'format', 'columns', | |
| 'enabled', 'analysis_type', 'log_transformable', | |
| 'selectable_measures', 'default_measures', 'default_log_transforms' | |
| } | |
| for field in required_fields: | |
| if field not in entry_config: | |
| errors.append(f"Missing required field: {field}") | |
| # Validate analysis_type | |
| if 'analysis_type' in entry_config: | |
| analysis_type = entry_config['analysis_type'] | |
| if analysis_type not in ['token', 'lemma']: | |
| errors.append(f"'analysis_type' must be 'token' or 'lemma', got: {analysis_type}") | |
| # Validate list fields | |
| list_fields = ['log_transformable', 'selectable_measures', 'default_measures', 'default_log_transforms'] | |
| for field in list_fields: | |
| if field in entry_config: | |
| value = entry_config[field] | |
| if not isinstance(value, list): | |
| errors.append(f"'{field}' must be a list, got: {type(value).__name__}") | |
| # Validate file field (single file path instead of files dict) | |
| if 'file' in entry_config: | |
| file_path = entry_config['file'] | |
| if not isinstance(file_path, str): | |
| errors.append("'file' must be a string path") | |
| return len(errors) == 0, errors | |
| def get_schema_migration_plan(cls, config_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Generate a migration plan for converting old schema to new schema. | |
| Args: | |
| config_data: Current configuration data | |
| Returns: | |
| Dictionary containing migration plan details | |
| """ | |
| schema_version = cls.detect_schema_version(config_data) | |
| migration_plan = { | |
| 'current_schema': schema_version, | |
| 'requires_migration': schema_version in ['old', 'mixed'], | |
| 'entries_to_migrate': [], | |
| 'entries_to_split': [], | |
| 'new_entries_count': 0 | |
| } | |
| if not migration_plan['requires_migration']: | |
| return migration_plan | |
| # Analyze entries that need migration | |
| for language, lang_data in config_data.items(): | |
| if not isinstance(lang_data, dict): | |
| continue | |
| for ngram_type, type_data in lang_data.items(): | |
| if not isinstance(type_data, dict): | |
| continue | |
| for entry_name, entry_config in type_data.items(): | |
| if not isinstance(entry_config, dict): | |
| continue | |
| # Check if this entry uses old schema | |
| if 'files' in entry_config: | |
| files = entry_config['files'] | |
| if isinstance(files, dict): | |
| # Count how many files this entry will split into | |
| file_count = len([k for k in files.keys() if k in ['token', 'lemma']]) | |
| migration_plan['entries_to_migrate'].append({ | |
| 'language': language, | |
| 'type': ngram_type, | |
| 'name': entry_name, | |
| 'files': list(files.keys()), | |
| 'will_create': file_count | |
| }) | |
| migration_plan['new_entries_count'] += file_count | |
| return migration_plan | |
| def create_default_new_schema_fields(cls, measure_names: List[str], | |
| analysis_type: str = 'token') -> Dict[str, Any]: | |
| """ | |
| Create default values for new schema fields based on measure names. | |
| Args: | |
| measure_names: List of available measure names from columns | |
| analysis_type: 'token' or 'lemma' | |
| Returns: | |
| Dictionary with default new schema fields | |
| """ | |
| # Smart defaults based on measure names | |
| frequency_measures = [] | |
| association_measures = [] | |
| psycholinguistic_measures = [] | |
| for measure in measure_names: | |
| measure_lower = measure.lower() | |
| if any(freq_term in measure_lower for freq_term in ['freq', 'frequency', 'count']): | |
| frequency_measures.append(measure) | |
| elif any(assoc_term in measure_lower for assoc_term in ['mi', 't_score', 'delta_p', 'ap_collex']): | |
| association_measures.append(measure) | |
| elif any(psych_term in measure_lower for psych_term in ['concreteness', 'valence', 'arousal', 'dominance']): | |
| psycholinguistic_measures.append(measure) | |
| else: | |
| # Default to no log transform for unknown measures | |
| pass | |
| # Set defaults | |
| log_transformable = frequency_measures # Only frequency measures should be log-transformed | |
| selectable_measures = measure_names | |
| # Smart default selection | |
| if frequency_measures: | |
| default_measures = frequency_measures[:2] # First 2 frequency measures | |
| elif association_measures: | |
| # Prefer MI and T-score for associations | |
| default_measures = [m for m in association_measures if any(pref in m.lower() for pref in ['mi', 't_score'])][:2] | |
| else: | |
| default_measures = measure_names[:2] if len(measure_names) >= 2 else measure_names | |
| # Default log transforms (only for frequency measures) | |
| default_log_transforms = [m for m in default_measures if m in frequency_measures] | |
| return { | |
| 'analysis_type': analysis_type, | |
| 'log_transformable': log_transformable, | |
| 'selectable_measures': selectable_measures, | |
| 'default_measures': default_measures, | |
| 'default_log_transforms': default_log_transforms | |
| } | |
| def load_and_validate_config(config_path: str) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| """ | |
| Load and validate YAML configuration file. | |
| Args: | |
| config_path: Path to YAML configuration file | |
| Returns: | |
| Tuple of (config_data, validation_results) | |
| """ | |
| try: | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| config_data = yaml.safe_load(f) | |
| schema_version = SchemaValidator.detect_schema_version(config_data) | |
| migration_plan = SchemaValidator.get_schema_migration_plan(config_data) | |
| validation_results = { | |
| 'schema_version': schema_version, | |
| 'migration_plan': migration_plan, | |
| 'is_valid': True, | |
| 'errors': [] | |
| } | |
| return config_data, validation_results | |
| except Exception as e: | |
| logger.error(f"Error loading config file {config_path}: {e}") | |
| return {}, { | |
| 'schema_version': 'unknown', | |
| 'migration_plan': {}, | |
| 'is_valid': False, | |
| 'errors': [str(e)] | |
| } | |
| if __name__ == "__main__": | |
| # Test the validator | |
| config_data, validation_results = load_and_validate_config("config/reference_lists.yaml") | |
| print(f"Schema version: {validation_results['schema_version']}") | |
| print(f"Migration plan: {validation_results['migration_plan']}") | |