Spaces:
Paused
Paused
| """ | |
| Config module with enhanced security validation and error handling. | |
| """ | |
| import yaml | |
| import os | |
| import logging | |
| import re | |
| import codecs | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from urllib.parse import urlparse | |
| import json | |
| config = {} | |
| def clear_config(): | |
| """Clear the global config dictionary. Used for testing to ensure clean state.""" | |
| global config | |
| config.clear() | |
| # Use centralized logging - avoid duplicate basicConfig | |
| logger = logging.getLogger(__name__) | |
| class ConfigValidationError(Exception): | |
| """Custom exception for configuration validation errors.""" | |
| pass | |
| class ConfigSecurityError(Exception): | |
| """Custom exception for configuration security violations.""" | |
| pass | |
| import difflib | |
| # ============================================================================ | |
| # Known config key schema (hierarchical) | |
| # Keys map to None (leaf), set (known sub-keys), or dict (nested schema). | |
| # Used by validate_unknown_keys() to warn about typos at all nesting levels. | |
| # ============================================================================ | |
| KNOWN_CONFIG_KEYS = { | |
| # === Core / required === | |
| "item_properties": { | |
| "id_key", "text_key", "category_key", "kwargs", | |
| }, | |
| "data_files": None, | |
| "task_dir": None, | |
| "output_annotation_dir": None, | |
| "output_annotation_format": None, | |
| "annotation_task_name": None, | |
| "task_description": None, | |
| "annotation_task_description": None, | |
| # === Data sources === | |
| "data_directory": None, | |
| "data_directory_encoding": None, | |
| "data_sources": None, | |
| "data_cache": {"enabled", "ttl_seconds", "max_size_mb"}, | |
| "watch_data_directory": None, | |
| "watch_poll_interval": None, | |
| "partial_loading": None, | |
| # === Annotation === | |
| "annotation_schemes": None, | |
| "phases": None, | |
| "output_annotation_format": None, | |
| # === Auth / login === | |
| "authentication": { | |
| "method", "providers", "user_identity_field", "database_url", | |
| "user_config_path", "auto_register", "allow_local_login", | |
| "allowed_domain", "allowed_domains", "allowed_org", | |
| }, | |
| "login": {"type", "url_argument", "auto_redirect_delay", "auto_redirect_on_completion"}, | |
| "user_config": {"allow_all_users", "users"}, | |
| "require_password": None, | |
| "require_no_password": None, | |
| "secret_key": None, | |
| # === Server === | |
| "server": {"port", "host", "debug"}, | |
| "port": None, | |
| "host": None, | |
| "customjs": None, | |
| "customjs_hostname": None, | |
| "site_dir": None, | |
| "site_file": None, | |
| "persist_sessions": None, | |
| "session_lifetime_days": None, | |
| "base_html_template": None, | |
| # === Quality control === | |
| "attention_checks": { | |
| "enabled", "items_file", "frequency", "probability", | |
| "min_response_time", "failure_handling", | |
| }, | |
| "gold_standards": { | |
| "enabled", "items_file", "mode", "frequency", | |
| "accuracy", "auto_promote", | |
| }, | |
| "gold_standards_file": None, | |
| "pre_annotation": { | |
| "enabled", "field", "highlight_low_confidence", | |
| "agreement_metrics", "predictions_file", | |
| "allow_modification", "show_confidence", | |
| }, | |
| "agreement_metrics": {"min_overlap", "refresh_interval", "enabled"}, | |
| "quality_control": None, | |
| # === AI === | |
| "ai_support": { | |
| "enabled", "endpoint_type", "ai_config_file", "ai_config", | |
| "option_highlighting", "features", "cache_config", | |
| }, | |
| "chat_support": { | |
| "enabled", "endpoint_type", "ai_config", "ui", | |
| }, | |
| # === Advanced features === | |
| "training": { | |
| "enabled", "data_file", "annotation_schemes", | |
| "passing_criteria", "feedback", "failure_action", | |
| }, | |
| "active_learning": { | |
| "enabled", "classifier", "vectorizer", | |
| "min_annotations_per_instance", "min_instances_for_training", | |
| "max_instances_to_reorder", "update_frequency", | |
| "resolution_strategy", "random_sample_percent", "schema_names", | |
| "database", "model_persistence", "llm", "query_strategy", | |
| "hybrid_weights", "cold_start_strategy", "confidence_method", | |
| "classifier_params", "vectorizer_params", "calibrate_probabilities", | |
| "bald_params", "use_icl_ensemble", "icl_ensemble_params", | |
| "annotation_routing", "routing_thresholds", | |
| }, | |
| "category_assignment": { | |
| "enabled", "category_key", "qualification", "fallback", "dynamic", | |
| }, | |
| "batch_assignment": { | |
| "groups", "annotator_key", | |
| }, | |
| "diversity_ordering": { | |
| "enabled", "model_name", "num_clusters", "items_per_cluster", | |
| "auto_clusters", "prefill_count", "batch_size", | |
| "recluster_threshold", "preserve_visited", | |
| "trigger_ai_prefetch", "cache_dir", | |
| }, | |
| "diversity_config": None, | |
| "embedding_visualization": { | |
| "enabled", "sample_size", "include_all_annotated", | |
| "embedding_model", "image_embedding_model", "umap", "label_source", | |
| }, | |
| "adjudication": { | |
| "enabled", "adjudicator_users", "min_annotations", | |
| "agreement_threshold", "fast_decision_warning_ms", | |
| "error_taxonomy", "similarity", | |
| "require_notes_on_override", "show_agreement_scores", | |
| "show_annotator_names", | |
| "output_subdir", "require_confidence", | |
| "show_all_items", "show_timing_data", | |
| }, | |
| "database": {"type", "host", "database", "username", "password", "port", | |
| "pool_size", "pool_timeout", "connection_string"}, | |
| "bws_config": { | |
| "tuple_size", "num_tuples", "seed", "min_item_appearances", "scoring", | |
| }, | |
| "ibws_config": { | |
| "tuple_size", "max_rounds", "seed", "scoring_method", | |
| "tuples_per_item_per_round", | |
| }, | |
| "mace": { | |
| "enabled", "min_annotations_per_item", "trigger_every_n", "num_restarts", | |
| "min_items", "num_iters", | |
| }, | |
| "icl_labeling": None, | |
| "llm_labeling": None, | |
| # === UI & layout === | |
| "ui": None, | |
| "ui_config": None, | |
| "layout": {"grid", "breakpoints", "groups", "order", "styling"}, | |
| "instance_display": {"fields", "layout", "resizable"}, | |
| "format_handling": {"enabled", "default_format", "pdf", "spreadsheet"}, | |
| "ui_language": { | |
| "html_lang", "html_dir", | |
| "next_button", "previous_button", "submit_button", "go_button", | |
| "retry_button", "logout", | |
| "labeled_badge", "in_progress_badge", "not_labeled_badge", | |
| "progress_label", "loading", "error_heading", | |
| "adjudicate", "codebook", "instructions_heading", | |
| "text_to_annotate", "video_to_annotate", "audio_to_annotate", | |
| "login_title", "login_subtitle_password", "login_subtitle_username", | |
| "sign_in_tab", "register_tab", | |
| "username_label", "password_label", | |
| "sign_in_button", "continue_button", "register_button", | |
| "forgot_password", "username_placeholder", | |
| "choose_username_placeholder", "create_password_placeholder", | |
| "sign_in_with", "or_divider", | |
| "powered_by", "cite_us", | |
| }, | |
| "base_css": None, | |
| "ui_debug": None, | |
| "hide_navbar": None, | |
| "task_layout": None, | |
| # === Content === | |
| "annotation_instructions": None, | |
| "annotation_codebook_url": None, | |
| "custom_footer_html": None, | |
| "header_file": None, | |
| "header_logo": None, | |
| # === Annotation features === | |
| "keyword_highlight_settings": None, | |
| "keyword_highlights_file": None, | |
| "highlight_linebreaks": None, | |
| "list_as_text": {"text_list_prefix_type", "horizontal", "alternating_shading"}, | |
| "jumping_to_id_disabled": None, | |
| "horizontal_key_bindings": None, | |
| "completion_code": None, | |
| "allow_phase_back_navigation": None, | |
| "require_fully_annotated": None, | |
| "export_include_phase_data": None, | |
| "export_annotation_format": None, | |
| "auto_export_interval": None, | |
| # === Media === | |
| "audio_annotation": { | |
| "waveform_cache_dir", "waveform_look_ahead", "waveform_cache_max_size", | |
| "client_fallback_max_duration", | |
| }, | |
| "spectrogram": None, | |
| "media_directory": None, | |
| "default_video_fps": None, | |
| # === External integrations === | |
| "mturk": None, | |
| "prolific": { | |
| "config_file_path", "token", "study_id", | |
| "max_concurrent_sessions", "workload_checker_period", | |
| "completion_code", "sandbox_mode", | |
| }, | |
| "webhooks": {"enabled", "endpoints"}, | |
| "trace_ingestion": {"enabled", "sources", "api_key", "notify_annotators"}, | |
| "judge_alignment": {"enabled", "ai_support", "schemas", "few_shot", "inline"}, | |
| # Judge Calibration: LLM-as-judge auto-labeling + blind human calibration. | |
| # Leaf sub-dicts (sampling/human/calibration/output) are validated by | |
| # validate_judge_calibration_config(); kept shallow here to avoid | |
| # unknown-key churn while the feature stabilizes. | |
| "judge_calibration": { | |
| "enabled", "prompt", "models", "k_samples", "max_items", "fraction", | |
| "sampling", "human", "schemas", "calibration", "output", "state_dir", | |
| }, | |
| "triage": {"enabled", "order", "default_priority", "show_badge", | |
| "signal_field", "invert_signal", "rules"}, | |
| "huggingface_backup": None, | |
| # === Debug / logging === | |
| "debug": None, | |
| "debug_phase": None, | |
| "server_debug": None, | |
| "verbose": None, | |
| "very_verbose": None, | |
| "debug_log": None, | |
| # === Agent === | |
| "live_agent": None, | |
| "live_coding_agent": None, | |
| "agent_proxy": None, | |
| # === Legacy / multi-task === | |
| "surveyflow": None, | |
| "prestudy": None, | |
| "automatic_assignment": None, | |
| # === Other === | |
| "random_seed": None, | |
| "max_annotations_per_user": None, | |
| # Deprecated alias of num_annotators_per_item (int form). Still accepted | |
| # for backwards compatibility; emits a warning when both are set. | |
| "max_annotations_per_item": None, | |
| # Canonical key for heterogeneous coverage. Accepts either: | |
| # int — same cap for every item (legacy behavior) | |
| # dict — { default, overlap_sample: {fraction, count, stratify_by, seed}, | |
| # adaptive: {enabled, disagreement_threshold, boost_to}, min } | |
| "num_annotators_per_item": None, | |
| "min_annotators_per_instance": None, | |
| # Per-annotator workload caps: | |
| # { default: int, by_user: {user_id: int}, by_user_role: {role: int} } | |
| "per_annotator_quota": None, | |
| # qda_mode sub-keys are deliberately leaf (None): validation stops at | |
| # memos/codebook and does NOT recurse into their sub-keys. This is | |
| # intentional forward-compat — parse_qda_mode_config() routes any | |
| # unrecognized qda_mode.* keys into `extras` so configs can declare | |
| # not-yet-shipped blocks (cases/queries/smart_codes/network/media_sync) | |
| # without tripping unknown-key warnings. The tradeoff: a typo like | |
| # qda_mode.memos.enabledd is silently accepted. Revisit (deepen the | |
| # schema) once those sub-blocks ship and their shapes are stable. | |
| "qda_mode": { | |
| "enabled": None, | |
| "memos": None, | |
| "codebook": None, | |
| # Sub-blocks reserved for later phases: | |
| # "cases", "queries", "smart_codes", "network", "media_sync" | |
| }, | |
| # Universal annotation UI feature toggles (not QDA-gated). `memos` | |
| # turns the memo sidebar on/off (default off in standard mode, on for | |
| # qda_mode/solo_mode); `visibility` is the default new-memo visibility. | |
| "annotation_ui": { | |
| "memos": None, | |
| "visibility": None, | |
| }, | |
| # Universal full-text search (FTS5). Read-only admin search is always | |
| # safe; `annotator_claim` opt-in is governed by a startup | |
| # compatibility guard (see validate_search_assignment_compat). | |
| "search": { | |
| "enabled": None, | |
| "backend": None, | |
| "max_instances": None, | |
| "annotator_claim": None, | |
| }, | |
| # Universal codebook. `mode` (fixed|extensible|open) governs whether | |
| # annotators may add codes on the fly; resolved via | |
| # get_codebook_mode() (defaults: qda/solo -> open, standard -> fixed; | |
| # a crowd backend force-locks fixed). Per-scheme opt-in is the | |
| # scheme-level `codebook: true` key. | |
| "codebook": { | |
| "enabled": None, | |
| "mode": None, | |
| }, | |
| # Top-level convenience scalar mirroring codebook.mode. | |
| "codebook_mode": None, | |
| # In-vivo coding (D): single key that, with text selected in a | |
| # codebook-backed span scheme, opens the "code from selection" | |
| # composer. Default 'i'; only meaningful when a codebook span | |
| # scheme exists. (Schema value is None = scalar/any-value key; the | |
| # 'i' default lives in the defaults map, not here.) | |
| "codebook_invivo_key": None, | |
| # Universal cases: group instances into units of analysis. `key` | |
| # names the item-data field to group on; `auto_detect` lets QDA | |
| # scan participant_id/respondent_id/case_id; `attributes` lifts | |
| # item fields onto the case for crosstabs. | |
| "cases": { | |
| "enabled": None, | |
| "key": None, | |
| "auto_detect": None, | |
| "attributes": None, | |
| }, | |
| "solo_mode": { | |
| "enabled": None, | |
| "labeling_models": None, | |
| "revision_models": None, | |
| "embedding": None, | |
| "uncertainty": None, | |
| "thresholds": None, | |
| "instance_selection": None, | |
| "batches": None, | |
| "prompt_optimization": None, | |
| "edge_case_rules": None, | |
| "labeling_functions": None, | |
| "confidence_routing": None, | |
| "confusion_analysis": None, | |
| "state_dir": None, | |
| "refinement_loop": { | |
| "enabled", | |
| "trigger_interval", | |
| "min_improvement", | |
| "max_cycles", | |
| "patience", | |
| "auto_apply_suggestions", | |
| "refinement_strategy", | |
| "validation_split_ratio", | |
| "eval_sample_size", | |
| "num_candidates", | |
| "min_val_size", | |
| "max_consecutive_failures", | |
| "dry_run", | |
| "require_approval", | |
| "min_val_improvement", | |
| "eval_temperature", | |
| "prefer_consistent_disagreements", | |
| }, | |
| }, | |
| "admin_api_key": None, | |
| "alert_time_each_instance": None, | |
| "assignment_strategy": None, | |
| "reclaim_stale_assignments": None, | |
| "instance_reclaim": None, | |
| "max_session_seconds": None, | |
| "env_substitution": None, | |
| # === Internal (set by system, not user) === | |
| "config_file": None, | |
| "__config_file__": None, | |
| "_bws_pool_items": None, | |
| } | |
| def validate_unknown_keys(config_data, schema=None, path=""): | |
| """Recursively warn about unrecognized config keys and suggest corrections. | |
| Args: | |
| config_data: The config dict (or sub-dict) to validate. | |
| schema: The known-keys schema for this level (defaults to KNOWN_CONFIG_KEYS). | |
| path: Dot-separated path prefix for nested key reporting (e.g., "training"). | |
| """ | |
| if schema is None: | |
| schema = KNOWN_CONFIG_KEYS | |
| if not isinstance(config_data, dict): | |
| return | |
| known_keys = set(schema.keys()) if isinstance(schema, dict) else schema | |
| unknown_keys = set(config_data.keys()) - known_keys | |
| for key in sorted(unknown_keys): | |
| full_key = f"{path}.{key}" if path else key | |
| matches = difflib.get_close_matches(key, known_keys, n=3, cutoff=0.6) | |
| if matches: | |
| suggestions = ", ".join(f"'{m}'" for m in matches) | |
| logger.warning( | |
| "Unrecognized config key '%s'. Did you mean: %s?", | |
| full_key, suggestions | |
| ) | |
| else: | |
| logger.warning( | |
| "Unrecognized config key '%s'. This key will be ignored.", | |
| full_key | |
| ) | |
| # Recurse into nested dicts that have sub-key schemas | |
| if isinstance(schema, dict): | |
| for key, sub_schema in schema.items(): | |
| if sub_schema is not None and key in config_data: | |
| value = config_data[key] | |
| if isinstance(value, dict): | |
| child_path = f"{path}.{key}" if path else key | |
| if isinstance(sub_schema, dict): | |
| validate_unknown_keys(value, sub_schema, child_path) | |
| elif isinstance(sub_schema, set): | |
| validate_unknown_keys( | |
| value, {k: None for k in sub_schema}, child_path | |
| ) | |
| def validate_path_security(path: str, base_dir: str, project_dir: str = None) -> str: | |
| """ | |
| Validate that a path is secure and contained within the base directory. | |
| Args: | |
| path: The path to validate | |
| base_dir: The base directory that should contain the path | |
| project_dir: The project directory for final security check (if different from base_dir) | |
| Returns: | |
| The normalized absolute path if valid | |
| Raises: | |
| ConfigSecurityError: If the path is not secure | |
| """ | |
| # Check for encoded traversal patterns before normalization | |
| if '....' in path or '..%2F' in path or '..%5C' in path: | |
| raise ConfigSecurityError(f"Encoded path traversal detected in '{path}'. Encoded traversal patterns are not allowed for security reasons.") | |
| # Normalize the path | |
| normalized_path = os.path.normpath(path) | |
| # Check for malicious path traversal attempts | |
| # Allow legitimate relative paths like "../data/file.json" but block excessive traversal | |
| path_parts = normalized_path.split(os.sep) | |
| if path_parts.count('..') > 2: # Allow up to 2 levels of ".." for legitimate relative paths | |
| raise ConfigSecurityError(f"Excessive path traversal detected in '{path}'. Too many '..' components for security reasons.") | |
| # Check for absolute paths that might escape the project directory | |
| if os.path.isabs(normalized_path): | |
| # Only allow absolute paths that are within the base directory | |
| try: | |
| real_path = os.path.realpath(normalized_path) | |
| real_base = os.path.realpath(base_dir) | |
| if not (real_path == real_base or real_path.startswith(real_base + os.sep)): | |
| raise ConfigSecurityError(f"Path '{path}' resolves to '{real_path}' which is outside the project directory '{real_base}'") | |
| except (OSError, ValueError) as e: | |
| raise ConfigSecurityError(f"Invalid path '{path}': {str(e)}") | |
| # Resolve relative paths against base directory | |
| if not os.path.isabs(normalized_path): | |
| resolved_path = os.path.join(base_dir, normalized_path) | |
| normalized_path = os.path.normpath(resolved_path) | |
| # Final security check - ensure the resolved path is within the project directory | |
| try: | |
| real_path = os.path.realpath(normalized_path) | |
| # Use project_dir for final check if provided, otherwise use base_dir | |
| check_dir = project_dir if project_dir else base_dir | |
| real_check_dir = os.path.realpath(check_dir) | |
| if not (real_path == real_check_dir or real_path.startswith(real_check_dir + os.sep)): | |
| raise ConfigSecurityError(f"Path '{path}' resolves to '{real_path}' which is outside the project directory '{real_check_dir}'") | |
| except (OSError, ValueError) as e: | |
| raise ConfigSecurityError(f"Invalid path '{path}': {str(e)}") | |
| return normalized_path | |
| # Optional field type specifications for validation. | |
| # Maps config key -> (expected_type, human description, allow_negative). | |
| # Only fields that are commonly misconfigured and cause silent failures. | |
| _OPTIONAL_INT_FIELDS = { | |
| "alert_time_each_instance": ("seconds to alert per instance", False), | |
| "max_annotations_per_item": ("max annotations per item", True), # -1 = unlimited | |
| "max_annotations_per_user": ("max annotations per user", True), | |
| "min_annotators_per_instance": ("minimum annotators per instance", False), | |
| "random_seed": ("random seed", True), | |
| "max_session_seconds": ("max session duration in seconds", False), | |
| } | |
| # num_annotators_per_item validated separately — it may be int OR dict. | |
| _OPTIONAL_BOOL_FIELDS = { | |
| "highlight_linebreaks": "whether to highlight linebreaks", | |
| "jumping_to_id_disabled": "whether jumping to ID is disabled", | |
| "require_fully_annotated": "whether full annotation is required", | |
| "require_password": "whether password is required", | |
| "require_no_password": "whether no-password mode is enabled", | |
| "customjs": "whether custom JS is enabled", | |
| "watch_data_directory": "whether to watch data directory for changes", | |
| "persist_sessions": "whether to persist sessions across restarts", | |
| } | |
| _VALID_ASSIGNMENT_STRATEGIES = [ | |
| "random", "fixed_order", "active_learning", "llm_confidence", | |
| "max_diversity", "least_annotated", "category_based", "diversity_clustering", | |
| "batch", "priority", | |
| ] | |
| def validate_num_annotators_per_item(value: Any) -> None: | |
| """ | |
| Validate the shape of ``num_annotators_per_item``. | |
| Accepts either an int (legacy form) or a dict with optional keys | |
| ``default``, ``overlap_sample``, ``adaptive``, and ``min``. | |
| """ | |
| if value is None: | |
| return | |
| if isinstance(value, bool): | |
| raise ConfigValidationError( | |
| "'num_annotators_per_item' must be an integer or a structured mapping, " | |
| f"got bool: {value!r}" | |
| ) | |
| if isinstance(value, int): | |
| if value < 0: | |
| raise ConfigValidationError( | |
| "'num_annotators_per_item' as integer must be non-negative; " | |
| "use 0 or omit the key for unlimited (legacy used -1)." | |
| ) | |
| return | |
| if not isinstance(value, dict): | |
| raise ConfigValidationError( | |
| "'num_annotators_per_item' must be an integer or a mapping, " | |
| f"got {type(value).__name__}: {value!r}" | |
| ) | |
| allowed = {"default", "overlap_sample", "adaptive", "min"} | |
| unknown = set(value) - allowed | |
| if unknown: | |
| raise ConfigValidationError( | |
| f"Unknown keys in num_annotators_per_item: {sorted(unknown)}. " | |
| f"Allowed: {sorted(allowed)}" | |
| ) | |
| default = value.get("default", 1) | |
| if not isinstance(default, int) or isinstance(default, bool) or default < 1: | |
| raise ConfigValidationError( | |
| f"num_annotators_per_item.default must be a positive integer, got {default!r}" | |
| ) | |
| minimum = value.get("min") | |
| if minimum is not None: | |
| if not isinstance(minimum, int) or isinstance(minimum, bool) or minimum < 1: | |
| raise ConfigValidationError( | |
| f"num_annotators_per_item.min must be a positive integer, got {minimum!r}" | |
| ) | |
| if minimum > default: | |
| raise ConfigValidationError( | |
| "num_annotators_per_item.min cannot exceed num_annotators_per_item.default" | |
| ) | |
| overlap = value.get("overlap_sample") | |
| if overlap is not None: | |
| if not isinstance(overlap, dict): | |
| raise ConfigValidationError( | |
| "num_annotators_per_item.overlap_sample must be a mapping" | |
| ) | |
| unknown = set(overlap) - {"fraction", "count", "stratify_by", "seed"} | |
| if unknown: | |
| raise ConfigValidationError( | |
| f"Unknown keys in overlap_sample: {sorted(unknown)}" | |
| ) | |
| frac = overlap.get("fraction") | |
| if not isinstance(frac, (int, float)) or isinstance(frac, bool) or not (0 < frac <= 1): | |
| raise ConfigValidationError( | |
| f"overlap_sample.fraction must be in (0, 1], got {frac!r}" | |
| ) | |
| count = overlap.get("count") | |
| if not isinstance(count, int) or isinstance(count, bool) or count < 2: | |
| raise ConfigValidationError( | |
| f"overlap_sample.count must be an integer >= 2, got {count!r}" | |
| ) | |
| if count <= default: | |
| raise ConfigValidationError( | |
| "overlap_sample.count must be greater than num_annotators_per_item.default " | |
| f"({count} <= {default})" | |
| ) | |
| stratify_by = overlap.get("stratify_by") | |
| if stratify_by is not None and not isinstance(stratify_by, str): | |
| raise ConfigValidationError( | |
| f"overlap_sample.stratify_by must be a string or omitted, got {stratify_by!r}" | |
| ) | |
| seed = overlap.get("seed") | |
| if seed is not None and (not isinstance(seed, int) or isinstance(seed, bool)): | |
| raise ConfigValidationError( | |
| f"overlap_sample.seed must be an integer, got {seed!r}" | |
| ) | |
| adaptive = value.get("adaptive") | |
| if adaptive is not None: | |
| if not isinstance(adaptive, dict): | |
| raise ConfigValidationError( | |
| "num_annotators_per_item.adaptive must be a mapping" | |
| ) | |
| unknown = set(adaptive) - {"enabled", "disagreement_threshold", "boost_to"} | |
| if unknown: | |
| raise ConfigValidationError( | |
| f"Unknown keys in adaptive: {sorted(unknown)}" | |
| ) | |
| if "enabled" in adaptive and not isinstance(adaptive["enabled"], bool): | |
| raise ConfigValidationError( | |
| f"adaptive.enabled must be a boolean, got {adaptive['enabled']!r}" | |
| ) | |
| thr = adaptive.get("disagreement_threshold") | |
| if thr is not None and (not isinstance(thr, (int, float)) or isinstance(thr, bool) or not (0 <= thr <= 1)): | |
| raise ConfigValidationError( | |
| f"adaptive.disagreement_threshold must be in [0, 1], got {thr!r}" | |
| ) | |
| boost = adaptive.get("boost_to") | |
| if boost is not None: | |
| if not isinstance(boost, int) or isinstance(boost, bool) or boost < 2: | |
| raise ConfigValidationError( | |
| f"adaptive.boost_to must be an integer >= 2, got {boost!r}" | |
| ) | |
| if boost <= default: | |
| raise ConfigValidationError( | |
| f"adaptive.boost_to must exceed default ({boost} <= {default})" | |
| ) | |
| def validate_per_annotator_quota(value: Any) -> None: | |
| """Validate the shape of ``per_annotator_quota``.""" | |
| if value is None: | |
| return | |
| if not isinstance(value, dict): | |
| raise ConfigValidationError( | |
| "'per_annotator_quota' must be a mapping, " | |
| f"got {type(value).__name__}: {value!r}" | |
| ) | |
| allowed = {"default", "by_user", "by_user_role"} | |
| unknown = set(value) - allowed | |
| if unknown: | |
| raise ConfigValidationError( | |
| f"Unknown keys in per_annotator_quota: {sorted(unknown)}. Allowed: {sorted(allowed)}" | |
| ) | |
| default = value.get("default") | |
| if default is not None and (not isinstance(default, int) or isinstance(default, bool) or default < 0): | |
| raise ConfigValidationError( | |
| f"per_annotator_quota.default must be a non-negative integer, got {default!r}" | |
| ) | |
| for key in ("by_user", "by_user_role"): | |
| mapping = value.get(key) | |
| if mapping is None: | |
| continue | |
| if not isinstance(mapping, dict): | |
| raise ConfigValidationError( | |
| f"per_annotator_quota.{key} must be a mapping of name -> integer" | |
| ) | |
| for k, v in mapping.items(): | |
| if not isinstance(k, str) or not k: | |
| raise ConfigValidationError( | |
| f"per_annotator_quota.{key} keys must be non-empty strings, got {k!r}" | |
| ) | |
| if not isinstance(v, int) or isinstance(v, bool) or v < 0: | |
| raise ConfigValidationError( | |
| f"per_annotator_quota.{key}[{k!r}] must be a non-negative integer, got {v!r}" | |
| ) | |
| def resolve_num_annotators_per_item(config_data: Dict[str, Any]) -> int: | |
| """ | |
| Resolve the *default* cap (used as ``ItemStateManager.max_annotations_per_item``). | |
| Resolution order: | |
| 1. num_annotators_per_item (int form) → that value | |
| 2. num_annotators_per_item.default → that value | |
| 3. max_annotations_per_item (legacy) → that value | |
| 4. otherwise → -1 (unlimited) | |
| """ | |
| val = config_data.get("num_annotators_per_item") | |
| if isinstance(val, int) and not isinstance(val, bool): | |
| return val | |
| if isinstance(val, dict) and val.get("default") is not None: | |
| return int(val["default"]) | |
| legacy = config_data.get("max_annotations_per_item") | |
| if isinstance(legacy, int) and not isinstance(legacy, bool): | |
| return legacy | |
| return -1 | |
| def validate_optional_field_types(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate types for commonly misconfigured optional fields. | |
| Catches issues like string values for integer fields (e.g., alert_time_each_instance: "30") | |
| or wrong types for booleans, which would silently produce incorrect behavior at runtime. | |
| Args: | |
| config_data: The parsed configuration dictionary | |
| Raises: | |
| ConfigValidationError: If a field has the wrong type | |
| """ | |
| # Validate integer fields | |
| for field, (desc, allow_negative) in _OPTIONAL_INT_FIELDS.items(): | |
| if field in config_data: | |
| val = config_data[field] | |
| if not isinstance(val, int) or isinstance(val, bool): | |
| raise ConfigValidationError( | |
| f"'{field}' must be an integer ({desc}), " | |
| f"got {type(val).__name__}: {val!r}" | |
| ) | |
| if not allow_negative and val < 0: | |
| raise ConfigValidationError( | |
| f"'{field}' must be a non-negative integer ({desc}), got {val}" | |
| ) | |
| # Validate boolean fields (None/null is allowed as "not set") | |
| for field, desc in _OPTIONAL_BOOL_FIELDS.items(): | |
| if field in config_data: | |
| val = config_data[field] | |
| if val is not None and not isinstance(val, bool): | |
| raise ConfigValidationError( | |
| f"'{field}' must be a boolean ({desc}), " | |
| f"got {type(val).__name__}: {val!r}" | |
| ) | |
| # Validate num_annotators_per_item (int OR structured dict) | |
| if 'num_annotators_per_item' in config_data: | |
| validate_num_annotators_per_item(config_data['num_annotators_per_item']) | |
| # Validate per_annotator_quota structured dict | |
| if 'per_annotator_quota' in config_data: | |
| validate_per_annotator_quota(config_data['per_annotator_quota']) | |
| # Emit a deprecation warning if max_annotations_per_item is set alongside | |
| # num_annotators_per_item; reject silent inconsistencies (both set to | |
| # conflicting values). | |
| if 'max_annotations_per_item' in config_data and 'num_annotators_per_item' in config_data: | |
| legacy = config_data['max_annotations_per_item'] | |
| canonical = config_data['num_annotators_per_item'] | |
| canonical_int = canonical if isinstance(canonical, int) else canonical.get('default') | |
| if canonical_int is not None and legacy != canonical_int: | |
| raise ConfigValidationError( | |
| "'max_annotations_per_item' and 'num_annotators_per_item' are both " | |
| f"set with conflicting values ({legacy} vs {canonical_int}). " | |
| "Drop 'max_annotations_per_item' — 'num_annotators_per_item' is the canonical key." | |
| ) | |
| import warnings as _w | |
| _w.warn( | |
| "'max_annotations_per_item' is deprecated; use 'num_annotators_per_item' " | |
| "instead. Setting both is redundant.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| # Validate assignment_strategy enum | |
| if 'assignment_strategy' in config_data: | |
| strat = config_data['assignment_strategy'] | |
| # Can be a string or a dict with a 'name' key | |
| strat_name = strat | |
| if isinstance(strat, dict): | |
| strat_name = strat.get('name', '') | |
| if isinstance(strat_name, str) and strat_name.lower() not in _VALID_ASSIGNMENT_STRATEGIES: | |
| raise ConfigValidationError( | |
| f"'assignment_strategy' value '{strat_name}' is not recognized. " | |
| f"Valid strategies: {', '.join(_VALID_ASSIGNMENT_STRATEGIES)}" | |
| ) | |
| def validate_judge_calibration_config(config_data: Dict[str, Any]) -> None: | |
| """Validate the ``judge_calibration`` block when enabled. | |
| Delegates to the typed config's ``validate()`` (so the rules live in one | |
| place) and additionally cross-checks that referenced schema names exist in | |
| ``annotation_schemes``. Raises ConfigValidationError on hard errors. | |
| """ | |
| jc = config_data.get("judge_calibration") | |
| if not isinstance(jc, dict) or not jc.get("enabled"): | |
| return | |
| from potato.judge_calibration.config import parse_judge_calibration_config | |
| cfg = parse_judge_calibration_config(config_data) | |
| errors = cfg.validate() | |
| # Cross-check schema references against declared annotation_schemes. | |
| declared = { | |
| s.get("name") | |
| for s in (config_data.get("annotation_schemes") or []) | |
| if isinstance(s, dict) | |
| } | |
| for name in cfg.schemas: | |
| if name not in declared: | |
| errors.append( | |
| f"judge_calibration.schemas references unknown scheme '{name}' " | |
| f"(declared: {sorted(n for n in declared if n)})" | |
| ) | |
| if errors: | |
| raise ConfigValidationError( | |
| "Invalid judge_calibration configuration:\n - " + "\n - ".join(errors) | |
| ) | |
| def validate_yaml_structure(config_data: Dict[str, Any], project_dir: str = None, config_file_dir: str = None) -> None: | |
| """ | |
| Validate the structure and content of the YAML configuration. | |
| Args: | |
| config_data: The parsed YAML configuration | |
| project_dir: The project directory | |
| config_file_dir: The directory containing the config file | |
| Raises: | |
| ConfigValidationError: If the configuration is invalid | |
| """ | |
| if not isinstance(config_data, dict): | |
| raise ConfigValidationError("Configuration must be a YAML object (dictionary)") | |
| # Required fields validation. NOTE: 'data_files' is intentionally NOT here — | |
| # it is one of three mutually-acceptable data sources (data_files / | |
| # data_directory / data_sources), enforced by the dedicated check below. | |
| # Listing it here unconditionally made data_directory- and data_sources-only | |
| # configs fail validation before that smarter check could run (F-038). | |
| required_fields = [ | |
| 'item_properties', | |
| 'task_dir', | |
| 'output_annotation_dir', | |
| 'annotation_task_name', | |
| ] | |
| missing_fields = [field for field in required_fields if field not in config_data] | |
| if missing_fields: | |
| raise ConfigValidationError(f"Missing required configuration fields: {', '.join(missing_fields)}") | |
| # Validate item_properties | |
| item_properties = config_data.get('item_properties', {}) | |
| if not isinstance(item_properties, dict): | |
| raise ConfigValidationError("item_properties must be a dictionary") | |
| required_item_props = ['id_key', 'text_key'] | |
| missing_item_props = [prop for prop in required_item_props if prop not in item_properties] | |
| if missing_item_props: | |
| raise ConfigValidationError(f"Missing required item_properties: {', '.join(missing_item_props)}") | |
| # Validate optional category_key (for category-based assignment) | |
| if 'category_key' in item_properties: | |
| category_key = item_properties['category_key'] | |
| if not isinstance(category_key, str) or not category_key.strip(): | |
| raise ConfigValidationError("item_properties.category_key must be a non-empty string") | |
| # Validate data_files (required unless data_directory or data_sources is provided) | |
| data_files = config_data.get('data_files', []) | |
| data_directory = config_data.get('data_directory') | |
| data_sources = config_data.get('data_sources') | |
| if not isinstance(data_files, list): | |
| raise ConfigValidationError("data_files must be a list") | |
| # data_files can be empty if data_directory or data_sources is configured | |
| if not data_files and not data_directory and not data_sources: | |
| raise ConfigValidationError( | |
| "At least one data source must be configured: " | |
| "'data_files', 'data_directory', or 'data_sources'" | |
| ) | |
| # Validate data_sources configuration if present | |
| if data_sources: | |
| validate_data_sources_config(config_data) | |
| # Validate server config if present | |
| validate_server_config(config_data) | |
| # Validate authentication config if present | |
| validate_authentication_config(config_data) | |
| # Validate data_directory config if present | |
| validate_data_directory_config(config_data) | |
| # Validate annotation schemes | |
| validate_annotation_schemes(config_data) | |
| # Validate training configuration if present | |
| validate_training_config(config_data, project_dir, config_file_dir) | |
| # Validate database configuration if present | |
| if 'database' in config_data: | |
| validate_database_config(config_data['database']) | |
| # Validate active learning configuration if present | |
| validate_active_learning_config(config_data) | |
| # Validate AI support configuration if present | |
| validate_ai_support_config(config_data) | |
| # Validate chat support configuration if present | |
| validate_chat_support_config(config_data) | |
| # Validate category assignment configuration if present | |
| validate_category_assignment_config(config_data) | |
| # Validate batch assignment configuration if present | |
| validate_batch_assignment_config(config_data) | |
| # Validate diversity ordering configuration if present | |
| validate_diversity_config(config_data) | |
| # Validate embedding visualization configuration if present | |
| validate_embedding_visualization_config(config_data) | |
| # Validate adjudication configuration if present | |
| if 'adjudication' in config_data: | |
| validate_adjudication_config(config_data) | |
| # Validate quality control configuration if present | |
| validate_quality_control_config(config_data) | |
| # Validate assignment reclaim configuration if present | |
| validate_instance_reclaim_config(config_data) | |
| # Validate instance display configuration if present | |
| validate_instance_display_config(config_data) | |
| # Validate format_handling configuration if present | |
| validate_format_handling_config(config_data) | |
| # Validate layout configuration if present | |
| validate_layout_config(config_data) | |
| # Validate BWS configuration if present | |
| if 'bws_config' in config_data: | |
| _validate_bws_config(config_data) | |
| # Validate IBWS configuration if present | |
| if 'ibws_config' in config_data: | |
| _validate_ibws_config(config_data) | |
| # Validate MACE configuration if present | |
| if 'mace' in config_data: | |
| _validate_mace_config(config_data) | |
| # Validate types for commonly misconfigured optional fields | |
| validate_optional_field_types(config_data) | |
| # Fail loud if annotator search-and-claim is combined with an | |
| # assignment design it would corrupt via self-selection. | |
| validate_search_assignment_compat(config_data) | |
| # Validate codebook_mode (and apply the crowd force-lock). | |
| validate_codebook_config(config_data) | |
| # Validate judge_calibration configuration if present | |
| validate_judge_calibration_config(config_data) | |
| # Warn about unrecognized keys at all nesting levels | |
| validate_unknown_keys(config_data) | |
| # Assignment strategies whose sampling/ordering self-selection breaks. | |
| _CLAIM_INCOMPATIBLE_STRATEGIES = { | |
| "random", "diversity_clustering", "max_diversity", | |
| "active_learning", "llm_confidence", "least_annotated", | |
| "category_based", "batch", | |
| } | |
| def validate_search_assignment_compat(config_data: Dict[str, Any]) -> None: | |
| """Hard-fail when ``search.annotator_claim`` is combined with a | |
| feature whose integrity depends on the platform — not the annotator — | |
| choosing the next item. Read-only admin search is unaffected. | |
| Solo/QDA mode (single coder over the whole corpus) is always allowed. | |
| """ | |
| search = config_data.get("search") | |
| if not isinstance(search, dict) or not search.get("annotator_claim"): | |
| return | |
| # Single-coder modes have no sampling/overlap invariant to protect. | |
| if (config_data.get("qda_mode") or {}).get("enabled") or \ | |
| (config_data.get("solo_mode") or {}).get("enabled"): | |
| return | |
| conflicts = [] | |
| strat = config_data.get("assignment_strategy") | |
| if isinstance(strat, dict): | |
| strat = strat.get("name") | |
| if strat and str(strat).lower() in _CLAIM_INCOMPATIBLE_STRATEGIES: | |
| conflicts.append( | |
| f"assignment_strategy: {strat} (self-selection breaks " | |
| f"sampling/ordering)") | |
| for k in ("max_annotations_per_item", "num_annotators_per_item", | |
| "min_annotators_per_instance"): | |
| raw = config_data.get(k, -1) | |
| # num_annotators_per_item may now be a dict — extract default + overlap_sample.count | |
| candidates = [] | |
| if isinstance(raw, dict): | |
| if raw.get("default") is not None: | |
| candidates.append(raw["default"]) | |
| overlap = raw.get("overlap_sample") or {} | |
| if overlap.get("count") is not None: | |
| candidates.append(overlap["count"]) | |
| else: | |
| candidates.append(raw) | |
| for cand in candidates: | |
| try: | |
| if int(cand) > 1: | |
| conflicts.append( | |
| f"{k}: {config_data[k]} (inter-annotator overlap " | |
| f"cannot be guaranteed under self-selection)") | |
| break | |
| except (TypeError, ValueError): | |
| continue | |
| if (config_data.get("attention_checks") or {}).get("enabled"): | |
| conflicts.append("attention_checks.enabled (annotators could " | |
| "locate/avoid QC items)") | |
| if (config_data.get("gold_standards") or {}).get("enabled"): | |
| conflicts.append("gold_standards.enabled (annotators could " | |
| "locate/avoid gold items)") | |
| if (config_data.get("icl_labeling") or {}).get("enabled"): | |
| conflicts.append("icl_labeling.enabled (blind LLM-verification " | |
| "tasks must not be findable)") | |
| if (config_data.get("adjudication") or {}).get("enabled"): | |
| conflicts.append("adjudication.enabled (the adjudication queue " | |
| "is curated)") | |
| login_type = (config_data.get("login") or {}).get("type") | |
| crowd = ( | |
| "mturk" in config_data or "prolific" in config_data | |
| or login_type in ("mturk", "prolific") | |
| ) | |
| if crowd: | |
| conflicts.append("crowdsourcing backend (HIT = the assigned " | |
| "unit; self-selection breaks payment/coverage)") | |
| if conflicts: | |
| raise ConfigValidationError( | |
| "search.annotator_claim: true is incompatible with this " | |
| "configuration:\n - " + "\n - ".join(conflicts) + | |
| "\n\nAnnotator search-and-claim is only supported with " | |
| "solo_mode/qda_mode, or fixed_order assignment without " | |
| "overlap, quality-control injection, ICL verification, " | |
| "adjudication, or a crowdsourcing backend. Use read-only " | |
| "admin search (no annotator_claim) for those designs." | |
| ) | |
| _CODEBOOK_MODES = ("fixed", "extensible", "open") | |
| def _crowd_backend(config_data: Dict[str, Any]) -> bool: | |
| login_type = (config_data.get("login") or {}).get("type") | |
| return ( | |
| "mturk" in config_data or "prolific" in config_data | |
| or login_type in ("mturk", "prolific") | |
| ) | |
| def get_codebook_mode(config_data: Dict[str, Any]) -> str: | |
| """Resolve the effective codebook mode. | |
| Precedence: explicit ``codebook_mode`` / ``codebook.mode`` if set; | |
| else ``open`` when solo/QDA mode is enabled; else ``fixed``. A crowd | |
| backend force-locks ``fixed`` regardless of the request (annotators | |
| on a paid HIT must not reshape the shared codebook). | |
| """ | |
| raw = config_data.get("codebook_mode") | |
| if raw is None: | |
| raw = (config_data.get("codebook") or {}).get("mode") | |
| if raw is None: | |
| single = ( | |
| (config_data.get("qda_mode") or {}).get("enabled") | |
| or (config_data.get("solo_mode") or {}).get("enabled") | |
| ) | |
| mode = "open" if single else "fixed" | |
| else: | |
| mode = str(raw).strip().lower() | |
| if _crowd_backend(config_data): | |
| return "fixed" | |
| return mode | |
| def validate_codebook_config(config_data: Dict[str, Any]) -> None: | |
| """Reject an invalid ``codebook_mode`` value, and warn when a crowd | |
| backend overrides a requested non-fixed mode.""" | |
| raw = config_data.get("codebook_mode") | |
| if raw is None: | |
| raw = (config_data.get("codebook") or {}).get("mode") | |
| if raw is None: | |
| return | |
| mode = str(raw).strip().lower() | |
| if mode not in _CODEBOOK_MODES: | |
| raise ConfigValidationError( | |
| f"codebook_mode must be one of {', '.join(_CODEBOOK_MODES)}; " | |
| f"got {raw!r}." | |
| ) | |
| if mode != "fixed" and _crowd_backend(config_data): | |
| logging.warning( | |
| "codebook_mode=%s requested with a crowdsourcing backend; " | |
| "force-locking to 'fixed' (paid annotators must not reshape " | |
| "the shared codebook).", mode) | |
| def validate_annotation_schemes(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate annotation schemes configuration. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If annotation schemes are invalid | |
| """ | |
| has_top_level = 'annotation_schemes' in config_data | |
| has_phases = 'phases' in config_data and config_data['phases'] | |
| # Check for conflicting annotation_schemes locations | |
| if has_top_level and has_phases: | |
| # Check if any phase also has annotation_schemes | |
| phases = config_data['phases'] | |
| phases_with_schemes = [] | |
| if isinstance(phases, list): | |
| phases_with_schemes = [ | |
| phase.get('name', f'phase[{i}]') | |
| for i, phase in enumerate(phases) | |
| if 'annotation_schemes' in phase | |
| ] | |
| elif isinstance(phases, dict): | |
| phases_with_schemes = [ | |
| name for name, phase in phases.items() | |
| if name != 'order' and isinstance(phase, dict) and 'annotation_schemes' in phase | |
| ] | |
| if phases_with_schemes: | |
| raise ConfigValidationError( | |
| f"Configuration has both top-level 'annotation_schemes' and phase-level " | |
| f"'annotation_schemes' in: {', '.join(phases_with_schemes)}. " | |
| f"Use only one location to avoid confusion." | |
| ) | |
| # Check for annotation schemes in different formats | |
| if has_top_level: | |
| schemes = config_data['annotation_schemes'] | |
| if not isinstance(schemes, list): | |
| raise ConfigValidationError("annotation_schemes must be a list") | |
| if not schemes: | |
| raise ConfigValidationError("annotation_schemes cannot be empty") | |
| for i, scheme in enumerate(schemes): | |
| validate_single_annotation_scheme(scheme, f"annotation_schemes[{i}]") | |
| elif 'phases' in config_data and config_data['phases']: | |
| phases = config_data['phases'] | |
| if isinstance(phases, list): | |
| for i, phase in enumerate(phases): | |
| phase_id = phase.get('name', f'phase[{i}]') | |
| # Phases can have annotation_schemes, file, type, instrument, or instruments | |
| if 'annotation_schemes' in phase: | |
| schemes = phase['annotation_schemes'] | |
| if not isinstance(schemes, list): | |
| raise ConfigValidationError(f"Phase {phase_id} annotation_schemes must be a list") | |
| if not schemes: | |
| raise ConfigValidationError(f"Phase {phase_id} annotation_schemes cannot be empty") | |
| for j, scheme in enumerate(schemes): | |
| validate_single_annotation_scheme(scheme, f"phases[{i}].annotation_schemes[{j}]") | |
| elif 'file' in phase or 'type' in phase or 'instrument' in phase or 'instruments' in phase: | |
| # Legacy format or instrument-based - validated at runtime | |
| _validate_phase_instruments(phase, phase_id) | |
| else: | |
| raise ConfigValidationError( | |
| f"Phase {phase_id} requires 'annotation_schemes', 'file', 'type', " | |
| f"'instrument', or 'instruments'" | |
| ) | |
| else: | |
| # Dictionary format | |
| for phase_name, phase in phases.items(): | |
| if phase_name == 'order': | |
| continue | |
| # Phases can have annotation_schemes, file, type, instrument, or instruments | |
| if 'annotation_schemes' in phase: | |
| schemes = phase['annotation_schemes'] | |
| if not isinstance(schemes, list): | |
| raise ConfigValidationError(f"Phase {phase_name} annotation_schemes must be a list") | |
| if not schemes: | |
| raise ConfigValidationError(f"Phase {phase_name} annotation_schemes cannot be empty") | |
| for j, scheme in enumerate(schemes): | |
| validate_single_annotation_scheme(scheme, f"phases.{phase_name}.annotation_schemes[{j}]") | |
| elif 'file' in phase or 'type' in phase or 'instrument' in phase or 'instruments' in phase: | |
| # Legacy format or instrument-based - validated at runtime | |
| _validate_phase_instruments(phase, phase_name) | |
| else: | |
| raise ConfigValidationError( | |
| f"Phase {phase_name} requires 'annotation_schemes', 'file', 'type', " | |
| f"'instrument', or 'instruments'" | |
| ) | |
| else: | |
| raise ConfigValidationError("Config must have either 'annotation_schemes' (top-level) or 'phases' with annotation_schemes") | |
| # Validate keyword_highlight is not enabled for image-based tasks | |
| _validate_keyword_highlight_for_images(config_data) | |
| # Validate display_logic cross-references (schema references and circular dependencies) | |
| all_schemes = _collect_all_annotation_schemes(config_data) | |
| if all_schemes: | |
| validate_display_logic_references(all_schemes) | |
| def _collect_all_annotation_schemes(config_data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Collect all annotation schemes from config, whether top-level or in phases. | |
| Args: | |
| config_data: The configuration data | |
| Returns: | |
| List of all annotation scheme dictionaries | |
| """ | |
| schemes = [] | |
| if 'annotation_schemes' in config_data: | |
| schemes.extend(config_data['annotation_schemes']) | |
| elif 'phases' in config_data: | |
| phases = config_data['phases'] | |
| if isinstance(phases, list): | |
| for phase in phases: | |
| if 'annotation_schemes' in phase: | |
| schemes.extend(phase['annotation_schemes']) | |
| elif isinstance(phases, dict): | |
| for phase_name, phase in phases.items(): | |
| if phase_name != 'order' and isinstance(phase, dict): | |
| if 'annotation_schemes' in phase: | |
| schemes.extend(phase['annotation_schemes']) | |
| return schemes | |
| def _validate_keyword_highlight_for_images(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate that keyword_highlight is not enabled for image-based tasks. | |
| Keyword highlighting highlights text in the instance content, which doesn't | |
| make sense for images. This validation catches configuration errors early. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If keyword_highlight is enabled for an image task | |
| """ | |
| # Check if the text_key suggests this is an image-based task | |
| text_key = config_data.get('item_properties', {}).get('text_key', 'text') | |
| image_indicators = ['image', 'img', 'photo', 'picture', 'url'] | |
| is_likely_image_task = any(indicator in text_key.lower() for indicator in image_indicators) | |
| if not is_likely_image_task: | |
| return # Not an image task, no need to check | |
| # Get all annotation schemes | |
| schemes = [] | |
| if 'annotation_schemes' in config_data: | |
| schemes = config_data['annotation_schemes'] | |
| elif 'phases' in config_data: | |
| phases = config_data['phases'] | |
| if isinstance(phases, list): | |
| for phase in phases: | |
| schemes.extend(phase.get('annotation_schemes', [])) | |
| elif isinstance(phases, dict): | |
| for phase_name, phase in phases.items(): | |
| if phase_name != 'order' and isinstance(phase, dict): | |
| schemes.extend(phase.get('annotation_schemes', [])) | |
| # Check each scheme for keyword_highlight | |
| for i, scheme in enumerate(schemes): | |
| if not isinstance(scheme, dict): | |
| continue | |
| ai_support = scheme.get('ai_support', {}) | |
| if not isinstance(ai_support, dict): | |
| continue | |
| features = ai_support.get('features', {}) | |
| if not isinstance(features, dict): | |
| continue | |
| keyword_highlight = features.get('keyword_highlight', False) | |
| if keyword_highlight: | |
| scheme_name = scheme.get('name', f'scheme[{i}]') | |
| raise ConfigValidationError( | |
| f"annotation_schemes.{scheme_name}.ai_support.features.keyword_highlight is enabled, " | |
| f"but item_properties.text_key='{text_key}' suggests this is an image-based task. " | |
| f"Keyword highlighting only works with text content, not images. " | |
| f"Set keyword_highlight: false or remove it from the ai_support features." | |
| ) | |
| def _validate_bws_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate Best-Worst Scaling configuration. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the BWS config is invalid | |
| """ | |
| bws = config_data['bws_config'] | |
| if not isinstance(bws, dict): | |
| raise ConfigValidationError("bws_config must be a dictionary") | |
| if 'tuple_size' in bws: | |
| if not isinstance(bws['tuple_size'], int) or bws['tuple_size'] < 2: | |
| raise ConfigValidationError("bws_config.tuple_size must be an integer >= 2") | |
| if 'seed' in bws: | |
| if not isinstance(bws['seed'], int): | |
| raise ConfigValidationError("bws_config.seed must be an integer") | |
| if 'num_tuples' in bws and bws['num_tuples'] is not None: | |
| if not isinstance(bws['num_tuples'], int) or bws['num_tuples'] < 1: | |
| raise ConfigValidationError("bws_config.num_tuples must be a positive integer or null") | |
| if 'min_item_appearances' in bws and bws['min_item_appearances'] is not None: | |
| if not isinstance(bws['min_item_appearances'], int) or bws['min_item_appearances'] < 1: | |
| raise ConfigValidationError("bws_config.min_item_appearances must be a positive integer or null") | |
| # Validate scoring config if present | |
| scoring = bws.get('scoring', {}) | |
| if scoring: | |
| if not isinstance(scoring, dict): | |
| raise ConfigValidationError("bws_config.scoring must be a dictionary") | |
| valid_methods = ['counting', 'bradley_terry', 'plackett_luce'] | |
| method = scoring.get('method', 'counting') | |
| if method not in valid_methods: | |
| raise ConfigValidationError(f"bws_config.scoring.method must be one of: {valid_methods}") | |
| def _validate_ibws_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate Iterative Best-Worst Scaling configuration. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the IBWS config is invalid | |
| """ | |
| # Mutual exclusivity with bws_config | |
| if 'bws_config' in config_data: | |
| raise ConfigValidationError( | |
| "ibws_config and bws_config are mutually exclusive. " | |
| "Use ibws_config for iterative BWS or bws_config for standard BWS." | |
| ) | |
| ibws = config_data['ibws_config'] | |
| if not isinstance(ibws, dict): | |
| raise ConfigValidationError("ibws_config must be a dictionary") | |
| # Require at least one BWS annotation scheme | |
| schemes = config_data.get('annotation_schemes', []) | |
| has_bws_scheme = any(s.get('annotation_type') == 'bws' for s in schemes) | |
| if not has_bws_scheme: | |
| raise ConfigValidationError( | |
| "ibws_config requires at least one annotation scheme with annotation_type: bws" | |
| ) | |
| # tuple_size | |
| if 'tuple_size' in ibws: | |
| if not isinstance(ibws['tuple_size'], int) or ibws['tuple_size'] < 2: | |
| raise ConfigValidationError("ibws_config.tuple_size must be an integer >= 2") | |
| # max_rounds | |
| if 'max_rounds' in ibws and ibws['max_rounds'] is not None: | |
| if not isinstance(ibws['max_rounds'], int) or ibws['max_rounds'] < 1: | |
| raise ConfigValidationError("ibws_config.max_rounds must be a positive integer or null") | |
| # seed | |
| if 'seed' in ibws: | |
| if not isinstance(ibws['seed'], int): | |
| raise ConfigValidationError("ibws_config.seed must be an integer") | |
| # scoring_method | |
| valid_methods = ['counting', 'bradley_terry', 'plackett_luce'] | |
| if 'scoring_method' in ibws: | |
| if ibws['scoring_method'] not in valid_methods: | |
| raise ConfigValidationError( | |
| f"ibws_config.scoring_method must be one of: {valid_methods}" | |
| ) | |
| # tuples_per_item_per_round | |
| if 'tuples_per_item_per_round' in ibws: | |
| val = ibws['tuples_per_item_per_round'] | |
| if not isinstance(val, int) or val < 1: | |
| raise ConfigValidationError( | |
| "ibws_config.tuples_per_item_per_round must be a positive integer" | |
| ) | |
| def _validate_mace_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate MACE competence estimation configuration. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the MACE config is invalid | |
| """ | |
| mace = config_data.get('mace', {}) | |
| if not isinstance(mace, dict): | |
| raise ConfigValidationError("mace must be a dictionary") | |
| if not mace.get('enabled', False): | |
| return # Not enabled, skip validation | |
| # Validate numeric parameters | |
| min_annots = mace.get('min_annotations_per_item', 3) | |
| if not isinstance(min_annots, int) or min_annots < 2: | |
| raise ConfigValidationError( | |
| "mace.min_annotations_per_item must be an integer >= 2" | |
| ) | |
| trigger_n = mace.get('trigger_every_n', 10) | |
| if not isinstance(trigger_n, int) or trigger_n < 1: | |
| raise ConfigValidationError( | |
| "mace.trigger_every_n must be an integer >= 1" | |
| ) | |
| num_restarts = mace.get('num_restarts', 10) | |
| if not isinstance(num_restarts, int) or num_restarts < 1: | |
| raise ConfigValidationError( | |
| "mace.num_restarts must be an integer >= 1" | |
| ) | |
| # Warn if no categorical schemas are defined | |
| categorical_types = {'radio', 'likert', 'select', 'multiselect'} | |
| schemes = config_data.get('annotation_schemes', []) | |
| has_categorical = any( | |
| s.get('annotation_type', '') in categorical_types | |
| for s in schemes if isinstance(s, dict) | |
| ) | |
| if not has_categorical: | |
| logger.warning( | |
| "MACE is enabled but no categorical annotation schemes " | |
| "(radio, likert, select, multiselect) are defined. " | |
| "MACE will have no data to process." | |
| ) | |
| def _validate_phase_instruments(phase: Dict[str, Any], phase_name: str) -> None: | |
| """ | |
| Validate instrument references in a phase configuration. | |
| Args: | |
| phase: The phase configuration | |
| phase_name: Name of the phase for error messages | |
| Raises: | |
| ConfigValidationError: If instrument references are invalid | |
| """ | |
| # Validate single instrument reference | |
| if 'instrument' in phase: | |
| inst_id = phase['instrument'] | |
| if not isinstance(inst_id, str): | |
| raise ConfigValidationError( | |
| f"Phase {phase_name}: 'instrument' must be a string" | |
| ) | |
| try: | |
| from potato.survey_instruments import get_registry | |
| registry = get_registry() | |
| if inst_id not in registry['instruments']: | |
| available = sorted(registry['instruments'].keys())[:10] | |
| raise ConfigValidationError( | |
| f"Phase {phase_name}: Unknown instrument '{inst_id}'. " | |
| f"Available instruments: {available}..." | |
| ) | |
| except ImportError: | |
| # survey_instruments module not available - skip validation | |
| pass | |
| # Validate multiple instruments | |
| if 'instruments' in phase: | |
| inst_list = phase['instruments'] | |
| if not isinstance(inst_list, list): | |
| raise ConfigValidationError( | |
| f"Phase {phase_name}: 'instruments' must be a list" | |
| ) | |
| try: | |
| from potato.survey_instruments import get_registry | |
| registry = get_registry() | |
| for inst_id in inst_list: | |
| if not isinstance(inst_id, str): | |
| raise ConfigValidationError( | |
| f"Phase {phase_name}: All items in 'instruments' must be strings" | |
| ) | |
| if inst_id not in registry['instruments']: | |
| available = sorted(registry['instruments'].keys())[:10] | |
| raise ConfigValidationError( | |
| f"Phase {phase_name}: Unknown instrument '{inst_id}'. " | |
| f"Available instruments: {available}..." | |
| ) | |
| except ImportError: | |
| # survey_instruments module not available - skip validation | |
| pass | |
| def validate_single_annotation_scheme(scheme: Dict[str, Any], path: str) -> None: | |
| """ | |
| Validate a single annotation scheme. | |
| Args: | |
| scheme: The annotation scheme to validate | |
| path: The path in the config for error reporting | |
| Raises: | |
| ConfigValidationError: If the scheme is invalid | |
| """ | |
| if not isinstance(scheme, dict): | |
| raise ConfigValidationError(f"{path} must be a dictionary") | |
| required_fields = ['annotation_type', 'name', 'description'] | |
| missing_fields = [field for field in required_fields if field not in scheme] | |
| if missing_fields: | |
| raise ConfigValidationError(f"{path} missing required fields: {', '.join(missing_fields)}") | |
| # Validate annotation_type against the schema registry (single source of truth) | |
| from potato.server_utils.schemas.registry import schema_registry | |
| valid_types = schema_registry.get_supported_types() | |
| if scheme['annotation_type'] not in valid_types: | |
| raise ConfigValidationError(f"{path}.annotation_type must be one of: {', '.join(sorted(valid_types))}") | |
| # Registry-driven required field check: validate fields that are unconditionally | |
| # required for this type. Types with alternative forms (e.g., likert accepts either | |
| # 'labels' OR 'min_label'+'max_label'+'size') have deeper validation in the | |
| # type-specific blocks below. This check catches missing fields for types that | |
| # don't have explicit type-specific validation blocks. | |
| annotation_type = scheme['annotation_type'] | |
| _types_with_explicit_validation = { | |
| 'radio', 'multiselect', 'select', 'likert', 'slider', 'span', 'multirate', | |
| 'image_annotation', 'audio_annotation', 'video_annotation', 'tiered_annotation', | |
| 'pairwise', 'bws', 'soft_label', 'confidence', 'constant_sum', | |
| 'semantic_differential', 'ranking', 'range_slider', 'hierarchical_multiselect', | |
| 'vas', 'rubric_eval', 'error_span', 'card_sort', 'conjoint', | |
| } | |
| if annotation_type not in _types_with_explicit_validation: | |
| schema_def = schema_registry.get(annotation_type) | |
| if schema_def and schema_def.required_fields: | |
| # 'name' and 'description' are already checked above | |
| extra_required = [f for f in schema_def.required_fields | |
| if f not in ('name', 'description')] | |
| missing = [f for f in extra_required if f not in scheme] | |
| if missing: | |
| raise ConfigValidationError( | |
| f"{path} (type '{annotation_type}') missing required field(s): " | |
| f"{', '.join(missing)}" | |
| ) | |
| # Type-specific validation (deep structural checks beyond registry required_fields) | |
| if annotation_type in ['radio', 'multiselect', 'select']: | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for {annotation_type} annotation type") | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels cannot be empty") | |
| elif annotation_type == 'likert': | |
| # Likert can use labels (falls back to radio) or min_label/max_label/size | |
| if 'labels' not in scheme: | |
| required_likert_fields = ['min_label', 'max_label', 'size'] | |
| missing_likert_fields = [field for field in required_likert_fields if field not in scheme] | |
| if missing_likert_fields: | |
| raise ConfigValidationError(f"{path} missing required fields for likert: {', '.join(missing_likert_fields)}") | |
| if not isinstance(scheme['size'], int) or scheme['size'] < 2: | |
| raise ConfigValidationError(f"{path}.size must be an integer >= 2") | |
| elif annotation_type == 'slider': | |
| # Slider can use labels (falls back to radio) or min_value/max_value | |
| if 'labels' not in scheme: | |
| required_slider_fields = ['min_value', 'max_value', 'starting_value'] | |
| missing_slider_fields = [field for field in required_slider_fields if field not in scheme] | |
| if missing_slider_fields: | |
| raise ConfigValidationError(f"{path} missing required fields for slider: {', '.join(missing_slider_fields)}") | |
| if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): | |
| raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") | |
| if scheme['min_value'] >= scheme['max_value']: | |
| raise ConfigValidationError(f"{path}.min_value must be less than max_value") | |
| elif annotation_type == 'span': | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for span annotation type") | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels cannot be empty") | |
| elif annotation_type == 'multirate': | |
| # multirate requires 'labels' always, and either 'options' or 'options_from_data' | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing required field for multirate: labels") | |
| has_options = 'options' in scheme | |
| has_options_from_data = 'options_from_data' in scheme | |
| if not has_options and not has_options_from_data: | |
| raise ConfigValidationError(f"{path} must have either 'options' or 'options_from_data' for multirate") | |
| if has_options: | |
| if not isinstance(scheme['options'], list): | |
| raise ConfigValidationError(f"{path}.options must be a list") | |
| if not scheme['options']: | |
| raise ConfigValidationError(f"{path}.options cannot be empty") | |
| if has_options_from_data: | |
| if not isinstance(scheme['options_from_data'], str) or not scheme['options_from_data'].strip(): | |
| raise ConfigValidationError(f"{path}.options_from_data must be a non-empty string (instance data field name)") | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels cannot be empty") | |
| elif annotation_type == 'image_annotation': | |
| # Image annotation requires tools and labels | |
| if 'tools' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'tools' field for image_annotation type") | |
| if not isinstance(scheme['tools'], list): | |
| raise ConfigValidationError(f"{path}.tools must be a list") | |
| if not scheme['tools']: | |
| raise ConfigValidationError(f"{path}.tools cannot be empty") | |
| # Validate tools | |
| valid_tools = ['bbox', 'polygon', 'freeform', 'landmark', 'fill', 'eraser', 'brush'] | |
| invalid_tools = [t for t in scheme['tools'] if t not in valid_tools] | |
| if invalid_tools: | |
| raise ConfigValidationError(f"{path}.tools contains invalid values: {invalid_tools}. Valid tools are: {valid_tools}") | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for image_annotation type") | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels cannot be empty") | |
| # Validate optional numeric fields | |
| if 'min_annotations' in scheme: | |
| if not isinstance(scheme['min_annotations'], int) or scheme['min_annotations'] < 0: | |
| raise ConfigValidationError(f"{path}.min_annotations must be a non-negative integer") | |
| if 'max_annotations' in scheme and scheme['max_annotations'] is not None: | |
| if not isinstance(scheme['max_annotations'], int) or scheme['max_annotations'] < 1: | |
| raise ConfigValidationError(f"{path}.max_annotations must be a positive integer or null") | |
| elif annotation_type == 'audio_annotation': | |
| # Validate mode | |
| valid_modes = ['label', 'questions', 'both'] | |
| mode = scheme.get('mode', 'label') | |
| if mode not in valid_modes: | |
| raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") | |
| # Validate labels for label/both modes | |
| if mode in ['label', 'both']: | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for audio_annotation mode '{mode}'") | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels cannot be empty for mode '{mode}'") | |
| # Validate segment_schemes for questions/both modes | |
| if mode in ['questions', 'both']: | |
| if 'segment_schemes' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'segment_schemes' field for audio_annotation mode '{mode}'") | |
| if not isinstance(scheme['segment_schemes'], list): | |
| raise ConfigValidationError(f"{path}.segment_schemes must be a list") | |
| if not scheme['segment_schemes']: | |
| raise ConfigValidationError(f"{path}.segment_schemes cannot be empty for mode '{mode}'") | |
| # Validate optional numeric fields | |
| if 'min_segments' in scheme: | |
| if not isinstance(scheme['min_segments'], int) or scheme['min_segments'] < 0: | |
| raise ConfigValidationError(f"{path}.min_segments must be a non-negative integer") | |
| if 'max_segments' in scheme and scheme['max_segments'] is not None: | |
| if not isinstance(scheme['max_segments'], int) or scheme['max_segments'] < 1: | |
| raise ConfigValidationError(f"{path}.max_segments must be a positive integer or null") | |
| elif annotation_type == 'video_annotation': | |
| # Validate mode | |
| valid_modes = ['segment', 'frame', 'keyframe', 'tracking', 'combined'] | |
| mode = scheme.get('mode', 'segment') | |
| if mode not in valid_modes: | |
| raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") | |
| # Validate labels for segment/frame/keyframe/combined modes | |
| if mode in ['segment', 'frame', 'keyframe', 'combined']: | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for video_annotation mode '{mode}'") | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels cannot be empty for mode '{mode}'") | |
| # Validate optional numeric fields | |
| if 'min_segments' in scheme: | |
| if not isinstance(scheme['min_segments'], int) or scheme['min_segments'] < 0: | |
| raise ConfigValidationError(f"{path}.min_segments must be a non-negative integer") | |
| if 'max_segments' in scheme and scheme['max_segments'] is not None: | |
| if not isinstance(scheme['max_segments'], int) or scheme['max_segments'] < 1: | |
| raise ConfigValidationError(f"{path}.max_segments must be a positive integer or null") | |
| if 'timeline_height' in scheme: | |
| if not isinstance(scheme['timeline_height'], int) or scheme['timeline_height'] < 30: | |
| raise ConfigValidationError(f"{path}.timeline_height must be an integer >= 30") | |
| if 'video_fps' in scheme: | |
| if not isinstance(scheme['video_fps'], (int, float)) or scheme['video_fps'] <= 0: | |
| raise ConfigValidationError(f"{path}.video_fps must be a positive number") | |
| elif annotation_type == 'tiered_annotation': | |
| # Validate required fields | |
| if 'tiers' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'tiers' field for tiered_annotation") | |
| if not isinstance(scheme['tiers'], list): | |
| raise ConfigValidationError(f"{path}.tiers must be a list") | |
| if not scheme['tiers']: | |
| raise ConfigValidationError(f"{path}.tiers cannot be empty") | |
| if 'source_field' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'source_field' field for tiered_annotation") | |
| # Validate media_type | |
| media_type = scheme.get('media_type', 'audio') | |
| if media_type not in ['audio', 'video']: | |
| raise ConfigValidationError(f"{path}.media_type must be 'audio' or 'video'") | |
| # Validate tiers | |
| tier_names = set() | |
| valid_tier_types = ['independent', 'dependent'] | |
| valid_constraint_types = ['time_subdivision', 'included_in', 'symbolic_association', 'symbolic_subdivision', 'none'] | |
| for i, tier in enumerate(scheme['tiers']): | |
| tier_path = f"{path}.tiers[{i}]" | |
| if not isinstance(tier, dict): | |
| raise ConfigValidationError(f"{tier_path} must be a dictionary") | |
| if 'name' not in tier: | |
| raise ConfigValidationError(f"{tier_path} missing 'name' field") | |
| tier_name = tier['name'] | |
| if tier_name in tier_names: | |
| raise ConfigValidationError(f"{tier_path} duplicate tier name: '{tier_name}'") | |
| tier_names.add(tier_name) | |
| # Validate tier_type | |
| tier_type = tier.get('tier_type', 'independent') | |
| if tier_type not in valid_tier_types: | |
| raise ConfigValidationError(f"{tier_path}.tier_type must be one of: {valid_tier_types}") | |
| # Validate dependent tier requirements | |
| if tier_type == 'dependent': | |
| if 'parent_tier' not in tier: | |
| raise ConfigValidationError(f"{tier_path} dependent tier must have 'parent_tier'") | |
| # Validate constraint_type | |
| constraint_type = tier.get('constraint_type', 'none') | |
| if constraint_type not in valid_constraint_types: | |
| raise ConfigValidationError(f"{tier_path}.constraint_type must be one of: {valid_constraint_types}") | |
| # Validate parent_tier references (second pass) | |
| for i, tier in enumerate(scheme['tiers']): | |
| parent = tier.get('parent_tier') | |
| if parent and parent not in tier_names: | |
| raise ConfigValidationError(f"{path}.tiers[{i}] references unknown parent_tier: '{parent}'") | |
| if parent and parent == tier['name']: | |
| raise ConfigValidationError(f"{path}.tiers[{i}] cannot be its own parent") | |
| # Validate optional numeric fields | |
| if 'tier_height' in scheme: | |
| if not isinstance(scheme['tier_height'], int) or scheme['tier_height'] < 20: | |
| raise ConfigValidationError(f"{path}.tier_height must be an integer >= 20") | |
| elif annotation_type == 'pairwise': | |
| # Validate mode | |
| valid_modes = ['binary', 'scale', 'multi_dimension'] | |
| mode = scheme.get('mode', 'binary') | |
| if mode not in valid_modes: | |
| raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") | |
| # Validate labels if provided | |
| if 'labels' in scheme: | |
| if not isinstance(scheme['labels'], list): | |
| raise ConfigValidationError(f"{path}.labels must be a list") | |
| if len(scheme['labels']) < 2: | |
| raise ConfigValidationError(f"{path}.labels must have at least 2 items (for A and B)") | |
| # Validate scale configuration for scale mode | |
| if mode == 'scale': | |
| scale = scheme.get('scale', {}) | |
| if not isinstance(scale, dict): | |
| raise ConfigValidationError(f"{path}.scale must be a dictionary") | |
| # Validate min/max values | |
| min_val = scale.get('min', -3) | |
| max_val = scale.get('max', 3) | |
| if not isinstance(min_val, (int, float)) or not isinstance(max_val, (int, float)): | |
| raise ConfigValidationError(f"{path}.scale.min and scale.max must be numbers") | |
| if min_val >= max_val: | |
| raise ConfigValidationError(f"{path}.scale.min must be less than scale.max") | |
| # Validate step | |
| step = scale.get('step', 1) | |
| if not isinstance(step, (int, float)) or step <= 0: | |
| raise ConfigValidationError(f"{path}.scale.step must be a positive number") | |
| # Validate scale labels if provided | |
| if 'labels' in scale: | |
| scale_labels = scale['labels'] | |
| if not isinstance(scale_labels, dict): | |
| raise ConfigValidationError(f"{path}.scale.labels must be a dictionary") | |
| # Validate multi_dimension mode | |
| if mode == 'multi_dimension': | |
| dimensions = scheme.get('dimensions', []) | |
| if not isinstance(dimensions, list) or not dimensions: | |
| raise ConfigValidationError(f"{path}.dimensions must be a non-empty list for multi_dimension mode") | |
| for i, dim in enumerate(dimensions): | |
| if not isinstance(dim, dict): | |
| raise ConfigValidationError(f"{path}.dimensions[{i}] must be a dictionary") | |
| if 'name' not in dim: | |
| raise ConfigValidationError(f"{path}.dimensions[{i}] must have a 'name' field") | |
| elif annotation_type == 'bws': | |
| # Validate tuple_size | |
| if 'tuple_size' in scheme: | |
| if not isinstance(scheme['tuple_size'], int) or scheme['tuple_size'] < 2: | |
| raise ConfigValidationError(f"{path}.tuple_size must be an integer >= 2") | |
| elif annotation_type == 'soft_label': | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for soft_label annotation type") | |
| if not isinstance(scheme['labels'], list) or not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels must be a non-empty list") | |
| if 'total' in scheme: | |
| if not isinstance(scheme['total'], int) or scheme['total'] < 1: | |
| raise ConfigValidationError(f"{path}.total must be a positive integer") | |
| elif annotation_type == 'confidence': | |
| if 'scale_type' in scheme: | |
| if scheme['scale_type'] not in ['likert', 'slider']: | |
| raise ConfigValidationError(f"{path}.scale_type must be 'likert' or 'slider'") | |
| if 'scale_points' in scheme: | |
| if not isinstance(scheme['scale_points'], int) or scheme['scale_points'] < 2: | |
| raise ConfigValidationError(f"{path}.scale_points must be an integer >= 2") | |
| elif annotation_type == 'constant_sum': | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for constant_sum annotation type") | |
| if not isinstance(scheme['labels'], list) or not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels must be a non-empty list") | |
| if 'total_points' in scheme: | |
| if not isinstance(scheme['total_points'], int) or scheme['total_points'] < 1: | |
| raise ConfigValidationError(f"{path}.total_points must be a positive integer") | |
| elif annotation_type == 'semantic_differential': | |
| if 'pairs' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'pairs' field for semantic_differential annotation type") | |
| if not isinstance(scheme['pairs'], list) or not scheme['pairs']: | |
| raise ConfigValidationError(f"{path}.pairs must be a non-empty list") | |
| for i, pair in enumerate(scheme['pairs']): | |
| if not isinstance(pair, list) or len(pair) != 2: | |
| raise ConfigValidationError(f"{path}.pairs[{i}] must be a list of exactly two strings") | |
| elif annotation_type == 'ranking': | |
| if 'labels' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'labels' field for ranking annotation type") | |
| if not isinstance(scheme['labels'], list) or not scheme['labels']: | |
| raise ConfigValidationError(f"{path}.labels must be a non-empty list") | |
| elif annotation_type == 'range_slider': | |
| if 'min_value' in scheme and 'max_value' in scheme: | |
| if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): | |
| raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") | |
| if scheme['min_value'] >= scheme['max_value']: | |
| raise ConfigValidationError(f"{path}.min_value must be less than max_value") | |
| elif annotation_type == 'hierarchical_multiselect': | |
| if 'taxonomy' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'taxonomy' field for hierarchical_multiselect annotation type") | |
| if not isinstance(scheme['taxonomy'], dict) or not scheme['taxonomy']: | |
| raise ConfigValidationError(f"{path}.taxonomy must be a non-empty dictionary") | |
| elif annotation_type == 'vas': | |
| if 'min_value' in scheme and 'max_value' in scheme: | |
| if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): | |
| raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") | |
| if scheme['min_value'] >= scheme['max_value']: | |
| raise ConfigValidationError(f"{path}.min_value must be less than max_value") | |
| elif annotation_type == 'rubric_eval': | |
| if 'criteria' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'criteria' field for rubric_eval annotation type") | |
| if not isinstance(scheme['criteria'], list) or not scheme['criteria']: | |
| raise ConfigValidationError(f"{path}.criteria must be a non-empty list") | |
| for i, crit in enumerate(scheme['criteria']): | |
| if not isinstance(crit, dict) or 'name' not in crit: | |
| raise ConfigValidationError(f"{path}.criteria[{i}] must be a dict with 'name'") | |
| if 'scale_points' in scheme: | |
| if not isinstance(scheme['scale_points'], int) or scheme['scale_points'] < 2: | |
| raise ConfigValidationError(f"{path}.scale_points must be an integer >= 2") | |
| elif annotation_type == 'error_span': | |
| if 'error_types' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'error_types' field for error_span annotation type") | |
| if not isinstance(scheme['error_types'], list) or not scheme['error_types']: | |
| raise ConfigValidationError(f"{path}.error_types must be a non-empty list") | |
| for i, et in enumerate(scheme['error_types']): | |
| if not isinstance(et, dict) or 'name' not in et: | |
| raise ConfigValidationError(f"{path}.error_types[{i}] must be a dict with 'name'") | |
| elif annotation_type == 'card_sort': | |
| mode = scheme.get('mode', 'closed') | |
| if mode not in ['open', 'closed']: | |
| raise ConfigValidationError(f"{path}.mode must be 'open' or 'closed'") | |
| if mode == 'closed': | |
| if 'groups' not in scheme: | |
| raise ConfigValidationError(f"{path} missing 'groups' field for card_sort in closed mode") | |
| if not isinstance(scheme['groups'], list) or not scheme['groups']: | |
| raise ConfigValidationError(f"{path}.groups must be a non-empty list for closed mode") | |
| elif annotation_type == 'conjoint': | |
| if 'attributes' not in scheme and 'profiles_field' not in scheme: | |
| raise ConfigValidationError(f"{path} requires 'attributes' or 'profiles_field' for conjoint annotation type") | |
| if 'attributes' in scheme: | |
| if not isinstance(scheme['attributes'], list) or not scheme['attributes']: | |
| raise ConfigValidationError(f"{path}.attributes must be a non-empty list") | |
| for i, attr in enumerate(scheme['attributes']): | |
| if not isinstance(attr, dict) or 'name' not in attr: | |
| raise ConfigValidationError(f"{path}.attributes[{i}] must be a dict with 'name'") | |
| if 'profiles_per_set' in scheme: | |
| if not isinstance(scheme['profiles_per_set'], int) or scheme['profiles_per_set'] < 2: | |
| raise ConfigValidationError(f"{path}.profiles_per_set must be an integer >= 2") | |
| # Validate display_logic if present | |
| if 'display_logic' in scheme: | |
| validate_display_logic_structure(scheme['display_logic'], path) | |
| def validate_display_logic_structure(display_logic: Dict[str, Any], path: str) -> None: | |
| """ | |
| Validate the structure of a display_logic configuration block. | |
| This validates the syntax and structure of a single display_logic block. | |
| Cross-schema validation (checking referenced schemas exist) is done separately | |
| in validate_display_logic_references(). | |
| Args: | |
| display_logic: The display_logic configuration | |
| path: Path in the config for error reporting | |
| Raises: | |
| ConfigValidationError: If the display_logic is invalid | |
| """ | |
| from potato.server_utils.display_logic import SUPPORTED_OPERATORS | |
| if not isinstance(display_logic, dict): | |
| raise ConfigValidationError(f"{path}.display_logic must be a dictionary") | |
| # Must have show_when | |
| if 'show_when' not in display_logic: | |
| raise ConfigValidationError(f"{path}.display_logic must have 'show_when' field") | |
| show_when = display_logic['show_when'] | |
| if not isinstance(show_when, list): | |
| raise ConfigValidationError(f"{path}.display_logic.show_when must be a list of conditions") | |
| if len(show_when) == 0: | |
| raise ConfigValidationError(f"{path}.display_logic.show_when must have at least one condition") | |
| # Validate each condition | |
| for i, condition in enumerate(show_when): | |
| cond_path = f"{path}.display_logic.show_when[{i}]" | |
| if not isinstance(condition, dict): | |
| raise ConfigValidationError(f"{cond_path} must be a dictionary") | |
| # Required fields | |
| if 'schema' not in condition: | |
| raise ConfigValidationError(f"{cond_path} missing required 'schema' field") | |
| if 'operator' not in condition: | |
| raise ConfigValidationError(f"{cond_path} missing required 'operator' field") | |
| operator = condition['operator'] | |
| if operator not in SUPPORTED_OPERATORS: | |
| raise ConfigValidationError( | |
| f"{cond_path}.operator '{operator}' is not supported. " | |
| f"Valid operators: {list(SUPPORTED_OPERATORS.keys())}" | |
| ) | |
| # Validate operator-specific value requirements | |
| value = condition.get('value') | |
| # Operators that don't need a value | |
| if operator in ('empty', 'not_empty'): | |
| pass # No value required | |
| # Range operators need [min, max] | |
| elif operator in ('in_range', 'not_in_range', 'length_in_range'): | |
| if not isinstance(value, (list, tuple)): | |
| raise ConfigValidationError( | |
| f"{cond_path}: operator '{operator}' requires a range value as [min, max]" | |
| ) | |
| if len(value) != 2: | |
| raise ConfigValidationError( | |
| f"{cond_path}: range value must have exactly 2 elements [min, max]" | |
| ) | |
| try: | |
| min_val, max_val = float(value[0]), float(value[1]) | |
| if min_val > max_val: | |
| raise ConfigValidationError( | |
| f"{cond_path}: range min ({min_val}) is greater than max ({max_val})" | |
| ) | |
| except (ValueError, TypeError): | |
| raise ConfigValidationError(f"{cond_path}: range values must be numeric") | |
| # Numeric operators need numeric values | |
| elif operator in ('gt', 'gte', 'lt', 'lte', 'length_gt', 'length_lt'): | |
| if value is None: | |
| raise ConfigValidationError(f"{cond_path}: operator '{operator}' requires a value") | |
| try: | |
| float(value) | |
| except (ValueError, TypeError): | |
| raise ConfigValidationError( | |
| f"{cond_path}: operator '{operator}' requires a numeric value" | |
| ) | |
| # Regex operator needs a valid pattern | |
| elif operator == 'matches': | |
| if value is None: | |
| raise ConfigValidationError(f"{cond_path}: operator 'matches' requires a regex pattern") | |
| try: | |
| import re | |
| re.compile(value) | |
| except re.error as e: | |
| raise ConfigValidationError(f"{cond_path}: invalid regex pattern '{value}': {e}") | |
| # Other operators just need a non-None value | |
| elif value is None: | |
| raise ConfigValidationError(f"{cond_path}: operator '{operator}' requires a value") | |
| # Validate logic field if present | |
| logic = display_logic.get('logic', 'all') | |
| if logic not in ('all', 'any'): | |
| raise ConfigValidationError( | |
| f"{path}.display_logic.logic must be 'all' or 'any', got '{logic}'" | |
| ) | |
| def validate_display_logic_references(annotation_schemes: List[Dict[str, Any]]) -> None: | |
| """ | |
| Validate that all display_logic references point to existing schemas | |
| and check for circular dependencies. | |
| This is called after all annotation schemes have been validated individually. | |
| Args: | |
| annotation_schemes: List of annotation scheme configurations | |
| Raises: | |
| ConfigValidationError: If there are invalid references or circular dependencies | |
| """ | |
| from potato.server_utils.display_logic import validate_display_logic_config | |
| # Use the DisplayLogicValidator for comprehensive validation | |
| is_valid, errors = validate_display_logic_config(annotation_schemes) | |
| if not is_valid: | |
| # Format errors nicely | |
| error_msg = "Display logic validation errors:\n" + "\n".join(f" - {e}" for e in errors) | |
| raise ConfigValidationError(error_msg) | |
| def validate_server_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate server configuration section. | |
| The server section allows specifying server settings in the YAML config | |
| instead of via command-line flags. CLI flags take precedence over config values. | |
| Supported options: | |
| - port: Port number to run on (1-65535) | |
| - host: Host address to bind to (default: localhost) | |
| - debug: Enable Flask debug mode (default: false) | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the server configuration is invalid | |
| """ | |
| if "server" not in config_data: | |
| return # server section is optional | |
| server_config = config_data["server"] | |
| if not isinstance(server_config, dict): | |
| raise ConfigValidationError("server configuration must be a dictionary") | |
| # Validate port | |
| if "port" in server_config: | |
| port = server_config["port"] | |
| if not isinstance(port, int): | |
| raise ConfigValidationError("server.port must be an integer") | |
| if port < 1 or port > 65535: | |
| raise ConfigValidationError("server.port must be between 1 and 65535") | |
| # Validate host | |
| if "host" in server_config: | |
| host = server_config["host"] | |
| if not isinstance(host, str): | |
| raise ConfigValidationError("server.host must be a string") | |
| if not host.strip(): | |
| raise ConfigValidationError("server.host cannot be empty") | |
| # Validate debug | |
| if "debug" in server_config: | |
| if not isinstance(server_config["debug"], bool): | |
| raise ConfigValidationError("server.debug must be a boolean") | |
| def validate_authentication_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate authentication configuration section. | |
| Validates OAuth/OIDC provider settings, required fields, and | |
| warns about common misconfigurations. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the authentication configuration is invalid | |
| """ | |
| if "authentication" not in config_data: | |
| return # authentication section is optional | |
| auth_config = config_data["authentication"] | |
| if not isinstance(auth_config, dict): | |
| raise ConfigValidationError("authentication configuration must be a dictionary") | |
| method = auth_config.get("method", "in_memory") | |
| valid_methods = ["in_memory", "database", "clerk", "oauth"] | |
| if method not in valid_methods: | |
| raise ConfigValidationError( | |
| f"authentication.method must be one of: {', '.join(valid_methods)}. " | |
| f"Got: '{method}'" | |
| ) | |
| # OAuth-specific validation | |
| if method == "oauth": | |
| # providers is required | |
| providers = auth_config.get("providers") | |
| if not providers or not isinstance(providers, dict): | |
| raise ConfigValidationError( | |
| "authentication.providers is required when method is 'oauth' " | |
| "and must be a dictionary with at least one provider" | |
| ) | |
| if len(providers) == 0: | |
| raise ConfigValidationError( | |
| "authentication.providers must contain at least one provider" | |
| ) | |
| # Validate each provider | |
| for name, pconfig in providers.items(): | |
| if not isinstance(pconfig, dict): | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name} must be a dictionary" | |
| ) | |
| # client_id and client_secret are required | |
| if "client_id" not in pconfig: | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name}.client_id is required" | |
| ) | |
| if "client_secret" not in pconfig: | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name}.client_secret is required" | |
| ) | |
| # Generic OIDC requires discovery_url | |
| if name not in ("google", "github") and "discovery_url" not in pconfig: | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name} requires 'discovery_url' " | |
| f"for OIDC providers (only 'google' and 'github' have built-in URLs)" | |
| ) | |
| # Validate optional fields | |
| if "allowed_domain" in pconfig: | |
| domain = pconfig["allowed_domain"] | |
| if not isinstance(domain, str) or not domain.strip(): | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name}.allowed_domain must be a non-empty string" | |
| ) | |
| if "allowed_org" in pconfig: | |
| org = pconfig["allowed_org"] | |
| if not isinstance(org, str) or not org.strip(): | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name}.allowed_org must be a non-empty string" | |
| ) | |
| if "scopes" in pconfig: | |
| scopes = pconfig["scopes"] | |
| if not isinstance(scopes, list): | |
| raise ConfigValidationError( | |
| f"authentication.providers.{name}.scopes must be a list" | |
| ) | |
| # Validate user_identity_field | |
| identity_field = auth_config.get("user_identity_field", "email") | |
| valid_fields = ["email", "username", "sub", "name"] | |
| if identity_field not in valid_fields: | |
| raise ConfigValidationError( | |
| f"authentication.user_identity_field must be one of: " | |
| f"{', '.join(valid_fields)}. Got: '{identity_field}'" | |
| ) | |
| # Warn if secret_key is not set (OAuth needs stable sessions) | |
| if "secret_key" not in config_data: | |
| import os | |
| if not os.environ.get("POTATO_SECRET_KEY"): | |
| logger.warning( | |
| "OAuth is configured but no 'secret_key' is set in config " | |
| "and POTATO_SECRET_KEY environment variable is not set. " | |
| "Sessions will be lost on server restart. " | |
| "Set 'secret_key' in config or POTATO_SECRET_KEY env var." | |
| ) | |
| # Database-specific validation | |
| if method == "database": | |
| db_url = auth_config.get("database_url") | |
| if db_url: | |
| if not (db_url.startswith("sqlite:///") or db_url.startswith("postgresql://")): | |
| raise ConfigValidationError( | |
| "authentication.database_url must start with 'sqlite:///' or 'postgresql://'. " | |
| f"Got: '{db_url}'" | |
| ) | |
| # Mutual exclusivity: database backend and user_config_path | |
| if "user_config_path" in auth_config: | |
| raise ConfigValidationError( | |
| "authentication.user_config_path cannot be used with method 'database'. " | |
| "The database backend handles its own user persistence." | |
| ) | |
| def validate_quality_control_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate quality control configuration (attention checks, gold standards, pre-annotation). | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the configuration is invalid | |
| """ | |
| # Validate attention checks config | |
| if "attention_checks" in config_data: | |
| attn_config = config_data["attention_checks"] | |
| if not isinstance(attn_config, dict): | |
| raise ConfigValidationError("attention_checks must be a dictionary") | |
| if attn_config.get("enabled", False): | |
| # Validate items_file is specified | |
| if "items_file" not in attn_config: | |
| raise ConfigValidationError("attention_checks.items_file is required when enabled") | |
| if not isinstance(attn_config["items_file"], str): | |
| raise ConfigValidationError("attention_checks.items_file must be a string path") | |
| # Validate frequency or probability (one should be set) | |
| has_frequency = "frequency" in attn_config | |
| has_probability = "probability" in attn_config | |
| if has_frequency and has_probability: | |
| raise ConfigValidationError("attention_checks: specify either 'frequency' or 'probability', not both") | |
| if has_frequency: | |
| freq = attn_config["frequency"] | |
| if not isinstance(freq, int) or freq < 1: | |
| raise ConfigValidationError("attention_checks.frequency must be a positive integer") | |
| if has_probability: | |
| prob = attn_config["probability"] | |
| if not isinstance(prob, (int, float)) or prob < 0 or prob > 1: | |
| raise ConfigValidationError("attention_checks.probability must be a number between 0 and 1") | |
| # Validate min_response_time | |
| if "min_response_time" in attn_config: | |
| min_time = attn_config["min_response_time"] | |
| if not isinstance(min_time, (int, float)) or min_time < 0: | |
| raise ConfigValidationError("attention_checks.min_response_time must be a non-negative number") | |
| # Validate failure_handling | |
| if "failure_handling" in attn_config: | |
| failure_config = attn_config["failure_handling"] | |
| if not isinstance(failure_config, dict): | |
| raise ConfigValidationError("attention_checks.failure_handling must be a dictionary") | |
| if "warn_threshold" in failure_config: | |
| warn = failure_config["warn_threshold"] | |
| if not isinstance(warn, int) or warn < 1: | |
| raise ConfigValidationError("attention_checks.failure_handling.warn_threshold must be a positive integer") | |
| if "block_threshold" in failure_config: | |
| block = failure_config["block_threshold"] | |
| if not isinstance(block, int) or block < 1: | |
| raise ConfigValidationError("attention_checks.failure_handling.block_threshold must be a positive integer") | |
| # Ensure block > warn | |
| warn = failure_config.get("warn_threshold", 2) | |
| if block <= warn: | |
| raise ConfigValidationError("attention_checks.failure_handling.block_threshold must be greater than warn_threshold") | |
| # Validate gold standards config | |
| if "gold_standards" in config_data: | |
| gold_config = config_data["gold_standards"] | |
| if not isinstance(gold_config, dict): | |
| raise ConfigValidationError("gold_standards must be a dictionary") | |
| if gold_config.get("enabled", False): | |
| # Validate items_file is specified | |
| if "items_file" not in gold_config: | |
| raise ConfigValidationError("gold_standards.items_file is required when enabled") | |
| if not isinstance(gold_config["items_file"], str): | |
| raise ConfigValidationError("gold_standards.items_file must be a string path") | |
| # Validate mode | |
| if "mode" in gold_config: | |
| valid_modes = ["training", "mixed", "separate"] | |
| if gold_config["mode"] not in valid_modes: | |
| raise ConfigValidationError(f"gold_standards.mode must be one of: {', '.join(valid_modes)}") | |
| # Validate frequency | |
| if "frequency" in gold_config: | |
| freq = gold_config["frequency"] | |
| if not isinstance(freq, int) or freq < 1: | |
| raise ConfigValidationError("gold_standards.frequency must be a positive integer") | |
| # Validate accuracy config | |
| if "accuracy" in gold_config: | |
| accuracy_config = gold_config["accuracy"] | |
| if not isinstance(accuracy_config, dict): | |
| raise ConfigValidationError("gold_standards.accuracy must be a dictionary") | |
| if "min_threshold" in accuracy_config: | |
| threshold = accuracy_config["min_threshold"] | |
| if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: | |
| raise ConfigValidationError("gold_standards.accuracy.min_threshold must be between 0 and 1") | |
| if "evaluation_count" in accuracy_config: | |
| count = accuracy_config["evaluation_count"] | |
| if not isinstance(count, int) or count < 1: | |
| raise ConfigValidationError("gold_standards.accuracy.evaluation_count must be a positive integer") | |
| # Validate auto_promote config | |
| if "auto_promote" in gold_config: | |
| auto_promote = gold_config["auto_promote"] | |
| if not isinstance(auto_promote, dict): | |
| raise ConfigValidationError("gold_standards.auto_promote must be a dictionary") | |
| if "min_annotators" in auto_promote: | |
| min_ann = auto_promote["min_annotators"] | |
| if not isinstance(min_ann, int) or min_ann < 2: | |
| raise ConfigValidationError("gold_standards.auto_promote.min_annotators must be an integer >= 2") | |
| if "agreement_threshold" in auto_promote: | |
| threshold = auto_promote["agreement_threshold"] | |
| if not isinstance(threshold, (int, float)) or threshold < 0.5 or threshold > 1.0: | |
| raise ConfigValidationError("gold_standards.auto_promote.agreement_threshold must be between 0.5 and 1.0") | |
| # Validate pre-annotation config | |
| if "pre_annotation" in config_data: | |
| pre_config = config_data["pre_annotation"] | |
| if not isinstance(pre_config, dict): | |
| raise ConfigValidationError("pre_annotation must be a dictionary") | |
| if pre_config.get("enabled", False): | |
| # Validate field name | |
| if "field" in pre_config: | |
| if not isinstance(pre_config["field"], str) or not pre_config["field"].strip(): | |
| raise ConfigValidationError("pre_annotation.field must be a non-empty string") | |
| # Validate highlight_low_confidence threshold | |
| if "highlight_low_confidence" in pre_config: | |
| threshold = pre_config["highlight_low_confidence"] | |
| if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: | |
| raise ConfigValidationError("pre_annotation.highlight_low_confidence must be between 0 and 1") | |
| # Validate agreement metrics config | |
| if "agreement_metrics" in config_data: | |
| agreement_config = config_data["agreement_metrics"] | |
| if not isinstance(agreement_config, dict): | |
| raise ConfigValidationError("agreement_metrics must be a dictionary") | |
| if "min_overlap" in agreement_config: | |
| overlap = agreement_config["min_overlap"] | |
| if not isinstance(overlap, int) or overlap < 2: | |
| raise ConfigValidationError("agreement_metrics.min_overlap must be an integer >= 2") | |
| if "refresh_interval" in agreement_config: | |
| interval = agreement_config["refresh_interval"] | |
| if not isinstance(interval, int) or interval < 10: | |
| raise ConfigValidationError("agreement_metrics.refresh_interval must be an integer >= 10 seconds") | |
| def validate_instance_reclaim_config(config_data: Dict[str, Any]) -> None: | |
| """Validate abandoned assignment reclaim configuration.""" | |
| if "instance_reclaim" not in config_data: | |
| return | |
| reclaim_config = config_data["instance_reclaim"] | |
| if not isinstance(reclaim_config, dict): | |
| raise ConfigValidationError("instance_reclaim must be a dictionary") | |
| def validate_bool(section: Dict[str, Any], path: str) -> None: | |
| if "preserve_completed_annotations" in section and not isinstance(section["preserve_completed_annotations"], bool): | |
| raise ConfigValidationError(f"{path}.preserve_completed_annotations must be a boolean") | |
| def validate_section(section_name: str) -> None: | |
| if section_name not in reclaim_config: | |
| return | |
| section = reclaim_config[section_name] | |
| if not isinstance(section, dict): | |
| raise ConfigValidationError(f"instance_reclaim.{section_name} must be a dictionary") | |
| validate_bool(section, f"instance_reclaim.{section_name}") | |
| if "enabled" in reclaim_config and not isinstance(reclaim_config["enabled"], bool): | |
| raise ConfigValidationError("instance_reclaim.enabled must be a boolean") | |
| if "timeout_hours" in reclaim_config: | |
| timeout = reclaim_config["timeout_hours"] | |
| if not isinstance(timeout, (int, float)) or timeout <= 0: | |
| raise ConfigValidationError("instance_reclaim.timeout_hours must be a positive number") | |
| validate_bool(reclaim_config, "instance_reclaim") | |
| for section_name in ("stale", "manual", "quality_control", "prolific"): | |
| validate_section(section_name) | |
| prolific = reclaim_config.get("prolific") | |
| if isinstance(prolific, dict) and "status_policies" in prolific: | |
| status_policies = prolific["status_policies"] | |
| if not isinstance(status_policies, dict): | |
| raise ConfigValidationError("instance_reclaim.prolific.status_policies must be a dictionary") | |
| valid_statuses = {"RETURNED", "TIMED-OUT", "REJECTED"} | |
| for status, section in status_policies.items(): | |
| if status not in valid_statuses: | |
| raise ConfigValidationError( | |
| "instance_reclaim.prolific.status_policies keys must be one of: RETURNED, TIMED-OUT, REJECTED" | |
| ) | |
| if not isinstance(section, dict): | |
| raise ConfigValidationError( | |
| f"instance_reclaim.prolific.status_policies.{status} must be a dictionary" | |
| ) | |
| validate_bool(section, f"instance_reclaim.prolific.status_policies.{status}") | |
| def validate_data_directory_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate data_directory configuration. | |
| This function validates the directory watching configuration options: | |
| - data_directory: Path to the directory containing data files | |
| - watch_data_directory: Whether to watch for changes (default: False) | |
| - watch_poll_interval: Seconds between scans (default: 5.0) | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the configuration is invalid | |
| """ | |
| if "data_directory" not in config_data: | |
| return # data_directory is optional | |
| data_directory = config_data["data_directory"] | |
| # Validate data_directory is a string | |
| if not isinstance(data_directory, str): | |
| raise ConfigValidationError("data_directory must be a string path") | |
| if not data_directory.strip(): | |
| raise ConfigValidationError("data_directory cannot be empty") | |
| # Validate watch_data_directory if present | |
| if "watch_data_directory" in config_data: | |
| watch_enabled = config_data["watch_data_directory"] | |
| if not isinstance(watch_enabled, bool): | |
| raise ConfigValidationError("watch_data_directory must be a boolean (true/false)") | |
| # Validate watch_poll_interval if present | |
| if "watch_poll_interval" in config_data: | |
| interval = config_data["watch_poll_interval"] | |
| if not isinstance(interval, (int, float)): | |
| raise ConfigValidationError("watch_poll_interval must be a number") | |
| if interval < 1.0: | |
| raise ConfigValidationError("watch_poll_interval must be at least 1.0 seconds") | |
| if interval > 3600: | |
| raise ConfigValidationError("watch_poll_interval cannot exceed 3600 seconds (1 hour)") | |
| def validate_data_sources_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate data_sources configuration for extended data loading. | |
| This function validates the configuration for loading data from | |
| various sources including URLs, cloud storage, and databases. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the configuration is invalid | |
| """ | |
| data_sources = config_data.get("data_sources") | |
| if not data_sources: | |
| return # Empty or missing is fine - data_files can be used instead | |
| if not isinstance(data_sources, list): | |
| raise ConfigValidationError("data_sources must be a list") | |
| # Valid source types | |
| valid_types = [ | |
| "file", "url", "google_drive", "dropbox", | |
| "s3", "huggingface", "google_sheets", "database" | |
| ] | |
| for i, source in enumerate(data_sources): | |
| if not isinstance(source, dict): | |
| raise ConfigValidationError( | |
| f"data_sources[{i}] must be a dictionary" | |
| ) | |
| source_type = source.get("type") | |
| if not source_type: | |
| raise ConfigValidationError( | |
| f"data_sources[{i}] is missing required 'type' field" | |
| ) | |
| if source_type not in valid_types: | |
| raise ConfigValidationError( | |
| f"data_sources[{i}] has invalid type '{source_type}'. " | |
| f"Valid types: {', '.join(valid_types)}" | |
| ) | |
| # Type-specific validation | |
| _validate_data_source_by_type(source, source_type, i) | |
| # Validate partial_loading configuration if present | |
| _validate_partial_loading_config(config_data) | |
| # Validate data_cache configuration if present | |
| _validate_data_cache_config(config_data) | |
| def _validate_data_source_by_type(source: Dict, source_type: str, index: int) -> None: | |
| """Validate source-specific configuration.""" | |
| prefix = f"data_sources[{index}]" | |
| if source_type == "file": | |
| if not source.get("path"): | |
| raise ConfigValidationError(f"{prefix} (type=file) requires 'path'") | |
| elif source_type == "url": | |
| url = source.get("url") | |
| if not url: | |
| raise ConfigValidationError(f"{prefix} (type=url) requires 'url'") | |
| if not isinstance(url, str): | |
| raise ConfigValidationError(f"{prefix}.url must be a string") | |
| # Basic URL format check | |
| if not (url.startswith("http://") or url.startswith("https://")): | |
| raise ConfigValidationError( | |
| f"{prefix}.url must start with http:// or https://" | |
| ) | |
| elif source_type == "google_drive": | |
| if not source.get("url") and not source.get("file_id"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=google_drive) requires 'url' or 'file_id'" | |
| ) | |
| elif source_type == "dropbox": | |
| if not source.get("url") and not source.get("path"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=dropbox) requires 'url' or 'path'" | |
| ) | |
| # If path is provided, access_token is required | |
| if source.get("path") and not source.get("access_token"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=dropbox) requires 'access_token' when using 'path'" | |
| ) | |
| elif source_type == "s3": | |
| if not source.get("bucket"): | |
| raise ConfigValidationError(f"{prefix} (type=s3) requires 'bucket'") | |
| if not source.get("key"): | |
| raise ConfigValidationError(f"{prefix} (type=s3) requires 'key'") | |
| elif source_type == "huggingface": | |
| if not source.get("dataset"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=huggingface) requires 'dataset'" | |
| ) | |
| elif source_type == "google_sheets": | |
| if not source.get("spreadsheet_id"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=google_sheets) requires 'spreadsheet_id'" | |
| ) | |
| if not source.get("credentials_file"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=google_sheets) requires 'credentials_file'" | |
| ) | |
| elif source_type == "database": | |
| # Must have connection_string OR dialect+database | |
| if not source.get("connection_string"): | |
| if not source.get("dialect"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=database) requires 'connection_string' or 'dialect'" | |
| ) | |
| if not source.get("database") and source.get("dialect") != "sqlite": | |
| raise ConfigValidationError( | |
| f"{prefix} (type=database) requires 'database' when not using sqlite" | |
| ) | |
| # Must have query OR table | |
| if not source.get("query") and not source.get("table"): | |
| raise ConfigValidationError( | |
| f"{prefix} (type=database) requires 'query' or 'table'" | |
| ) | |
| def _validate_partial_loading_config(config_data: Dict[str, Any]) -> None: | |
| """Validate partial_loading configuration.""" | |
| partial = config_data.get("partial_loading") | |
| if not partial: | |
| return | |
| if not isinstance(partial, dict): | |
| raise ConfigValidationError("partial_loading must be a dictionary") | |
| # Validate enabled | |
| if "enabled" in partial and not isinstance(partial["enabled"], bool): | |
| raise ConfigValidationError("partial_loading.enabled must be a boolean") | |
| # Validate initial_count | |
| if "initial_count" in partial: | |
| count = partial["initial_count"] | |
| if not isinstance(count, int) or count < 1: | |
| raise ConfigValidationError( | |
| "partial_loading.initial_count must be a positive integer" | |
| ) | |
| # Validate batch_size | |
| if "batch_size" in partial: | |
| size = partial["batch_size"] | |
| if not isinstance(size, int) or size < 1: | |
| raise ConfigValidationError( | |
| "partial_loading.batch_size must be a positive integer" | |
| ) | |
| # Validate auto_load_threshold | |
| if "auto_load_threshold" in partial: | |
| threshold = partial["auto_load_threshold"] | |
| if not isinstance(threshold, (int, float)) or not (0 <= threshold <= 1): | |
| raise ConfigValidationError( | |
| "partial_loading.auto_load_threshold must be between 0.0 and 1.0" | |
| ) | |
| def _validate_data_cache_config(config_data: Dict[str, Any]) -> None: | |
| """Validate data_cache configuration.""" | |
| cache = config_data.get("data_cache") | |
| if not cache: | |
| return | |
| if not isinstance(cache, dict): | |
| raise ConfigValidationError("data_cache must be a dictionary") | |
| # Validate ttl_seconds | |
| if "ttl_seconds" in cache: | |
| ttl = cache["ttl_seconds"] | |
| if not isinstance(ttl, int) or ttl < 0: | |
| raise ConfigValidationError( | |
| "data_cache.ttl_seconds must be a non-negative integer" | |
| ) | |
| # Validate max_size_mb | |
| if "max_size_mb" in cache: | |
| size = cache["max_size_mb"] | |
| if not isinstance(size, int) or size < 1: | |
| raise ConfigValidationError( | |
| "data_cache.max_size_mb must be a positive integer" | |
| ) | |
| def validate_database_config(db_config: Dict[str, Any]) -> None: | |
| """ | |
| Validate database configuration. | |
| Args: | |
| db_config: The database configuration | |
| Raises: | |
| ConfigValidationError: If the database configuration is invalid | |
| """ | |
| if not isinstance(db_config, dict): | |
| raise ConfigValidationError("database configuration must be a dictionary") | |
| required_fields = ['type', 'host', 'database', 'username'] | |
| missing_fields = [field for field in required_fields if field not in db_config] | |
| if missing_fields: | |
| raise ConfigValidationError(f"Missing required database fields: {', '.join(missing_fields)}") | |
| valid_types = ['mysql', 'file'] | |
| if db_config['type'] not in valid_types: | |
| raise ConfigValidationError(f"Unsupported database type: {db_config['type']}. Must be one of: {', '.join(valid_types)}") | |
| # Validate MySQL-specific fields | |
| if db_config['type'] == 'mysql': | |
| if 'password' not in db_config: | |
| raise ConfigValidationError("MySQL database requires password") | |
| # Validate port if specified | |
| if 'port' in db_config: | |
| try: | |
| port = int(db_config['port']) | |
| if port < 1 or port > 65535: | |
| raise ConfigValidationError("Database port must be between 1 and 65535") | |
| except (ValueError, TypeError): | |
| raise ConfigValidationError("Database port must be a valid integer") | |
| def validate_file_paths(config_data: Dict[str, Any], project_dir: str, config_file_dir: str = None) -> None: | |
| """ | |
| Validate that all file paths in the configuration are secure and exist. | |
| Args: | |
| config_data: The configuration data | |
| project_dir: The project directory | |
| config_file_dir: The directory containing the config file (for relative path resolution) | |
| Raises: | |
| ConfigSecurityError: If any file paths are not secure | |
| ConfigValidationError: If required files don't exist | |
| """ | |
| # Get the task_dir from config | |
| task_dir = config_data.get('task_dir') | |
| if not task_dir: | |
| raise ConfigValidationError("task_dir is required in configuration") | |
| # Validate task_dir exists and is secure | |
| try: | |
| validated_task_dir = validate_path_security(task_dir, project_dir) | |
| # Don't require task_dir to exist - it's often an output directory that will be created | |
| # Only validate that it's a valid path | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"task_dir: {str(e)}") | |
| # Use task_dir as the base for resolving relative paths in the config | |
| base_dir = validated_task_dir | |
| # Validate data files | |
| data_files = config_data.get('data_files', []) | |
| for i, data_file in enumerate(data_files): | |
| # Skip validation for special values | |
| if data_file in [None, "null", "default"]: | |
| continue | |
| # Handle dict entries with path + optional encoding | |
| if isinstance(data_file, dict): | |
| file_path = data_file.get("path") | |
| if not file_path: | |
| raise ConfigValidationError(f"Data file {i}: dict entry missing 'path' field") | |
| # Validate encoding if specified | |
| encoding = data_file.get("encoding") | |
| if encoding is not None: | |
| if not isinstance(encoding, str): | |
| raise ConfigValidationError( | |
| f"Data file {i}: 'encoding' must be a string, got {type(encoding).__name__}" | |
| ) | |
| try: | |
| codecs.lookup(encoding) | |
| except LookupError: | |
| raise ConfigValidationError( | |
| f"Data file {i}: unknown encoding '{encoding}'" | |
| ) | |
| else: | |
| file_path = data_file | |
| try: | |
| validated_path = validate_path_security(file_path, base_dir, project_dir) | |
| if not os.path.exists(validated_path): | |
| raise ConfigValidationError(f"Data file not found: {file_path} (resolved to: {validated_path})") | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"Data file {i}: {str(e)}") | |
| # Validate batch assignment instance files | |
| batch_config = config_data.get('batch_assignment') | |
| if isinstance(batch_config, dict): | |
| for i, group in enumerate(batch_config.get('groups') or []): | |
| if not isinstance(group, dict): | |
| continue | |
| file_entry = group.get( | |
| 'instances_file', | |
| group.get('items_file', group.get('instance_ids_file')), | |
| ) | |
| if not file_entry: | |
| continue | |
| if isinstance(file_entry, dict): | |
| file_path = file_entry.get("path") | |
| else: | |
| file_path = file_entry | |
| try: | |
| validated_path = validate_path_security(file_path, base_dir, project_dir) | |
| if not os.path.exists(validated_path): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{i}] file not found: " | |
| f"{file_path} (resolved to: {validated_path})" | |
| ) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError( | |
| f"batch_assignment.groups[{i}] file: {str(e)}" | |
| ) | |
| # Validate data_directory if configured | |
| if 'data_directory' in config_data: | |
| data_directory = config_data['data_directory'] | |
| # Skip validation for special values | |
| if data_directory not in [None, "null", "default"]: | |
| try: | |
| validated_dir = validate_path_security(data_directory, base_dir, project_dir) | |
| if not os.path.exists(validated_dir): | |
| raise ConfigValidationError(f"data_directory not found: {data_directory} (resolved to: {validated_dir})") | |
| if not os.path.isdir(validated_dir): | |
| raise ConfigValidationError(f"data_directory is not a directory: {data_directory} (resolved to: {validated_dir})") | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"data_directory: {str(e)}") | |
| # Validate output_annotation_dir | |
| if 'output_annotation_dir' in config_data: | |
| output_dir = config_data['output_annotation_dir'] | |
| # Skip validation for special values | |
| if output_dir not in [None, "null", "default"]: | |
| try: | |
| validate_path_security(output_dir, project_dir) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"output_annotation_dir: {str(e)}") | |
| # Validate site_dir | |
| if 'site_dir' in config_data: | |
| site_dir = config_data['site_dir'] | |
| # Skip validation for special values | |
| if site_dir not in [None, "null", "default"]: | |
| try: | |
| validate_path_security(site_dir, base_dir, project_dir) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"site_dir: {str(e)}") | |
| # Validate custom_ds | |
| if 'custom_ds' in config_data: | |
| custom_ds = config_data['custom_ds'] | |
| # Skip validation for special values | |
| if custom_ds not in [None, "null", "default"]: | |
| try: | |
| validate_path_security(custom_ds, base_dir, project_dir) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"custom_ds: {str(e)}") | |
| # Validate base_css | |
| if 'base_css' in config_data: | |
| base_css = config_data['base_css'] | |
| if base_css not in [None, "null", "default"]: | |
| try: | |
| validated_css = validate_path_security(base_css, base_dir, project_dir) | |
| if not os.path.exists(validated_css): | |
| # Try resolving relative to config file directory | |
| if config_file_dir: | |
| alt_path = os.path.join(config_file_dir, base_css) | |
| if not os.path.exists(alt_path): | |
| raise ConfigValidationError( | |
| f"base_css file not found: {base_css} (resolved to: {validated_css})" | |
| ) | |
| else: | |
| raise ConfigValidationError( | |
| f"base_css file not found: {base_css} (resolved to: {validated_css})" | |
| ) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"base_css: {str(e)}") | |
| # Validate header_logo | |
| if 'header_logo' in config_data: | |
| header_logo = config_data['header_logo'] | |
| if header_logo not in [None, "null", "default"]: | |
| # Allow URLs to pass through without file validation | |
| if not str(header_logo).startswith(("http://", "https://")): | |
| try: | |
| validated_logo = validate_path_security(header_logo, base_dir, project_dir) | |
| if not os.path.exists(validated_logo): | |
| # Try resolving relative to config file directory | |
| if config_file_dir: | |
| alt_path = os.path.join(config_file_dir, header_logo) | |
| if not os.path.exists(alt_path): | |
| raise ConfigValidationError( | |
| f"header_logo file not found: {header_logo} (resolved to: {validated_logo})" | |
| ) | |
| else: | |
| raise ConfigValidationError( | |
| f"header_logo file not found: {header_logo} (resolved to: {validated_logo})" | |
| ) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"header_logo: {str(e)}") | |
| def validate_training_config(config_data: Dict[str, Any], project_dir: str, config_file_dir: str = None) -> None: | |
| """ | |
| Validate training configuration. | |
| Args: | |
| config_data: The configuration data | |
| project_dir: The project directory | |
| config_file_dir: The directory containing the config file | |
| Raises: | |
| ConfigValidationError: If training configuration is invalid | |
| ConfigSecurityError: If training data file path is not secure | |
| """ | |
| if 'training' not in config_data: | |
| return # Training is optional | |
| training_config = config_data['training'] | |
| if not isinstance(training_config, dict): | |
| raise ConfigValidationError("training configuration must be a dictionary") | |
| # Validate enabled flag | |
| if 'enabled' in training_config: | |
| if not isinstance(training_config['enabled'], bool): | |
| raise ConfigValidationError("training.enabled must be a boolean") | |
| # If training is disabled or not specified, skip further validation | |
| if not training_config.get('enabled', False): | |
| return | |
| # Validate training data file | |
| if 'data_file' not in training_config: | |
| raise ConfigValidationError("training.data_file is required when training is enabled") | |
| data_file = training_config['data_file'] | |
| if not isinstance(data_file, str): | |
| raise ConfigValidationError("training.data_file must be a string") | |
| # Validate training data file path security and existence | |
| try: | |
| base_dir = config_file_dir if config_file_dir else project_dir | |
| validated_path = validate_path_security(data_file, base_dir, project_dir) | |
| if not os.path.exists(validated_path): | |
| raise ConfigValidationError(f"Training data file not found: {data_file} (resolved to: {validated_path})") | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"training.data_file: {str(e)}") | |
| # Validate annotation schemes | |
| if 'annotation_schemes' in training_config: | |
| schemes = training_config['annotation_schemes'] | |
| if not isinstance(schemes, list): | |
| raise ConfigValidationError("training.annotation_schemes must be a list") | |
| if not schemes: | |
| raise ConfigValidationError("training.annotation_schemes cannot be empty") | |
| for i, scheme in enumerate(schemes): | |
| if isinstance(scheme, str): | |
| # String reference to existing scheme - validate it's a valid string | |
| if not scheme.strip(): | |
| raise ConfigValidationError(f"training.annotation_schemes[{i}] cannot be empty") | |
| elif isinstance(scheme, dict): | |
| # Full scheme dictionary - validate it | |
| validate_single_annotation_scheme(scheme, f"training.annotation_schemes[{i}]") | |
| else: | |
| raise ConfigValidationError(f"training.annotation_schemes[{i}] must be a string or dictionary") | |
| # Validate passing criteria | |
| if 'passing_criteria' in training_config: | |
| criteria = training_config['passing_criteria'] | |
| if not isinstance(criteria, dict): | |
| raise ConfigValidationError("training.passing_criteria must be a dictionary") | |
| # Validate min_correct | |
| if 'min_correct' in criteria: | |
| min_correct = criteria['min_correct'] | |
| if not isinstance(min_correct, int) or min_correct < 1: | |
| raise ConfigValidationError("training.passing_criteria.min_correct must be a positive integer") | |
| # Validate max_attempts | |
| if 'max_attempts' in criteria: | |
| max_attempts = criteria['max_attempts'] | |
| if not isinstance(max_attempts, int) or max_attempts < 1: | |
| raise ConfigValidationError("training.passing_criteria.max_attempts must be a positive integer") | |
| # Validate require_all_correct | |
| if 'require_all_correct' in criteria: | |
| if not isinstance(criteria['require_all_correct'], bool): | |
| raise ConfigValidationError("training.passing_criteria.require_all_correct must be a boolean") | |
| # Validate feedback settings | |
| if 'feedback' in training_config: | |
| feedback = training_config['feedback'] | |
| if not isinstance(feedback, dict): | |
| raise ConfigValidationError("training.feedback must be a dictionary") | |
| # Validate show_explanations | |
| if 'show_explanations' in feedback: | |
| if not isinstance(feedback['show_explanations'], bool): | |
| raise ConfigValidationError("training.feedback.show_explanations must be a boolean") | |
| # Validate allow_retry | |
| if 'allow_retry' in feedback: | |
| if not isinstance(feedback['allow_retry'], bool): | |
| raise ConfigValidationError("training.feedback.allow_retry must be a boolean") | |
| # Validate failure action | |
| if 'failure_action' in training_config: | |
| failure_action = training_config['failure_action'] | |
| valid_actions = ['move_to_done', 'repeat_training'] | |
| if failure_action not in valid_actions: | |
| raise ConfigValidationError(f"training.failure_action must be one of: {', '.join(valid_actions)}") | |
| def validate_training_data_file(data_file_path: str, annotation_schemes: List[Dict[str, Any]]) -> None: | |
| """ | |
| Validate training data file format and consistency. | |
| Args: | |
| data_file_path: Path to the training data file | |
| annotation_schemes: List of annotation schemes to validate against | |
| Raises: | |
| ConfigValidationError: If training data is invalid | |
| """ | |
| try: | |
| with open(data_file_path, 'r', encoding='utf-8') as f: | |
| training_data = json.load(f) | |
| except (json.JSONDecodeError, UnicodeDecodeError) as e: | |
| raise ConfigValidationError(f"Training data file is not valid JSON: {str(e)}") | |
| except FileNotFoundError: | |
| raise ConfigValidationError(f"Training data file not found: {data_file_path}") | |
| if not isinstance(training_data, dict): | |
| raise ConfigValidationError("Training data must be a JSON object") | |
| if 'training_instances' not in training_data: | |
| raise ConfigValidationError("Training data must contain 'training_instances' field") | |
| training_instances = training_data['training_instances'] | |
| if not isinstance(training_instances, list): | |
| raise ConfigValidationError("training_instances must be a list") | |
| if not training_instances: | |
| raise ConfigValidationError("training_instances cannot be empty") | |
| # Create a mapping of scheme names for validation | |
| scheme_names = {scheme['name'] for scheme in annotation_schemes} | |
| for i, instance in enumerate(training_instances): | |
| if not isinstance(instance, dict): | |
| raise ConfigValidationError(f"Training instance {i} must be a dictionary") | |
| # Validate required fields | |
| required_fields = ['id', 'text', 'correct_answers'] | |
| missing_fields = [field for field in required_fields if field not in instance] | |
| if missing_fields: | |
| raise ConfigValidationError(f"Training instance {i} missing required fields: {', '.join(missing_fields)}") | |
| # Validate id | |
| if not isinstance(instance['id'], str): | |
| raise ConfigValidationError(f"Training instance {i}.id must be a string") | |
| # Validate text | |
| if not isinstance(instance['text'], str): | |
| raise ConfigValidationError(f"Training instance {i}.text must be a string") | |
| # Validate correct_answers | |
| correct_answers = instance['correct_answers'] | |
| if not isinstance(correct_answers, dict): | |
| raise ConfigValidationError(f"Training instance {i}.correct_answers must be a dictionary") | |
| # Validate that all correct_answers correspond to annotation schemes | |
| for scheme_name, answer in correct_answers.items(): | |
| if scheme_name not in scheme_names: | |
| raise ConfigValidationError(f"Training instance {i}.correct_answers contains unknown scheme: {scheme_name}") | |
| # Validate explanation if present | |
| if 'explanation' in instance: | |
| if not isinstance(instance['explanation'], str): | |
| raise ConfigValidationError(f"Training instance {i}.explanation must be a string") | |
| def validate_batch_assignment_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate batch assignment configuration. | |
| ``batch_assignment`` supports explicit annotator cohorts for repeat-round | |
| studies. Each group defines annotators allowed to receive a fixed item set, | |
| either inline or through a separate supported data file. Items may also | |
| carry annotator lists via ``annotator_key``; that field is validated at | |
| assignment time because data files load later. | |
| """ | |
| if 'batch_assignment' not in config_data: | |
| return | |
| batch_config = config_data['batch_assignment'] | |
| if not isinstance(batch_config, dict): | |
| raise ConfigValidationError("batch_assignment must be a dictionary") | |
| annotator_key = batch_config.get('annotator_key') | |
| if annotator_key is not None and ( | |
| not isinstance(annotator_key, str) or not annotator_key.strip() | |
| ): | |
| raise ConfigValidationError("batch_assignment.annotator_key must be a non-empty string") | |
| groups = batch_config.get('groups', []) | |
| if groups is None: | |
| return | |
| if not isinstance(groups, list): | |
| raise ConfigValidationError("batch_assignment.groups must be a list") | |
| for idx, group in enumerate(groups): | |
| if not isinstance(group, dict): | |
| raise ConfigValidationError(f"batch_assignment.groups[{idx}] must be a dictionary") | |
| users = group.get('annotators', group.get('users')) | |
| instances = group.get('instances', group.get('items', group.get('instance_ids'))) | |
| file_entry = group.get( | |
| 'instances_file', | |
| group.get('items_file', group.get('instance_ids_file')), | |
| ) | |
| if not isinstance(users, list) or not users: | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] must define non-empty annotators/users list" | |
| ) | |
| if not all(isinstance(user, str) and user.strip() for user in users): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}].annotators/users must contain non-empty strings" | |
| ) | |
| has_instances = instances is not None | |
| has_file = file_entry is not None | |
| if not has_instances and not has_file: | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] must define either " | |
| "instances/items/instance_ids or instances_file/items_file/instance_ids_file" | |
| ) | |
| if has_instances and (not isinstance(instances, list) or not instances): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] must define non-empty instances/items/instance_ids list" | |
| ) | |
| if has_instances and not all(isinstance(instance, str) and instance.strip() for instance in instances): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}].instances/items/instance_ids must contain non-empty strings" | |
| ) | |
| if has_file: | |
| if isinstance(file_entry, str): | |
| if not file_entry.strip(): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] file path must be non-empty" | |
| ) | |
| elif isinstance(file_entry, dict): | |
| path = file_entry.get('path') | |
| if not isinstance(path, str) or not path.strip(): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] file entry must define a non-empty path" | |
| ) | |
| encoding = file_entry.get('encoding') | |
| if encoding is not None and not isinstance(encoding, str): | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] file encoding must be a string" | |
| ) | |
| else: | |
| raise ConfigValidationError( | |
| f"batch_assignment.groups[{idx}] file entry must be a path string or mapping" | |
| ) | |
| def validate_category_assignment_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate category assignment configuration. | |
| This function validates the category_assignment configuration section which | |
| controls how users are assigned to annotation categories based on their | |
| training/prestudy performance. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If category assignment configuration is invalid | |
| """ | |
| if 'category_assignment' not in config_data: | |
| return # Category assignment is optional | |
| cat_config = config_data['category_assignment'] | |
| if not isinstance(cat_config, dict): | |
| raise ConfigValidationError("category_assignment must be a dictionary") | |
| # Validate enabled flag | |
| if 'enabled' in cat_config: | |
| if not isinstance(cat_config['enabled'], bool): | |
| raise ConfigValidationError("category_assignment.enabled must be a boolean") | |
| # If not enabled, skip further validation | |
| if not cat_config.get('enabled', True): | |
| return | |
| # Validate category_key (optional, can also be in item_properties) | |
| if 'category_key' in cat_config: | |
| if not isinstance(cat_config['category_key'], str) or not cat_config['category_key'].strip(): | |
| raise ConfigValidationError("category_assignment.category_key must be a non-empty string") | |
| # Validate qualification settings | |
| if 'qualification' in cat_config: | |
| qual = cat_config['qualification'] | |
| if not isinstance(qual, dict): | |
| raise ConfigValidationError("category_assignment.qualification must be a dictionary") | |
| # Validate source | |
| if 'source' in qual: | |
| valid_sources = ['training', 'prestudy', 'both'] | |
| if qual['source'] not in valid_sources: | |
| raise ConfigValidationError( | |
| f"category_assignment.qualification.source must be one of: {', '.join(valid_sources)}" | |
| ) | |
| # Validate threshold | |
| if 'threshold' in qual: | |
| threshold = qual['threshold'] | |
| if not isinstance(threshold, (int, float)) or threshold < 0.0 or threshold > 1.0: | |
| raise ConfigValidationError( | |
| "category_assignment.qualification.threshold must be a number between 0.0 and 1.0" | |
| ) | |
| # Validate min_questions | |
| if 'min_questions' in qual: | |
| min_q = qual['min_questions'] | |
| if not isinstance(min_q, int) or min_q < 1: | |
| raise ConfigValidationError( | |
| "category_assignment.qualification.min_questions must be a positive integer" | |
| ) | |
| # Validate combine_method (for combining prestudy and training scores) | |
| if 'combine_method' in qual: | |
| valid_methods = ['average', 'max', 'sum'] | |
| if qual['combine_method'] not in valid_methods: | |
| raise ConfigValidationError( | |
| f"category_assignment.qualification.combine_method must be one of: {', '.join(valid_methods)}" | |
| ) | |
| # Validate fallback behavior | |
| if 'fallback' in cat_config: | |
| valid_fallbacks = ['uncategorized', 'random', 'none'] | |
| if cat_config['fallback'] not in valid_fallbacks: | |
| raise ConfigValidationError( | |
| f"category_assignment.fallback must be one of: {', '.join(valid_fallbacks)}" | |
| ) | |
| # Validate dynamic expertise settings | |
| if 'dynamic' in cat_config: | |
| dynamic = cat_config['dynamic'] | |
| if not isinstance(dynamic, dict): | |
| raise ConfigValidationError("category_assignment.dynamic must be a dictionary") | |
| # Validate enabled flag | |
| if 'enabled' in dynamic: | |
| if not isinstance(dynamic['enabled'], bool): | |
| raise ConfigValidationError("category_assignment.dynamic.enabled must be a boolean") | |
| # If dynamic is not enabled, skip further validation | |
| if not dynamic.get('enabled', False): | |
| return | |
| # Validate agreement_method | |
| if 'agreement_method' in dynamic: | |
| valid_methods = ['majority_vote', 'super_majority', 'unanimous'] | |
| if dynamic['agreement_method'] not in valid_methods: | |
| raise ConfigValidationError( | |
| f"category_assignment.dynamic.agreement_method must be one of: {', '.join(valid_methods)}" | |
| ) | |
| # Validate min_annotations_for_consensus | |
| if 'min_annotations_for_consensus' in dynamic: | |
| min_ann = dynamic['min_annotations_for_consensus'] | |
| if not isinstance(min_ann, int) or min_ann < 2: | |
| raise ConfigValidationError( | |
| "category_assignment.dynamic.min_annotations_for_consensus must be an integer >= 2" | |
| ) | |
| # Validate learning_rate | |
| if 'learning_rate' in dynamic: | |
| lr = dynamic['learning_rate'] | |
| if not isinstance(lr, (int, float)) or lr <= 0.0 or lr > 1.0: | |
| raise ConfigValidationError( | |
| "category_assignment.dynamic.learning_rate must be a number between 0.0 (exclusive) and 1.0" | |
| ) | |
| # Validate update_interval_seconds | |
| if 'update_interval_seconds' in dynamic: | |
| interval = dynamic['update_interval_seconds'] | |
| if not isinstance(interval, (int, float)) or interval < 1: | |
| raise ConfigValidationError( | |
| "category_assignment.dynamic.update_interval_seconds must be a number >= 1" | |
| ) | |
| # Validate base_probability | |
| if 'base_probability' in dynamic: | |
| base_prob = dynamic['base_probability'] | |
| if not isinstance(base_prob, (int, float)) or base_prob < 0.0 or base_prob > 1.0: | |
| raise ConfigValidationError( | |
| "category_assignment.dynamic.base_probability must be a number between 0.0 and 1.0" | |
| ) | |
| def validate_diversity_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate diversity ordering configuration. | |
| This function validates the diversity_ordering section which controls | |
| embedding-based clustering for diverse item ordering. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If diversity ordering configuration is invalid | |
| """ | |
| if 'diversity_ordering' not in config_data: | |
| return # Diversity ordering is optional | |
| dc = config_data['diversity_ordering'] | |
| if not isinstance(dc, dict): | |
| raise ConfigValidationError("diversity_ordering must be a dictionary") | |
| # Validate enabled flag | |
| if 'enabled' in dc: | |
| if not isinstance(dc['enabled'], bool): | |
| raise ConfigValidationError("diversity_ordering.enabled must be a boolean") | |
| # If not enabled, skip further validation | |
| if not dc.get('enabled', False): | |
| return | |
| # Validate model_name | |
| if 'model_name' in dc: | |
| if not isinstance(dc['model_name'], str) or not dc['model_name'].strip(): | |
| raise ConfigValidationError("diversity_ordering.model_name must be a non-empty string") | |
| # Validate num_clusters | |
| if 'num_clusters' in dc: | |
| num_clusters = dc['num_clusters'] | |
| if not isinstance(num_clusters, int) or num_clusters < 2: | |
| raise ConfigValidationError("diversity_ordering.num_clusters must be an integer >= 2") | |
| # Validate items_per_cluster | |
| if 'items_per_cluster' in dc: | |
| items_per_cluster = dc['items_per_cluster'] | |
| if not isinstance(items_per_cluster, int) or items_per_cluster < 1: | |
| raise ConfigValidationError("diversity_ordering.items_per_cluster must be a positive integer") | |
| # Validate auto_clusters | |
| if 'auto_clusters' in dc: | |
| if not isinstance(dc['auto_clusters'], bool): | |
| raise ConfigValidationError("diversity_ordering.auto_clusters must be a boolean") | |
| # Validate prefill_count | |
| if 'prefill_count' in dc: | |
| prefill_count = dc['prefill_count'] | |
| if not isinstance(prefill_count, int) or prefill_count < 0: | |
| raise ConfigValidationError("diversity_ordering.prefill_count must be a non-negative integer") | |
| # Validate batch_size | |
| if 'batch_size' in dc: | |
| batch_size = dc['batch_size'] | |
| if not isinstance(batch_size, int) or batch_size < 1: | |
| raise ConfigValidationError("diversity_ordering.batch_size must be a positive integer") | |
| # Validate recluster_threshold | |
| if 'recluster_threshold' in dc: | |
| recluster_threshold = dc['recluster_threshold'] | |
| if not isinstance(recluster_threshold, (int, float)) or recluster_threshold < 0 or recluster_threshold > 1: | |
| raise ConfigValidationError( | |
| "diversity_ordering.recluster_threshold must be a number between 0 and 1" | |
| ) | |
| # Validate preserve_visited | |
| if 'preserve_visited' in dc: | |
| if not isinstance(dc['preserve_visited'], bool): | |
| raise ConfigValidationError("diversity_ordering.preserve_visited must be a boolean") | |
| # Validate trigger_ai_prefetch | |
| if 'trigger_ai_prefetch' in dc: | |
| if not isinstance(dc['trigger_ai_prefetch'], bool): | |
| raise ConfigValidationError("diversity_ordering.trigger_ai_prefetch must be a boolean") | |
| # Validate cache_dir | |
| if 'cache_dir' in dc: | |
| cache_dir = dc['cache_dir'] | |
| if cache_dir is not None and (not isinstance(cache_dir, str) or not cache_dir.strip()): | |
| raise ConfigValidationError( | |
| "diversity_ordering.cache_dir must be a non-empty string or null" | |
| ) | |
| def validate_embedding_visualization_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate embedding visualization configuration. | |
| This function validates the embedding_visualization section which controls | |
| the admin dashboard 2D visualization of embeddings. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If embedding visualization configuration is invalid | |
| """ | |
| if 'embedding_visualization' not in config_data: | |
| return # Embedding visualization is optional | |
| ev = config_data['embedding_visualization'] | |
| if not isinstance(ev, dict): | |
| raise ConfigValidationError("embedding_visualization must be a dictionary") | |
| # Validate enabled flag | |
| if 'enabled' in ev: | |
| if not isinstance(ev['enabled'], bool): | |
| raise ConfigValidationError("embedding_visualization.enabled must be a boolean") | |
| # If not enabled, skip further validation | |
| if not ev.get('enabled', True): | |
| return | |
| # Validate sample_size | |
| if 'sample_size' in ev: | |
| sample_size = ev['sample_size'] | |
| if not isinstance(sample_size, int) or sample_size < 1: | |
| raise ConfigValidationError( | |
| "embedding_visualization.sample_size must be a positive integer" | |
| ) | |
| # Validate include_all_annotated | |
| if 'include_all_annotated' in ev: | |
| if not isinstance(ev['include_all_annotated'], bool): | |
| raise ConfigValidationError( | |
| "embedding_visualization.include_all_annotated must be a boolean" | |
| ) | |
| # Validate embedding_model | |
| if 'embedding_model' in ev: | |
| if not isinstance(ev['embedding_model'], str) or not ev['embedding_model'].strip(): | |
| raise ConfigValidationError( | |
| "embedding_visualization.embedding_model must be a non-empty string" | |
| ) | |
| # Validate image_embedding_model | |
| if 'image_embedding_model' in ev: | |
| if not isinstance(ev['image_embedding_model'], str) or not ev['image_embedding_model'].strip(): | |
| raise ConfigValidationError( | |
| "embedding_visualization.image_embedding_model must be a non-empty string" | |
| ) | |
| # Validate UMAP configuration | |
| if 'umap' in ev: | |
| umap_config = ev['umap'] | |
| if not isinstance(umap_config, dict): | |
| raise ConfigValidationError("embedding_visualization.umap must be a dictionary") | |
| # Validate n_neighbors | |
| if 'n_neighbors' in umap_config: | |
| n_neighbors = umap_config['n_neighbors'] | |
| if not isinstance(n_neighbors, int) or n_neighbors < 2: | |
| raise ConfigValidationError( | |
| "embedding_visualization.umap.n_neighbors must be an integer >= 2" | |
| ) | |
| # Validate min_dist | |
| if 'min_dist' in umap_config: | |
| min_dist = umap_config['min_dist'] | |
| if not isinstance(min_dist, (int, float)) or min_dist < 0 or min_dist > 1: | |
| raise ConfigValidationError( | |
| "embedding_visualization.umap.min_dist must be a number between 0 and 1" | |
| ) | |
| # Validate metric | |
| if 'metric' in umap_config: | |
| valid_metrics = ['cosine', 'euclidean', 'manhattan', 'correlation'] | |
| if umap_config['metric'] not in valid_metrics: | |
| raise ConfigValidationError( | |
| f"embedding_visualization.umap.metric must be one of: {valid_metrics}" | |
| ) | |
| # Validate label_source | |
| if 'label_source' in ev: | |
| valid_sources = ['mace', 'majority'] | |
| if ev['label_source'] not in valid_sources: | |
| raise ConfigValidationError( | |
| f"embedding_visualization.label_source must be one of: {valid_sources}" | |
| ) | |
| def _merge_ai_config_file(config_data: Dict[str, Any], config_dir: str) -> Dict[str, Any]: | |
| """ | |
| Merge an external ai-config.yaml into the main config if specified. | |
| When ai_support.ai_config_file is set, loads that YAML file and merges its | |
| contents into the ai_support section. The external file provides endpoint-specific | |
| details (endpoint_type, model, api_key, base_url) while the inline ai_config | |
| provides defaults (temperature, max_tokens, include settings). | |
| Args: | |
| config_data: The parsed main configuration dictionary | |
| config_dir: Directory containing the main config file (for resolving relative paths) | |
| Returns: | |
| The config_data with external AI config merged in (modified in place and returned) | |
| """ | |
| ai_support = config_data.get("ai_support", {}) | |
| if not isinstance(ai_support, dict): | |
| return config_data | |
| ai_config_file = ai_support.get("ai_config_file") | |
| if not ai_config_file: | |
| # No external file specified - apply env var substitution to inline ai_config | |
| if "ai_config" in ai_support: | |
| from potato.data_sources.credentials import substitute_env_vars | |
| ai_support["ai_config"] = substitute_env_vars(ai_support["ai_config"]) | |
| config_data["ai_support"] = ai_support | |
| return config_data | |
| if not isinstance(ai_config_file, str): | |
| logger.warning("ai_support.ai_config_file must be a string. Ignoring.") | |
| return config_data | |
| # Resolve relative to config file directory | |
| ai_config_path = os.path.join(config_dir, ai_config_file) | |
| if not os.path.exists(ai_config_path): | |
| logger.warning( | |
| f"AI config file '{ai_config_file}' not found at {ai_config_path}. " | |
| f"AI support will be disabled. Create this file with your endpoint details." | |
| ) | |
| config_data["ai_support"]["enabled"] = False | |
| return config_data | |
| # Load external AI config | |
| try: | |
| with open(ai_config_path, 'r', encoding='utf-8') as f: | |
| external_config = yaml.safe_load(f) or {} | |
| except yaml.YAMLError as e: | |
| logger.warning(f"Invalid YAML in AI config file '{ai_config_file}': {e}. AI support will be disabled.") | |
| config_data["ai_support"]["enabled"] = False | |
| return config_data | |
| if not isinstance(external_config, dict): | |
| logger.warning(f"AI config file '{ai_config_file}' must contain a YAML dictionary. AI support will be disabled.") | |
| config_data["ai_support"]["enabled"] = False | |
| return config_data | |
| # Apply environment variable substitution to external config | |
| from potato.data_sources.credentials import substitute_env_vars | |
| external_config = substitute_env_vars(external_config) | |
| # Extract endpoint_type from external config (top-level key) | |
| if "endpoint_type" in external_config: | |
| ai_support["endpoint_type"] = external_config.pop("endpoint_type") | |
| # Merge remaining keys into ai_config (external takes precedence) | |
| ai_config = ai_support.get("ai_config", {}) | |
| if not isinstance(ai_config, dict): | |
| ai_config = {} | |
| ai_config.update(external_config) | |
| ai_support["ai_config"] = ai_config | |
| # Also apply env var substitution to the final merged ai_config | |
| ai_support["ai_config"] = substitute_env_vars(ai_support["ai_config"]) | |
| config_data["ai_support"] = ai_support | |
| logger.info(f"Loaded AI endpoint config from {ai_config_file}") | |
| return config_data | |
| def load_and_validate_config(config_file: str, project_dir: str) -> Dict[str, Any]: | |
| """ | |
| Load and validate a YAML configuration file with security checks. | |
| Args: | |
| config_file: Path to the configuration file | |
| project_dir: The project directory | |
| Returns: | |
| The validated configuration dictionary | |
| Raises: | |
| ConfigSecurityError: If the configuration file is not secure | |
| ConfigValidationError: If the configuration is invalid | |
| FileNotFoundError: If the configuration file doesn't exist | |
| """ | |
| # Validate the config file path itself | |
| try: | |
| validated_config_path = validate_path_security(config_file, project_dir) | |
| except ConfigSecurityError as e: | |
| raise ConfigSecurityError(f"Configuration file path: {str(e)}") | |
| if not os.path.exists(validated_config_path): | |
| raise FileNotFoundError(f"Configuration file not found: {config_file}") | |
| # Load and parse YAML | |
| try: | |
| with open(validated_config_path, 'r', encoding='utf-8') as file_p: | |
| config_data = yaml.safe_load(file_p) | |
| except yaml.YAMLError as e: | |
| raise ConfigValidationError(f"Invalid YAML format in {config_file}: {str(e)}") | |
| except UnicodeDecodeError as e: | |
| raise ConfigValidationError(f"Invalid file encoding in {config_file}: {str(e)}") | |
| except Exception as e: | |
| raise ConfigValidationError(f"Error reading configuration file {config_file}: {str(e)}") | |
| # Get the directory containing the config file for relative path resolution | |
| config_file_dir = os.path.dirname(validated_config_path) | |
| # Merge external AI config file if specified (before validation) | |
| config_data = _merge_ai_config_file(config_data, config_file_dir) | |
| # Apply default values for common configuration options | |
| if 'task_dir' not in config_data: | |
| config_data['task_dir'] = '.' | |
| logger.debug("task_dir not specified, defaulting to '.'") | |
| if 'site_dir' not in config_data: | |
| config_data['site_dir'] = 'default' | |
| logger.debug("site_dir not specified, defaulting to 'default'") | |
| # Resolve task_dir relative to config file directory if it's '.' or a relative path | |
| if 'task_dir' in config_data: | |
| task_dir = config_data['task_dir'] | |
| if task_dir == '.' or not os.path.isabs(task_dir): | |
| # Resolve relative to config file's directory | |
| task_dir = os.path.normpath(os.path.join(config_file_dir, task_dir)) | |
| config_data['task_dir'] = task_dir | |
| logger.debug(f"Resolved task_dir to: {task_dir}") | |
| # Validate the configuration structure | |
| validate_yaml_structure(config_data, project_dir, config_file_dir) | |
| # Validate file paths | |
| validate_file_paths(config_data, project_dir, config_file_dir) | |
| return config_data | |
| def init_config(args): | |
| global config | |
| project_dir = os.getcwd() #get the current working dir as the default project_dir | |
| config_file = None | |
| config_file_dir = None | |
| try: | |
| # if the .yaml config file is given, directly use it | |
| if args.config_file[-5:] == '.yaml': | |
| if os.path.exists(args.config_file): | |
| print("INFO: when you run the server directly from a .yaml file, please make sure your config file is put in the annotation project folder") | |
| config_file = args.config_file | |
| # For direct YAML file usage, we'll determine the project_dir from the config file content | |
| # after loading it, not from the file path structure | |
| else: | |
| raise FileNotFoundError(f"Configuration file not found: {args.config_file}") | |
| # if the user gives a directory, check if config.yaml or configs/config.yaml exists | |
| elif os.path.isdir(args.config_file): | |
| project_dir = args.config_file if os.path.isabs(args.config_file) else os.path.join(project_dir, args.config_file) | |
| config_folder = os.path.join(args.config_file, 'configs') | |
| if not os.path.isdir(config_folder): | |
| raise ConfigValidationError(".yaml file must be put in the configs/ folder under the main project directory when you try to start the project with the project directory, otherwise please directly give the path of the .yaml file") | |
| #get all the config files | |
| yamlfiles = [it for it in os.listdir(config_folder) if it[-5:] == '.yaml'] | |
| # if no yaml files found, quit the program | |
| if len(yamlfiles) == 0: | |
| raise ConfigValidationError(f"Configuration file not found under {config_folder}, please make sure .yaml file exists in the given directory, or please directly give the path of the .yaml file") | |
| # if only one yaml file found, directly use it | |
| elif len(yamlfiles) == 1: | |
| config_file = os.path.join(config_folder, yamlfiles[0]) | |
| config_file_dir = config_folder | |
| # if multiple yaml files found, ask the user to choose which one to use | |
| else: | |
| while True: | |
| print("multiple config files found, please select the one you want to use (number 0-%d)"%len(yamlfiles)) | |
| for i,it in enumerate(yamlfiles): | |
| print("[%d] %s"%(i, it)) | |
| input_id = input("number: ") | |
| try: | |
| config_file = os.path.join(config_folder, yamlfiles[int(input_id)]) | |
| config_file_dir = config_folder | |
| break | |
| except Exception: | |
| print("wrong input, please reselect") | |
| if not config_file: | |
| raise ConfigValidationError(f"Configuration file not found under {config_folder}, please make sure .yaml file exists in the given directory, or please directly give the path of the .yaml file") | |
| # Load and validate the configuration | |
| # For direct config file usage, use current working directory as base for config file path resolution | |
| if args.config_file[-5:] == '.yaml': | |
| # First, load the config without full validation to get the task_dir | |
| try: | |
| validated_config_path = validate_path_security(config_file, os.getcwd()) | |
| with open(validated_config_path, 'r', encoding='utf-8') as file_p: | |
| temp_config_data = yaml.safe_load(file_p) | |
| except Exception as e: | |
| raise ConfigValidationError(f"Error loading configuration file: {str(e)}") | |
| # Get the config file's directory for resolving relative paths | |
| config_file_abs = os.path.abspath(config_file) | |
| config_file_dir = os.path.dirname(config_file_abs) | |
| # Resolve task_dir relative to config file directory if it's '.' or a relative path | |
| if 'task_dir' in temp_config_data: | |
| task_dir = temp_config_data['task_dir'] | |
| if task_dir == '.' or not os.path.isabs(task_dir): | |
| # Resolve relative to config file's directory | |
| task_dir = os.path.normpath(os.path.join(config_file_dir, task_dir)) | |
| temp_config_data['task_dir'] = task_dir | |
| logger.debug(f"Resolved task_dir to: {task_dir}") | |
| # Validate that config file is in task_dir (skip in test mode) | |
| skip_path_validation = os.environ.get('POTATO_SKIP_CONFIG_PATH_VALIDATION', '').lower() in ('1', 'true') | |
| if 'task_dir' in temp_config_data and not skip_path_validation: | |
| task_dir = temp_config_data['task_dir'] | |
| task_dir_abs = os.path.abspath(task_dir) | |
| if not config_file_abs.startswith(task_dir_abs): | |
| raise ConfigValidationError(f"Configuration file must be in the task_dir. Config file is at '{config_file_abs}' but task_dir is '{task_dir_abs}'") | |
| project_dir = task_dir | |
| # Now load and validate with the correct project_dir | |
| config_data = load_and_validate_config(config_file, os.getcwd()) | |
| # Update config_data with resolved task_dir | |
| if 'task_dir' in temp_config_data: | |
| config_data['task_dir'] = temp_config_data['task_dir'] | |
| else: | |
| config_data = load_and_validate_config(config_file, project_dir) | |
| config.update(config_data) | |
| # Only override config settings if command line arguments are explicitly provided | |
| config_updates = { | |
| "verbose": args.verbose, | |
| "very_verbose": args.very_verbose, | |
| # Store an ABSOLUTE path: the server chdir's into task_dir at startup, | |
| # so a relative path would be re-resolved against the wrong CWD later | |
| # (e.g. admin export doubled the project path). CWD is still the | |
| # original launch dir here (chdir happens further below). | |
| "__config_file__": os.path.abspath(args.config_file), | |
| "customjs": args.customjs, | |
| "customjs_hostname": args.customjs_hostname, | |
| "persist_sessions": args.persist_sessions, | |
| } | |
| # Only override debug if explicitly set to True via command line | |
| # or if config file doesn't have a debug setting | |
| if args.debug or "debug" not in config: | |
| config_updates["debug"] = args.debug | |
| # Add debug logging mode if specified | |
| if hasattr(args, 'debug_log') and args.debug_log: | |
| config_updates["debug_log"] = args.debug_log | |
| # Add debug phase if specified (requires --debug flag) | |
| if hasattr(args, 'debug_phase') and args.debug_phase: | |
| if not args.debug: | |
| print("⚠️ Warning: --debug-phase requires --debug flag. Enabling debug mode.") | |
| config_updates["debug"] = True | |
| config_updates["debug_phase"] = args.debug_phase | |
| config.update(config_updates) | |
| # Apply server config values (CLI args take precedence) | |
| if "server" in config: | |
| server_config = config["server"] | |
| # Apply port from server config if not specified via CLI | |
| if "port" in server_config and args.port is None: | |
| config["port"] = server_config["port"] | |
| logger.debug(f"Port set from config file: {server_config['port']}") | |
| # Apply host from server config | |
| if "host" in server_config: | |
| # Host can only be set via config (no CLI arg currently) | |
| config["host"] = server_config["host"] | |
| logger.debug(f"Host set from config file: {server_config['host']}") | |
| # Apply debug from server config if not specified via CLI | |
| if "debug" in server_config and not args.debug: | |
| config["debug"] = server_config["debug"] | |
| logger.debug(f"Debug mode set from config file: {server_config['debug']}") | |
| # update the current working dir for the server | |
| os.chdir(project_dir) | |
| except (ConfigSecurityError, ConfigValidationError, FileNotFoundError) as e: | |
| logger.error(f"Configuration error: {str(e)}") | |
| print(f"❌ Configuration error: {str(e)}") | |
| print("Please check your configuration file and try again.") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error during configuration initialization: {str(e)}") | |
| print(f"❌ Unexpected error: {str(e)}") | |
| raise | |
| def validate_active_learning_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate active learning configuration. | |
| Args: | |
| config_data: The configuration data containing active_learning section | |
| Raises: | |
| ConfigValidationError: If the active learning configuration is invalid | |
| """ | |
| if "active_learning" not in config_data: | |
| return # Active learning is optional | |
| al_config = config_data["active_learning"] | |
| # Validate enabled flag | |
| if not isinstance(al_config.get("enabled", False), bool): | |
| raise ConfigValidationError("active_learning.enabled must be a boolean") | |
| if not al_config.get("enabled", False): | |
| return # Skip validation if not enabled | |
| # Validate classifier configuration | |
| if "classifier" in al_config: | |
| classifier_config = al_config["classifier"] | |
| if not isinstance(classifier_config, dict): | |
| raise ConfigValidationError("active_learning.classifier must be a dictionary") | |
| if "name" not in classifier_config: | |
| raise ConfigValidationError("active_learning.classifier.name is required") | |
| if not isinstance(classifier_config["name"], str): | |
| raise ConfigValidationError("active_learning.classifier.name must be a string") | |
| # Validate hyperparameters if present | |
| if "hyperparameters" in classifier_config: | |
| if not isinstance(classifier_config["hyperparameters"], dict): | |
| raise ConfigValidationError("active_learning.classifier.hyperparameters must be a dictionary") | |
| # Validate vectorizer configuration | |
| if "vectorizer" in al_config: | |
| vectorizer_config = al_config["vectorizer"] | |
| if not isinstance(vectorizer_config, dict): | |
| raise ConfigValidationError("active_learning.vectorizer must be a dictionary") | |
| if "name" not in vectorizer_config: | |
| raise ConfigValidationError("active_learning.vectorizer.name is required") | |
| if not isinstance(vectorizer_config["name"], str): | |
| raise ConfigValidationError("active_learning.vectorizer.name must be a string") | |
| # Validate hyperparameters if present | |
| if "hyperparameters" in vectorizer_config: | |
| if not isinstance(vectorizer_config["hyperparameters"], dict): | |
| raise ConfigValidationError("active_learning.vectorizer.hyperparameters must be a dictionary") | |
| # Validate training parameters | |
| if "min_annotations_per_instance" in al_config: | |
| min_ann = al_config["min_annotations_per_instance"] | |
| if not isinstance(min_ann, int) or min_ann < 1: | |
| raise ConfigValidationError("active_learning.min_annotations_per_instance must be a positive integer") | |
| if "min_instances_for_training" in al_config: | |
| min_inst = al_config["min_instances_for_training"] | |
| if not isinstance(min_inst, int) or min_inst < 2: | |
| raise ConfigValidationError("active_learning.min_instances_for_training must be an integer >= 2") | |
| if "max_instances_to_reorder" in al_config: | |
| max_inst = al_config["max_instances_to_reorder"] | |
| if not isinstance(max_inst, int) or max_inst < 1: | |
| raise ConfigValidationError("active_learning.max_instances_to_reorder must be a positive integer") | |
| if "update_frequency" in al_config: | |
| update_freq = al_config["update_frequency"] | |
| if not isinstance(update_freq, int) or update_freq < 1: | |
| raise ConfigValidationError("active_learning.update_frequency must be a positive integer") | |
| # Validate resolution strategy | |
| if "resolution_strategy" in al_config: | |
| strategy = al_config["resolution_strategy"] | |
| valid_strategies = ["majority_vote", "random", "consensus", "weighted_average"] | |
| if strategy not in valid_strategies: | |
| raise ConfigValidationError(f"active_learning.resolution_strategy must be one of: {', '.join(valid_strategies)}") | |
| # Validate random sample percent | |
| if "random_sample_percent" in al_config: | |
| random_pct = al_config["random_sample_percent"] | |
| if not isinstance(random_pct, (int, float)) or random_pct < 0 or random_pct > 1: | |
| raise ConfigValidationError("active_learning.random_sample_percent must be between 0 and 1") | |
| # Validate schema names | |
| if "schema_names" in al_config: | |
| schema_names = al_config["schema_names"] | |
| if not isinstance(schema_names, list): | |
| raise ConfigValidationError("active_learning.schema_names must be a list") | |
| for schema in schema_names: | |
| if not isinstance(schema, str): | |
| raise ConfigValidationError("active_learning.schema_names must contain only strings") | |
| # Check for unsupported schema types | |
| if schema in ["text", "span"]: | |
| raise ConfigValidationError(f"Text and span annotation schemes are not supported for active learning: {schema}") | |
| # Validate database configuration | |
| if "database" in al_config: | |
| db_config = al_config["database"] | |
| if not isinstance(db_config, dict): | |
| raise ConfigValidationError("active_learning.database must be a dictionary") | |
| if "enabled" in db_config and not isinstance(db_config["enabled"], bool): | |
| raise ConfigValidationError("active_learning.database.enabled must be a boolean") | |
| # Validate model persistence configuration | |
| if "model_persistence" in al_config: | |
| model_config = al_config["model_persistence"] | |
| if not isinstance(model_config, dict): | |
| raise ConfigValidationError("active_learning.model_persistence must be a dictionary") | |
| if "enabled" in model_config and not isinstance(model_config["enabled"], bool): | |
| raise ConfigValidationError("active_learning.model_persistence.enabled must be a boolean") | |
| if "retention_count" in model_config: | |
| retention = model_config["retention_count"] | |
| if not isinstance(retention, int) or retention < 1: | |
| raise ConfigValidationError("active_learning.model_persistence.retention_count must be a positive integer") | |
| # Validate LLM configuration | |
| if "llm" in al_config: | |
| llm_config = al_config["llm"] | |
| if not isinstance(llm_config, dict): | |
| raise ConfigValidationError("active_learning.llm must be a dictionary") | |
| if "enabled" in llm_config and not isinstance(llm_config["enabled"], bool): | |
| raise ConfigValidationError("active_learning.llm.enabled must be a boolean") | |
| if "endpoint_url" in llm_config and not isinstance(llm_config["endpoint_url"], str): | |
| raise ConfigValidationError("active_learning.llm.endpoint_url must be a string") | |
| if "model_name" in llm_config and not isinstance(llm_config["model_name"], str): | |
| raise ConfigValidationError("active_learning.llm.model_name must be a string") | |
| # Validate query strategy | |
| if "query_strategy" in al_config: | |
| strategy = al_config["query_strategy"] | |
| valid_strategies = ["uncertainty", "diversity", "badge", "bald", "hybrid"] | |
| if strategy not in valid_strategies: | |
| raise ConfigValidationError( | |
| f"active_learning.query_strategy must be one of: {', '.join(valid_strategies)}" | |
| ) | |
| # Validate hybrid weights | |
| if "hybrid_weights" in al_config: | |
| weights = al_config["hybrid_weights"] | |
| if not isinstance(weights, dict): | |
| raise ConfigValidationError("active_learning.hybrid_weights must be a dictionary") | |
| weight_sum = sum(weights.values()) | |
| if abs(weight_sum - 1.0) > 0.01: | |
| raise ConfigValidationError( | |
| f"active_learning.hybrid_weights must sum to 1.0 (got {weight_sum})" | |
| ) | |
| # Validate cold-start strategy | |
| if "cold_start_strategy" in al_config: | |
| cs = al_config["cold_start_strategy"] | |
| if cs not in ["random", "llm"]: | |
| raise ConfigValidationError( | |
| "active_learning.cold_start_strategy must be one of: random, llm" | |
| ) | |
| # Validate confidence method (for LLM active learning) | |
| if "confidence_method" in al_config: | |
| cm = al_config["confidence_method"] | |
| if cm not in ["logprobs", "verbalized", "consistency"]: | |
| raise ConfigValidationError( | |
| "active_learning.confidence_method must be one of: logprobs, verbalized, consistency" | |
| ) | |
| # Validate classifier_params and vectorizer_params | |
| if "classifier_params" in al_config: | |
| if not isinstance(al_config["classifier_params"], dict): | |
| raise ConfigValidationError("active_learning.classifier_params must be a dictionary") | |
| if "vectorizer_params" in al_config: | |
| if not isinstance(al_config["vectorizer_params"], dict): | |
| raise ConfigValidationError("active_learning.vectorizer_params must be a dictionary") | |
| # Validate calibrate_probabilities | |
| if "calibrate_probabilities" in al_config: | |
| if not isinstance(al_config["calibrate_probabilities"], bool): | |
| raise ConfigValidationError("active_learning.calibrate_probabilities must be a boolean") | |
| # Validate BALD params | |
| if "bald_params" in al_config: | |
| bp = al_config["bald_params"] | |
| if not isinstance(bp, dict): | |
| raise ConfigValidationError("active_learning.bald_params must be a dictionary") | |
| if "n_estimators" in bp: | |
| if not isinstance(bp["n_estimators"], int) or bp["n_estimators"] < 2: | |
| raise ConfigValidationError("active_learning.bald_params.n_estimators must be an integer >= 2") | |
| # Validate ICL ensemble params | |
| if "use_icl_ensemble" in al_config: | |
| if not isinstance(al_config["use_icl_ensemble"], bool): | |
| raise ConfigValidationError("active_learning.use_icl_ensemble must be a boolean") | |
| if "icl_ensemble_params" in al_config: | |
| if not isinstance(al_config["icl_ensemble_params"], dict): | |
| raise ConfigValidationError("active_learning.icl_ensemble_params must be a dictionary") | |
| # Validate annotation routing | |
| if "annotation_routing" in al_config: | |
| if not isinstance(al_config["annotation_routing"], bool): | |
| raise ConfigValidationError("active_learning.annotation_routing must be a boolean") | |
| if "routing_thresholds" in al_config: | |
| rt = al_config["routing_thresholds"] | |
| if not isinstance(rt, dict): | |
| raise ConfigValidationError("active_learning.routing_thresholds must be a dictionary") | |
| for key in ["auto_label_min_confidence", "show_suggestion_below"]: | |
| if key in rt: | |
| val = rt[key] | |
| if not isinstance(val, (int, float)) or val < 0 or val > 1: | |
| raise ConfigValidationError( | |
| f"active_learning.routing_thresholds.{key} must be between 0 and 1" | |
| ) | |
| # Warn about sentence-transformers dependency | |
| if al_config.get("vectorizer_name") == "sentence-transformers" or \ | |
| (isinstance(al_config.get("vectorizer"), dict) and | |
| al_config["vectorizer"].get("name") == "sentence-transformers"): | |
| try: | |
| import sentence_transformers # noqa: F401 | |
| except ImportError: | |
| logger.warning( | |
| "sentence-transformers vectorizer configured but package not installed. " | |
| "Install with: pip install sentence-transformers" | |
| ) | |
| def validate_ai_support_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate AI support configuration. | |
| Args: | |
| config_data: The configuration data containing ai_support section | |
| Raises: | |
| ConfigValidationError: If the AI support configuration is invalid | |
| """ | |
| if "ai_support" not in config_data: | |
| return # AI support is optional | |
| ai_config = config_data["ai_support"] | |
| # Validate enabled flag | |
| if not isinstance(ai_config.get("enabled", False), bool): | |
| raise ConfigValidationError("ai_support.enabled must be a boolean") | |
| if not ai_config.get("enabled", False): | |
| return # Skip validation if not enabled | |
| # Validate ai_config_file (optional, string path to external AI config) | |
| has_external_config = False | |
| if "ai_config_file" in ai_config: | |
| if not isinstance(ai_config["ai_config_file"], str): | |
| raise ConfigValidationError("ai_support.ai_config_file must be a string") | |
| has_external_config = True | |
| # Validate endpoint type. When ai_config_file is set, the endpoint_type is | |
| # expected to live in the external file (which may be gitignored, e.g. when | |
| # it holds API keys) and is loaded at server start. | |
| if "endpoint_type" not in ai_config: | |
| if has_external_config: | |
| return # External file provides endpoint_type + model + credentials | |
| raise ConfigValidationError("ai_support.endpoint_type is required when ai_support is enabled") | |
| endpoint_type = ai_config["endpoint_type"] | |
| if not isinstance(endpoint_type, str): | |
| raise ConfigValidationError("ai_support.endpoint_type must be a string") | |
| valid_endpoint_types = ["openai", "anthropic", "huggingface", "ollama", "gemini", "vllm", | |
| "yolo", "ollama_vision", "openai_vision", "anthropic_vision"] | |
| if endpoint_type not in valid_endpoint_types: | |
| raise ConfigValidationError(f"ai_support.endpoint_type must be one of: {', '.join(valid_endpoint_types)}") | |
| # Validate ai_config section | |
| if "ai_config" in ai_config: | |
| ai_endpoint_config = ai_config["ai_config"] | |
| if not isinstance(ai_endpoint_config, dict): | |
| raise ConfigValidationError("ai_support.ai_config must be a dictionary") | |
| # Validate model name | |
| if "model" in ai_endpoint_config: | |
| model = ai_endpoint_config["model"] | |
| if not isinstance(model, str) or not model.strip(): | |
| raise ConfigValidationError("ai_support.ai_config.model must be a non-empty string") | |
| # Validate API key for cloud-based endpoints | |
| if endpoint_type in ["openai", "anthropic", "huggingface", "gemini"]: | |
| api_key = ai_endpoint_config.get("api_key", "") | |
| if not api_key or not isinstance(api_key, str): | |
| raise ConfigValidationError(f"ai_support.ai_config.api_key is required for {endpoint_type} endpoint") | |
| # Validate base_url for VLLM | |
| if endpoint_type == "vllm": | |
| base_url = ai_endpoint_config.get("base_url", "") | |
| if base_url and not isinstance(base_url, str): | |
| raise ConfigValidationError("ai_support.ai_config.base_url must be a string") | |
| # Validate temperature | |
| if "temperature" in ai_endpoint_config: | |
| temperature = ai_endpoint_config["temperature"] | |
| if not isinstance(temperature, (int, float)) or temperature < 0 or temperature > 2: | |
| raise ConfigValidationError("ai_support.ai_config.temperature must be between 0 and 2") | |
| # Validate max_tokens | |
| if "max_tokens" in ai_endpoint_config: | |
| max_tokens = ai_endpoint_config["max_tokens"] | |
| if not isinstance(max_tokens, int) or max_tokens < 1: | |
| raise ConfigValidationError("ai_support.ai_config.max_tokens must be a positive integer") | |
| # Validate custom prompts | |
| for prompt_key in ["hint_prompt", "keyword_prompt"]: | |
| if prompt_key in ai_endpoint_config: | |
| prompt = ai_endpoint_config[prompt_key] | |
| if not isinstance(prompt, str): | |
| raise ConfigValidationError(f"ai_support.ai_config.{prompt_key} must be a string") | |
| if not prompt.strip(): | |
| raise ConfigValidationError(f"ai_support.ai_config.{prompt_key} cannot be empty") | |
| # Validate option_highlighting configuration | |
| if "option_highlighting" in ai_config: | |
| _validate_option_highlighting_config(ai_config["option_highlighting"]) | |
| def validate_chat_support_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate chat support configuration for LLM annotator assistance. | |
| Args: | |
| config_data: The configuration data containing chat_support section | |
| Raises: | |
| ConfigValidationError: If the chat support configuration is invalid | |
| """ | |
| if "chat_support" not in config_data: | |
| return # Chat support is optional | |
| chat_config = config_data["chat_support"] | |
| if not isinstance(chat_config.get("enabled", False), bool): | |
| raise ConfigValidationError("chat_support.enabled must be a boolean") | |
| if not chat_config.get("enabled", False): | |
| return # Skip validation if not enabled | |
| # Validate endpoint type | |
| if "endpoint_type" not in chat_config: | |
| raise ConfigValidationError( | |
| "chat_support.endpoint_type is required when chat_support is enabled" | |
| ) | |
| endpoint_type = chat_config["endpoint_type"] | |
| valid_endpoint_types = [ | |
| "openai", "anthropic", "huggingface", "ollama", "gemini", "vllm", "openrouter", | |
| ] | |
| if endpoint_type not in valid_endpoint_types: | |
| raise ConfigValidationError( | |
| f"chat_support.endpoint_type must be one of: {', '.join(valid_endpoint_types)}" | |
| ) | |
| # Validate ai_config section | |
| if "ai_config" in chat_config: | |
| ai_cfg = chat_config["ai_config"] | |
| if not isinstance(ai_cfg, dict): | |
| raise ConfigValidationError("chat_support.ai_config must be a dictionary") | |
| if "model" in ai_cfg: | |
| if not isinstance(ai_cfg["model"], str) or not ai_cfg["model"].strip(): | |
| raise ConfigValidationError( | |
| "chat_support.ai_config.model must be a non-empty string" | |
| ) | |
| if "temperature" in ai_cfg: | |
| temp = ai_cfg["temperature"] | |
| if not isinstance(temp, (int, float)) or temp < 0 or temp > 2: | |
| raise ConfigValidationError( | |
| "chat_support.ai_config.temperature must be between 0 and 2" | |
| ) | |
| if "max_tokens" in ai_cfg: | |
| mt = ai_cfg["max_tokens"] | |
| if not isinstance(mt, int) or mt < 1: | |
| raise ConfigValidationError( | |
| "chat_support.ai_config.max_tokens must be a positive integer" | |
| ) | |
| # Validate API key for cloud endpoints | |
| if endpoint_type in ["openai", "anthropic", "huggingface", "gemini", "openrouter"]: | |
| api_key = ai_cfg.get("api_key", "") | |
| if not api_key or not isinstance(api_key, str): | |
| raise ConfigValidationError( | |
| f"chat_support.ai_config.api_key is required for {endpoint_type} endpoint" | |
| ) | |
| # Validate UI section | |
| if "ui" in chat_config: | |
| ui_cfg = chat_config["ui"] | |
| if not isinstance(ui_cfg, dict): | |
| raise ConfigValidationError("chat_support.ui must be a dictionary") | |
| if "sidebar_width" in ui_cfg: | |
| sw = ui_cfg["sidebar_width"] | |
| if not isinstance(sw, int) or sw < 200 or sw > 800: | |
| raise ConfigValidationError( | |
| "chat_support.ui.sidebar_width must be an integer between 200 and 800" | |
| ) | |
| if "max_history_per_instance" in ui_cfg: | |
| mh = ui_cfg["max_history_per_instance"] | |
| if not isinstance(mh, int) or mh < 1: | |
| raise ConfigValidationError( | |
| "chat_support.ui.max_history_per_instance must be a positive integer" | |
| ) | |
| def _validate_option_highlighting_config(oh_config: Dict[str, Any]) -> None: | |
| """ | |
| Validate option highlighting configuration. | |
| Args: | |
| oh_config: The option_highlighting configuration section | |
| Raises: | |
| ConfigValidationError: If the configuration is invalid | |
| """ | |
| if not isinstance(oh_config, dict): | |
| raise ConfigValidationError("ai_support.option_highlighting must be a dictionary") | |
| # Validate enabled flag | |
| if "enabled" in oh_config: | |
| if not isinstance(oh_config["enabled"], bool): | |
| raise ConfigValidationError("ai_support.option_highlighting.enabled must be a boolean") | |
| # Validate top_k (number of options to highlight) | |
| if "top_k" in oh_config: | |
| top_k = oh_config["top_k"] | |
| if not isinstance(top_k, int) or top_k < 1 or top_k > 10: | |
| raise ConfigValidationError("ai_support.option_highlighting.top_k must be an integer between 1 and 10") | |
| # Validate dim_opacity (opacity for non-highlighted options) | |
| if "dim_opacity" in oh_config: | |
| dim_opacity = oh_config["dim_opacity"] | |
| if not isinstance(dim_opacity, (int, float)) or dim_opacity < 0.1 or dim_opacity > 0.9: | |
| raise ConfigValidationError("ai_support.option_highlighting.dim_opacity must be a number between 0.1 and 0.9") | |
| # Validate auto_apply flag | |
| if "auto_apply" in oh_config: | |
| if not isinstance(oh_config["auto_apply"], bool): | |
| raise ConfigValidationError("ai_support.option_highlighting.auto_apply must be a boolean") | |
| # Validate schemas filter (list of schema names or null) | |
| if "schemas" in oh_config: | |
| schemas = oh_config["schemas"] | |
| if schemas is not None: | |
| if not isinstance(schemas, list): | |
| raise ConfigValidationError("ai_support.option_highlighting.schemas must be a list or null") | |
| for schema in schemas: | |
| if not isinstance(schema, str): | |
| raise ConfigValidationError("ai_support.option_highlighting.schemas must contain only strings") | |
| # Validate prefetch_count | |
| if "prefetch_count" in oh_config: | |
| prefetch_count = oh_config["prefetch_count"] | |
| if not isinstance(prefetch_count, int) or prefetch_count < 0 or prefetch_count > 100: | |
| raise ConfigValidationError("ai_support.option_highlighting.prefetch_count must be an integer between 0 and 100") | |
| def parse_active_learning_config(config_data: Dict[str, Any]) -> 'ActiveLearningConfig': | |
| """ | |
| Parse active learning configuration from YAML data. | |
| Args: | |
| config_data: The configuration data containing active_learning section | |
| Returns: | |
| ActiveLearningConfig: Parsed active learning configuration | |
| Raises: | |
| ConfigValidationError: If the configuration is invalid | |
| """ | |
| from potato.active_learning_manager import ActiveLearningConfig, ResolutionStrategy | |
| if "active_learning" not in config_data: | |
| return ActiveLearningConfig() # Return default config | |
| al_config = config_data["active_learning"] | |
| # Parse classifier configuration | |
| classifier_name = "sklearn.linear_model.LogisticRegression" | |
| classifier_kwargs = {} | |
| if "classifier" in al_config: | |
| classifier_config = al_config["classifier"] | |
| classifier_name = classifier_config.get("name", classifier_name) | |
| classifier_kwargs = classifier_config.get("hyperparameters", {}) | |
| # Parse vectorizer configuration | |
| vectorizer_name = "sklearn.feature_extraction.text.CountVectorizer" | |
| vectorizer_kwargs = {} | |
| if "vectorizer" in al_config: | |
| vectorizer_config = al_config["vectorizer"] | |
| vectorizer_name = vectorizer_config.get("name", vectorizer_name) | |
| vectorizer_kwargs = vectorizer_config.get("hyperparameters", {}) | |
| # Parse resolution strategy | |
| resolution_strategy = ResolutionStrategy.MAJORITY_VOTE | |
| if "resolution_strategy" in al_config: | |
| strategy_str = al_config["resolution_strategy"] | |
| if strategy_str == "majority_vote": | |
| resolution_strategy = ResolutionStrategy.MAJORITY_VOTE | |
| elif strategy_str == "random": | |
| resolution_strategy = ResolutionStrategy.RANDOM | |
| elif strategy_str == "consensus": | |
| resolution_strategy = ResolutionStrategy.CONSENSUS | |
| elif strategy_str == "weighted_average": | |
| resolution_strategy = ResolutionStrategy.WEIGHTED_AVERAGE | |
| # Parse other parameters | |
| min_annotations_per_instance = al_config.get("min_annotations_per_instance", 1) | |
| min_instances_for_training = al_config.get("min_instances_for_training", 10) | |
| max_instances_to_reorder = al_config.get("max_instances_to_reorder") | |
| random_sample_percent = al_config.get("random_sample_percent", 0.2) | |
| update_frequency = al_config.get("update_frequency", 5) | |
| schema_names = al_config.get("schema_names", []) | |
| # Parse database configuration | |
| database_enabled = False | |
| database_config = {} | |
| if "database" in al_config: | |
| db_config = al_config["database"] | |
| database_enabled = db_config.get("enabled", False) | |
| database_config = {k: v for k, v in db_config.items() if k != "enabled"} | |
| # Parse model persistence configuration | |
| model_persistence_enabled = False | |
| model_save_directory = None | |
| model_retention_count = 2 | |
| if "model_persistence" in al_config: | |
| model_config = al_config["model_persistence"] | |
| model_persistence_enabled = model_config.get("enabled", False) | |
| model_save_directory = model_config.get("save_directory") | |
| model_retention_count = model_config.get("retention_count", 2) | |
| # Parse LLM configuration | |
| llm_enabled = False | |
| llm_config = {} | |
| if "llm" in al_config: | |
| llm_config = al_config["llm"] | |
| llm_enabled = llm_config.get("enabled", False) | |
| return ActiveLearningConfig( | |
| enabled=al_config.get("enabled", False), | |
| classifier_name=classifier_name, | |
| classifier_kwargs=classifier_kwargs, | |
| vectorizer_name=vectorizer_name, | |
| vectorizer_kwargs=vectorizer_kwargs, | |
| min_annotations_per_instance=min_annotations_per_instance, | |
| min_instances_for_training=min_instances_for_training, | |
| max_instances_to_reorder=max_instances_to_reorder, | |
| resolution_strategy=resolution_strategy, | |
| random_sample_percent=random_sample_percent, | |
| update_frequency=update_frequency, | |
| schema_names=schema_names, | |
| database_enabled=database_enabled, | |
| database_config=database_config, | |
| model_persistence_enabled=model_persistence_enabled, | |
| model_save_directory=model_save_directory, | |
| model_retention_count=model_retention_count, | |
| llm_enabled=llm_enabled, | |
| llm_config=llm_config | |
| ) | |
| def validate_instance_display_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate instance_display configuration. | |
| The instance_display section defines what content to show annotators, | |
| separate from what annotations to collect. This allows displaying | |
| images/videos/audio alongside any annotation type. | |
| Args: | |
| config_data: The configuration data | |
| Raises: | |
| ConfigValidationError: If the instance_display configuration is invalid | |
| """ | |
| if "instance_display" not in config_data: | |
| return # instance_display is optional (backwards compatible) | |
| display_config = config_data["instance_display"] | |
| if not isinstance(display_config, dict): | |
| raise ConfigValidationError("instance_display must be a dictionary") | |
| # Validate fields | |
| if "fields" not in display_config: | |
| raise ConfigValidationError("instance_display must contain 'fields' list") | |
| fields = display_config["fields"] | |
| if not isinstance(fields, list): | |
| raise ConfigValidationError("instance_display.fields must be a list") | |
| if not fields: | |
| raise ConfigValidationError("instance_display.fields cannot be empty") | |
| # Track span targets for validation | |
| span_targets = [] | |
| # Valid display types — sourced from the display registry (single source | |
| # of truth) so new display types don't require editing this list. Falls | |
| # back to a static list if the registry can't be imported. | |
| try: | |
| from .displays import display_registry | |
| valid_display_types = display_registry.get_supported_types() | |
| except Exception: | |
| valid_display_types = [ | |
| "text", "html", "image", "video", "audio", "dialogue", "pairwise", | |
| "pdf", "document", "spreadsheet", "code", "agent_trace", "eval_trace", | |
| "gallery", "conversation_tree", "interactive_chat", "web_agent_trace", | |
| "live_agent", "coding_trace", "live_coding_agent", | |
| ] | |
| for i, field in enumerate(fields): | |
| if not isinstance(field, dict): | |
| raise ConfigValidationError(f"instance_display.fields[{i}] must be a dictionary") | |
| # Validate required field properties | |
| if "key" not in field: | |
| raise ConfigValidationError(f"instance_display.fields[{i}] missing required 'key' property") | |
| key = field["key"] | |
| if not isinstance(key, str) or not key.strip(): | |
| raise ConfigValidationError(f"instance_display.fields[{i}].key must be a non-empty string") | |
| if "type" not in field: | |
| raise ConfigValidationError(f"instance_display.fields[{i}] missing required 'type' property") | |
| field_type = field["type"] | |
| if field_type not in valid_display_types: | |
| raise ConfigValidationError( | |
| f"instance_display.fields[{i}].type '{field_type}' is invalid. " | |
| f"Valid types are: {', '.join(valid_display_types)}" | |
| ) | |
| # Validate label if present | |
| if "label" in field: | |
| if not isinstance(field["label"], str): | |
| raise ConfigValidationError(f"instance_display.fields[{i}].label must be a string") | |
| # Validate span_target | |
| if field.get("span_target"): | |
| # Types that support span annotation targets | |
| span_target_types = ["text", "dialogue", "pdf", "document", "spreadsheet", "code", "agent_trace", "interactive_chat"] | |
| if field_type not in span_target_types: | |
| raise ConfigValidationError( | |
| f"instance_display.fields[{i}].span_target is set but type '{field_type}' " | |
| f"does not support span annotation. Types that support span_target: {', '.join(span_target_types)}." | |
| ) | |
| span_targets.append(key) | |
| # Validate display_options if present | |
| if "display_options" in field: | |
| options = field["display_options"] | |
| if not isinstance(options, dict): | |
| raise ConfigValidationError(f"instance_display.fields[{i}].display_options must be a dictionary") | |
| # Type-specific option validation | |
| _validate_display_options(field_type, options, f"instance_display.fields[{i}]") | |
| # Validate layout if present | |
| if "layout" in display_config: | |
| layout = display_config["layout"] | |
| if not isinstance(layout, dict): | |
| raise ConfigValidationError("instance_display.layout must be a dictionary") | |
| if "direction" in layout: | |
| valid_directions = ["vertical", "horizontal"] | |
| if layout["direction"] not in valid_directions: | |
| raise ConfigValidationError( | |
| f"instance_display.layout.direction must be one of: {', '.join(valid_directions)}" | |
| ) | |
| if "gap" in layout: | |
| gap = layout["gap"] | |
| if not isinstance(gap, str): | |
| raise ConfigValidationError("instance_display.layout.gap must be a string (e.g., '20px', '1rem')") | |
| # Validate resizable option (defaults to True) | |
| if "resizable" in display_config: | |
| if not isinstance(display_config["resizable"], bool): | |
| raise ConfigValidationError("instance_display.resizable must be a boolean (true/false)") | |
| # Check for deprecation warning: using annotation schemas for display-only | |
| _check_display_only_deprecation(config_data) | |
| def _validate_display_options(field_type: str, options: Dict[str, Any], path: str) -> None: | |
| """ | |
| Validate display options for a specific field type. | |
| Args: | |
| field_type: The display type | |
| options: The display options dictionary | |
| path: The config path for error messages | |
| Raises: | |
| ConfigValidationError: If options are invalid | |
| """ | |
| # Common option validation | |
| if "max_width" in options: | |
| max_width = options["max_width"] | |
| if not isinstance(max_width, (int, str)): | |
| raise ConfigValidationError(f"{path}.display_options.max_width must be an integer or string") | |
| if isinstance(max_width, int) and max_width < 1: | |
| raise ConfigValidationError(f"{path}.display_options.max_width must be positive") | |
| if "max_height" in options: | |
| max_height = options["max_height"] | |
| if not isinstance(max_height, (int, str)): | |
| raise ConfigValidationError(f"{path}.display_options.max_height must be an integer or string") | |
| if isinstance(max_height, int) and max_height < 1: | |
| raise ConfigValidationError(f"{path}.display_options.max_height must be positive") | |
| if "min_height" in options: | |
| min_height = options["min_height"] | |
| if not isinstance(min_height, (int, str)): | |
| raise ConfigValidationError(f"{path}.display_options.min_height must be an integer or string") | |
| if isinstance(min_height, int) and min_height < 1: | |
| raise ConfigValidationError(f"{path}.display_options.min_height must be positive") | |
| if "resizable" in options: | |
| if not isinstance(options["resizable"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.resizable must be a boolean") | |
| # Text-specific options | |
| if field_type in ["text", "html"]: | |
| if "collapsible" in options: | |
| if not isinstance(options["collapsible"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.collapsible must be a boolean") | |
| if "preserve_whitespace" in options: | |
| if not isinstance(options["preserve_whitespace"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.preserve_whitespace must be a boolean") | |
| # Image-specific options | |
| if field_type == "image": | |
| if "zoomable" in options: | |
| if not isinstance(options["zoomable"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.zoomable must be a boolean") | |
| if "object_fit" in options: | |
| valid_fits = ["contain", "cover", "fill", "none", "scale-down"] | |
| if options["object_fit"] not in valid_fits: | |
| raise ConfigValidationError( | |
| f"{path}.display_options.object_fit must be one of: {', '.join(valid_fits)}" | |
| ) | |
| # Video-specific options | |
| if field_type == "video": | |
| for bool_opt in ["controls", "autoplay", "loop", "muted"]: | |
| if bool_opt in options: | |
| if not isinstance(options[bool_opt], bool): | |
| raise ConfigValidationError(f"{path}.display_options.{bool_opt} must be a boolean") | |
| # Audio-specific options | |
| if field_type == "audio": | |
| if "controls" in options: | |
| if not isinstance(options["controls"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.controls must be a boolean") | |
| if "show_waveform" in options: | |
| if not isinstance(options["show_waveform"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.show_waveform must be a boolean") | |
| # Dialogue-specific options | |
| if field_type == "dialogue": | |
| if "alternating_shading" in options: | |
| if not isinstance(options["alternating_shading"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.alternating_shading must be a boolean") | |
| if "speaker_extraction" in options: | |
| if not isinstance(options["speaker_extraction"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.speaker_extraction must be a boolean") | |
| # Pairwise-specific options | |
| if field_type == "pairwise": | |
| if "cell_width" in options: | |
| cell_width = options["cell_width"] | |
| if not isinstance(cell_width, str): | |
| raise ConfigValidationError(f"{path}.display_options.cell_width must be a string (e.g., '50%')") | |
| # PDF-specific options | |
| if field_type == "pdf": | |
| if "view_mode" in options: | |
| valid_modes = ["scroll", "paginated", "side-by-side"] | |
| if options["view_mode"] not in valid_modes: | |
| raise ConfigValidationError( | |
| f"{path}.display_options.view_mode must be one of: {', '.join(valid_modes)}" | |
| ) | |
| if "text_layer" in options: | |
| if not isinstance(options["text_layer"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.text_layer must be a boolean") | |
| if "zoom" in options: | |
| zoom = options["zoom"] | |
| valid_zoom_modes = ["auto", "page-fit", "page-width"] | |
| if zoom not in valid_zoom_modes: | |
| try: | |
| float(zoom) | |
| except (TypeError, ValueError): | |
| raise ConfigValidationError( | |
| f"{path}.display_options.zoom must be one of {valid_zoom_modes} or a number" | |
| ) | |
| # Document-specific options | |
| if field_type == "document": | |
| if "collapsible" in options: | |
| if not isinstance(options["collapsible"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.collapsible must be a boolean") | |
| if "show_outline" in options: | |
| if not isinstance(options["show_outline"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.show_outline must be a boolean") | |
| if "style_theme" in options: | |
| valid_themes = ["default", "minimal", "print"] | |
| if options["style_theme"] not in valid_themes: | |
| raise ConfigValidationError( | |
| f"{path}.display_options.style_theme must be one of: {', '.join(valid_themes)}" | |
| ) | |
| # Spreadsheet-specific options | |
| if field_type == "spreadsheet": | |
| if "annotation_mode" in options: | |
| valid_modes = ["row", "cell", "range"] | |
| if options["annotation_mode"] not in valid_modes: | |
| raise ConfigValidationError( | |
| f"{path}.display_options.annotation_mode must be one of: {', '.join(valid_modes)}" | |
| ) | |
| for bool_opt in ["show_headers", "striped", "hoverable", "sortable", "selectable", "compact"]: | |
| if bool_opt in options: | |
| if not isinstance(options[bool_opt], bool): | |
| raise ConfigValidationError(f"{path}.display_options.{bool_opt} must be a boolean") | |
| # Code-specific options | |
| if field_type == "code": | |
| if "language" in options: | |
| if not isinstance(options["language"], (str, type(None))): | |
| raise ConfigValidationError(f"{path}.display_options.language must be a string or null") | |
| if "show_line_numbers" in options: | |
| if not isinstance(options["show_line_numbers"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.show_line_numbers must be a boolean") | |
| if "wrap_lines" in options: | |
| if not isinstance(options["wrap_lines"], bool): | |
| raise ConfigValidationError(f"{path}.display_options.wrap_lines must be a boolean") | |
| if "highlight_lines" in options: | |
| hl = options["highlight_lines"] | |
| if hl is not None and not isinstance(hl, list): | |
| raise ConfigValidationError(f"{path}.display_options.highlight_lines must be a list of line numbers or null") | |
| if "theme" in options: | |
| valid_themes = ["default", "dark"] | |
| if options["theme"] not in valid_themes: | |
| raise ConfigValidationError( | |
| f"{path}.display_options.theme must be one of: {', '.join(valid_themes)}" | |
| ) | |
| def validate_format_handling_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate format_handling configuration for extended format support. | |
| Args: | |
| config_data: The full configuration data | |
| Raises: | |
| ConfigValidationError: If the format_handling configuration is invalid | |
| """ | |
| format_config = config_data.get('format_handling') | |
| if format_config is None: | |
| return | |
| if not isinstance(format_config, dict): | |
| raise ConfigValidationError("format_handling must be a dictionary") | |
| # Validate enabled flag | |
| if "enabled" in format_config: | |
| if not isinstance(format_config["enabled"], bool): | |
| raise ConfigValidationError("format_handling.enabled must be a boolean") | |
| # Validate default_format | |
| if "default_format" in format_config: | |
| default = format_config["default_format"] | |
| valid_defaults = ["auto", "pdf", "docx", "markdown", "spreadsheet", "code"] | |
| if default not in valid_defaults: | |
| raise ConfigValidationError( | |
| f"format_handling.default_format must be one of: {', '.join(valid_defaults)}" | |
| ) | |
| # Validate PDF-specific options | |
| if "pdf" in format_config: | |
| pdf_opts = format_config["pdf"] | |
| if not isinstance(pdf_opts, dict): | |
| raise ConfigValidationError("format_handling.pdf must be a dictionary") | |
| if "extraction_mode" in pdf_opts: | |
| valid_modes = ["text", "ocr", "hybrid"] | |
| if pdf_opts["extraction_mode"] not in valid_modes: | |
| raise ConfigValidationError( | |
| f"format_handling.pdf.extraction_mode must be one of: {', '.join(valid_modes)}" | |
| ) | |
| if "cache_extracted" in pdf_opts: | |
| if not isinstance(pdf_opts["cache_extracted"], bool): | |
| raise ConfigValidationError("format_handling.pdf.cache_extracted must be a boolean") | |
| # Validate spreadsheet-specific options | |
| if "spreadsheet" in format_config: | |
| ss_opts = format_config["spreadsheet"] | |
| if not isinstance(ss_opts, dict): | |
| raise ConfigValidationError("format_handling.spreadsheet must be a dictionary") | |
| if "annotation_mode" in ss_opts: | |
| valid_modes = ["row", "cell", "range"] | |
| if ss_opts["annotation_mode"] not in valid_modes: | |
| raise ConfigValidationError( | |
| f"format_handling.spreadsheet.annotation_mode must be one of: {', '.join(valid_modes)}" | |
| ) | |
| if "max_rows" in ss_opts: | |
| max_rows = ss_opts["max_rows"] | |
| if not isinstance(max_rows, int) or max_rows < 1: | |
| raise ConfigValidationError("format_handling.spreadsheet.max_rows must be a positive integer") | |
| def validate_layout_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate layout configuration for annotation form grid arrangement. | |
| The layout section configures how annotation forms are arranged in a grid, | |
| supports grouping schemas with collapsible headers, and provides responsive | |
| breakpoints for mobile/tablet displays. | |
| Args: | |
| config_data: The full configuration data | |
| Raises: | |
| ConfigValidationError: If the layout configuration is invalid | |
| """ | |
| layout = config_data.get('layout') | |
| if layout is None: | |
| return # layout is optional | |
| if not isinstance(layout, dict): | |
| raise ConfigValidationError("layout must be a dictionary") | |
| # Validate grid configuration | |
| if 'grid' in layout: | |
| grid = layout['grid'] | |
| if not isinstance(grid, dict): | |
| raise ConfigValidationError("layout.grid must be a dictionary") | |
| # Validate columns (1-6) | |
| if 'columns' in grid: | |
| columns = grid['columns'] | |
| if not isinstance(columns, int) or columns < 1 or columns > 6: | |
| raise ConfigValidationError("layout.grid.columns must be an integer between 1 and 6") | |
| # Validate gap (CSS value) | |
| if 'gap' in grid: | |
| gap = grid['gap'] | |
| if not isinstance(gap, str) or not gap.strip(): | |
| raise ConfigValidationError("layout.grid.gap must be a non-empty CSS value string (e.g., '1rem', '16px')") | |
| # Validate row_gap (CSS value) | |
| if 'row_gap' in grid: | |
| row_gap = grid['row_gap'] | |
| if not isinstance(row_gap, str) or not row_gap.strip(): | |
| raise ConfigValidationError("layout.grid.row_gap must be a non-empty CSS value string") | |
| # Validate align_items | |
| if 'align_items' in grid: | |
| valid_alignments = ['start', 'center', 'end', 'stretch'] | |
| if grid['align_items'] not in valid_alignments: | |
| raise ConfigValidationError( | |
| f"layout.grid.align_items must be one of: {', '.join(valid_alignments)}" | |
| ) | |
| # Validate breakpoints | |
| if 'breakpoints' in layout: | |
| breakpoints = layout['breakpoints'] | |
| if not isinstance(breakpoints, dict): | |
| raise ConfigValidationError("layout.breakpoints must be a dictionary") | |
| for bp_name in ['mobile', 'tablet']: | |
| if bp_name in breakpoints: | |
| bp_value = breakpoints[bp_name] | |
| if not isinstance(bp_value, int) or bp_value < 0: | |
| raise ConfigValidationError( | |
| f"layout.breakpoints.{bp_name} must be a non-negative integer (pixel value)" | |
| ) | |
| # Validate groups | |
| if 'groups' in layout: | |
| groups = layout['groups'] | |
| if not isinstance(groups, list): | |
| raise ConfigValidationError("layout.groups must be a list") | |
| # Collect all schema names for validation | |
| all_schemas = set() | |
| schemes = config_data.get('annotation_schemes', []) | |
| for scheme in schemes: | |
| if isinstance(scheme, dict) and 'name' in scheme: | |
| all_schemas.add(scheme['name']) | |
| group_ids = set() | |
| for i, group in enumerate(groups): | |
| if not isinstance(group, dict): | |
| raise ConfigValidationError(f"layout.groups[{i}] must be a dictionary") | |
| # Validate required group fields | |
| if 'id' not in group: | |
| raise ConfigValidationError(f"layout.groups[{i}] missing required 'id' field") | |
| group_id = group['id'] | |
| if not isinstance(group_id, str) or not group_id.strip(): | |
| raise ConfigValidationError(f"layout.groups[{i}].id must be a non-empty string") | |
| if group_id in group_ids: | |
| raise ConfigValidationError(f"layout.groups[{i}].id '{group_id}' is duplicate") | |
| group_ids.add(group_id) | |
| # Validate schemas list | |
| if 'schemas' not in group: | |
| raise ConfigValidationError(f"layout.groups[{i}] missing required 'schemas' field") | |
| group_schemas = group['schemas'] | |
| if not isinstance(group_schemas, list): | |
| raise ConfigValidationError(f"layout.groups[{i}].schemas must be a list") | |
| if not group_schemas: | |
| raise ConfigValidationError(f"layout.groups[{i}].schemas cannot be empty") | |
| # Validate each schema reference exists | |
| for j, schema_name in enumerate(group_schemas): | |
| if not isinstance(schema_name, str): | |
| raise ConfigValidationError( | |
| f"layout.groups[{i}].schemas[{j}] must be a string" | |
| ) | |
| if schema_name not in all_schemas: | |
| raise ConfigValidationError( | |
| f"layout.groups[{i}].schemas references unknown schema: '{schema_name}'" | |
| ) | |
| # Validate optional boolean fields | |
| if 'collapsible' in group: | |
| if not isinstance(group['collapsible'], bool): | |
| raise ConfigValidationError(f"layout.groups[{i}].collapsible must be a boolean") | |
| if 'collapsed_default' in group: | |
| if not isinstance(group['collapsed_default'], bool): | |
| raise ConfigValidationError(f"layout.groups[{i}].collapsed_default must be a boolean") | |
| # Validate optional title | |
| if 'title' in group: | |
| if not isinstance(group['title'], str): | |
| raise ConfigValidationError(f"layout.groups[{i}].title must be a string") | |
| # Validate optional description | |
| if 'description' in group: | |
| if not isinstance(group['description'], str): | |
| raise ConfigValidationError(f"layout.groups[{i}].description must be a string") | |
| # Validate order | |
| if 'order' in layout: | |
| order = layout['order'] | |
| if not isinstance(order, list): | |
| raise ConfigValidationError("layout.order must be a list") | |
| for i, schema_name in enumerate(order): | |
| if not isinstance(schema_name, str): | |
| raise ConfigValidationError(f"layout.order[{i}] must be a string") | |
| # Validate styling (advanced options) | |
| if 'styling' in layout: | |
| styling = layout['styling'] | |
| if not isinstance(styling, dict): | |
| raise ConfigValidationError("layout.styling must be a dictionary") | |
| # Validate align_items | |
| if 'align_items' in styling: | |
| valid_alignments = ['start', 'center', 'end', 'stretch'] | |
| if styling['align_items'] not in valid_alignments: | |
| raise ConfigValidationError( | |
| f"layout.styling.align_items must be one of: {', '.join(valid_alignments)}" | |
| ) | |
| # Validate content_align | |
| if 'content_align' in styling: | |
| valid_content_align = ['left', 'center', 'right'] | |
| if styling['content_align'] not in valid_content_align: | |
| raise ConfigValidationError( | |
| f"layout.styling.content_align must be one of: {', '.join(valid_content_align)}" | |
| ) | |
| # Validate background colors (CSS color values) | |
| for color_key in ['group_background_odd', 'group_background_even']: | |
| if color_key in styling: | |
| color = styling[color_key] | |
| if not isinstance(color, str) or not color.strip(): | |
| raise ConfigValidationError( | |
| f"layout.styling.{color_key} must be a non-empty CSS color value" | |
| ) | |
| # Validate padding values (CSS padding) | |
| for padding_key in ['group_padding', 'form_padding']: | |
| if padding_key in styling: | |
| padding = styling[padding_key] | |
| if not isinstance(padding, str) or not padding.strip(): | |
| raise ConfigValidationError( | |
| f"layout.styling.{padding_key} must be a non-empty CSS padding value" | |
| ) | |
| # Validate per-group background_color if present | |
| if 'groups' in layout: | |
| for i, group in enumerate(layout['groups']): | |
| if 'background_color' in group: | |
| bg_color = group['background_color'] | |
| if not isinstance(bg_color, str) or not bg_color.strip(): | |
| raise ConfigValidationError( | |
| f"layout.groups[{i}].background_color must be a non-empty CSS color value" | |
| ) | |
| def validate_adjudication_config(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Validate adjudication configuration. | |
| Args: | |
| config_data: The full configuration data | |
| Raises: | |
| ConfigValidationError: If the adjudication configuration is invalid | |
| """ | |
| adj_config = config_data.get('adjudication', {}) | |
| if not isinstance(adj_config, dict): | |
| raise ConfigValidationError("adjudication must be a dictionary") | |
| if not adj_config.get('enabled', False): | |
| return | |
| # Require adjudicator_users | |
| users = adj_config.get('adjudicator_users', []) | |
| if not isinstance(users, list) or len(users) == 0: | |
| raise ConfigValidationError( | |
| "adjudication.adjudicator_users must be a non-empty list of usernames" | |
| ) | |
| # Validate numeric fields | |
| min_ann = adj_config.get('min_annotations', 2) | |
| if not isinstance(min_ann, int) or min_ann < 1: | |
| raise ConfigValidationError( | |
| "adjudication.min_annotations must be a positive integer" | |
| ) | |
| threshold = adj_config.get('agreement_threshold', 0.75) | |
| if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: | |
| raise ConfigValidationError( | |
| "adjudication.agreement_threshold must be a number between 0 and 1" | |
| ) | |
| fast_warn = adj_config.get('fast_decision_warning_ms', 2000) | |
| if not isinstance(fast_warn, (int, float)) or fast_warn < 0: | |
| raise ConfigValidationError( | |
| "adjudication.fast_decision_warning_ms must be a non-negative number" | |
| ) | |
| # Validate error_taxonomy | |
| taxonomy = adj_config.get('error_taxonomy') | |
| if taxonomy is not None: | |
| if not isinstance(taxonomy, list): | |
| raise ConfigValidationError( | |
| "adjudication.error_taxonomy must be a list of strings" | |
| ) | |
| for item in taxonomy: | |
| if not isinstance(item, str): | |
| raise ConfigValidationError( | |
| "adjudication.error_taxonomy entries must be strings" | |
| ) | |
| # Validate similarity config | |
| sim_config = adj_config.get('similarity', {}) | |
| if isinstance(sim_config, dict) and sim_config.get('enabled', False): | |
| top_k = sim_config.get('top_k', 5) | |
| if not isinstance(top_k, int) or top_k < 1 or top_k > 20: | |
| raise ConfigValidationError( | |
| "adjudication.similarity.top_k must be an integer between 1 and 20" | |
| ) | |
| model = sim_config.get('model', 'all-MiniLM-L6-v2') | |
| if not isinstance(model, str) or not model.strip(): | |
| raise ConfigValidationError( | |
| "adjudication.similarity.model must be a non-empty string" | |
| ) | |
| def _check_display_only_deprecation(config_data: Dict[str, Any]) -> None: | |
| """ | |
| Check for deprecated display-only pattern and log warning. | |
| Detects when image_annotation, video_annotation, or audio_annotation | |
| is used with min_annotations: 0 just to display content. | |
| Args: | |
| config_data: The configuration data | |
| """ | |
| # Get annotation schemes | |
| schemes = [] | |
| if "annotation_schemes" in config_data: | |
| schemes = config_data["annotation_schemes"] | |
| elif "phases" in config_data: | |
| phases = config_data["phases"] | |
| if isinstance(phases, list): | |
| for phase in phases: | |
| schemes.extend(phase.get("annotation_schemes", [])) | |
| elif isinstance(phases, dict): | |
| for phase_name, phase in phases.items(): | |
| if phase_name != "order" and isinstance(phase, dict): | |
| schemes.extend(phase.get("annotation_schemes", [])) | |
| for scheme in schemes: | |
| if not isinstance(scheme, dict): | |
| continue | |
| annotation_type = scheme.get("annotation_type") | |
| if annotation_type in ["image_annotation", "video_annotation", "audio_annotation"]: | |
| min_annotations = scheme.get("min_annotations", 1) | |
| if min_annotations == 0: | |
| logger.warning( | |
| f"Deprecation warning: Using {annotation_type} with min_annotations=0 " | |
| f"for display-only is deprecated. Use instance_display instead. " | |
| f"See docs/instance_display.md for migration guide." | |
| ) | |