""" Config module with enhanced security validation and error handling. """ import yaml import os import logging import re import codecs from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from urllib.parse import urlparse import json config = {} def clear_config(): """Clear the global config dictionary. Used for testing to ensure clean state.""" global config config.clear() # Use centralized logging - avoid duplicate basicConfig logger = logging.getLogger(__name__) class ConfigValidationError(Exception): """Custom exception for configuration validation errors.""" pass class ConfigSecurityError(Exception): """Custom exception for configuration security violations.""" pass import difflib # ============================================================================ # Known config key schema (hierarchical) # Keys map to None (leaf), set (known sub-keys), or dict (nested schema). # Used by validate_unknown_keys() to warn about typos at all nesting levels. # ============================================================================ KNOWN_CONFIG_KEYS = { # === Core / required === "item_properties": { "id_key", "text_key", "category_key", "kwargs", }, "data_files": None, "task_dir": None, "output_annotation_dir": None, "output_annotation_format": None, "annotation_task_name": None, "task_description": None, "annotation_task_description": None, # === Data sources === "data_directory": None, "data_directory_encoding": None, "data_sources": None, "data_cache": {"enabled", "ttl_seconds", "max_size_mb"}, "watch_data_directory": None, "watch_poll_interval": None, "partial_loading": None, # === Annotation === "annotation_schemes": None, "phases": None, "output_annotation_format": None, # === Auth / login === "authentication": { "method", "providers", "user_identity_field", "database_url", "user_config_path", "auto_register", "allow_local_login", "allowed_domain", "allowed_domains", "allowed_org", }, "login": {"type", "url_argument", "auto_redirect_delay", "auto_redirect_on_completion"}, "user_config": {"allow_all_users", "users"}, "require_password": None, "require_no_password": None, "secret_key": None, # === Server === "server": {"port", "host", "debug"}, "port": None, "host": None, "customjs": None, "customjs_hostname": None, "site_dir": None, "site_file": None, "persist_sessions": None, "session_lifetime_days": None, "base_html_template": None, # === Quality control === "attention_checks": { "enabled", "items_file", "frequency", "probability", "min_response_time", "failure_handling", }, "gold_standards": { "enabled", "items_file", "mode", "frequency", "accuracy", "auto_promote", }, "gold_standards_file": None, "pre_annotation": { "enabled", "field", "highlight_low_confidence", "agreement_metrics", "predictions_file", "allow_modification", "show_confidence", }, "agreement_metrics": {"min_overlap", "refresh_interval", "enabled"}, "quality_control": None, # === AI === "ai_support": { "enabled", "endpoint_type", "ai_config_file", "ai_config", "option_highlighting", "features", "cache_config", }, "chat_support": { "enabled", "endpoint_type", "ai_config", "ui", }, # === Advanced features === "training": { "enabled", "data_file", "annotation_schemes", "passing_criteria", "feedback", "failure_action", }, "active_learning": { "enabled", "classifier", "vectorizer", "min_annotations_per_instance", "min_instances_for_training", "max_instances_to_reorder", "update_frequency", "resolution_strategy", "random_sample_percent", "schema_names", "database", "model_persistence", "llm", "query_strategy", "hybrid_weights", "cold_start_strategy", "confidence_method", "classifier_params", "vectorizer_params", "calibrate_probabilities", "bald_params", "use_icl_ensemble", "icl_ensemble_params", "annotation_routing", "routing_thresholds", }, "category_assignment": { "enabled", "category_key", "qualification", "fallback", "dynamic", }, "batch_assignment": { "groups", "annotator_key", }, "diversity_ordering": { "enabled", "model_name", "num_clusters", "items_per_cluster", "auto_clusters", "prefill_count", "batch_size", "recluster_threshold", "preserve_visited", "trigger_ai_prefetch", "cache_dir", }, "diversity_config": None, "embedding_visualization": { "enabled", "sample_size", "include_all_annotated", "embedding_model", "image_embedding_model", "umap", "label_source", }, "adjudication": { "enabled", "adjudicator_users", "min_annotations", "agreement_threshold", "fast_decision_warning_ms", "error_taxonomy", "similarity", "require_notes_on_override", "show_agreement_scores", "show_annotator_names", "output_subdir", "require_confidence", "show_all_items", "show_timing_data", }, "database": {"type", "host", "database", "username", "password", "port", "pool_size", "pool_timeout", "connection_string"}, "bws_config": { "tuple_size", "num_tuples", "seed", "min_item_appearances", "scoring", }, "ibws_config": { "tuple_size", "max_rounds", "seed", "scoring_method", "tuples_per_item_per_round", }, "mace": { "enabled", "min_annotations_per_item", "trigger_every_n", "num_restarts", "min_items", "num_iters", }, "icl_labeling": None, "llm_labeling": None, # === UI & layout === "ui": None, "ui_config": None, "layout": {"grid", "breakpoints", "groups", "order", "styling"}, "instance_display": {"fields", "layout", "resizable"}, "format_handling": {"enabled", "default_format", "pdf", "spreadsheet"}, "ui_language": { "html_lang", "html_dir", "next_button", "previous_button", "submit_button", "go_button", "retry_button", "logout", "labeled_badge", "in_progress_badge", "not_labeled_badge", "progress_label", "loading", "error_heading", "adjudicate", "codebook", "instructions_heading", "text_to_annotate", "video_to_annotate", "audio_to_annotate", "login_title", "login_subtitle_password", "login_subtitle_username", "sign_in_tab", "register_tab", "username_label", "password_label", "sign_in_button", "continue_button", "register_button", "forgot_password", "username_placeholder", "choose_username_placeholder", "create_password_placeholder", "sign_in_with", "or_divider", "powered_by", "cite_us", }, "base_css": None, "ui_debug": None, "hide_navbar": None, "task_layout": None, # === Content === "annotation_instructions": None, "annotation_codebook_url": None, "custom_footer_html": None, "header_file": None, "header_logo": None, # === Annotation features === "keyword_highlight_settings": None, "keyword_highlights_file": None, "highlight_linebreaks": None, "list_as_text": {"text_list_prefix_type", "horizontal", "alternating_shading"}, "jumping_to_id_disabled": None, "horizontal_key_bindings": None, "completion_code": None, "allow_phase_back_navigation": None, "require_fully_annotated": None, "export_include_phase_data": None, "export_annotation_format": None, "auto_export_interval": None, # === Media === "audio_annotation": { "waveform_cache_dir", "waveform_look_ahead", "waveform_cache_max_size", "client_fallback_max_duration", }, "spectrogram": None, "media_directory": None, "default_video_fps": None, # === External integrations === "mturk": None, "prolific": { "config_file_path", "token", "study_id", "max_concurrent_sessions", "workload_checker_period", "completion_code", "sandbox_mode", }, "webhooks": {"enabled", "endpoints"}, "trace_ingestion": {"enabled", "sources", "api_key", "notify_annotators"}, "judge_alignment": {"enabled", "ai_support", "schemas", "few_shot", "inline"}, # Judge Calibration: LLM-as-judge auto-labeling + blind human calibration. # Leaf sub-dicts (sampling/human/calibration/output) are validated by # validate_judge_calibration_config(); kept shallow here to avoid # unknown-key churn while the feature stabilizes. "judge_calibration": { "enabled", "prompt", "models", "k_samples", "max_items", "fraction", "sampling", "human", "schemas", "calibration", "output", "state_dir", }, "triage": {"enabled", "order", "default_priority", "show_badge", "signal_field", "invert_signal", "rules"}, "huggingface_backup": None, # === Debug / logging === "debug": None, "debug_phase": None, "server_debug": None, "verbose": None, "very_verbose": None, "debug_log": None, # === Agent === "live_agent": None, "live_coding_agent": None, "agent_proxy": None, # === Legacy / multi-task === "surveyflow": None, "prestudy": None, "automatic_assignment": None, # === Other === "random_seed": None, "max_annotations_per_user": None, # Deprecated alias of num_annotators_per_item (int form). Still accepted # for backwards compatibility; emits a warning when both are set. "max_annotations_per_item": None, # Canonical key for heterogeneous coverage. Accepts either: # int — same cap for every item (legacy behavior) # dict — { default, overlap_sample: {fraction, count, stratify_by, seed}, # adaptive: {enabled, disagreement_threshold, boost_to}, min } "num_annotators_per_item": None, "min_annotators_per_instance": None, # Per-annotator workload caps: # { default: int, by_user: {user_id: int}, by_user_role: {role: int} } "per_annotator_quota": None, # qda_mode sub-keys are deliberately leaf (None): validation stops at # memos/codebook and does NOT recurse into their sub-keys. This is # intentional forward-compat — parse_qda_mode_config() routes any # unrecognized qda_mode.* keys into `extras` so configs can declare # not-yet-shipped blocks (cases/queries/smart_codes/network/media_sync) # without tripping unknown-key warnings. The tradeoff: a typo like # qda_mode.memos.enabledd is silently accepted. Revisit (deepen the # schema) once those sub-blocks ship and their shapes are stable. "qda_mode": { "enabled": None, "memos": None, "codebook": None, # Sub-blocks reserved for later phases: # "cases", "queries", "smart_codes", "network", "media_sync" }, # Universal annotation UI feature toggles (not QDA-gated). `memos` # turns the memo sidebar on/off (default off in standard mode, on for # qda_mode/solo_mode); `visibility` is the default new-memo visibility. "annotation_ui": { "memos": None, "visibility": None, }, # Universal full-text search (FTS5). Read-only admin search is always # safe; `annotator_claim` opt-in is governed by a startup # compatibility guard (see validate_search_assignment_compat). "search": { "enabled": None, "backend": None, "max_instances": None, "annotator_claim": None, }, # Universal codebook. `mode` (fixed|extensible|open) governs whether # annotators may add codes on the fly; resolved via # get_codebook_mode() (defaults: qda/solo -> open, standard -> fixed; # a crowd backend force-locks fixed). Per-scheme opt-in is the # scheme-level `codebook: true` key. "codebook": { "enabled": None, "mode": None, }, # Top-level convenience scalar mirroring codebook.mode. "codebook_mode": None, # In-vivo coding (D): single key that, with text selected in a # codebook-backed span scheme, opens the "code from selection" # composer. Default 'i'; only meaningful when a codebook span # scheme exists. (Schema value is None = scalar/any-value key; the # 'i' default lives in the defaults map, not here.) "codebook_invivo_key": None, # Universal cases: group instances into units of analysis. `key` # names the item-data field to group on; `auto_detect` lets QDA # scan participant_id/respondent_id/case_id; `attributes` lifts # item fields onto the case for crosstabs. "cases": { "enabled": None, "key": None, "auto_detect": None, "attributes": None, }, "solo_mode": { "enabled": None, "labeling_models": None, "revision_models": None, "embedding": None, "uncertainty": None, "thresholds": None, "instance_selection": None, "batches": None, "prompt_optimization": None, "edge_case_rules": None, "labeling_functions": None, "confidence_routing": None, "confusion_analysis": None, "state_dir": None, "refinement_loop": { "enabled", "trigger_interval", "min_improvement", "max_cycles", "patience", "auto_apply_suggestions", "refinement_strategy", "validation_split_ratio", "eval_sample_size", "num_candidates", "min_val_size", "max_consecutive_failures", "dry_run", "require_approval", "min_val_improvement", "eval_temperature", "prefer_consistent_disagreements", }, }, "admin_api_key": None, "alert_time_each_instance": None, "assignment_strategy": None, "reclaim_stale_assignments": None, "instance_reclaim": None, "max_session_seconds": None, "env_substitution": None, # === Internal (set by system, not user) === "config_file": None, "__config_file__": None, "_bws_pool_items": None, } def validate_unknown_keys(config_data, schema=None, path=""): """Recursively warn about unrecognized config keys and suggest corrections. Args: config_data: The config dict (or sub-dict) to validate. schema: The known-keys schema for this level (defaults to KNOWN_CONFIG_KEYS). path: Dot-separated path prefix for nested key reporting (e.g., "training"). """ if schema is None: schema = KNOWN_CONFIG_KEYS if not isinstance(config_data, dict): return known_keys = set(schema.keys()) if isinstance(schema, dict) else schema unknown_keys = set(config_data.keys()) - known_keys for key in sorted(unknown_keys): full_key = f"{path}.{key}" if path else key matches = difflib.get_close_matches(key, known_keys, n=3, cutoff=0.6) if matches: suggestions = ", ".join(f"'{m}'" for m in matches) logger.warning( "Unrecognized config key '%s'. Did you mean: %s?", full_key, suggestions ) else: logger.warning( "Unrecognized config key '%s'. This key will be ignored.", full_key ) # Recurse into nested dicts that have sub-key schemas if isinstance(schema, dict): for key, sub_schema in schema.items(): if sub_schema is not None and key in config_data: value = config_data[key] if isinstance(value, dict): child_path = f"{path}.{key}" if path else key if isinstance(sub_schema, dict): validate_unknown_keys(value, sub_schema, child_path) elif isinstance(sub_schema, set): validate_unknown_keys( value, {k: None for k in sub_schema}, child_path ) def validate_path_security(path: str, base_dir: str, project_dir: str = None) -> str: """ Validate that a path is secure and contained within the base directory. Args: path: The path to validate base_dir: The base directory that should contain the path project_dir: The project directory for final security check (if different from base_dir) Returns: The normalized absolute path if valid Raises: ConfigSecurityError: If the path is not secure """ # Check for encoded traversal patterns before normalization if '....' in path or '..%2F' in path or '..%5C' in path: raise ConfigSecurityError(f"Encoded path traversal detected in '{path}'. Encoded traversal patterns are not allowed for security reasons.") # Normalize the path normalized_path = os.path.normpath(path) # Check for malicious path traversal attempts # Allow legitimate relative paths like "../data/file.json" but block excessive traversal path_parts = normalized_path.split(os.sep) if path_parts.count('..') > 2: # Allow up to 2 levels of ".." for legitimate relative paths raise ConfigSecurityError(f"Excessive path traversal detected in '{path}'. Too many '..' components for security reasons.") # Check for absolute paths that might escape the project directory if os.path.isabs(normalized_path): # Only allow absolute paths that are within the base directory try: real_path = os.path.realpath(normalized_path) real_base = os.path.realpath(base_dir) if not (real_path == real_base or real_path.startswith(real_base + os.sep)): raise ConfigSecurityError(f"Path '{path}' resolves to '{real_path}' which is outside the project directory '{real_base}'") except (OSError, ValueError) as e: raise ConfigSecurityError(f"Invalid path '{path}': {str(e)}") # Resolve relative paths against base directory if not os.path.isabs(normalized_path): resolved_path = os.path.join(base_dir, normalized_path) normalized_path = os.path.normpath(resolved_path) # Final security check - ensure the resolved path is within the project directory try: real_path = os.path.realpath(normalized_path) # Use project_dir for final check if provided, otherwise use base_dir check_dir = project_dir if project_dir else base_dir real_check_dir = os.path.realpath(check_dir) if not (real_path == real_check_dir or real_path.startswith(real_check_dir + os.sep)): raise ConfigSecurityError(f"Path '{path}' resolves to '{real_path}' which is outside the project directory '{real_check_dir}'") except (OSError, ValueError) as e: raise ConfigSecurityError(f"Invalid path '{path}': {str(e)}") return normalized_path # Optional field type specifications for validation. # Maps config key -> (expected_type, human description, allow_negative). # Only fields that are commonly misconfigured and cause silent failures. _OPTIONAL_INT_FIELDS = { "alert_time_each_instance": ("seconds to alert per instance", False), "max_annotations_per_item": ("max annotations per item", True), # -1 = unlimited "max_annotations_per_user": ("max annotations per user", True), "min_annotators_per_instance": ("minimum annotators per instance", False), "random_seed": ("random seed", True), "max_session_seconds": ("max session duration in seconds", False), } # num_annotators_per_item validated separately — it may be int OR dict. _OPTIONAL_BOOL_FIELDS = { "highlight_linebreaks": "whether to highlight linebreaks", "jumping_to_id_disabled": "whether jumping to ID is disabled", "require_fully_annotated": "whether full annotation is required", "require_password": "whether password is required", "require_no_password": "whether no-password mode is enabled", "customjs": "whether custom JS is enabled", "watch_data_directory": "whether to watch data directory for changes", "persist_sessions": "whether to persist sessions across restarts", } _VALID_ASSIGNMENT_STRATEGIES = [ "random", "fixed_order", "active_learning", "llm_confidence", "max_diversity", "least_annotated", "category_based", "diversity_clustering", "batch", "priority", ] def validate_num_annotators_per_item(value: Any) -> None: """ Validate the shape of ``num_annotators_per_item``. Accepts either an int (legacy form) or a dict with optional keys ``default``, ``overlap_sample``, ``adaptive``, and ``min``. """ if value is None: return if isinstance(value, bool): raise ConfigValidationError( "'num_annotators_per_item' must be an integer or a structured mapping, " f"got bool: {value!r}" ) if isinstance(value, int): if value < 0: raise ConfigValidationError( "'num_annotators_per_item' as integer must be non-negative; " "use 0 or omit the key for unlimited (legacy used -1)." ) return if not isinstance(value, dict): raise ConfigValidationError( "'num_annotators_per_item' must be an integer or a mapping, " f"got {type(value).__name__}: {value!r}" ) allowed = {"default", "overlap_sample", "adaptive", "min"} unknown = set(value) - allowed if unknown: raise ConfigValidationError( f"Unknown keys in num_annotators_per_item: {sorted(unknown)}. " f"Allowed: {sorted(allowed)}" ) default = value.get("default", 1) if not isinstance(default, int) or isinstance(default, bool) or default < 1: raise ConfigValidationError( f"num_annotators_per_item.default must be a positive integer, got {default!r}" ) minimum = value.get("min") if minimum is not None: if not isinstance(minimum, int) or isinstance(minimum, bool) or minimum < 1: raise ConfigValidationError( f"num_annotators_per_item.min must be a positive integer, got {minimum!r}" ) if minimum > default: raise ConfigValidationError( "num_annotators_per_item.min cannot exceed num_annotators_per_item.default" ) overlap = value.get("overlap_sample") if overlap is not None: if not isinstance(overlap, dict): raise ConfigValidationError( "num_annotators_per_item.overlap_sample must be a mapping" ) unknown = set(overlap) - {"fraction", "count", "stratify_by", "seed"} if unknown: raise ConfigValidationError( f"Unknown keys in overlap_sample: {sorted(unknown)}" ) frac = overlap.get("fraction") if not isinstance(frac, (int, float)) or isinstance(frac, bool) or not (0 < frac <= 1): raise ConfigValidationError( f"overlap_sample.fraction must be in (0, 1], got {frac!r}" ) count = overlap.get("count") if not isinstance(count, int) or isinstance(count, bool) or count < 2: raise ConfigValidationError( f"overlap_sample.count must be an integer >= 2, got {count!r}" ) if count <= default: raise ConfigValidationError( "overlap_sample.count must be greater than num_annotators_per_item.default " f"({count} <= {default})" ) stratify_by = overlap.get("stratify_by") if stratify_by is not None and not isinstance(stratify_by, str): raise ConfigValidationError( f"overlap_sample.stratify_by must be a string or omitted, got {stratify_by!r}" ) seed = overlap.get("seed") if seed is not None and (not isinstance(seed, int) or isinstance(seed, bool)): raise ConfigValidationError( f"overlap_sample.seed must be an integer, got {seed!r}" ) adaptive = value.get("adaptive") if adaptive is not None: if not isinstance(adaptive, dict): raise ConfigValidationError( "num_annotators_per_item.adaptive must be a mapping" ) unknown = set(adaptive) - {"enabled", "disagreement_threshold", "boost_to"} if unknown: raise ConfigValidationError( f"Unknown keys in adaptive: {sorted(unknown)}" ) if "enabled" in adaptive and not isinstance(adaptive["enabled"], bool): raise ConfigValidationError( f"adaptive.enabled must be a boolean, got {adaptive['enabled']!r}" ) thr = adaptive.get("disagreement_threshold") if thr is not None and (not isinstance(thr, (int, float)) or isinstance(thr, bool) or not (0 <= thr <= 1)): raise ConfigValidationError( f"adaptive.disagreement_threshold must be in [0, 1], got {thr!r}" ) boost = adaptive.get("boost_to") if boost is not None: if not isinstance(boost, int) or isinstance(boost, bool) or boost < 2: raise ConfigValidationError( f"adaptive.boost_to must be an integer >= 2, got {boost!r}" ) if boost <= default: raise ConfigValidationError( f"adaptive.boost_to must exceed default ({boost} <= {default})" ) def validate_per_annotator_quota(value: Any) -> None: """Validate the shape of ``per_annotator_quota``.""" if value is None: return if not isinstance(value, dict): raise ConfigValidationError( "'per_annotator_quota' must be a mapping, " f"got {type(value).__name__}: {value!r}" ) allowed = {"default", "by_user", "by_user_role"} unknown = set(value) - allowed if unknown: raise ConfigValidationError( f"Unknown keys in per_annotator_quota: {sorted(unknown)}. Allowed: {sorted(allowed)}" ) default = value.get("default") if default is not None and (not isinstance(default, int) or isinstance(default, bool) or default < 0): raise ConfigValidationError( f"per_annotator_quota.default must be a non-negative integer, got {default!r}" ) for key in ("by_user", "by_user_role"): mapping = value.get(key) if mapping is None: continue if not isinstance(mapping, dict): raise ConfigValidationError( f"per_annotator_quota.{key} must be a mapping of name -> integer" ) for k, v in mapping.items(): if not isinstance(k, str) or not k: raise ConfigValidationError( f"per_annotator_quota.{key} keys must be non-empty strings, got {k!r}" ) if not isinstance(v, int) or isinstance(v, bool) or v < 0: raise ConfigValidationError( f"per_annotator_quota.{key}[{k!r}] must be a non-negative integer, got {v!r}" ) def resolve_num_annotators_per_item(config_data: Dict[str, Any]) -> int: """ Resolve the *default* cap (used as ``ItemStateManager.max_annotations_per_item``). Resolution order: 1. num_annotators_per_item (int form) → that value 2. num_annotators_per_item.default → that value 3. max_annotations_per_item (legacy) → that value 4. otherwise → -1 (unlimited) """ val = config_data.get("num_annotators_per_item") if isinstance(val, int) and not isinstance(val, bool): return val if isinstance(val, dict) and val.get("default") is not None: return int(val["default"]) legacy = config_data.get("max_annotations_per_item") if isinstance(legacy, int) and not isinstance(legacy, bool): return legacy return -1 def validate_optional_field_types(config_data: Dict[str, Any]) -> None: """ Validate types for commonly misconfigured optional fields. Catches issues like string values for integer fields (e.g., alert_time_each_instance: "30") or wrong types for booleans, which would silently produce incorrect behavior at runtime. Args: config_data: The parsed configuration dictionary Raises: ConfigValidationError: If a field has the wrong type """ # Validate integer fields for field, (desc, allow_negative) in _OPTIONAL_INT_FIELDS.items(): if field in config_data: val = config_data[field] if not isinstance(val, int) or isinstance(val, bool): raise ConfigValidationError( f"'{field}' must be an integer ({desc}), " f"got {type(val).__name__}: {val!r}" ) if not allow_negative and val < 0: raise ConfigValidationError( f"'{field}' must be a non-negative integer ({desc}), got {val}" ) # Validate boolean fields (None/null is allowed as "not set") for field, desc in _OPTIONAL_BOOL_FIELDS.items(): if field in config_data: val = config_data[field] if val is not None and not isinstance(val, bool): raise ConfigValidationError( f"'{field}' must be a boolean ({desc}), " f"got {type(val).__name__}: {val!r}" ) # Validate num_annotators_per_item (int OR structured dict) if 'num_annotators_per_item' in config_data: validate_num_annotators_per_item(config_data['num_annotators_per_item']) # Validate per_annotator_quota structured dict if 'per_annotator_quota' in config_data: validate_per_annotator_quota(config_data['per_annotator_quota']) # Emit a deprecation warning if max_annotations_per_item is set alongside # num_annotators_per_item; reject silent inconsistencies (both set to # conflicting values). if 'max_annotations_per_item' in config_data and 'num_annotators_per_item' in config_data: legacy = config_data['max_annotations_per_item'] canonical = config_data['num_annotators_per_item'] canonical_int = canonical if isinstance(canonical, int) else canonical.get('default') if canonical_int is not None and legacy != canonical_int: raise ConfigValidationError( "'max_annotations_per_item' and 'num_annotators_per_item' are both " f"set with conflicting values ({legacy} vs {canonical_int}). " "Drop 'max_annotations_per_item' — 'num_annotators_per_item' is the canonical key." ) import warnings as _w _w.warn( "'max_annotations_per_item' is deprecated; use 'num_annotators_per_item' " "instead. Setting both is redundant.", DeprecationWarning, stacklevel=2, ) # Validate assignment_strategy enum if 'assignment_strategy' in config_data: strat = config_data['assignment_strategy'] # Can be a string or a dict with a 'name' key strat_name = strat if isinstance(strat, dict): strat_name = strat.get('name', '') if isinstance(strat_name, str) and strat_name.lower() not in _VALID_ASSIGNMENT_STRATEGIES: raise ConfigValidationError( f"'assignment_strategy' value '{strat_name}' is not recognized. " f"Valid strategies: {', '.join(_VALID_ASSIGNMENT_STRATEGIES)}" ) def validate_judge_calibration_config(config_data: Dict[str, Any]) -> None: """Validate the ``judge_calibration`` block when enabled. Delegates to the typed config's ``validate()`` (so the rules live in one place) and additionally cross-checks that referenced schema names exist in ``annotation_schemes``. Raises ConfigValidationError on hard errors. """ jc = config_data.get("judge_calibration") if not isinstance(jc, dict) or not jc.get("enabled"): return from potato.judge_calibration.config import parse_judge_calibration_config cfg = parse_judge_calibration_config(config_data) errors = cfg.validate() # Cross-check schema references against declared annotation_schemes. declared = { s.get("name") for s in (config_data.get("annotation_schemes") or []) if isinstance(s, dict) } for name in cfg.schemas: if name not in declared: errors.append( f"judge_calibration.schemas references unknown scheme '{name}' " f"(declared: {sorted(n for n in declared if n)})" ) if errors: raise ConfigValidationError( "Invalid judge_calibration configuration:\n - " + "\n - ".join(errors) ) def validate_yaml_structure(config_data: Dict[str, Any], project_dir: str = None, config_file_dir: str = None) -> None: """ Validate the structure and content of the YAML configuration. Args: config_data: The parsed YAML configuration project_dir: The project directory config_file_dir: The directory containing the config file Raises: ConfigValidationError: If the configuration is invalid """ if not isinstance(config_data, dict): raise ConfigValidationError("Configuration must be a YAML object (dictionary)") # Required fields validation. NOTE: 'data_files' is intentionally NOT here — # it is one of three mutually-acceptable data sources (data_files / # data_directory / data_sources), enforced by the dedicated check below. # Listing it here unconditionally made data_directory- and data_sources-only # configs fail validation before that smarter check could run (F-038). required_fields = [ 'item_properties', 'task_dir', 'output_annotation_dir', 'annotation_task_name', ] missing_fields = [field for field in required_fields if field not in config_data] if missing_fields: raise ConfigValidationError(f"Missing required configuration fields: {', '.join(missing_fields)}") # Validate item_properties item_properties = config_data.get('item_properties', {}) if not isinstance(item_properties, dict): raise ConfigValidationError("item_properties must be a dictionary") required_item_props = ['id_key', 'text_key'] missing_item_props = [prop for prop in required_item_props if prop not in item_properties] if missing_item_props: raise ConfigValidationError(f"Missing required item_properties: {', '.join(missing_item_props)}") # Validate optional category_key (for category-based assignment) if 'category_key' in item_properties: category_key = item_properties['category_key'] if not isinstance(category_key, str) or not category_key.strip(): raise ConfigValidationError("item_properties.category_key must be a non-empty string") # Validate data_files (required unless data_directory or data_sources is provided) data_files = config_data.get('data_files', []) data_directory = config_data.get('data_directory') data_sources = config_data.get('data_sources') if not isinstance(data_files, list): raise ConfigValidationError("data_files must be a list") # data_files can be empty if data_directory or data_sources is configured if not data_files and not data_directory and not data_sources: raise ConfigValidationError( "At least one data source must be configured: " "'data_files', 'data_directory', or 'data_sources'" ) # Validate data_sources configuration if present if data_sources: validate_data_sources_config(config_data) # Validate server config if present validate_server_config(config_data) # Validate authentication config if present validate_authentication_config(config_data) # Validate data_directory config if present validate_data_directory_config(config_data) # Validate annotation schemes validate_annotation_schemes(config_data) # Validate training configuration if present validate_training_config(config_data, project_dir, config_file_dir) # Validate database configuration if present if 'database' in config_data: validate_database_config(config_data['database']) # Validate active learning configuration if present validate_active_learning_config(config_data) # Validate AI support configuration if present validate_ai_support_config(config_data) # Validate chat support configuration if present validate_chat_support_config(config_data) # Validate category assignment configuration if present validate_category_assignment_config(config_data) # Validate batch assignment configuration if present validate_batch_assignment_config(config_data) # Validate diversity ordering configuration if present validate_diversity_config(config_data) # Validate embedding visualization configuration if present validate_embedding_visualization_config(config_data) # Validate adjudication configuration if present if 'adjudication' in config_data: validate_adjudication_config(config_data) # Validate quality control configuration if present validate_quality_control_config(config_data) # Validate assignment reclaim configuration if present validate_instance_reclaim_config(config_data) # Validate instance display configuration if present validate_instance_display_config(config_data) # Validate format_handling configuration if present validate_format_handling_config(config_data) # Validate layout configuration if present validate_layout_config(config_data) # Validate BWS configuration if present if 'bws_config' in config_data: _validate_bws_config(config_data) # Validate IBWS configuration if present if 'ibws_config' in config_data: _validate_ibws_config(config_data) # Validate MACE configuration if present if 'mace' in config_data: _validate_mace_config(config_data) # Validate types for commonly misconfigured optional fields validate_optional_field_types(config_data) # Fail loud if annotator search-and-claim is combined with an # assignment design it would corrupt via self-selection. validate_search_assignment_compat(config_data) # Validate codebook_mode (and apply the crowd force-lock). validate_codebook_config(config_data) # Validate judge_calibration configuration if present validate_judge_calibration_config(config_data) # Warn about unrecognized keys at all nesting levels validate_unknown_keys(config_data) # Assignment strategies whose sampling/ordering self-selection breaks. _CLAIM_INCOMPATIBLE_STRATEGIES = { "random", "diversity_clustering", "max_diversity", "active_learning", "llm_confidence", "least_annotated", "category_based", "batch", } def validate_search_assignment_compat(config_data: Dict[str, Any]) -> None: """Hard-fail when ``search.annotator_claim`` is combined with a feature whose integrity depends on the platform — not the annotator — choosing the next item. Read-only admin search is unaffected. Solo/QDA mode (single coder over the whole corpus) is always allowed. """ search = config_data.get("search") if not isinstance(search, dict) or not search.get("annotator_claim"): return # Single-coder modes have no sampling/overlap invariant to protect. if (config_data.get("qda_mode") or {}).get("enabled") or \ (config_data.get("solo_mode") or {}).get("enabled"): return conflicts = [] strat = config_data.get("assignment_strategy") if isinstance(strat, dict): strat = strat.get("name") if strat and str(strat).lower() in _CLAIM_INCOMPATIBLE_STRATEGIES: conflicts.append( f"assignment_strategy: {strat} (self-selection breaks " f"sampling/ordering)") for k in ("max_annotations_per_item", "num_annotators_per_item", "min_annotators_per_instance"): raw = config_data.get(k, -1) # num_annotators_per_item may now be a dict — extract default + overlap_sample.count candidates = [] if isinstance(raw, dict): if raw.get("default") is not None: candidates.append(raw["default"]) overlap = raw.get("overlap_sample") or {} if overlap.get("count") is not None: candidates.append(overlap["count"]) else: candidates.append(raw) for cand in candidates: try: if int(cand) > 1: conflicts.append( f"{k}: {config_data[k]} (inter-annotator overlap " f"cannot be guaranteed under self-selection)") break except (TypeError, ValueError): continue if (config_data.get("attention_checks") or {}).get("enabled"): conflicts.append("attention_checks.enabled (annotators could " "locate/avoid QC items)") if (config_data.get("gold_standards") or {}).get("enabled"): conflicts.append("gold_standards.enabled (annotators could " "locate/avoid gold items)") if (config_data.get("icl_labeling") or {}).get("enabled"): conflicts.append("icl_labeling.enabled (blind LLM-verification " "tasks must not be findable)") if (config_data.get("adjudication") or {}).get("enabled"): conflicts.append("adjudication.enabled (the adjudication queue " "is curated)") login_type = (config_data.get("login") or {}).get("type") crowd = ( "mturk" in config_data or "prolific" in config_data or login_type in ("mturk", "prolific") ) if crowd: conflicts.append("crowdsourcing backend (HIT = the assigned " "unit; self-selection breaks payment/coverage)") if conflicts: raise ConfigValidationError( "search.annotator_claim: true is incompatible with this " "configuration:\n - " + "\n - ".join(conflicts) + "\n\nAnnotator search-and-claim is only supported with " "solo_mode/qda_mode, or fixed_order assignment without " "overlap, quality-control injection, ICL verification, " "adjudication, or a crowdsourcing backend. Use read-only " "admin search (no annotator_claim) for those designs." ) _CODEBOOK_MODES = ("fixed", "extensible", "open") def _crowd_backend(config_data: Dict[str, Any]) -> bool: login_type = (config_data.get("login") or {}).get("type") return ( "mturk" in config_data or "prolific" in config_data or login_type in ("mturk", "prolific") ) def get_codebook_mode(config_data: Dict[str, Any]) -> str: """Resolve the effective codebook mode. Precedence: explicit ``codebook_mode`` / ``codebook.mode`` if set; else ``open`` when solo/QDA mode is enabled; else ``fixed``. A crowd backend force-locks ``fixed`` regardless of the request (annotators on a paid HIT must not reshape the shared codebook). """ raw = config_data.get("codebook_mode") if raw is None: raw = (config_data.get("codebook") or {}).get("mode") if raw is None: single = ( (config_data.get("qda_mode") or {}).get("enabled") or (config_data.get("solo_mode") or {}).get("enabled") ) mode = "open" if single else "fixed" else: mode = str(raw).strip().lower() if _crowd_backend(config_data): return "fixed" return mode def validate_codebook_config(config_data: Dict[str, Any]) -> None: """Reject an invalid ``codebook_mode`` value, and warn when a crowd backend overrides a requested non-fixed mode.""" raw = config_data.get("codebook_mode") if raw is None: raw = (config_data.get("codebook") or {}).get("mode") if raw is None: return mode = str(raw).strip().lower() if mode not in _CODEBOOK_MODES: raise ConfigValidationError( f"codebook_mode must be one of {', '.join(_CODEBOOK_MODES)}; " f"got {raw!r}." ) if mode != "fixed" and _crowd_backend(config_data): logging.warning( "codebook_mode=%s requested with a crowdsourcing backend; " "force-locking to 'fixed' (paid annotators must not reshape " "the shared codebook).", mode) def validate_annotation_schemes(config_data: Dict[str, Any]) -> None: """ Validate annotation schemes configuration. Args: config_data: The configuration data Raises: ConfigValidationError: If annotation schemes are invalid """ has_top_level = 'annotation_schemes' in config_data has_phases = 'phases' in config_data and config_data['phases'] # Check for conflicting annotation_schemes locations if has_top_level and has_phases: # Check if any phase also has annotation_schemes phases = config_data['phases'] phases_with_schemes = [] if isinstance(phases, list): phases_with_schemes = [ phase.get('name', f'phase[{i}]') for i, phase in enumerate(phases) if 'annotation_schemes' in phase ] elif isinstance(phases, dict): phases_with_schemes = [ name for name, phase in phases.items() if name != 'order' and isinstance(phase, dict) and 'annotation_schemes' in phase ] if phases_with_schemes: raise ConfigValidationError( f"Configuration has both top-level 'annotation_schemes' and phase-level " f"'annotation_schemes' in: {', '.join(phases_with_schemes)}. " f"Use only one location to avoid confusion." ) # Check for annotation schemes in different formats if has_top_level: schemes = config_data['annotation_schemes'] if not isinstance(schemes, list): raise ConfigValidationError("annotation_schemes must be a list") if not schemes: raise ConfigValidationError("annotation_schemes cannot be empty") for i, scheme in enumerate(schemes): validate_single_annotation_scheme(scheme, f"annotation_schemes[{i}]") elif 'phases' in config_data and config_data['phases']: phases = config_data['phases'] if isinstance(phases, list): for i, phase in enumerate(phases): phase_id = phase.get('name', f'phase[{i}]') # Phases can have annotation_schemes, file, type, instrument, or instruments if 'annotation_schemes' in phase: schemes = phase['annotation_schemes'] if not isinstance(schemes, list): raise ConfigValidationError(f"Phase {phase_id} annotation_schemes must be a list") if not schemes: raise ConfigValidationError(f"Phase {phase_id} annotation_schemes cannot be empty") for j, scheme in enumerate(schemes): validate_single_annotation_scheme(scheme, f"phases[{i}].annotation_schemes[{j}]") elif 'file' in phase or 'type' in phase or 'instrument' in phase or 'instruments' in phase: # Legacy format or instrument-based - validated at runtime _validate_phase_instruments(phase, phase_id) else: raise ConfigValidationError( f"Phase {phase_id} requires 'annotation_schemes', 'file', 'type', " f"'instrument', or 'instruments'" ) else: # Dictionary format for phase_name, phase in phases.items(): if phase_name == 'order': continue # Phases can have annotation_schemes, file, type, instrument, or instruments if 'annotation_schemes' in phase: schemes = phase['annotation_schemes'] if not isinstance(schemes, list): raise ConfigValidationError(f"Phase {phase_name} annotation_schemes must be a list") if not schemes: raise ConfigValidationError(f"Phase {phase_name} annotation_schemes cannot be empty") for j, scheme in enumerate(schemes): validate_single_annotation_scheme(scheme, f"phases.{phase_name}.annotation_schemes[{j}]") elif 'file' in phase or 'type' in phase or 'instrument' in phase or 'instruments' in phase: # Legacy format or instrument-based - validated at runtime _validate_phase_instruments(phase, phase_name) else: raise ConfigValidationError( f"Phase {phase_name} requires 'annotation_schemes', 'file', 'type', " f"'instrument', or 'instruments'" ) else: raise ConfigValidationError("Config must have either 'annotation_schemes' (top-level) or 'phases' with annotation_schemes") # Validate keyword_highlight is not enabled for image-based tasks _validate_keyword_highlight_for_images(config_data) # Validate display_logic cross-references (schema references and circular dependencies) all_schemes = _collect_all_annotation_schemes(config_data) if all_schemes: validate_display_logic_references(all_schemes) def _collect_all_annotation_schemes(config_data: Dict[str, Any]) -> List[Dict[str, Any]]: """ Collect all annotation schemes from config, whether top-level or in phases. Args: config_data: The configuration data Returns: List of all annotation scheme dictionaries """ schemes = [] if 'annotation_schemes' in config_data: schemes.extend(config_data['annotation_schemes']) elif 'phases' in config_data: phases = config_data['phases'] if isinstance(phases, list): for phase in phases: if 'annotation_schemes' in phase: schemes.extend(phase['annotation_schemes']) elif isinstance(phases, dict): for phase_name, phase in phases.items(): if phase_name != 'order' and isinstance(phase, dict): if 'annotation_schemes' in phase: schemes.extend(phase['annotation_schemes']) return schemes def _validate_keyword_highlight_for_images(config_data: Dict[str, Any]) -> None: """ Validate that keyword_highlight is not enabled for image-based tasks. Keyword highlighting highlights text in the instance content, which doesn't make sense for images. This validation catches configuration errors early. Args: config_data: The configuration data Raises: ConfigValidationError: If keyword_highlight is enabled for an image task """ # Check if the text_key suggests this is an image-based task text_key = config_data.get('item_properties', {}).get('text_key', 'text') image_indicators = ['image', 'img', 'photo', 'picture', 'url'] is_likely_image_task = any(indicator in text_key.lower() for indicator in image_indicators) if not is_likely_image_task: return # Not an image task, no need to check # Get all annotation schemes schemes = [] if 'annotation_schemes' in config_data: schemes = config_data['annotation_schemes'] elif 'phases' in config_data: phases = config_data['phases'] if isinstance(phases, list): for phase in phases: schemes.extend(phase.get('annotation_schemes', [])) elif isinstance(phases, dict): for phase_name, phase in phases.items(): if phase_name != 'order' and isinstance(phase, dict): schemes.extend(phase.get('annotation_schemes', [])) # Check each scheme for keyword_highlight for i, scheme in enumerate(schemes): if not isinstance(scheme, dict): continue ai_support = scheme.get('ai_support', {}) if not isinstance(ai_support, dict): continue features = ai_support.get('features', {}) if not isinstance(features, dict): continue keyword_highlight = features.get('keyword_highlight', False) if keyword_highlight: scheme_name = scheme.get('name', f'scheme[{i}]') raise ConfigValidationError( f"annotation_schemes.{scheme_name}.ai_support.features.keyword_highlight is enabled, " f"but item_properties.text_key='{text_key}' suggests this is an image-based task. " f"Keyword highlighting only works with text content, not images. " f"Set keyword_highlight: false or remove it from the ai_support features." ) def _validate_bws_config(config_data: Dict[str, Any]) -> None: """ Validate Best-Worst Scaling configuration. Args: config_data: The configuration data Raises: ConfigValidationError: If the BWS config is invalid """ bws = config_data['bws_config'] if not isinstance(bws, dict): raise ConfigValidationError("bws_config must be a dictionary") if 'tuple_size' in bws: if not isinstance(bws['tuple_size'], int) or bws['tuple_size'] < 2: raise ConfigValidationError("bws_config.tuple_size must be an integer >= 2") if 'seed' in bws: if not isinstance(bws['seed'], int): raise ConfigValidationError("bws_config.seed must be an integer") if 'num_tuples' in bws and bws['num_tuples'] is not None: if not isinstance(bws['num_tuples'], int) or bws['num_tuples'] < 1: raise ConfigValidationError("bws_config.num_tuples must be a positive integer or null") if 'min_item_appearances' in bws and bws['min_item_appearances'] is not None: if not isinstance(bws['min_item_appearances'], int) or bws['min_item_appearances'] < 1: raise ConfigValidationError("bws_config.min_item_appearances must be a positive integer or null") # Validate scoring config if present scoring = bws.get('scoring', {}) if scoring: if not isinstance(scoring, dict): raise ConfigValidationError("bws_config.scoring must be a dictionary") valid_methods = ['counting', 'bradley_terry', 'plackett_luce'] method = scoring.get('method', 'counting') if method not in valid_methods: raise ConfigValidationError(f"bws_config.scoring.method must be one of: {valid_methods}") def _validate_ibws_config(config_data: Dict[str, Any]) -> None: """ Validate Iterative Best-Worst Scaling configuration. Args: config_data: The configuration data Raises: ConfigValidationError: If the IBWS config is invalid """ # Mutual exclusivity with bws_config if 'bws_config' in config_data: raise ConfigValidationError( "ibws_config and bws_config are mutually exclusive. " "Use ibws_config for iterative BWS or bws_config for standard BWS." ) ibws = config_data['ibws_config'] if not isinstance(ibws, dict): raise ConfigValidationError("ibws_config must be a dictionary") # Require at least one BWS annotation scheme schemes = config_data.get('annotation_schemes', []) has_bws_scheme = any(s.get('annotation_type') == 'bws' for s in schemes) if not has_bws_scheme: raise ConfigValidationError( "ibws_config requires at least one annotation scheme with annotation_type: bws" ) # tuple_size if 'tuple_size' in ibws: if not isinstance(ibws['tuple_size'], int) or ibws['tuple_size'] < 2: raise ConfigValidationError("ibws_config.tuple_size must be an integer >= 2") # max_rounds if 'max_rounds' in ibws and ibws['max_rounds'] is not None: if not isinstance(ibws['max_rounds'], int) or ibws['max_rounds'] < 1: raise ConfigValidationError("ibws_config.max_rounds must be a positive integer or null") # seed if 'seed' in ibws: if not isinstance(ibws['seed'], int): raise ConfigValidationError("ibws_config.seed must be an integer") # scoring_method valid_methods = ['counting', 'bradley_terry', 'plackett_luce'] if 'scoring_method' in ibws: if ibws['scoring_method'] not in valid_methods: raise ConfigValidationError( f"ibws_config.scoring_method must be one of: {valid_methods}" ) # tuples_per_item_per_round if 'tuples_per_item_per_round' in ibws: val = ibws['tuples_per_item_per_round'] if not isinstance(val, int) or val < 1: raise ConfigValidationError( "ibws_config.tuples_per_item_per_round must be a positive integer" ) def _validate_mace_config(config_data: Dict[str, Any]) -> None: """ Validate MACE competence estimation configuration. Args: config_data: The configuration data Raises: ConfigValidationError: If the MACE config is invalid """ mace = config_data.get('mace', {}) if not isinstance(mace, dict): raise ConfigValidationError("mace must be a dictionary") if not mace.get('enabled', False): return # Not enabled, skip validation # Validate numeric parameters min_annots = mace.get('min_annotations_per_item', 3) if not isinstance(min_annots, int) or min_annots < 2: raise ConfigValidationError( "mace.min_annotations_per_item must be an integer >= 2" ) trigger_n = mace.get('trigger_every_n', 10) if not isinstance(trigger_n, int) or trigger_n < 1: raise ConfigValidationError( "mace.trigger_every_n must be an integer >= 1" ) num_restarts = mace.get('num_restarts', 10) if not isinstance(num_restarts, int) or num_restarts < 1: raise ConfigValidationError( "mace.num_restarts must be an integer >= 1" ) # Warn if no categorical schemas are defined categorical_types = {'radio', 'likert', 'select', 'multiselect'} schemes = config_data.get('annotation_schemes', []) has_categorical = any( s.get('annotation_type', '') in categorical_types for s in schemes if isinstance(s, dict) ) if not has_categorical: logger.warning( "MACE is enabled but no categorical annotation schemes " "(radio, likert, select, multiselect) are defined. " "MACE will have no data to process." ) def _validate_phase_instruments(phase: Dict[str, Any], phase_name: str) -> None: """ Validate instrument references in a phase configuration. Args: phase: The phase configuration phase_name: Name of the phase for error messages Raises: ConfigValidationError: If instrument references are invalid """ # Validate single instrument reference if 'instrument' in phase: inst_id = phase['instrument'] if not isinstance(inst_id, str): raise ConfigValidationError( f"Phase {phase_name}: 'instrument' must be a string" ) try: from potato.survey_instruments import get_registry registry = get_registry() if inst_id not in registry['instruments']: available = sorted(registry['instruments'].keys())[:10] raise ConfigValidationError( f"Phase {phase_name}: Unknown instrument '{inst_id}'. " f"Available instruments: {available}..." ) except ImportError: # survey_instruments module not available - skip validation pass # Validate multiple instruments if 'instruments' in phase: inst_list = phase['instruments'] if not isinstance(inst_list, list): raise ConfigValidationError( f"Phase {phase_name}: 'instruments' must be a list" ) try: from potato.survey_instruments import get_registry registry = get_registry() for inst_id in inst_list: if not isinstance(inst_id, str): raise ConfigValidationError( f"Phase {phase_name}: All items in 'instruments' must be strings" ) if inst_id not in registry['instruments']: available = sorted(registry['instruments'].keys())[:10] raise ConfigValidationError( f"Phase {phase_name}: Unknown instrument '{inst_id}'. " f"Available instruments: {available}..." ) except ImportError: # survey_instruments module not available - skip validation pass def validate_single_annotation_scheme(scheme: Dict[str, Any], path: str) -> None: """ Validate a single annotation scheme. Args: scheme: The annotation scheme to validate path: The path in the config for error reporting Raises: ConfigValidationError: If the scheme is invalid """ if not isinstance(scheme, dict): raise ConfigValidationError(f"{path} must be a dictionary") required_fields = ['annotation_type', 'name', 'description'] missing_fields = [field for field in required_fields if field not in scheme] if missing_fields: raise ConfigValidationError(f"{path} missing required fields: {', '.join(missing_fields)}") # Validate annotation_type against the schema registry (single source of truth) from potato.server_utils.schemas.registry import schema_registry valid_types = schema_registry.get_supported_types() if scheme['annotation_type'] not in valid_types: raise ConfigValidationError(f"{path}.annotation_type must be one of: {', '.join(sorted(valid_types))}") # Registry-driven required field check: validate fields that are unconditionally # required for this type. Types with alternative forms (e.g., likert accepts either # 'labels' OR 'min_label'+'max_label'+'size') have deeper validation in the # type-specific blocks below. This check catches missing fields for types that # don't have explicit type-specific validation blocks. annotation_type = scheme['annotation_type'] _types_with_explicit_validation = { 'radio', 'multiselect', 'select', 'likert', 'slider', 'span', 'multirate', 'image_annotation', 'audio_annotation', 'video_annotation', 'tiered_annotation', 'pairwise', 'bws', 'soft_label', 'confidence', 'constant_sum', 'semantic_differential', 'ranking', 'range_slider', 'hierarchical_multiselect', 'vas', 'rubric_eval', 'error_span', 'card_sort', 'conjoint', } if annotation_type not in _types_with_explicit_validation: schema_def = schema_registry.get(annotation_type) if schema_def and schema_def.required_fields: # 'name' and 'description' are already checked above extra_required = [f for f in schema_def.required_fields if f not in ('name', 'description')] missing = [f for f in extra_required if f not in scheme] if missing: raise ConfigValidationError( f"{path} (type '{annotation_type}') missing required field(s): " f"{', '.join(missing)}" ) # Type-specific validation (deep structural checks beyond registry required_fields) if annotation_type in ['radio', 'multiselect', 'select']: if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for {annotation_type} annotation type") if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if not scheme['labels']: raise ConfigValidationError(f"{path}.labels cannot be empty") elif annotation_type == 'likert': # Likert can use labels (falls back to radio) or min_label/max_label/size if 'labels' not in scheme: required_likert_fields = ['min_label', 'max_label', 'size'] missing_likert_fields = [field for field in required_likert_fields if field not in scheme] if missing_likert_fields: raise ConfigValidationError(f"{path} missing required fields for likert: {', '.join(missing_likert_fields)}") if not isinstance(scheme['size'], int) or scheme['size'] < 2: raise ConfigValidationError(f"{path}.size must be an integer >= 2") elif annotation_type == 'slider': # Slider can use labels (falls back to radio) or min_value/max_value if 'labels' not in scheme: required_slider_fields = ['min_value', 'max_value', 'starting_value'] missing_slider_fields = [field for field in required_slider_fields if field not in scheme] if missing_slider_fields: raise ConfigValidationError(f"{path} missing required fields for slider: {', '.join(missing_slider_fields)}") if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") if scheme['min_value'] >= scheme['max_value']: raise ConfigValidationError(f"{path}.min_value must be less than max_value") elif annotation_type == 'span': if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for span annotation type") if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if not scheme['labels']: raise ConfigValidationError(f"{path}.labels cannot be empty") elif annotation_type == 'multirate': # multirate requires 'labels' always, and either 'options' or 'options_from_data' if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing required field for multirate: labels") has_options = 'options' in scheme has_options_from_data = 'options_from_data' in scheme if not has_options and not has_options_from_data: raise ConfigValidationError(f"{path} must have either 'options' or 'options_from_data' for multirate") if has_options: if not isinstance(scheme['options'], list): raise ConfigValidationError(f"{path}.options must be a list") if not scheme['options']: raise ConfigValidationError(f"{path}.options cannot be empty") if has_options_from_data: if not isinstance(scheme['options_from_data'], str) or not scheme['options_from_data'].strip(): raise ConfigValidationError(f"{path}.options_from_data must be a non-empty string (instance data field name)") if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if not scheme['labels']: raise ConfigValidationError(f"{path}.labels cannot be empty") elif annotation_type == 'image_annotation': # Image annotation requires tools and labels if 'tools' not in scheme: raise ConfigValidationError(f"{path} missing 'tools' field for image_annotation type") if not isinstance(scheme['tools'], list): raise ConfigValidationError(f"{path}.tools must be a list") if not scheme['tools']: raise ConfigValidationError(f"{path}.tools cannot be empty") # Validate tools valid_tools = ['bbox', 'polygon', 'freeform', 'landmark', 'fill', 'eraser', 'brush'] invalid_tools = [t for t in scheme['tools'] if t not in valid_tools] if invalid_tools: raise ConfigValidationError(f"{path}.tools contains invalid values: {invalid_tools}. Valid tools are: {valid_tools}") if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for image_annotation type") if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if not scheme['labels']: raise ConfigValidationError(f"{path}.labels cannot be empty") # Validate optional numeric fields if 'min_annotations' in scheme: if not isinstance(scheme['min_annotations'], int) or scheme['min_annotations'] < 0: raise ConfigValidationError(f"{path}.min_annotations must be a non-negative integer") if 'max_annotations' in scheme and scheme['max_annotations'] is not None: if not isinstance(scheme['max_annotations'], int) or scheme['max_annotations'] < 1: raise ConfigValidationError(f"{path}.max_annotations must be a positive integer or null") elif annotation_type == 'audio_annotation': # Validate mode valid_modes = ['label', 'questions', 'both'] mode = scheme.get('mode', 'label') if mode not in valid_modes: raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") # Validate labels for label/both modes if mode in ['label', 'both']: if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for audio_annotation mode '{mode}'") if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if not scheme['labels']: raise ConfigValidationError(f"{path}.labels cannot be empty for mode '{mode}'") # Validate segment_schemes for questions/both modes if mode in ['questions', 'both']: if 'segment_schemes' not in scheme: raise ConfigValidationError(f"{path} missing 'segment_schemes' field for audio_annotation mode '{mode}'") if not isinstance(scheme['segment_schemes'], list): raise ConfigValidationError(f"{path}.segment_schemes must be a list") if not scheme['segment_schemes']: raise ConfigValidationError(f"{path}.segment_schemes cannot be empty for mode '{mode}'") # Validate optional numeric fields if 'min_segments' in scheme: if not isinstance(scheme['min_segments'], int) or scheme['min_segments'] < 0: raise ConfigValidationError(f"{path}.min_segments must be a non-negative integer") if 'max_segments' in scheme and scheme['max_segments'] is not None: if not isinstance(scheme['max_segments'], int) or scheme['max_segments'] < 1: raise ConfigValidationError(f"{path}.max_segments must be a positive integer or null") elif annotation_type == 'video_annotation': # Validate mode valid_modes = ['segment', 'frame', 'keyframe', 'tracking', 'combined'] mode = scheme.get('mode', 'segment') if mode not in valid_modes: raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") # Validate labels for segment/frame/keyframe/combined modes if mode in ['segment', 'frame', 'keyframe', 'combined']: if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for video_annotation mode '{mode}'") if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if not scheme['labels']: raise ConfigValidationError(f"{path}.labels cannot be empty for mode '{mode}'") # Validate optional numeric fields if 'min_segments' in scheme: if not isinstance(scheme['min_segments'], int) or scheme['min_segments'] < 0: raise ConfigValidationError(f"{path}.min_segments must be a non-negative integer") if 'max_segments' in scheme and scheme['max_segments'] is not None: if not isinstance(scheme['max_segments'], int) or scheme['max_segments'] < 1: raise ConfigValidationError(f"{path}.max_segments must be a positive integer or null") if 'timeline_height' in scheme: if not isinstance(scheme['timeline_height'], int) or scheme['timeline_height'] < 30: raise ConfigValidationError(f"{path}.timeline_height must be an integer >= 30") if 'video_fps' in scheme: if not isinstance(scheme['video_fps'], (int, float)) or scheme['video_fps'] <= 0: raise ConfigValidationError(f"{path}.video_fps must be a positive number") elif annotation_type == 'tiered_annotation': # Validate required fields if 'tiers' not in scheme: raise ConfigValidationError(f"{path} missing 'tiers' field for tiered_annotation") if not isinstance(scheme['tiers'], list): raise ConfigValidationError(f"{path}.tiers must be a list") if not scheme['tiers']: raise ConfigValidationError(f"{path}.tiers cannot be empty") if 'source_field' not in scheme: raise ConfigValidationError(f"{path} missing 'source_field' field for tiered_annotation") # Validate media_type media_type = scheme.get('media_type', 'audio') if media_type not in ['audio', 'video']: raise ConfigValidationError(f"{path}.media_type must be 'audio' or 'video'") # Validate tiers tier_names = set() valid_tier_types = ['independent', 'dependent'] valid_constraint_types = ['time_subdivision', 'included_in', 'symbolic_association', 'symbolic_subdivision', 'none'] for i, tier in enumerate(scheme['tiers']): tier_path = f"{path}.tiers[{i}]" if not isinstance(tier, dict): raise ConfigValidationError(f"{tier_path} must be a dictionary") if 'name' not in tier: raise ConfigValidationError(f"{tier_path} missing 'name' field") tier_name = tier['name'] if tier_name in tier_names: raise ConfigValidationError(f"{tier_path} duplicate tier name: '{tier_name}'") tier_names.add(tier_name) # Validate tier_type tier_type = tier.get('tier_type', 'independent') if tier_type not in valid_tier_types: raise ConfigValidationError(f"{tier_path}.tier_type must be one of: {valid_tier_types}") # Validate dependent tier requirements if tier_type == 'dependent': if 'parent_tier' not in tier: raise ConfigValidationError(f"{tier_path} dependent tier must have 'parent_tier'") # Validate constraint_type constraint_type = tier.get('constraint_type', 'none') if constraint_type not in valid_constraint_types: raise ConfigValidationError(f"{tier_path}.constraint_type must be one of: {valid_constraint_types}") # Validate parent_tier references (second pass) for i, tier in enumerate(scheme['tiers']): parent = tier.get('parent_tier') if parent and parent not in tier_names: raise ConfigValidationError(f"{path}.tiers[{i}] references unknown parent_tier: '{parent}'") if parent and parent == tier['name']: raise ConfigValidationError(f"{path}.tiers[{i}] cannot be its own parent") # Validate optional numeric fields if 'tier_height' in scheme: if not isinstance(scheme['tier_height'], int) or scheme['tier_height'] < 20: raise ConfigValidationError(f"{path}.tier_height must be an integer >= 20") elif annotation_type == 'pairwise': # Validate mode valid_modes = ['binary', 'scale', 'multi_dimension'] mode = scheme.get('mode', 'binary') if mode not in valid_modes: raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") # Validate labels if provided if 'labels' in scheme: if not isinstance(scheme['labels'], list): raise ConfigValidationError(f"{path}.labels must be a list") if len(scheme['labels']) < 2: raise ConfigValidationError(f"{path}.labels must have at least 2 items (for A and B)") # Validate scale configuration for scale mode if mode == 'scale': scale = scheme.get('scale', {}) if not isinstance(scale, dict): raise ConfigValidationError(f"{path}.scale must be a dictionary") # Validate min/max values min_val = scale.get('min', -3) max_val = scale.get('max', 3) if not isinstance(min_val, (int, float)) or not isinstance(max_val, (int, float)): raise ConfigValidationError(f"{path}.scale.min and scale.max must be numbers") if min_val >= max_val: raise ConfigValidationError(f"{path}.scale.min must be less than scale.max") # Validate step step = scale.get('step', 1) if not isinstance(step, (int, float)) or step <= 0: raise ConfigValidationError(f"{path}.scale.step must be a positive number") # Validate scale labels if provided if 'labels' in scale: scale_labels = scale['labels'] if not isinstance(scale_labels, dict): raise ConfigValidationError(f"{path}.scale.labels must be a dictionary") # Validate multi_dimension mode if mode == 'multi_dimension': dimensions = scheme.get('dimensions', []) if not isinstance(dimensions, list) or not dimensions: raise ConfigValidationError(f"{path}.dimensions must be a non-empty list for multi_dimension mode") for i, dim in enumerate(dimensions): if not isinstance(dim, dict): raise ConfigValidationError(f"{path}.dimensions[{i}] must be a dictionary") if 'name' not in dim: raise ConfigValidationError(f"{path}.dimensions[{i}] must have a 'name' field") elif annotation_type == 'bws': # Validate tuple_size if 'tuple_size' in scheme: if not isinstance(scheme['tuple_size'], int) or scheme['tuple_size'] < 2: raise ConfigValidationError(f"{path}.tuple_size must be an integer >= 2") elif annotation_type == 'soft_label': if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for soft_label annotation type") if not isinstance(scheme['labels'], list) or not scheme['labels']: raise ConfigValidationError(f"{path}.labels must be a non-empty list") if 'total' in scheme: if not isinstance(scheme['total'], int) or scheme['total'] < 1: raise ConfigValidationError(f"{path}.total must be a positive integer") elif annotation_type == 'confidence': if 'scale_type' in scheme: if scheme['scale_type'] not in ['likert', 'slider']: raise ConfigValidationError(f"{path}.scale_type must be 'likert' or 'slider'") if 'scale_points' in scheme: if not isinstance(scheme['scale_points'], int) or scheme['scale_points'] < 2: raise ConfigValidationError(f"{path}.scale_points must be an integer >= 2") elif annotation_type == 'constant_sum': if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for constant_sum annotation type") if not isinstance(scheme['labels'], list) or not scheme['labels']: raise ConfigValidationError(f"{path}.labels must be a non-empty list") if 'total_points' in scheme: if not isinstance(scheme['total_points'], int) or scheme['total_points'] < 1: raise ConfigValidationError(f"{path}.total_points must be a positive integer") elif annotation_type == 'semantic_differential': if 'pairs' not in scheme: raise ConfigValidationError(f"{path} missing 'pairs' field for semantic_differential annotation type") if not isinstance(scheme['pairs'], list) or not scheme['pairs']: raise ConfigValidationError(f"{path}.pairs must be a non-empty list") for i, pair in enumerate(scheme['pairs']): if not isinstance(pair, list) or len(pair) != 2: raise ConfigValidationError(f"{path}.pairs[{i}] must be a list of exactly two strings") elif annotation_type == 'ranking': if 'labels' not in scheme: raise ConfigValidationError(f"{path} missing 'labels' field for ranking annotation type") if not isinstance(scheme['labels'], list) or not scheme['labels']: raise ConfigValidationError(f"{path}.labels must be a non-empty list") elif annotation_type == 'range_slider': if 'min_value' in scheme and 'max_value' in scheme: if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") if scheme['min_value'] >= scheme['max_value']: raise ConfigValidationError(f"{path}.min_value must be less than max_value") elif annotation_type == 'hierarchical_multiselect': if 'taxonomy' not in scheme: raise ConfigValidationError(f"{path} missing 'taxonomy' field for hierarchical_multiselect annotation type") if not isinstance(scheme['taxonomy'], dict) or not scheme['taxonomy']: raise ConfigValidationError(f"{path}.taxonomy must be a non-empty dictionary") elif annotation_type == 'vas': if 'min_value' in scheme and 'max_value' in scheme: if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") if scheme['min_value'] >= scheme['max_value']: raise ConfigValidationError(f"{path}.min_value must be less than max_value") elif annotation_type == 'rubric_eval': if 'criteria' not in scheme: raise ConfigValidationError(f"{path} missing 'criteria' field for rubric_eval annotation type") if not isinstance(scheme['criteria'], list) or not scheme['criteria']: raise ConfigValidationError(f"{path}.criteria must be a non-empty list") for i, crit in enumerate(scheme['criteria']): if not isinstance(crit, dict) or 'name' not in crit: raise ConfigValidationError(f"{path}.criteria[{i}] must be a dict with 'name'") if 'scale_points' in scheme: if not isinstance(scheme['scale_points'], int) or scheme['scale_points'] < 2: raise ConfigValidationError(f"{path}.scale_points must be an integer >= 2") elif annotation_type == 'error_span': if 'error_types' not in scheme: raise ConfigValidationError(f"{path} missing 'error_types' field for error_span annotation type") if not isinstance(scheme['error_types'], list) or not scheme['error_types']: raise ConfigValidationError(f"{path}.error_types must be a non-empty list") for i, et in enumerate(scheme['error_types']): if not isinstance(et, dict) or 'name' not in et: raise ConfigValidationError(f"{path}.error_types[{i}] must be a dict with 'name'") elif annotation_type == 'card_sort': mode = scheme.get('mode', 'closed') if mode not in ['open', 'closed']: raise ConfigValidationError(f"{path}.mode must be 'open' or 'closed'") if mode == 'closed': if 'groups' not in scheme: raise ConfigValidationError(f"{path} missing 'groups' field for card_sort in closed mode") if not isinstance(scheme['groups'], list) or not scheme['groups']: raise ConfigValidationError(f"{path}.groups must be a non-empty list for closed mode") elif annotation_type == 'conjoint': if 'attributes' not in scheme and 'profiles_field' not in scheme: raise ConfigValidationError(f"{path} requires 'attributes' or 'profiles_field' for conjoint annotation type") if 'attributes' in scheme: if not isinstance(scheme['attributes'], list) or not scheme['attributes']: raise ConfigValidationError(f"{path}.attributes must be a non-empty list") for i, attr in enumerate(scheme['attributes']): if not isinstance(attr, dict) or 'name' not in attr: raise ConfigValidationError(f"{path}.attributes[{i}] must be a dict with 'name'") if 'profiles_per_set' in scheme: if not isinstance(scheme['profiles_per_set'], int) or scheme['profiles_per_set'] < 2: raise ConfigValidationError(f"{path}.profiles_per_set must be an integer >= 2") # Validate display_logic if present if 'display_logic' in scheme: validate_display_logic_structure(scheme['display_logic'], path) def validate_display_logic_structure(display_logic: Dict[str, Any], path: str) -> None: """ Validate the structure of a display_logic configuration block. This validates the syntax and structure of a single display_logic block. Cross-schema validation (checking referenced schemas exist) is done separately in validate_display_logic_references(). Args: display_logic: The display_logic configuration path: Path in the config for error reporting Raises: ConfigValidationError: If the display_logic is invalid """ from potato.server_utils.display_logic import SUPPORTED_OPERATORS if not isinstance(display_logic, dict): raise ConfigValidationError(f"{path}.display_logic must be a dictionary") # Must have show_when if 'show_when' not in display_logic: raise ConfigValidationError(f"{path}.display_logic must have 'show_when' field") show_when = display_logic['show_when'] if not isinstance(show_when, list): raise ConfigValidationError(f"{path}.display_logic.show_when must be a list of conditions") if len(show_when) == 0: raise ConfigValidationError(f"{path}.display_logic.show_when must have at least one condition") # Validate each condition for i, condition in enumerate(show_when): cond_path = f"{path}.display_logic.show_when[{i}]" if not isinstance(condition, dict): raise ConfigValidationError(f"{cond_path} must be a dictionary") # Required fields if 'schema' not in condition: raise ConfigValidationError(f"{cond_path} missing required 'schema' field") if 'operator' not in condition: raise ConfigValidationError(f"{cond_path} missing required 'operator' field") operator = condition['operator'] if operator not in SUPPORTED_OPERATORS: raise ConfigValidationError( f"{cond_path}.operator '{operator}' is not supported. " f"Valid operators: {list(SUPPORTED_OPERATORS.keys())}" ) # Validate operator-specific value requirements value = condition.get('value') # Operators that don't need a value if operator in ('empty', 'not_empty'): pass # No value required # Range operators need [min, max] elif operator in ('in_range', 'not_in_range', 'length_in_range'): if not isinstance(value, (list, tuple)): raise ConfigValidationError( f"{cond_path}: operator '{operator}' requires a range value as [min, max]" ) if len(value) != 2: raise ConfigValidationError( f"{cond_path}: range value must have exactly 2 elements [min, max]" ) try: min_val, max_val = float(value[0]), float(value[1]) if min_val > max_val: raise ConfigValidationError( f"{cond_path}: range min ({min_val}) is greater than max ({max_val})" ) except (ValueError, TypeError): raise ConfigValidationError(f"{cond_path}: range values must be numeric") # Numeric operators need numeric values elif operator in ('gt', 'gte', 'lt', 'lte', 'length_gt', 'length_lt'): if value is None: raise ConfigValidationError(f"{cond_path}: operator '{operator}' requires a value") try: float(value) except (ValueError, TypeError): raise ConfigValidationError( f"{cond_path}: operator '{operator}' requires a numeric value" ) # Regex operator needs a valid pattern elif operator == 'matches': if value is None: raise ConfigValidationError(f"{cond_path}: operator 'matches' requires a regex pattern") try: import re re.compile(value) except re.error as e: raise ConfigValidationError(f"{cond_path}: invalid regex pattern '{value}': {e}") # Other operators just need a non-None value elif value is None: raise ConfigValidationError(f"{cond_path}: operator '{operator}' requires a value") # Validate logic field if present logic = display_logic.get('logic', 'all') if logic not in ('all', 'any'): raise ConfigValidationError( f"{path}.display_logic.logic must be 'all' or 'any', got '{logic}'" ) def validate_display_logic_references(annotation_schemes: List[Dict[str, Any]]) -> None: """ Validate that all display_logic references point to existing schemas and check for circular dependencies. This is called after all annotation schemes have been validated individually. Args: annotation_schemes: List of annotation scheme configurations Raises: ConfigValidationError: If there are invalid references or circular dependencies """ from potato.server_utils.display_logic import validate_display_logic_config # Use the DisplayLogicValidator for comprehensive validation is_valid, errors = validate_display_logic_config(annotation_schemes) if not is_valid: # Format errors nicely error_msg = "Display logic validation errors:\n" + "\n".join(f" - {e}" for e in errors) raise ConfigValidationError(error_msg) def validate_server_config(config_data: Dict[str, Any]) -> None: """ Validate server configuration section. The server section allows specifying server settings in the YAML config instead of via command-line flags. CLI flags take precedence over config values. Supported options: - port: Port number to run on (1-65535) - host: Host address to bind to (default: localhost) - debug: Enable Flask debug mode (default: false) Args: config_data: The configuration data Raises: ConfigValidationError: If the server configuration is invalid """ if "server" not in config_data: return # server section is optional server_config = config_data["server"] if not isinstance(server_config, dict): raise ConfigValidationError("server configuration must be a dictionary") # Validate port if "port" in server_config: port = server_config["port"] if not isinstance(port, int): raise ConfigValidationError("server.port must be an integer") if port < 1 or port > 65535: raise ConfigValidationError("server.port must be between 1 and 65535") # Validate host if "host" in server_config: host = server_config["host"] if not isinstance(host, str): raise ConfigValidationError("server.host must be a string") if not host.strip(): raise ConfigValidationError("server.host cannot be empty") # Validate debug if "debug" in server_config: if not isinstance(server_config["debug"], bool): raise ConfigValidationError("server.debug must be a boolean") def validate_authentication_config(config_data: Dict[str, Any]) -> None: """ Validate authentication configuration section. Validates OAuth/OIDC provider settings, required fields, and warns about common misconfigurations. Args: config_data: The configuration data Raises: ConfigValidationError: If the authentication configuration is invalid """ if "authentication" not in config_data: return # authentication section is optional auth_config = config_data["authentication"] if not isinstance(auth_config, dict): raise ConfigValidationError("authentication configuration must be a dictionary") method = auth_config.get("method", "in_memory") valid_methods = ["in_memory", "database", "clerk", "oauth"] if method not in valid_methods: raise ConfigValidationError( f"authentication.method must be one of: {', '.join(valid_methods)}. " f"Got: '{method}'" ) # OAuth-specific validation if method == "oauth": # providers is required providers = auth_config.get("providers") if not providers or not isinstance(providers, dict): raise ConfigValidationError( "authentication.providers is required when method is 'oauth' " "and must be a dictionary with at least one provider" ) if len(providers) == 0: raise ConfigValidationError( "authentication.providers must contain at least one provider" ) # Validate each provider for name, pconfig in providers.items(): if not isinstance(pconfig, dict): raise ConfigValidationError( f"authentication.providers.{name} must be a dictionary" ) # client_id and client_secret are required if "client_id" not in pconfig: raise ConfigValidationError( f"authentication.providers.{name}.client_id is required" ) if "client_secret" not in pconfig: raise ConfigValidationError( f"authentication.providers.{name}.client_secret is required" ) # Generic OIDC requires discovery_url if name not in ("google", "github") and "discovery_url" not in pconfig: raise ConfigValidationError( f"authentication.providers.{name} requires 'discovery_url' " f"for OIDC providers (only 'google' and 'github' have built-in URLs)" ) # Validate optional fields if "allowed_domain" in pconfig: domain = pconfig["allowed_domain"] if not isinstance(domain, str) or not domain.strip(): raise ConfigValidationError( f"authentication.providers.{name}.allowed_domain must be a non-empty string" ) if "allowed_org" in pconfig: org = pconfig["allowed_org"] if not isinstance(org, str) or not org.strip(): raise ConfigValidationError( f"authentication.providers.{name}.allowed_org must be a non-empty string" ) if "scopes" in pconfig: scopes = pconfig["scopes"] if not isinstance(scopes, list): raise ConfigValidationError( f"authentication.providers.{name}.scopes must be a list" ) # Validate user_identity_field identity_field = auth_config.get("user_identity_field", "email") valid_fields = ["email", "username", "sub", "name"] if identity_field not in valid_fields: raise ConfigValidationError( f"authentication.user_identity_field must be one of: " f"{', '.join(valid_fields)}. Got: '{identity_field}'" ) # Warn if secret_key is not set (OAuth needs stable sessions) if "secret_key" not in config_data: import os if not os.environ.get("POTATO_SECRET_KEY"): logger.warning( "OAuth is configured but no 'secret_key' is set in config " "and POTATO_SECRET_KEY environment variable is not set. " "Sessions will be lost on server restart. " "Set 'secret_key' in config or POTATO_SECRET_KEY env var." ) # Database-specific validation if method == "database": db_url = auth_config.get("database_url") if db_url: if not (db_url.startswith("sqlite:///") or db_url.startswith("postgresql://")): raise ConfigValidationError( "authentication.database_url must start with 'sqlite:///' or 'postgresql://'. " f"Got: '{db_url}'" ) # Mutual exclusivity: database backend and user_config_path if "user_config_path" in auth_config: raise ConfigValidationError( "authentication.user_config_path cannot be used with method 'database'. " "The database backend handles its own user persistence." ) def validate_quality_control_config(config_data: Dict[str, Any]) -> None: """ Validate quality control configuration (attention checks, gold standards, pre-annotation). Args: config_data: The configuration data Raises: ConfigValidationError: If the configuration is invalid """ # Validate attention checks config if "attention_checks" in config_data: attn_config = config_data["attention_checks"] if not isinstance(attn_config, dict): raise ConfigValidationError("attention_checks must be a dictionary") if attn_config.get("enabled", False): # Validate items_file is specified if "items_file" not in attn_config: raise ConfigValidationError("attention_checks.items_file is required when enabled") if not isinstance(attn_config["items_file"], str): raise ConfigValidationError("attention_checks.items_file must be a string path") # Validate frequency or probability (one should be set) has_frequency = "frequency" in attn_config has_probability = "probability" in attn_config if has_frequency and has_probability: raise ConfigValidationError("attention_checks: specify either 'frequency' or 'probability', not both") if has_frequency: freq = attn_config["frequency"] if not isinstance(freq, int) or freq < 1: raise ConfigValidationError("attention_checks.frequency must be a positive integer") if has_probability: prob = attn_config["probability"] if not isinstance(prob, (int, float)) or prob < 0 or prob > 1: raise ConfigValidationError("attention_checks.probability must be a number between 0 and 1") # Validate min_response_time if "min_response_time" in attn_config: min_time = attn_config["min_response_time"] if not isinstance(min_time, (int, float)) or min_time < 0: raise ConfigValidationError("attention_checks.min_response_time must be a non-negative number") # Validate failure_handling if "failure_handling" in attn_config: failure_config = attn_config["failure_handling"] if not isinstance(failure_config, dict): raise ConfigValidationError("attention_checks.failure_handling must be a dictionary") if "warn_threshold" in failure_config: warn = failure_config["warn_threshold"] if not isinstance(warn, int) or warn < 1: raise ConfigValidationError("attention_checks.failure_handling.warn_threshold must be a positive integer") if "block_threshold" in failure_config: block = failure_config["block_threshold"] if not isinstance(block, int) or block < 1: raise ConfigValidationError("attention_checks.failure_handling.block_threshold must be a positive integer") # Ensure block > warn warn = failure_config.get("warn_threshold", 2) if block <= warn: raise ConfigValidationError("attention_checks.failure_handling.block_threshold must be greater than warn_threshold") # Validate gold standards config if "gold_standards" in config_data: gold_config = config_data["gold_standards"] if not isinstance(gold_config, dict): raise ConfigValidationError("gold_standards must be a dictionary") if gold_config.get("enabled", False): # Validate items_file is specified if "items_file" not in gold_config: raise ConfigValidationError("gold_standards.items_file is required when enabled") if not isinstance(gold_config["items_file"], str): raise ConfigValidationError("gold_standards.items_file must be a string path") # Validate mode if "mode" in gold_config: valid_modes = ["training", "mixed", "separate"] if gold_config["mode"] not in valid_modes: raise ConfigValidationError(f"gold_standards.mode must be one of: {', '.join(valid_modes)}") # Validate frequency if "frequency" in gold_config: freq = gold_config["frequency"] if not isinstance(freq, int) or freq < 1: raise ConfigValidationError("gold_standards.frequency must be a positive integer") # Validate accuracy config if "accuracy" in gold_config: accuracy_config = gold_config["accuracy"] if not isinstance(accuracy_config, dict): raise ConfigValidationError("gold_standards.accuracy must be a dictionary") if "min_threshold" in accuracy_config: threshold = accuracy_config["min_threshold"] if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: raise ConfigValidationError("gold_standards.accuracy.min_threshold must be between 0 and 1") if "evaluation_count" in accuracy_config: count = accuracy_config["evaluation_count"] if not isinstance(count, int) or count < 1: raise ConfigValidationError("gold_standards.accuracy.evaluation_count must be a positive integer") # Validate auto_promote config if "auto_promote" in gold_config: auto_promote = gold_config["auto_promote"] if not isinstance(auto_promote, dict): raise ConfigValidationError("gold_standards.auto_promote must be a dictionary") if "min_annotators" in auto_promote: min_ann = auto_promote["min_annotators"] if not isinstance(min_ann, int) or min_ann < 2: raise ConfigValidationError("gold_standards.auto_promote.min_annotators must be an integer >= 2") if "agreement_threshold" in auto_promote: threshold = auto_promote["agreement_threshold"] if not isinstance(threshold, (int, float)) or threshold < 0.5 or threshold > 1.0: raise ConfigValidationError("gold_standards.auto_promote.agreement_threshold must be between 0.5 and 1.0") # Validate pre-annotation config if "pre_annotation" in config_data: pre_config = config_data["pre_annotation"] if not isinstance(pre_config, dict): raise ConfigValidationError("pre_annotation must be a dictionary") if pre_config.get("enabled", False): # Validate field name if "field" in pre_config: if not isinstance(pre_config["field"], str) or not pre_config["field"].strip(): raise ConfigValidationError("pre_annotation.field must be a non-empty string") # Validate highlight_low_confidence threshold if "highlight_low_confidence" in pre_config: threshold = pre_config["highlight_low_confidence"] if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: raise ConfigValidationError("pre_annotation.highlight_low_confidence must be between 0 and 1") # Validate agreement metrics config if "agreement_metrics" in config_data: agreement_config = config_data["agreement_metrics"] if not isinstance(agreement_config, dict): raise ConfigValidationError("agreement_metrics must be a dictionary") if "min_overlap" in agreement_config: overlap = agreement_config["min_overlap"] if not isinstance(overlap, int) or overlap < 2: raise ConfigValidationError("agreement_metrics.min_overlap must be an integer >= 2") if "refresh_interval" in agreement_config: interval = agreement_config["refresh_interval"] if not isinstance(interval, int) or interval < 10: raise ConfigValidationError("agreement_metrics.refresh_interval must be an integer >= 10 seconds") def validate_instance_reclaim_config(config_data: Dict[str, Any]) -> None: """Validate abandoned assignment reclaim configuration.""" if "instance_reclaim" not in config_data: return reclaim_config = config_data["instance_reclaim"] if not isinstance(reclaim_config, dict): raise ConfigValidationError("instance_reclaim must be a dictionary") def validate_bool(section: Dict[str, Any], path: str) -> None: if "preserve_completed_annotations" in section and not isinstance(section["preserve_completed_annotations"], bool): raise ConfigValidationError(f"{path}.preserve_completed_annotations must be a boolean") def validate_section(section_name: str) -> None: if section_name not in reclaim_config: return section = reclaim_config[section_name] if not isinstance(section, dict): raise ConfigValidationError(f"instance_reclaim.{section_name} must be a dictionary") validate_bool(section, f"instance_reclaim.{section_name}") if "enabled" in reclaim_config and not isinstance(reclaim_config["enabled"], bool): raise ConfigValidationError("instance_reclaim.enabled must be a boolean") if "timeout_hours" in reclaim_config: timeout = reclaim_config["timeout_hours"] if not isinstance(timeout, (int, float)) or timeout <= 0: raise ConfigValidationError("instance_reclaim.timeout_hours must be a positive number") validate_bool(reclaim_config, "instance_reclaim") for section_name in ("stale", "manual", "quality_control", "prolific"): validate_section(section_name) prolific = reclaim_config.get("prolific") if isinstance(prolific, dict) and "status_policies" in prolific: status_policies = prolific["status_policies"] if not isinstance(status_policies, dict): raise ConfigValidationError("instance_reclaim.prolific.status_policies must be a dictionary") valid_statuses = {"RETURNED", "TIMED-OUT", "REJECTED"} for status, section in status_policies.items(): if status not in valid_statuses: raise ConfigValidationError( "instance_reclaim.prolific.status_policies keys must be one of: RETURNED, TIMED-OUT, REJECTED" ) if not isinstance(section, dict): raise ConfigValidationError( f"instance_reclaim.prolific.status_policies.{status} must be a dictionary" ) validate_bool(section, f"instance_reclaim.prolific.status_policies.{status}") def validate_data_directory_config(config_data: Dict[str, Any]) -> None: """ Validate data_directory configuration. This function validates the directory watching configuration options: - data_directory: Path to the directory containing data files - watch_data_directory: Whether to watch for changes (default: False) - watch_poll_interval: Seconds between scans (default: 5.0) Args: config_data: The configuration data Raises: ConfigValidationError: If the configuration is invalid """ if "data_directory" not in config_data: return # data_directory is optional data_directory = config_data["data_directory"] # Validate data_directory is a string if not isinstance(data_directory, str): raise ConfigValidationError("data_directory must be a string path") if not data_directory.strip(): raise ConfigValidationError("data_directory cannot be empty") # Validate watch_data_directory if present if "watch_data_directory" in config_data: watch_enabled = config_data["watch_data_directory"] if not isinstance(watch_enabled, bool): raise ConfigValidationError("watch_data_directory must be a boolean (true/false)") # Validate watch_poll_interval if present if "watch_poll_interval" in config_data: interval = config_data["watch_poll_interval"] if not isinstance(interval, (int, float)): raise ConfigValidationError("watch_poll_interval must be a number") if interval < 1.0: raise ConfigValidationError("watch_poll_interval must be at least 1.0 seconds") if interval > 3600: raise ConfigValidationError("watch_poll_interval cannot exceed 3600 seconds (1 hour)") def validate_data_sources_config(config_data: Dict[str, Any]) -> None: """ Validate data_sources configuration for extended data loading. This function validates the configuration for loading data from various sources including URLs, cloud storage, and databases. Args: config_data: The configuration data Raises: ConfigValidationError: If the configuration is invalid """ data_sources = config_data.get("data_sources") if not data_sources: return # Empty or missing is fine - data_files can be used instead if not isinstance(data_sources, list): raise ConfigValidationError("data_sources must be a list") # Valid source types valid_types = [ "file", "url", "google_drive", "dropbox", "s3", "huggingface", "google_sheets", "database" ] for i, source in enumerate(data_sources): if not isinstance(source, dict): raise ConfigValidationError( f"data_sources[{i}] must be a dictionary" ) source_type = source.get("type") if not source_type: raise ConfigValidationError( f"data_sources[{i}] is missing required 'type' field" ) if source_type not in valid_types: raise ConfigValidationError( f"data_sources[{i}] has invalid type '{source_type}'. " f"Valid types: {', '.join(valid_types)}" ) # Type-specific validation _validate_data_source_by_type(source, source_type, i) # Validate partial_loading configuration if present _validate_partial_loading_config(config_data) # Validate data_cache configuration if present _validate_data_cache_config(config_data) def _validate_data_source_by_type(source: Dict, source_type: str, index: int) -> None: """Validate source-specific configuration.""" prefix = f"data_sources[{index}]" if source_type == "file": if not source.get("path"): raise ConfigValidationError(f"{prefix} (type=file) requires 'path'") elif source_type == "url": url = source.get("url") if not url: raise ConfigValidationError(f"{prefix} (type=url) requires 'url'") if not isinstance(url, str): raise ConfigValidationError(f"{prefix}.url must be a string") # Basic URL format check if not (url.startswith("http://") or url.startswith("https://")): raise ConfigValidationError( f"{prefix}.url must start with http:// or https://" ) elif source_type == "google_drive": if not source.get("url") and not source.get("file_id"): raise ConfigValidationError( f"{prefix} (type=google_drive) requires 'url' or 'file_id'" ) elif source_type == "dropbox": if not source.get("url") and not source.get("path"): raise ConfigValidationError( f"{prefix} (type=dropbox) requires 'url' or 'path'" ) # If path is provided, access_token is required if source.get("path") and not source.get("access_token"): raise ConfigValidationError( f"{prefix} (type=dropbox) requires 'access_token' when using 'path'" ) elif source_type == "s3": if not source.get("bucket"): raise ConfigValidationError(f"{prefix} (type=s3) requires 'bucket'") if not source.get("key"): raise ConfigValidationError(f"{prefix} (type=s3) requires 'key'") elif source_type == "huggingface": if not source.get("dataset"): raise ConfigValidationError( f"{prefix} (type=huggingface) requires 'dataset'" ) elif source_type == "google_sheets": if not source.get("spreadsheet_id"): raise ConfigValidationError( f"{prefix} (type=google_sheets) requires 'spreadsheet_id'" ) if not source.get("credentials_file"): raise ConfigValidationError( f"{prefix} (type=google_sheets) requires 'credentials_file'" ) elif source_type == "database": # Must have connection_string OR dialect+database if not source.get("connection_string"): if not source.get("dialect"): raise ConfigValidationError( f"{prefix} (type=database) requires 'connection_string' or 'dialect'" ) if not source.get("database") and source.get("dialect") != "sqlite": raise ConfigValidationError( f"{prefix} (type=database) requires 'database' when not using sqlite" ) # Must have query OR table if not source.get("query") and not source.get("table"): raise ConfigValidationError( f"{prefix} (type=database) requires 'query' or 'table'" ) def _validate_partial_loading_config(config_data: Dict[str, Any]) -> None: """Validate partial_loading configuration.""" partial = config_data.get("partial_loading") if not partial: return if not isinstance(partial, dict): raise ConfigValidationError("partial_loading must be a dictionary") # Validate enabled if "enabled" in partial and not isinstance(partial["enabled"], bool): raise ConfigValidationError("partial_loading.enabled must be a boolean") # Validate initial_count if "initial_count" in partial: count = partial["initial_count"] if not isinstance(count, int) or count < 1: raise ConfigValidationError( "partial_loading.initial_count must be a positive integer" ) # Validate batch_size if "batch_size" in partial: size = partial["batch_size"] if not isinstance(size, int) or size < 1: raise ConfigValidationError( "partial_loading.batch_size must be a positive integer" ) # Validate auto_load_threshold if "auto_load_threshold" in partial: threshold = partial["auto_load_threshold"] if not isinstance(threshold, (int, float)) or not (0 <= threshold <= 1): raise ConfigValidationError( "partial_loading.auto_load_threshold must be between 0.0 and 1.0" ) def _validate_data_cache_config(config_data: Dict[str, Any]) -> None: """Validate data_cache configuration.""" cache = config_data.get("data_cache") if not cache: return if not isinstance(cache, dict): raise ConfigValidationError("data_cache must be a dictionary") # Validate ttl_seconds if "ttl_seconds" in cache: ttl = cache["ttl_seconds"] if not isinstance(ttl, int) or ttl < 0: raise ConfigValidationError( "data_cache.ttl_seconds must be a non-negative integer" ) # Validate max_size_mb if "max_size_mb" in cache: size = cache["max_size_mb"] if not isinstance(size, int) or size < 1: raise ConfigValidationError( "data_cache.max_size_mb must be a positive integer" ) def validate_database_config(db_config: Dict[str, Any]) -> None: """ Validate database configuration. Args: db_config: The database configuration Raises: ConfigValidationError: If the database configuration is invalid """ if not isinstance(db_config, dict): raise ConfigValidationError("database configuration must be a dictionary") required_fields = ['type', 'host', 'database', 'username'] missing_fields = [field for field in required_fields if field not in db_config] if missing_fields: raise ConfigValidationError(f"Missing required database fields: {', '.join(missing_fields)}") valid_types = ['mysql', 'file'] if db_config['type'] not in valid_types: raise ConfigValidationError(f"Unsupported database type: {db_config['type']}. Must be one of: {', '.join(valid_types)}") # Validate MySQL-specific fields if db_config['type'] == 'mysql': if 'password' not in db_config: raise ConfigValidationError("MySQL database requires password") # Validate port if specified if 'port' in db_config: try: port = int(db_config['port']) if port < 1 or port > 65535: raise ConfigValidationError("Database port must be between 1 and 65535") except (ValueError, TypeError): raise ConfigValidationError("Database port must be a valid integer") def validate_file_paths(config_data: Dict[str, Any], project_dir: str, config_file_dir: str = None) -> None: """ Validate that all file paths in the configuration are secure and exist. Args: config_data: The configuration data project_dir: The project directory config_file_dir: The directory containing the config file (for relative path resolution) Raises: ConfigSecurityError: If any file paths are not secure ConfigValidationError: If required files don't exist """ # Get the task_dir from config task_dir = config_data.get('task_dir') if not task_dir: raise ConfigValidationError("task_dir is required in configuration") # Validate task_dir exists and is secure try: validated_task_dir = validate_path_security(task_dir, project_dir) # Don't require task_dir to exist - it's often an output directory that will be created # Only validate that it's a valid path except ConfigSecurityError as e: raise ConfigSecurityError(f"task_dir: {str(e)}") # Use task_dir as the base for resolving relative paths in the config base_dir = validated_task_dir # Validate data files data_files = config_data.get('data_files', []) for i, data_file in enumerate(data_files): # Skip validation for special values if data_file in [None, "null", "default"]: continue # Handle dict entries with path + optional encoding if isinstance(data_file, dict): file_path = data_file.get("path") if not file_path: raise ConfigValidationError(f"Data file {i}: dict entry missing 'path' field") # Validate encoding if specified encoding = data_file.get("encoding") if encoding is not None: if not isinstance(encoding, str): raise ConfigValidationError( f"Data file {i}: 'encoding' must be a string, got {type(encoding).__name__}" ) try: codecs.lookup(encoding) except LookupError: raise ConfigValidationError( f"Data file {i}: unknown encoding '{encoding}'" ) else: file_path = data_file try: validated_path = validate_path_security(file_path, base_dir, project_dir) if not os.path.exists(validated_path): raise ConfigValidationError(f"Data file not found: {file_path} (resolved to: {validated_path})") except ConfigSecurityError as e: raise ConfigSecurityError(f"Data file {i}: {str(e)}") # Validate batch assignment instance files batch_config = config_data.get('batch_assignment') if isinstance(batch_config, dict): for i, group in enumerate(batch_config.get('groups') or []): if not isinstance(group, dict): continue file_entry = group.get( 'instances_file', group.get('items_file', group.get('instance_ids_file')), ) if not file_entry: continue if isinstance(file_entry, dict): file_path = file_entry.get("path") else: file_path = file_entry try: validated_path = validate_path_security(file_path, base_dir, project_dir) if not os.path.exists(validated_path): raise ConfigValidationError( f"batch_assignment.groups[{i}] file not found: " f"{file_path} (resolved to: {validated_path})" ) except ConfigSecurityError as e: raise ConfigSecurityError( f"batch_assignment.groups[{i}] file: {str(e)}" ) # Validate data_directory if configured if 'data_directory' in config_data: data_directory = config_data['data_directory'] # Skip validation for special values if data_directory not in [None, "null", "default"]: try: validated_dir = validate_path_security(data_directory, base_dir, project_dir) if not os.path.exists(validated_dir): raise ConfigValidationError(f"data_directory not found: {data_directory} (resolved to: {validated_dir})") if not os.path.isdir(validated_dir): raise ConfigValidationError(f"data_directory is not a directory: {data_directory} (resolved to: {validated_dir})") except ConfigSecurityError as e: raise ConfigSecurityError(f"data_directory: {str(e)}") # Validate output_annotation_dir if 'output_annotation_dir' in config_data: output_dir = config_data['output_annotation_dir'] # Skip validation for special values if output_dir not in [None, "null", "default"]: try: validate_path_security(output_dir, project_dir) except ConfigSecurityError as e: raise ConfigSecurityError(f"output_annotation_dir: {str(e)}") # Validate site_dir if 'site_dir' in config_data: site_dir = config_data['site_dir'] # Skip validation for special values if site_dir not in [None, "null", "default"]: try: validate_path_security(site_dir, base_dir, project_dir) except ConfigSecurityError as e: raise ConfigSecurityError(f"site_dir: {str(e)}") # Validate custom_ds if 'custom_ds' in config_data: custom_ds = config_data['custom_ds'] # Skip validation for special values if custom_ds not in [None, "null", "default"]: try: validate_path_security(custom_ds, base_dir, project_dir) except ConfigSecurityError as e: raise ConfigSecurityError(f"custom_ds: {str(e)}") # Validate base_css if 'base_css' in config_data: base_css = config_data['base_css'] if base_css not in [None, "null", "default"]: try: validated_css = validate_path_security(base_css, base_dir, project_dir) if not os.path.exists(validated_css): # Try resolving relative to config file directory if config_file_dir: alt_path = os.path.join(config_file_dir, base_css) if not os.path.exists(alt_path): raise ConfigValidationError( f"base_css file not found: {base_css} (resolved to: {validated_css})" ) else: raise ConfigValidationError( f"base_css file not found: {base_css} (resolved to: {validated_css})" ) except ConfigSecurityError as e: raise ConfigSecurityError(f"base_css: {str(e)}") # Validate header_logo if 'header_logo' in config_data: header_logo = config_data['header_logo'] if header_logo not in [None, "null", "default"]: # Allow URLs to pass through without file validation if not str(header_logo).startswith(("http://", "https://")): try: validated_logo = validate_path_security(header_logo, base_dir, project_dir) if not os.path.exists(validated_logo): # Try resolving relative to config file directory if config_file_dir: alt_path = os.path.join(config_file_dir, header_logo) if not os.path.exists(alt_path): raise ConfigValidationError( f"header_logo file not found: {header_logo} (resolved to: {validated_logo})" ) else: raise ConfigValidationError( f"header_logo file not found: {header_logo} (resolved to: {validated_logo})" ) except ConfigSecurityError as e: raise ConfigSecurityError(f"header_logo: {str(e)}") def validate_training_config(config_data: Dict[str, Any], project_dir: str, config_file_dir: str = None) -> None: """ Validate training configuration. Args: config_data: The configuration data project_dir: The project directory config_file_dir: The directory containing the config file Raises: ConfigValidationError: If training configuration is invalid ConfigSecurityError: If training data file path is not secure """ if 'training' not in config_data: return # Training is optional training_config = config_data['training'] if not isinstance(training_config, dict): raise ConfigValidationError("training configuration must be a dictionary") # Validate enabled flag if 'enabled' in training_config: if not isinstance(training_config['enabled'], bool): raise ConfigValidationError("training.enabled must be a boolean") # If training is disabled or not specified, skip further validation if not training_config.get('enabled', False): return # Validate training data file if 'data_file' not in training_config: raise ConfigValidationError("training.data_file is required when training is enabled") data_file = training_config['data_file'] if not isinstance(data_file, str): raise ConfigValidationError("training.data_file must be a string") # Validate training data file path security and existence try: base_dir = config_file_dir if config_file_dir else project_dir validated_path = validate_path_security(data_file, base_dir, project_dir) if not os.path.exists(validated_path): raise ConfigValidationError(f"Training data file not found: {data_file} (resolved to: {validated_path})") except ConfigSecurityError as e: raise ConfigSecurityError(f"training.data_file: {str(e)}") # Validate annotation schemes if 'annotation_schemes' in training_config: schemes = training_config['annotation_schemes'] if not isinstance(schemes, list): raise ConfigValidationError("training.annotation_schemes must be a list") if not schemes: raise ConfigValidationError("training.annotation_schemes cannot be empty") for i, scheme in enumerate(schemes): if isinstance(scheme, str): # String reference to existing scheme - validate it's a valid string if not scheme.strip(): raise ConfigValidationError(f"training.annotation_schemes[{i}] cannot be empty") elif isinstance(scheme, dict): # Full scheme dictionary - validate it validate_single_annotation_scheme(scheme, f"training.annotation_schemes[{i}]") else: raise ConfigValidationError(f"training.annotation_schemes[{i}] must be a string or dictionary") # Validate passing criteria if 'passing_criteria' in training_config: criteria = training_config['passing_criteria'] if not isinstance(criteria, dict): raise ConfigValidationError("training.passing_criteria must be a dictionary") # Validate min_correct if 'min_correct' in criteria: min_correct = criteria['min_correct'] if not isinstance(min_correct, int) or min_correct < 1: raise ConfigValidationError("training.passing_criteria.min_correct must be a positive integer") # Validate max_attempts if 'max_attempts' in criteria: max_attempts = criteria['max_attempts'] if not isinstance(max_attempts, int) or max_attempts < 1: raise ConfigValidationError("training.passing_criteria.max_attempts must be a positive integer") # Validate require_all_correct if 'require_all_correct' in criteria: if not isinstance(criteria['require_all_correct'], bool): raise ConfigValidationError("training.passing_criteria.require_all_correct must be a boolean") # Validate feedback settings if 'feedback' in training_config: feedback = training_config['feedback'] if not isinstance(feedback, dict): raise ConfigValidationError("training.feedback must be a dictionary") # Validate show_explanations if 'show_explanations' in feedback: if not isinstance(feedback['show_explanations'], bool): raise ConfigValidationError("training.feedback.show_explanations must be a boolean") # Validate allow_retry if 'allow_retry' in feedback: if not isinstance(feedback['allow_retry'], bool): raise ConfigValidationError("training.feedback.allow_retry must be a boolean") # Validate failure action if 'failure_action' in training_config: failure_action = training_config['failure_action'] valid_actions = ['move_to_done', 'repeat_training'] if failure_action not in valid_actions: raise ConfigValidationError(f"training.failure_action must be one of: {', '.join(valid_actions)}") def validate_training_data_file(data_file_path: str, annotation_schemes: List[Dict[str, Any]]) -> None: """ Validate training data file format and consistency. Args: data_file_path: Path to the training data file annotation_schemes: List of annotation schemes to validate against Raises: ConfigValidationError: If training data is invalid """ try: with open(data_file_path, 'r', encoding='utf-8') as f: training_data = json.load(f) except (json.JSONDecodeError, UnicodeDecodeError) as e: raise ConfigValidationError(f"Training data file is not valid JSON: {str(e)}") except FileNotFoundError: raise ConfigValidationError(f"Training data file not found: {data_file_path}") if not isinstance(training_data, dict): raise ConfigValidationError("Training data must be a JSON object") if 'training_instances' not in training_data: raise ConfigValidationError("Training data must contain 'training_instances' field") training_instances = training_data['training_instances'] if not isinstance(training_instances, list): raise ConfigValidationError("training_instances must be a list") if not training_instances: raise ConfigValidationError("training_instances cannot be empty") # Create a mapping of scheme names for validation scheme_names = {scheme['name'] for scheme in annotation_schemes} for i, instance in enumerate(training_instances): if not isinstance(instance, dict): raise ConfigValidationError(f"Training instance {i} must be a dictionary") # Validate required fields required_fields = ['id', 'text', 'correct_answers'] missing_fields = [field for field in required_fields if field not in instance] if missing_fields: raise ConfigValidationError(f"Training instance {i} missing required fields: {', '.join(missing_fields)}") # Validate id if not isinstance(instance['id'], str): raise ConfigValidationError(f"Training instance {i}.id must be a string") # Validate text if not isinstance(instance['text'], str): raise ConfigValidationError(f"Training instance {i}.text must be a string") # Validate correct_answers correct_answers = instance['correct_answers'] if not isinstance(correct_answers, dict): raise ConfigValidationError(f"Training instance {i}.correct_answers must be a dictionary") # Validate that all correct_answers correspond to annotation schemes for scheme_name, answer in correct_answers.items(): if scheme_name not in scheme_names: raise ConfigValidationError(f"Training instance {i}.correct_answers contains unknown scheme: {scheme_name}") # Validate explanation if present if 'explanation' in instance: if not isinstance(instance['explanation'], str): raise ConfigValidationError(f"Training instance {i}.explanation must be a string") def validate_batch_assignment_config(config_data: Dict[str, Any]) -> None: """ Validate batch assignment configuration. ``batch_assignment`` supports explicit annotator cohorts for repeat-round studies. Each group defines annotators allowed to receive a fixed item set, either inline or through a separate supported data file. Items may also carry annotator lists via ``annotator_key``; that field is validated at assignment time because data files load later. """ if 'batch_assignment' not in config_data: return batch_config = config_data['batch_assignment'] if not isinstance(batch_config, dict): raise ConfigValidationError("batch_assignment must be a dictionary") annotator_key = batch_config.get('annotator_key') if annotator_key is not None and ( not isinstance(annotator_key, str) or not annotator_key.strip() ): raise ConfigValidationError("batch_assignment.annotator_key must be a non-empty string") groups = batch_config.get('groups', []) if groups is None: return if not isinstance(groups, list): raise ConfigValidationError("batch_assignment.groups must be a list") for idx, group in enumerate(groups): if not isinstance(group, dict): raise ConfigValidationError(f"batch_assignment.groups[{idx}] must be a dictionary") users = group.get('annotators', group.get('users')) instances = group.get('instances', group.get('items', group.get('instance_ids'))) file_entry = group.get( 'instances_file', group.get('items_file', group.get('instance_ids_file')), ) if not isinstance(users, list) or not users: raise ConfigValidationError( f"batch_assignment.groups[{idx}] must define non-empty annotators/users list" ) if not all(isinstance(user, str) and user.strip() for user in users): raise ConfigValidationError( f"batch_assignment.groups[{idx}].annotators/users must contain non-empty strings" ) has_instances = instances is not None has_file = file_entry is not None if not has_instances and not has_file: raise ConfigValidationError( f"batch_assignment.groups[{idx}] must define either " "instances/items/instance_ids or instances_file/items_file/instance_ids_file" ) if has_instances and (not isinstance(instances, list) or not instances): raise ConfigValidationError( f"batch_assignment.groups[{idx}] must define non-empty instances/items/instance_ids list" ) if has_instances and not all(isinstance(instance, str) and instance.strip() for instance in instances): raise ConfigValidationError( f"batch_assignment.groups[{idx}].instances/items/instance_ids must contain non-empty strings" ) if has_file: if isinstance(file_entry, str): if not file_entry.strip(): raise ConfigValidationError( f"batch_assignment.groups[{idx}] file path must be non-empty" ) elif isinstance(file_entry, dict): path = file_entry.get('path') if not isinstance(path, str) or not path.strip(): raise ConfigValidationError( f"batch_assignment.groups[{idx}] file entry must define a non-empty path" ) encoding = file_entry.get('encoding') if encoding is not None and not isinstance(encoding, str): raise ConfigValidationError( f"batch_assignment.groups[{idx}] file encoding must be a string" ) else: raise ConfigValidationError( f"batch_assignment.groups[{idx}] file entry must be a path string or mapping" ) def validate_category_assignment_config(config_data: Dict[str, Any]) -> None: """ Validate category assignment configuration. This function validates the category_assignment configuration section which controls how users are assigned to annotation categories based on their training/prestudy performance. Args: config_data: The configuration data Raises: ConfigValidationError: If category assignment configuration is invalid """ if 'category_assignment' not in config_data: return # Category assignment is optional cat_config = config_data['category_assignment'] if not isinstance(cat_config, dict): raise ConfigValidationError("category_assignment must be a dictionary") # Validate enabled flag if 'enabled' in cat_config: if not isinstance(cat_config['enabled'], bool): raise ConfigValidationError("category_assignment.enabled must be a boolean") # If not enabled, skip further validation if not cat_config.get('enabled', True): return # Validate category_key (optional, can also be in item_properties) if 'category_key' in cat_config: if not isinstance(cat_config['category_key'], str) or not cat_config['category_key'].strip(): raise ConfigValidationError("category_assignment.category_key must be a non-empty string") # Validate qualification settings if 'qualification' in cat_config: qual = cat_config['qualification'] if not isinstance(qual, dict): raise ConfigValidationError("category_assignment.qualification must be a dictionary") # Validate source if 'source' in qual: valid_sources = ['training', 'prestudy', 'both'] if qual['source'] not in valid_sources: raise ConfigValidationError( f"category_assignment.qualification.source must be one of: {', '.join(valid_sources)}" ) # Validate threshold if 'threshold' in qual: threshold = qual['threshold'] if not isinstance(threshold, (int, float)) or threshold < 0.0 or threshold > 1.0: raise ConfigValidationError( "category_assignment.qualification.threshold must be a number between 0.0 and 1.0" ) # Validate min_questions if 'min_questions' in qual: min_q = qual['min_questions'] if not isinstance(min_q, int) or min_q < 1: raise ConfigValidationError( "category_assignment.qualification.min_questions must be a positive integer" ) # Validate combine_method (for combining prestudy and training scores) if 'combine_method' in qual: valid_methods = ['average', 'max', 'sum'] if qual['combine_method'] not in valid_methods: raise ConfigValidationError( f"category_assignment.qualification.combine_method must be one of: {', '.join(valid_methods)}" ) # Validate fallback behavior if 'fallback' in cat_config: valid_fallbacks = ['uncategorized', 'random', 'none'] if cat_config['fallback'] not in valid_fallbacks: raise ConfigValidationError( f"category_assignment.fallback must be one of: {', '.join(valid_fallbacks)}" ) # Validate dynamic expertise settings if 'dynamic' in cat_config: dynamic = cat_config['dynamic'] if not isinstance(dynamic, dict): raise ConfigValidationError("category_assignment.dynamic must be a dictionary") # Validate enabled flag if 'enabled' in dynamic: if not isinstance(dynamic['enabled'], bool): raise ConfigValidationError("category_assignment.dynamic.enabled must be a boolean") # If dynamic is not enabled, skip further validation if not dynamic.get('enabled', False): return # Validate agreement_method if 'agreement_method' in dynamic: valid_methods = ['majority_vote', 'super_majority', 'unanimous'] if dynamic['agreement_method'] not in valid_methods: raise ConfigValidationError( f"category_assignment.dynamic.agreement_method must be one of: {', '.join(valid_methods)}" ) # Validate min_annotations_for_consensus if 'min_annotations_for_consensus' in dynamic: min_ann = dynamic['min_annotations_for_consensus'] if not isinstance(min_ann, int) or min_ann < 2: raise ConfigValidationError( "category_assignment.dynamic.min_annotations_for_consensus must be an integer >= 2" ) # Validate learning_rate if 'learning_rate' in dynamic: lr = dynamic['learning_rate'] if not isinstance(lr, (int, float)) or lr <= 0.0 or lr > 1.0: raise ConfigValidationError( "category_assignment.dynamic.learning_rate must be a number between 0.0 (exclusive) and 1.0" ) # Validate update_interval_seconds if 'update_interval_seconds' in dynamic: interval = dynamic['update_interval_seconds'] if not isinstance(interval, (int, float)) or interval < 1: raise ConfigValidationError( "category_assignment.dynamic.update_interval_seconds must be a number >= 1" ) # Validate base_probability if 'base_probability' in dynamic: base_prob = dynamic['base_probability'] if not isinstance(base_prob, (int, float)) or base_prob < 0.0 or base_prob > 1.0: raise ConfigValidationError( "category_assignment.dynamic.base_probability must be a number between 0.0 and 1.0" ) def validate_diversity_config(config_data: Dict[str, Any]) -> None: """ Validate diversity ordering configuration. This function validates the diversity_ordering section which controls embedding-based clustering for diverse item ordering. Args: config_data: The configuration data Raises: ConfigValidationError: If diversity ordering configuration is invalid """ if 'diversity_ordering' not in config_data: return # Diversity ordering is optional dc = config_data['diversity_ordering'] if not isinstance(dc, dict): raise ConfigValidationError("diversity_ordering must be a dictionary") # Validate enabled flag if 'enabled' in dc: if not isinstance(dc['enabled'], bool): raise ConfigValidationError("diversity_ordering.enabled must be a boolean") # If not enabled, skip further validation if not dc.get('enabled', False): return # Validate model_name if 'model_name' in dc: if not isinstance(dc['model_name'], str) or not dc['model_name'].strip(): raise ConfigValidationError("diversity_ordering.model_name must be a non-empty string") # Validate num_clusters if 'num_clusters' in dc: num_clusters = dc['num_clusters'] if not isinstance(num_clusters, int) or num_clusters < 2: raise ConfigValidationError("diversity_ordering.num_clusters must be an integer >= 2") # Validate items_per_cluster if 'items_per_cluster' in dc: items_per_cluster = dc['items_per_cluster'] if not isinstance(items_per_cluster, int) or items_per_cluster < 1: raise ConfigValidationError("diversity_ordering.items_per_cluster must be a positive integer") # Validate auto_clusters if 'auto_clusters' in dc: if not isinstance(dc['auto_clusters'], bool): raise ConfigValidationError("diversity_ordering.auto_clusters must be a boolean") # Validate prefill_count if 'prefill_count' in dc: prefill_count = dc['prefill_count'] if not isinstance(prefill_count, int) or prefill_count < 0: raise ConfigValidationError("diversity_ordering.prefill_count must be a non-negative integer") # Validate batch_size if 'batch_size' in dc: batch_size = dc['batch_size'] if not isinstance(batch_size, int) or batch_size < 1: raise ConfigValidationError("diversity_ordering.batch_size must be a positive integer") # Validate recluster_threshold if 'recluster_threshold' in dc: recluster_threshold = dc['recluster_threshold'] if not isinstance(recluster_threshold, (int, float)) or recluster_threshold < 0 or recluster_threshold > 1: raise ConfigValidationError( "diversity_ordering.recluster_threshold must be a number between 0 and 1" ) # Validate preserve_visited if 'preserve_visited' in dc: if not isinstance(dc['preserve_visited'], bool): raise ConfigValidationError("diversity_ordering.preserve_visited must be a boolean") # Validate trigger_ai_prefetch if 'trigger_ai_prefetch' in dc: if not isinstance(dc['trigger_ai_prefetch'], bool): raise ConfigValidationError("diversity_ordering.trigger_ai_prefetch must be a boolean") # Validate cache_dir if 'cache_dir' in dc: cache_dir = dc['cache_dir'] if cache_dir is not None and (not isinstance(cache_dir, str) or not cache_dir.strip()): raise ConfigValidationError( "diversity_ordering.cache_dir must be a non-empty string or null" ) def validate_embedding_visualization_config(config_data: Dict[str, Any]) -> None: """ Validate embedding visualization configuration. This function validates the embedding_visualization section which controls the admin dashboard 2D visualization of embeddings. Args: config_data: The configuration data Raises: ConfigValidationError: If embedding visualization configuration is invalid """ if 'embedding_visualization' not in config_data: return # Embedding visualization is optional ev = config_data['embedding_visualization'] if not isinstance(ev, dict): raise ConfigValidationError("embedding_visualization must be a dictionary") # Validate enabled flag if 'enabled' in ev: if not isinstance(ev['enabled'], bool): raise ConfigValidationError("embedding_visualization.enabled must be a boolean") # If not enabled, skip further validation if not ev.get('enabled', True): return # Validate sample_size if 'sample_size' in ev: sample_size = ev['sample_size'] if not isinstance(sample_size, int) or sample_size < 1: raise ConfigValidationError( "embedding_visualization.sample_size must be a positive integer" ) # Validate include_all_annotated if 'include_all_annotated' in ev: if not isinstance(ev['include_all_annotated'], bool): raise ConfigValidationError( "embedding_visualization.include_all_annotated must be a boolean" ) # Validate embedding_model if 'embedding_model' in ev: if not isinstance(ev['embedding_model'], str) or not ev['embedding_model'].strip(): raise ConfigValidationError( "embedding_visualization.embedding_model must be a non-empty string" ) # Validate image_embedding_model if 'image_embedding_model' in ev: if not isinstance(ev['image_embedding_model'], str) or not ev['image_embedding_model'].strip(): raise ConfigValidationError( "embedding_visualization.image_embedding_model must be a non-empty string" ) # Validate UMAP configuration if 'umap' in ev: umap_config = ev['umap'] if not isinstance(umap_config, dict): raise ConfigValidationError("embedding_visualization.umap must be a dictionary") # Validate n_neighbors if 'n_neighbors' in umap_config: n_neighbors = umap_config['n_neighbors'] if not isinstance(n_neighbors, int) or n_neighbors < 2: raise ConfigValidationError( "embedding_visualization.umap.n_neighbors must be an integer >= 2" ) # Validate min_dist if 'min_dist' in umap_config: min_dist = umap_config['min_dist'] if not isinstance(min_dist, (int, float)) or min_dist < 0 or min_dist > 1: raise ConfigValidationError( "embedding_visualization.umap.min_dist must be a number between 0 and 1" ) # Validate metric if 'metric' in umap_config: valid_metrics = ['cosine', 'euclidean', 'manhattan', 'correlation'] if umap_config['metric'] not in valid_metrics: raise ConfigValidationError( f"embedding_visualization.umap.metric must be one of: {valid_metrics}" ) # Validate label_source if 'label_source' in ev: valid_sources = ['mace', 'majority'] if ev['label_source'] not in valid_sources: raise ConfigValidationError( f"embedding_visualization.label_source must be one of: {valid_sources}" ) def _merge_ai_config_file(config_data: Dict[str, Any], config_dir: str) -> Dict[str, Any]: """ Merge an external ai-config.yaml into the main config if specified. When ai_support.ai_config_file is set, loads that YAML file and merges its contents into the ai_support section. The external file provides endpoint-specific details (endpoint_type, model, api_key, base_url) while the inline ai_config provides defaults (temperature, max_tokens, include settings). Args: config_data: The parsed main configuration dictionary config_dir: Directory containing the main config file (for resolving relative paths) Returns: The config_data with external AI config merged in (modified in place and returned) """ ai_support = config_data.get("ai_support", {}) if not isinstance(ai_support, dict): return config_data ai_config_file = ai_support.get("ai_config_file") if not ai_config_file: # No external file specified - apply env var substitution to inline ai_config if "ai_config" in ai_support: from potato.data_sources.credentials import substitute_env_vars ai_support["ai_config"] = substitute_env_vars(ai_support["ai_config"]) config_data["ai_support"] = ai_support return config_data if not isinstance(ai_config_file, str): logger.warning("ai_support.ai_config_file must be a string. Ignoring.") return config_data # Resolve relative to config file directory ai_config_path = os.path.join(config_dir, ai_config_file) if not os.path.exists(ai_config_path): logger.warning( f"AI config file '{ai_config_file}' not found at {ai_config_path}. " f"AI support will be disabled. Create this file with your endpoint details." ) config_data["ai_support"]["enabled"] = False return config_data # Load external AI config try: with open(ai_config_path, 'r', encoding='utf-8') as f: external_config = yaml.safe_load(f) or {} except yaml.YAMLError as e: logger.warning(f"Invalid YAML in AI config file '{ai_config_file}': {e}. AI support will be disabled.") config_data["ai_support"]["enabled"] = False return config_data if not isinstance(external_config, dict): logger.warning(f"AI config file '{ai_config_file}' must contain a YAML dictionary. AI support will be disabled.") config_data["ai_support"]["enabled"] = False return config_data # Apply environment variable substitution to external config from potato.data_sources.credentials import substitute_env_vars external_config = substitute_env_vars(external_config) # Extract endpoint_type from external config (top-level key) if "endpoint_type" in external_config: ai_support["endpoint_type"] = external_config.pop("endpoint_type") # Merge remaining keys into ai_config (external takes precedence) ai_config = ai_support.get("ai_config", {}) if not isinstance(ai_config, dict): ai_config = {} ai_config.update(external_config) ai_support["ai_config"] = ai_config # Also apply env var substitution to the final merged ai_config ai_support["ai_config"] = substitute_env_vars(ai_support["ai_config"]) config_data["ai_support"] = ai_support logger.info(f"Loaded AI endpoint config from {ai_config_file}") return config_data def load_and_validate_config(config_file: str, project_dir: str) -> Dict[str, Any]: """ Load and validate a YAML configuration file with security checks. Args: config_file: Path to the configuration file project_dir: The project directory Returns: The validated configuration dictionary Raises: ConfigSecurityError: If the configuration file is not secure ConfigValidationError: If the configuration is invalid FileNotFoundError: If the configuration file doesn't exist """ # Validate the config file path itself try: validated_config_path = validate_path_security(config_file, project_dir) except ConfigSecurityError as e: raise ConfigSecurityError(f"Configuration file path: {str(e)}") if not os.path.exists(validated_config_path): raise FileNotFoundError(f"Configuration file not found: {config_file}") # Load and parse YAML try: with open(validated_config_path, 'r', encoding='utf-8') as file_p: config_data = yaml.safe_load(file_p) except yaml.YAMLError as e: raise ConfigValidationError(f"Invalid YAML format in {config_file}: {str(e)}") except UnicodeDecodeError as e: raise ConfigValidationError(f"Invalid file encoding in {config_file}: {str(e)}") except Exception as e: raise ConfigValidationError(f"Error reading configuration file {config_file}: {str(e)}") # Get the directory containing the config file for relative path resolution config_file_dir = os.path.dirname(validated_config_path) # Merge external AI config file if specified (before validation) config_data = _merge_ai_config_file(config_data, config_file_dir) # Apply default values for common configuration options if 'task_dir' not in config_data: config_data['task_dir'] = '.' logger.debug("task_dir not specified, defaulting to '.'") if 'site_dir' not in config_data: config_data['site_dir'] = 'default' logger.debug("site_dir not specified, defaulting to 'default'") # Resolve task_dir relative to config file directory if it's '.' or a relative path if 'task_dir' in config_data: task_dir = config_data['task_dir'] if task_dir == '.' or not os.path.isabs(task_dir): # Resolve relative to config file's directory task_dir = os.path.normpath(os.path.join(config_file_dir, task_dir)) config_data['task_dir'] = task_dir logger.debug(f"Resolved task_dir to: {task_dir}") # Validate the configuration structure validate_yaml_structure(config_data, project_dir, config_file_dir) # Validate file paths validate_file_paths(config_data, project_dir, config_file_dir) return config_data def init_config(args): global config project_dir = os.getcwd() #get the current working dir as the default project_dir config_file = None config_file_dir = None try: # if the .yaml config file is given, directly use it if args.config_file[-5:] == '.yaml': if os.path.exists(args.config_file): print("INFO: when you run the server directly from a .yaml file, please make sure your config file is put in the annotation project folder") config_file = args.config_file # For direct YAML file usage, we'll determine the project_dir from the config file content # after loading it, not from the file path structure else: raise FileNotFoundError(f"Configuration file not found: {args.config_file}") # if the user gives a directory, check if config.yaml or configs/config.yaml exists elif os.path.isdir(args.config_file): project_dir = args.config_file if os.path.isabs(args.config_file) else os.path.join(project_dir, args.config_file) config_folder = os.path.join(args.config_file, 'configs') if not os.path.isdir(config_folder): raise ConfigValidationError(".yaml file must be put in the configs/ folder under the main project directory when you try to start the project with the project directory, otherwise please directly give the path of the .yaml file") #get all the config files yamlfiles = [it for it in os.listdir(config_folder) if it[-5:] == '.yaml'] # if no yaml files found, quit the program if len(yamlfiles) == 0: raise ConfigValidationError(f"Configuration file not found under {config_folder}, please make sure .yaml file exists in the given directory, or please directly give the path of the .yaml file") # if only one yaml file found, directly use it elif len(yamlfiles) == 1: config_file = os.path.join(config_folder, yamlfiles[0]) config_file_dir = config_folder # if multiple yaml files found, ask the user to choose which one to use else: while True: print("multiple config files found, please select the one you want to use (number 0-%d)"%len(yamlfiles)) for i,it in enumerate(yamlfiles): print("[%d] %s"%(i, it)) input_id = input("number: ") try: config_file = os.path.join(config_folder, yamlfiles[int(input_id)]) config_file_dir = config_folder break except Exception: print("wrong input, please reselect") if not config_file: raise ConfigValidationError(f"Configuration file not found under {config_folder}, please make sure .yaml file exists in the given directory, or please directly give the path of the .yaml file") # Load and validate the configuration # For direct config file usage, use current working directory as base for config file path resolution if args.config_file[-5:] == '.yaml': # First, load the config without full validation to get the task_dir try: validated_config_path = validate_path_security(config_file, os.getcwd()) with open(validated_config_path, 'r', encoding='utf-8') as file_p: temp_config_data = yaml.safe_load(file_p) except Exception as e: raise ConfigValidationError(f"Error loading configuration file: {str(e)}") # Get the config file's directory for resolving relative paths config_file_abs = os.path.abspath(config_file) config_file_dir = os.path.dirname(config_file_abs) # Resolve task_dir relative to config file directory if it's '.' or a relative path if 'task_dir' in temp_config_data: task_dir = temp_config_data['task_dir'] if task_dir == '.' or not os.path.isabs(task_dir): # Resolve relative to config file's directory task_dir = os.path.normpath(os.path.join(config_file_dir, task_dir)) temp_config_data['task_dir'] = task_dir logger.debug(f"Resolved task_dir to: {task_dir}") # Validate that config file is in task_dir (skip in test mode) skip_path_validation = os.environ.get('POTATO_SKIP_CONFIG_PATH_VALIDATION', '').lower() in ('1', 'true') if 'task_dir' in temp_config_data and not skip_path_validation: task_dir = temp_config_data['task_dir'] task_dir_abs = os.path.abspath(task_dir) if not config_file_abs.startswith(task_dir_abs): raise ConfigValidationError(f"Configuration file must be in the task_dir. Config file is at '{config_file_abs}' but task_dir is '{task_dir_abs}'") project_dir = task_dir # Now load and validate with the correct project_dir config_data = load_and_validate_config(config_file, os.getcwd()) # Update config_data with resolved task_dir if 'task_dir' in temp_config_data: config_data['task_dir'] = temp_config_data['task_dir'] else: config_data = load_and_validate_config(config_file, project_dir) config.update(config_data) # Only override config settings if command line arguments are explicitly provided config_updates = { "verbose": args.verbose, "very_verbose": args.very_verbose, # Store an ABSOLUTE path: the server chdir's into task_dir at startup, # so a relative path would be re-resolved against the wrong CWD later # (e.g. admin export doubled the project path). CWD is still the # original launch dir here (chdir happens further below). "__config_file__": os.path.abspath(args.config_file), "customjs": args.customjs, "customjs_hostname": args.customjs_hostname, "persist_sessions": args.persist_sessions, } # Only override debug if explicitly set to True via command line # or if config file doesn't have a debug setting if args.debug or "debug" not in config: config_updates["debug"] = args.debug # Add debug logging mode if specified if hasattr(args, 'debug_log') and args.debug_log: config_updates["debug_log"] = args.debug_log # Add debug phase if specified (requires --debug flag) if hasattr(args, 'debug_phase') and args.debug_phase: if not args.debug: print("⚠️ Warning: --debug-phase requires --debug flag. Enabling debug mode.") config_updates["debug"] = True config_updates["debug_phase"] = args.debug_phase config.update(config_updates) # Apply server config values (CLI args take precedence) if "server" in config: server_config = config["server"] # Apply port from server config if not specified via CLI if "port" in server_config and args.port is None: config["port"] = server_config["port"] logger.debug(f"Port set from config file: {server_config['port']}") # Apply host from server config if "host" in server_config: # Host can only be set via config (no CLI arg currently) config["host"] = server_config["host"] logger.debug(f"Host set from config file: {server_config['host']}") # Apply debug from server config if not specified via CLI if "debug" in server_config and not args.debug: config["debug"] = server_config["debug"] logger.debug(f"Debug mode set from config file: {server_config['debug']}") # update the current working dir for the server os.chdir(project_dir) except (ConfigSecurityError, ConfigValidationError, FileNotFoundError) as e: logger.error(f"Configuration error: {str(e)}") print(f"❌ Configuration error: {str(e)}") print("Please check your configuration file and try again.") raise except Exception as e: logger.error(f"Unexpected error during configuration initialization: {str(e)}") print(f"❌ Unexpected error: {str(e)}") raise def validate_active_learning_config(config_data: Dict[str, Any]) -> None: """ Validate active learning configuration. Args: config_data: The configuration data containing active_learning section Raises: ConfigValidationError: If the active learning configuration is invalid """ if "active_learning" not in config_data: return # Active learning is optional al_config = config_data["active_learning"] # Validate enabled flag if not isinstance(al_config.get("enabled", False), bool): raise ConfigValidationError("active_learning.enabled must be a boolean") if not al_config.get("enabled", False): return # Skip validation if not enabled # Validate classifier configuration if "classifier" in al_config: classifier_config = al_config["classifier"] if not isinstance(classifier_config, dict): raise ConfigValidationError("active_learning.classifier must be a dictionary") if "name" not in classifier_config: raise ConfigValidationError("active_learning.classifier.name is required") if not isinstance(classifier_config["name"], str): raise ConfigValidationError("active_learning.classifier.name must be a string") # Validate hyperparameters if present if "hyperparameters" in classifier_config: if not isinstance(classifier_config["hyperparameters"], dict): raise ConfigValidationError("active_learning.classifier.hyperparameters must be a dictionary") # Validate vectorizer configuration if "vectorizer" in al_config: vectorizer_config = al_config["vectorizer"] if not isinstance(vectorizer_config, dict): raise ConfigValidationError("active_learning.vectorizer must be a dictionary") if "name" not in vectorizer_config: raise ConfigValidationError("active_learning.vectorizer.name is required") if not isinstance(vectorizer_config["name"], str): raise ConfigValidationError("active_learning.vectorizer.name must be a string") # Validate hyperparameters if present if "hyperparameters" in vectorizer_config: if not isinstance(vectorizer_config["hyperparameters"], dict): raise ConfigValidationError("active_learning.vectorizer.hyperparameters must be a dictionary") # Validate training parameters if "min_annotations_per_instance" in al_config: min_ann = al_config["min_annotations_per_instance"] if not isinstance(min_ann, int) or min_ann < 1: raise ConfigValidationError("active_learning.min_annotations_per_instance must be a positive integer") if "min_instances_for_training" in al_config: min_inst = al_config["min_instances_for_training"] if not isinstance(min_inst, int) or min_inst < 2: raise ConfigValidationError("active_learning.min_instances_for_training must be an integer >= 2") if "max_instances_to_reorder" in al_config: max_inst = al_config["max_instances_to_reorder"] if not isinstance(max_inst, int) or max_inst < 1: raise ConfigValidationError("active_learning.max_instances_to_reorder must be a positive integer") if "update_frequency" in al_config: update_freq = al_config["update_frequency"] if not isinstance(update_freq, int) or update_freq < 1: raise ConfigValidationError("active_learning.update_frequency must be a positive integer") # Validate resolution strategy if "resolution_strategy" in al_config: strategy = al_config["resolution_strategy"] valid_strategies = ["majority_vote", "random", "consensus", "weighted_average"] if strategy not in valid_strategies: raise ConfigValidationError(f"active_learning.resolution_strategy must be one of: {', '.join(valid_strategies)}") # Validate random sample percent if "random_sample_percent" in al_config: random_pct = al_config["random_sample_percent"] if not isinstance(random_pct, (int, float)) or random_pct < 0 or random_pct > 1: raise ConfigValidationError("active_learning.random_sample_percent must be between 0 and 1") # Validate schema names if "schema_names" in al_config: schema_names = al_config["schema_names"] if not isinstance(schema_names, list): raise ConfigValidationError("active_learning.schema_names must be a list") for schema in schema_names: if not isinstance(schema, str): raise ConfigValidationError("active_learning.schema_names must contain only strings") # Check for unsupported schema types if schema in ["text", "span"]: raise ConfigValidationError(f"Text and span annotation schemes are not supported for active learning: {schema}") # Validate database configuration if "database" in al_config: db_config = al_config["database"] if not isinstance(db_config, dict): raise ConfigValidationError("active_learning.database must be a dictionary") if "enabled" in db_config and not isinstance(db_config["enabled"], bool): raise ConfigValidationError("active_learning.database.enabled must be a boolean") # Validate model persistence configuration if "model_persistence" in al_config: model_config = al_config["model_persistence"] if not isinstance(model_config, dict): raise ConfigValidationError("active_learning.model_persistence must be a dictionary") if "enabled" in model_config and not isinstance(model_config["enabled"], bool): raise ConfigValidationError("active_learning.model_persistence.enabled must be a boolean") if "retention_count" in model_config: retention = model_config["retention_count"] if not isinstance(retention, int) or retention < 1: raise ConfigValidationError("active_learning.model_persistence.retention_count must be a positive integer") # Validate LLM configuration if "llm" in al_config: llm_config = al_config["llm"] if not isinstance(llm_config, dict): raise ConfigValidationError("active_learning.llm must be a dictionary") if "enabled" in llm_config and not isinstance(llm_config["enabled"], bool): raise ConfigValidationError("active_learning.llm.enabled must be a boolean") if "endpoint_url" in llm_config and not isinstance(llm_config["endpoint_url"], str): raise ConfigValidationError("active_learning.llm.endpoint_url must be a string") if "model_name" in llm_config and not isinstance(llm_config["model_name"], str): raise ConfigValidationError("active_learning.llm.model_name must be a string") # Validate query strategy if "query_strategy" in al_config: strategy = al_config["query_strategy"] valid_strategies = ["uncertainty", "diversity", "badge", "bald", "hybrid"] if strategy not in valid_strategies: raise ConfigValidationError( f"active_learning.query_strategy must be one of: {', '.join(valid_strategies)}" ) # Validate hybrid weights if "hybrid_weights" in al_config: weights = al_config["hybrid_weights"] if not isinstance(weights, dict): raise ConfigValidationError("active_learning.hybrid_weights must be a dictionary") weight_sum = sum(weights.values()) if abs(weight_sum - 1.0) > 0.01: raise ConfigValidationError( f"active_learning.hybrid_weights must sum to 1.0 (got {weight_sum})" ) # Validate cold-start strategy if "cold_start_strategy" in al_config: cs = al_config["cold_start_strategy"] if cs not in ["random", "llm"]: raise ConfigValidationError( "active_learning.cold_start_strategy must be one of: random, llm" ) # Validate confidence method (for LLM active learning) if "confidence_method" in al_config: cm = al_config["confidence_method"] if cm not in ["logprobs", "verbalized", "consistency"]: raise ConfigValidationError( "active_learning.confidence_method must be one of: logprobs, verbalized, consistency" ) # Validate classifier_params and vectorizer_params if "classifier_params" in al_config: if not isinstance(al_config["classifier_params"], dict): raise ConfigValidationError("active_learning.classifier_params must be a dictionary") if "vectorizer_params" in al_config: if not isinstance(al_config["vectorizer_params"], dict): raise ConfigValidationError("active_learning.vectorizer_params must be a dictionary") # Validate calibrate_probabilities if "calibrate_probabilities" in al_config: if not isinstance(al_config["calibrate_probabilities"], bool): raise ConfigValidationError("active_learning.calibrate_probabilities must be a boolean") # Validate BALD params if "bald_params" in al_config: bp = al_config["bald_params"] if not isinstance(bp, dict): raise ConfigValidationError("active_learning.bald_params must be a dictionary") if "n_estimators" in bp: if not isinstance(bp["n_estimators"], int) or bp["n_estimators"] < 2: raise ConfigValidationError("active_learning.bald_params.n_estimators must be an integer >= 2") # Validate ICL ensemble params if "use_icl_ensemble" in al_config: if not isinstance(al_config["use_icl_ensemble"], bool): raise ConfigValidationError("active_learning.use_icl_ensemble must be a boolean") if "icl_ensemble_params" in al_config: if not isinstance(al_config["icl_ensemble_params"], dict): raise ConfigValidationError("active_learning.icl_ensemble_params must be a dictionary") # Validate annotation routing if "annotation_routing" in al_config: if not isinstance(al_config["annotation_routing"], bool): raise ConfigValidationError("active_learning.annotation_routing must be a boolean") if "routing_thresholds" in al_config: rt = al_config["routing_thresholds"] if not isinstance(rt, dict): raise ConfigValidationError("active_learning.routing_thresholds must be a dictionary") for key in ["auto_label_min_confidence", "show_suggestion_below"]: if key in rt: val = rt[key] if not isinstance(val, (int, float)) or val < 0 or val > 1: raise ConfigValidationError( f"active_learning.routing_thresholds.{key} must be between 0 and 1" ) # Warn about sentence-transformers dependency if al_config.get("vectorizer_name") == "sentence-transformers" or \ (isinstance(al_config.get("vectorizer"), dict) and al_config["vectorizer"].get("name") == "sentence-transformers"): try: import sentence_transformers # noqa: F401 except ImportError: logger.warning( "sentence-transformers vectorizer configured but package not installed. " "Install with: pip install sentence-transformers" ) def validate_ai_support_config(config_data: Dict[str, Any]) -> None: """ Validate AI support configuration. Args: config_data: The configuration data containing ai_support section Raises: ConfigValidationError: If the AI support configuration is invalid """ if "ai_support" not in config_data: return # AI support is optional ai_config = config_data["ai_support"] # Validate enabled flag if not isinstance(ai_config.get("enabled", False), bool): raise ConfigValidationError("ai_support.enabled must be a boolean") if not ai_config.get("enabled", False): return # Skip validation if not enabled # Validate ai_config_file (optional, string path to external AI config) has_external_config = False if "ai_config_file" in ai_config: if not isinstance(ai_config["ai_config_file"], str): raise ConfigValidationError("ai_support.ai_config_file must be a string") has_external_config = True # Validate endpoint type. When ai_config_file is set, the endpoint_type is # expected to live in the external file (which may be gitignored, e.g. when # it holds API keys) and is loaded at server start. if "endpoint_type" not in ai_config: if has_external_config: return # External file provides endpoint_type + model + credentials raise ConfigValidationError("ai_support.endpoint_type is required when ai_support is enabled") endpoint_type = ai_config["endpoint_type"] if not isinstance(endpoint_type, str): raise ConfigValidationError("ai_support.endpoint_type must be a string") valid_endpoint_types = ["openai", "anthropic", "huggingface", "ollama", "gemini", "vllm", "yolo", "ollama_vision", "openai_vision", "anthropic_vision"] if endpoint_type not in valid_endpoint_types: raise ConfigValidationError(f"ai_support.endpoint_type must be one of: {', '.join(valid_endpoint_types)}") # Validate ai_config section if "ai_config" in ai_config: ai_endpoint_config = ai_config["ai_config"] if not isinstance(ai_endpoint_config, dict): raise ConfigValidationError("ai_support.ai_config must be a dictionary") # Validate model name if "model" in ai_endpoint_config: model = ai_endpoint_config["model"] if not isinstance(model, str) or not model.strip(): raise ConfigValidationError("ai_support.ai_config.model must be a non-empty string") # Validate API key for cloud-based endpoints if endpoint_type in ["openai", "anthropic", "huggingface", "gemini"]: api_key = ai_endpoint_config.get("api_key", "") if not api_key or not isinstance(api_key, str): raise ConfigValidationError(f"ai_support.ai_config.api_key is required for {endpoint_type} endpoint") # Validate base_url for VLLM if endpoint_type == "vllm": base_url = ai_endpoint_config.get("base_url", "") if base_url and not isinstance(base_url, str): raise ConfigValidationError("ai_support.ai_config.base_url must be a string") # Validate temperature if "temperature" in ai_endpoint_config: temperature = ai_endpoint_config["temperature"] if not isinstance(temperature, (int, float)) or temperature < 0 or temperature > 2: raise ConfigValidationError("ai_support.ai_config.temperature must be between 0 and 2") # Validate max_tokens if "max_tokens" in ai_endpoint_config: max_tokens = ai_endpoint_config["max_tokens"] if not isinstance(max_tokens, int) or max_tokens < 1: raise ConfigValidationError("ai_support.ai_config.max_tokens must be a positive integer") # Validate custom prompts for prompt_key in ["hint_prompt", "keyword_prompt"]: if prompt_key in ai_endpoint_config: prompt = ai_endpoint_config[prompt_key] if not isinstance(prompt, str): raise ConfigValidationError(f"ai_support.ai_config.{prompt_key} must be a string") if not prompt.strip(): raise ConfigValidationError(f"ai_support.ai_config.{prompt_key} cannot be empty") # Validate option_highlighting configuration if "option_highlighting" in ai_config: _validate_option_highlighting_config(ai_config["option_highlighting"]) def validate_chat_support_config(config_data: Dict[str, Any]) -> None: """ Validate chat support configuration for LLM annotator assistance. Args: config_data: The configuration data containing chat_support section Raises: ConfigValidationError: If the chat support configuration is invalid """ if "chat_support" not in config_data: return # Chat support is optional chat_config = config_data["chat_support"] if not isinstance(chat_config.get("enabled", False), bool): raise ConfigValidationError("chat_support.enabled must be a boolean") if not chat_config.get("enabled", False): return # Skip validation if not enabled # Validate endpoint type if "endpoint_type" not in chat_config: raise ConfigValidationError( "chat_support.endpoint_type is required when chat_support is enabled" ) endpoint_type = chat_config["endpoint_type"] valid_endpoint_types = [ "openai", "anthropic", "huggingface", "ollama", "gemini", "vllm", "openrouter", ] if endpoint_type not in valid_endpoint_types: raise ConfigValidationError( f"chat_support.endpoint_type must be one of: {', '.join(valid_endpoint_types)}" ) # Validate ai_config section if "ai_config" in chat_config: ai_cfg = chat_config["ai_config"] if not isinstance(ai_cfg, dict): raise ConfigValidationError("chat_support.ai_config must be a dictionary") if "model" in ai_cfg: if not isinstance(ai_cfg["model"], str) or not ai_cfg["model"].strip(): raise ConfigValidationError( "chat_support.ai_config.model must be a non-empty string" ) if "temperature" in ai_cfg: temp = ai_cfg["temperature"] if not isinstance(temp, (int, float)) or temp < 0 or temp > 2: raise ConfigValidationError( "chat_support.ai_config.temperature must be between 0 and 2" ) if "max_tokens" in ai_cfg: mt = ai_cfg["max_tokens"] if not isinstance(mt, int) or mt < 1: raise ConfigValidationError( "chat_support.ai_config.max_tokens must be a positive integer" ) # Validate API key for cloud endpoints if endpoint_type in ["openai", "anthropic", "huggingface", "gemini", "openrouter"]: api_key = ai_cfg.get("api_key", "") if not api_key or not isinstance(api_key, str): raise ConfigValidationError( f"chat_support.ai_config.api_key is required for {endpoint_type} endpoint" ) # Validate UI section if "ui" in chat_config: ui_cfg = chat_config["ui"] if not isinstance(ui_cfg, dict): raise ConfigValidationError("chat_support.ui must be a dictionary") if "sidebar_width" in ui_cfg: sw = ui_cfg["sidebar_width"] if not isinstance(sw, int) or sw < 200 or sw > 800: raise ConfigValidationError( "chat_support.ui.sidebar_width must be an integer between 200 and 800" ) if "max_history_per_instance" in ui_cfg: mh = ui_cfg["max_history_per_instance"] if not isinstance(mh, int) or mh < 1: raise ConfigValidationError( "chat_support.ui.max_history_per_instance must be a positive integer" ) def _validate_option_highlighting_config(oh_config: Dict[str, Any]) -> None: """ Validate option highlighting configuration. Args: oh_config: The option_highlighting configuration section Raises: ConfigValidationError: If the configuration is invalid """ if not isinstance(oh_config, dict): raise ConfigValidationError("ai_support.option_highlighting must be a dictionary") # Validate enabled flag if "enabled" in oh_config: if not isinstance(oh_config["enabled"], bool): raise ConfigValidationError("ai_support.option_highlighting.enabled must be a boolean") # Validate top_k (number of options to highlight) if "top_k" in oh_config: top_k = oh_config["top_k"] if not isinstance(top_k, int) or top_k < 1 or top_k > 10: raise ConfigValidationError("ai_support.option_highlighting.top_k must be an integer between 1 and 10") # Validate dim_opacity (opacity for non-highlighted options) if "dim_opacity" in oh_config: dim_opacity = oh_config["dim_opacity"] if not isinstance(dim_opacity, (int, float)) or dim_opacity < 0.1 or dim_opacity > 0.9: raise ConfigValidationError("ai_support.option_highlighting.dim_opacity must be a number between 0.1 and 0.9") # Validate auto_apply flag if "auto_apply" in oh_config: if not isinstance(oh_config["auto_apply"], bool): raise ConfigValidationError("ai_support.option_highlighting.auto_apply must be a boolean") # Validate schemas filter (list of schema names or null) if "schemas" in oh_config: schemas = oh_config["schemas"] if schemas is not None: if not isinstance(schemas, list): raise ConfigValidationError("ai_support.option_highlighting.schemas must be a list or null") for schema in schemas: if not isinstance(schema, str): raise ConfigValidationError("ai_support.option_highlighting.schemas must contain only strings") # Validate prefetch_count if "prefetch_count" in oh_config: prefetch_count = oh_config["prefetch_count"] if not isinstance(prefetch_count, int) or prefetch_count < 0 or prefetch_count > 100: raise ConfigValidationError("ai_support.option_highlighting.prefetch_count must be an integer between 0 and 100") def parse_active_learning_config(config_data: Dict[str, Any]) -> 'ActiveLearningConfig': """ Parse active learning configuration from YAML data. Args: config_data: The configuration data containing active_learning section Returns: ActiveLearningConfig: Parsed active learning configuration Raises: ConfigValidationError: If the configuration is invalid """ from potato.active_learning_manager import ActiveLearningConfig, ResolutionStrategy if "active_learning" not in config_data: return ActiveLearningConfig() # Return default config al_config = config_data["active_learning"] # Parse classifier configuration classifier_name = "sklearn.linear_model.LogisticRegression" classifier_kwargs = {} if "classifier" in al_config: classifier_config = al_config["classifier"] classifier_name = classifier_config.get("name", classifier_name) classifier_kwargs = classifier_config.get("hyperparameters", {}) # Parse vectorizer configuration vectorizer_name = "sklearn.feature_extraction.text.CountVectorizer" vectorizer_kwargs = {} if "vectorizer" in al_config: vectorizer_config = al_config["vectorizer"] vectorizer_name = vectorizer_config.get("name", vectorizer_name) vectorizer_kwargs = vectorizer_config.get("hyperparameters", {}) # Parse resolution strategy resolution_strategy = ResolutionStrategy.MAJORITY_VOTE if "resolution_strategy" in al_config: strategy_str = al_config["resolution_strategy"] if strategy_str == "majority_vote": resolution_strategy = ResolutionStrategy.MAJORITY_VOTE elif strategy_str == "random": resolution_strategy = ResolutionStrategy.RANDOM elif strategy_str == "consensus": resolution_strategy = ResolutionStrategy.CONSENSUS elif strategy_str == "weighted_average": resolution_strategy = ResolutionStrategy.WEIGHTED_AVERAGE # Parse other parameters min_annotations_per_instance = al_config.get("min_annotations_per_instance", 1) min_instances_for_training = al_config.get("min_instances_for_training", 10) max_instances_to_reorder = al_config.get("max_instances_to_reorder") random_sample_percent = al_config.get("random_sample_percent", 0.2) update_frequency = al_config.get("update_frequency", 5) schema_names = al_config.get("schema_names", []) # Parse database configuration database_enabled = False database_config = {} if "database" in al_config: db_config = al_config["database"] database_enabled = db_config.get("enabled", False) database_config = {k: v for k, v in db_config.items() if k != "enabled"} # Parse model persistence configuration model_persistence_enabled = False model_save_directory = None model_retention_count = 2 if "model_persistence" in al_config: model_config = al_config["model_persistence"] model_persistence_enabled = model_config.get("enabled", False) model_save_directory = model_config.get("save_directory") model_retention_count = model_config.get("retention_count", 2) # Parse LLM configuration llm_enabled = False llm_config = {} if "llm" in al_config: llm_config = al_config["llm"] llm_enabled = llm_config.get("enabled", False) return ActiveLearningConfig( enabled=al_config.get("enabled", False), classifier_name=classifier_name, classifier_kwargs=classifier_kwargs, vectorizer_name=vectorizer_name, vectorizer_kwargs=vectorizer_kwargs, min_annotations_per_instance=min_annotations_per_instance, min_instances_for_training=min_instances_for_training, max_instances_to_reorder=max_instances_to_reorder, resolution_strategy=resolution_strategy, random_sample_percent=random_sample_percent, update_frequency=update_frequency, schema_names=schema_names, database_enabled=database_enabled, database_config=database_config, model_persistence_enabled=model_persistence_enabled, model_save_directory=model_save_directory, model_retention_count=model_retention_count, llm_enabled=llm_enabled, llm_config=llm_config ) def validate_instance_display_config(config_data: Dict[str, Any]) -> None: """ Validate instance_display configuration. The instance_display section defines what content to show annotators, separate from what annotations to collect. This allows displaying images/videos/audio alongside any annotation type. Args: config_data: The configuration data Raises: ConfigValidationError: If the instance_display configuration is invalid """ if "instance_display" not in config_data: return # instance_display is optional (backwards compatible) display_config = config_data["instance_display"] if not isinstance(display_config, dict): raise ConfigValidationError("instance_display must be a dictionary") # Validate fields if "fields" not in display_config: raise ConfigValidationError("instance_display must contain 'fields' list") fields = display_config["fields"] if not isinstance(fields, list): raise ConfigValidationError("instance_display.fields must be a list") if not fields: raise ConfigValidationError("instance_display.fields cannot be empty") # Track span targets for validation span_targets = [] # Valid display types — sourced from the display registry (single source # of truth) so new display types don't require editing this list. Falls # back to a static list if the registry can't be imported. try: from .displays import display_registry valid_display_types = display_registry.get_supported_types() except Exception: valid_display_types = [ "text", "html", "image", "video", "audio", "dialogue", "pairwise", "pdf", "document", "spreadsheet", "code", "agent_trace", "eval_trace", "gallery", "conversation_tree", "interactive_chat", "web_agent_trace", "live_agent", "coding_trace", "live_coding_agent", ] for i, field in enumerate(fields): if not isinstance(field, dict): raise ConfigValidationError(f"instance_display.fields[{i}] must be a dictionary") # Validate required field properties if "key" not in field: raise ConfigValidationError(f"instance_display.fields[{i}] missing required 'key' property") key = field["key"] if not isinstance(key, str) or not key.strip(): raise ConfigValidationError(f"instance_display.fields[{i}].key must be a non-empty string") if "type" not in field: raise ConfigValidationError(f"instance_display.fields[{i}] missing required 'type' property") field_type = field["type"] if field_type not in valid_display_types: raise ConfigValidationError( f"instance_display.fields[{i}].type '{field_type}' is invalid. " f"Valid types are: {', '.join(valid_display_types)}" ) # Validate label if present if "label" in field: if not isinstance(field["label"], str): raise ConfigValidationError(f"instance_display.fields[{i}].label must be a string") # Validate span_target if field.get("span_target"): # Types that support span annotation targets span_target_types = ["text", "dialogue", "pdf", "document", "spreadsheet", "code", "agent_trace", "interactive_chat"] if field_type not in span_target_types: raise ConfigValidationError( f"instance_display.fields[{i}].span_target is set but type '{field_type}' " f"does not support span annotation. Types that support span_target: {', '.join(span_target_types)}." ) span_targets.append(key) # Validate display_options if present if "display_options" in field: options = field["display_options"] if not isinstance(options, dict): raise ConfigValidationError(f"instance_display.fields[{i}].display_options must be a dictionary") # Type-specific option validation _validate_display_options(field_type, options, f"instance_display.fields[{i}]") # Validate layout if present if "layout" in display_config: layout = display_config["layout"] if not isinstance(layout, dict): raise ConfigValidationError("instance_display.layout must be a dictionary") if "direction" in layout: valid_directions = ["vertical", "horizontal"] if layout["direction"] not in valid_directions: raise ConfigValidationError( f"instance_display.layout.direction must be one of: {', '.join(valid_directions)}" ) if "gap" in layout: gap = layout["gap"] if not isinstance(gap, str): raise ConfigValidationError("instance_display.layout.gap must be a string (e.g., '20px', '1rem')") # Validate resizable option (defaults to True) if "resizable" in display_config: if not isinstance(display_config["resizable"], bool): raise ConfigValidationError("instance_display.resizable must be a boolean (true/false)") # Check for deprecation warning: using annotation schemas for display-only _check_display_only_deprecation(config_data) def _validate_display_options(field_type: str, options: Dict[str, Any], path: str) -> None: """ Validate display options for a specific field type. Args: field_type: The display type options: The display options dictionary path: The config path for error messages Raises: ConfigValidationError: If options are invalid """ # Common option validation if "max_width" in options: max_width = options["max_width"] if not isinstance(max_width, (int, str)): raise ConfigValidationError(f"{path}.display_options.max_width must be an integer or string") if isinstance(max_width, int) and max_width < 1: raise ConfigValidationError(f"{path}.display_options.max_width must be positive") if "max_height" in options: max_height = options["max_height"] if not isinstance(max_height, (int, str)): raise ConfigValidationError(f"{path}.display_options.max_height must be an integer or string") if isinstance(max_height, int) and max_height < 1: raise ConfigValidationError(f"{path}.display_options.max_height must be positive") if "min_height" in options: min_height = options["min_height"] if not isinstance(min_height, (int, str)): raise ConfigValidationError(f"{path}.display_options.min_height must be an integer or string") if isinstance(min_height, int) and min_height < 1: raise ConfigValidationError(f"{path}.display_options.min_height must be positive") if "resizable" in options: if not isinstance(options["resizable"], bool): raise ConfigValidationError(f"{path}.display_options.resizable must be a boolean") # Text-specific options if field_type in ["text", "html"]: if "collapsible" in options: if not isinstance(options["collapsible"], bool): raise ConfigValidationError(f"{path}.display_options.collapsible must be a boolean") if "preserve_whitespace" in options: if not isinstance(options["preserve_whitespace"], bool): raise ConfigValidationError(f"{path}.display_options.preserve_whitespace must be a boolean") # Image-specific options if field_type == "image": if "zoomable" in options: if not isinstance(options["zoomable"], bool): raise ConfigValidationError(f"{path}.display_options.zoomable must be a boolean") if "object_fit" in options: valid_fits = ["contain", "cover", "fill", "none", "scale-down"] if options["object_fit"] not in valid_fits: raise ConfigValidationError( f"{path}.display_options.object_fit must be one of: {', '.join(valid_fits)}" ) # Video-specific options if field_type == "video": for bool_opt in ["controls", "autoplay", "loop", "muted"]: if bool_opt in options: if not isinstance(options[bool_opt], bool): raise ConfigValidationError(f"{path}.display_options.{bool_opt} must be a boolean") # Audio-specific options if field_type == "audio": if "controls" in options: if not isinstance(options["controls"], bool): raise ConfigValidationError(f"{path}.display_options.controls must be a boolean") if "show_waveform" in options: if not isinstance(options["show_waveform"], bool): raise ConfigValidationError(f"{path}.display_options.show_waveform must be a boolean") # Dialogue-specific options if field_type == "dialogue": if "alternating_shading" in options: if not isinstance(options["alternating_shading"], bool): raise ConfigValidationError(f"{path}.display_options.alternating_shading must be a boolean") if "speaker_extraction" in options: if not isinstance(options["speaker_extraction"], bool): raise ConfigValidationError(f"{path}.display_options.speaker_extraction must be a boolean") # Pairwise-specific options if field_type == "pairwise": if "cell_width" in options: cell_width = options["cell_width"] if not isinstance(cell_width, str): raise ConfigValidationError(f"{path}.display_options.cell_width must be a string (e.g., '50%')") # PDF-specific options if field_type == "pdf": if "view_mode" in options: valid_modes = ["scroll", "paginated", "side-by-side"] if options["view_mode"] not in valid_modes: raise ConfigValidationError( f"{path}.display_options.view_mode must be one of: {', '.join(valid_modes)}" ) if "text_layer" in options: if not isinstance(options["text_layer"], bool): raise ConfigValidationError(f"{path}.display_options.text_layer must be a boolean") if "zoom" in options: zoom = options["zoom"] valid_zoom_modes = ["auto", "page-fit", "page-width"] if zoom not in valid_zoom_modes: try: float(zoom) except (TypeError, ValueError): raise ConfigValidationError( f"{path}.display_options.zoom must be one of {valid_zoom_modes} or a number" ) # Document-specific options if field_type == "document": if "collapsible" in options: if not isinstance(options["collapsible"], bool): raise ConfigValidationError(f"{path}.display_options.collapsible must be a boolean") if "show_outline" in options: if not isinstance(options["show_outline"], bool): raise ConfigValidationError(f"{path}.display_options.show_outline must be a boolean") if "style_theme" in options: valid_themes = ["default", "minimal", "print"] if options["style_theme"] not in valid_themes: raise ConfigValidationError( f"{path}.display_options.style_theme must be one of: {', '.join(valid_themes)}" ) # Spreadsheet-specific options if field_type == "spreadsheet": if "annotation_mode" in options: valid_modes = ["row", "cell", "range"] if options["annotation_mode"] not in valid_modes: raise ConfigValidationError( f"{path}.display_options.annotation_mode must be one of: {', '.join(valid_modes)}" ) for bool_opt in ["show_headers", "striped", "hoverable", "sortable", "selectable", "compact"]: if bool_opt in options: if not isinstance(options[bool_opt], bool): raise ConfigValidationError(f"{path}.display_options.{bool_opt} must be a boolean") # Code-specific options if field_type == "code": if "language" in options: if not isinstance(options["language"], (str, type(None))): raise ConfigValidationError(f"{path}.display_options.language must be a string or null") if "show_line_numbers" in options: if not isinstance(options["show_line_numbers"], bool): raise ConfigValidationError(f"{path}.display_options.show_line_numbers must be a boolean") if "wrap_lines" in options: if not isinstance(options["wrap_lines"], bool): raise ConfigValidationError(f"{path}.display_options.wrap_lines must be a boolean") if "highlight_lines" in options: hl = options["highlight_lines"] if hl is not None and not isinstance(hl, list): raise ConfigValidationError(f"{path}.display_options.highlight_lines must be a list of line numbers or null") if "theme" in options: valid_themes = ["default", "dark"] if options["theme"] not in valid_themes: raise ConfigValidationError( f"{path}.display_options.theme must be one of: {', '.join(valid_themes)}" ) def validate_format_handling_config(config_data: Dict[str, Any]) -> None: """ Validate format_handling configuration for extended format support. Args: config_data: The full configuration data Raises: ConfigValidationError: If the format_handling configuration is invalid """ format_config = config_data.get('format_handling') if format_config is None: return if not isinstance(format_config, dict): raise ConfigValidationError("format_handling must be a dictionary") # Validate enabled flag if "enabled" in format_config: if not isinstance(format_config["enabled"], bool): raise ConfigValidationError("format_handling.enabled must be a boolean") # Validate default_format if "default_format" in format_config: default = format_config["default_format"] valid_defaults = ["auto", "pdf", "docx", "markdown", "spreadsheet", "code"] if default not in valid_defaults: raise ConfigValidationError( f"format_handling.default_format must be one of: {', '.join(valid_defaults)}" ) # Validate PDF-specific options if "pdf" in format_config: pdf_opts = format_config["pdf"] if not isinstance(pdf_opts, dict): raise ConfigValidationError("format_handling.pdf must be a dictionary") if "extraction_mode" in pdf_opts: valid_modes = ["text", "ocr", "hybrid"] if pdf_opts["extraction_mode"] not in valid_modes: raise ConfigValidationError( f"format_handling.pdf.extraction_mode must be one of: {', '.join(valid_modes)}" ) if "cache_extracted" in pdf_opts: if not isinstance(pdf_opts["cache_extracted"], bool): raise ConfigValidationError("format_handling.pdf.cache_extracted must be a boolean") # Validate spreadsheet-specific options if "spreadsheet" in format_config: ss_opts = format_config["spreadsheet"] if not isinstance(ss_opts, dict): raise ConfigValidationError("format_handling.spreadsheet must be a dictionary") if "annotation_mode" in ss_opts: valid_modes = ["row", "cell", "range"] if ss_opts["annotation_mode"] not in valid_modes: raise ConfigValidationError( f"format_handling.spreadsheet.annotation_mode must be one of: {', '.join(valid_modes)}" ) if "max_rows" in ss_opts: max_rows = ss_opts["max_rows"] if not isinstance(max_rows, int) or max_rows < 1: raise ConfigValidationError("format_handling.spreadsheet.max_rows must be a positive integer") def validate_layout_config(config_data: Dict[str, Any]) -> None: """ Validate layout configuration for annotation form grid arrangement. The layout section configures how annotation forms are arranged in a grid, supports grouping schemas with collapsible headers, and provides responsive breakpoints for mobile/tablet displays. Args: config_data: The full configuration data Raises: ConfigValidationError: If the layout configuration is invalid """ layout = config_data.get('layout') if layout is None: return # layout is optional if not isinstance(layout, dict): raise ConfigValidationError("layout must be a dictionary") # Validate grid configuration if 'grid' in layout: grid = layout['grid'] if not isinstance(grid, dict): raise ConfigValidationError("layout.grid must be a dictionary") # Validate columns (1-6) if 'columns' in grid: columns = grid['columns'] if not isinstance(columns, int) or columns < 1 or columns > 6: raise ConfigValidationError("layout.grid.columns must be an integer between 1 and 6") # Validate gap (CSS value) if 'gap' in grid: gap = grid['gap'] if not isinstance(gap, str) or not gap.strip(): raise ConfigValidationError("layout.grid.gap must be a non-empty CSS value string (e.g., '1rem', '16px')") # Validate row_gap (CSS value) if 'row_gap' in grid: row_gap = grid['row_gap'] if not isinstance(row_gap, str) or not row_gap.strip(): raise ConfigValidationError("layout.grid.row_gap must be a non-empty CSS value string") # Validate align_items if 'align_items' in grid: valid_alignments = ['start', 'center', 'end', 'stretch'] if grid['align_items'] not in valid_alignments: raise ConfigValidationError( f"layout.grid.align_items must be one of: {', '.join(valid_alignments)}" ) # Validate breakpoints if 'breakpoints' in layout: breakpoints = layout['breakpoints'] if not isinstance(breakpoints, dict): raise ConfigValidationError("layout.breakpoints must be a dictionary") for bp_name in ['mobile', 'tablet']: if bp_name in breakpoints: bp_value = breakpoints[bp_name] if not isinstance(bp_value, int) or bp_value < 0: raise ConfigValidationError( f"layout.breakpoints.{bp_name} must be a non-negative integer (pixel value)" ) # Validate groups if 'groups' in layout: groups = layout['groups'] if not isinstance(groups, list): raise ConfigValidationError("layout.groups must be a list") # Collect all schema names for validation all_schemas = set() schemes = config_data.get('annotation_schemes', []) for scheme in schemes: if isinstance(scheme, dict) and 'name' in scheme: all_schemas.add(scheme['name']) group_ids = set() for i, group in enumerate(groups): if not isinstance(group, dict): raise ConfigValidationError(f"layout.groups[{i}] must be a dictionary") # Validate required group fields if 'id' not in group: raise ConfigValidationError(f"layout.groups[{i}] missing required 'id' field") group_id = group['id'] if not isinstance(group_id, str) or not group_id.strip(): raise ConfigValidationError(f"layout.groups[{i}].id must be a non-empty string") if group_id in group_ids: raise ConfigValidationError(f"layout.groups[{i}].id '{group_id}' is duplicate") group_ids.add(group_id) # Validate schemas list if 'schemas' not in group: raise ConfigValidationError(f"layout.groups[{i}] missing required 'schemas' field") group_schemas = group['schemas'] if not isinstance(group_schemas, list): raise ConfigValidationError(f"layout.groups[{i}].schemas must be a list") if not group_schemas: raise ConfigValidationError(f"layout.groups[{i}].schemas cannot be empty") # Validate each schema reference exists for j, schema_name in enumerate(group_schemas): if not isinstance(schema_name, str): raise ConfigValidationError( f"layout.groups[{i}].schemas[{j}] must be a string" ) if schema_name not in all_schemas: raise ConfigValidationError( f"layout.groups[{i}].schemas references unknown schema: '{schema_name}'" ) # Validate optional boolean fields if 'collapsible' in group: if not isinstance(group['collapsible'], bool): raise ConfigValidationError(f"layout.groups[{i}].collapsible must be a boolean") if 'collapsed_default' in group: if not isinstance(group['collapsed_default'], bool): raise ConfigValidationError(f"layout.groups[{i}].collapsed_default must be a boolean") # Validate optional title if 'title' in group: if not isinstance(group['title'], str): raise ConfigValidationError(f"layout.groups[{i}].title must be a string") # Validate optional description if 'description' in group: if not isinstance(group['description'], str): raise ConfigValidationError(f"layout.groups[{i}].description must be a string") # Validate order if 'order' in layout: order = layout['order'] if not isinstance(order, list): raise ConfigValidationError("layout.order must be a list") for i, schema_name in enumerate(order): if not isinstance(schema_name, str): raise ConfigValidationError(f"layout.order[{i}] must be a string") # Validate styling (advanced options) if 'styling' in layout: styling = layout['styling'] if not isinstance(styling, dict): raise ConfigValidationError("layout.styling must be a dictionary") # Validate align_items if 'align_items' in styling: valid_alignments = ['start', 'center', 'end', 'stretch'] if styling['align_items'] not in valid_alignments: raise ConfigValidationError( f"layout.styling.align_items must be one of: {', '.join(valid_alignments)}" ) # Validate content_align if 'content_align' in styling: valid_content_align = ['left', 'center', 'right'] if styling['content_align'] not in valid_content_align: raise ConfigValidationError( f"layout.styling.content_align must be one of: {', '.join(valid_content_align)}" ) # Validate background colors (CSS color values) for color_key in ['group_background_odd', 'group_background_even']: if color_key in styling: color = styling[color_key] if not isinstance(color, str) or not color.strip(): raise ConfigValidationError( f"layout.styling.{color_key} must be a non-empty CSS color value" ) # Validate padding values (CSS padding) for padding_key in ['group_padding', 'form_padding']: if padding_key in styling: padding = styling[padding_key] if not isinstance(padding, str) or not padding.strip(): raise ConfigValidationError( f"layout.styling.{padding_key} must be a non-empty CSS padding value" ) # Validate per-group background_color if present if 'groups' in layout: for i, group in enumerate(layout['groups']): if 'background_color' in group: bg_color = group['background_color'] if not isinstance(bg_color, str) or not bg_color.strip(): raise ConfigValidationError( f"layout.groups[{i}].background_color must be a non-empty CSS color value" ) def validate_adjudication_config(config_data: Dict[str, Any]) -> None: """ Validate adjudication configuration. Args: config_data: The full configuration data Raises: ConfigValidationError: If the adjudication configuration is invalid """ adj_config = config_data.get('adjudication', {}) if not isinstance(adj_config, dict): raise ConfigValidationError("adjudication must be a dictionary") if not adj_config.get('enabled', False): return # Require adjudicator_users users = adj_config.get('adjudicator_users', []) if not isinstance(users, list) or len(users) == 0: raise ConfigValidationError( "adjudication.adjudicator_users must be a non-empty list of usernames" ) # Validate numeric fields min_ann = adj_config.get('min_annotations', 2) if not isinstance(min_ann, int) or min_ann < 1: raise ConfigValidationError( "adjudication.min_annotations must be a positive integer" ) threshold = adj_config.get('agreement_threshold', 0.75) if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: raise ConfigValidationError( "adjudication.agreement_threshold must be a number between 0 and 1" ) fast_warn = adj_config.get('fast_decision_warning_ms', 2000) if not isinstance(fast_warn, (int, float)) or fast_warn < 0: raise ConfigValidationError( "adjudication.fast_decision_warning_ms must be a non-negative number" ) # Validate error_taxonomy taxonomy = adj_config.get('error_taxonomy') if taxonomy is not None: if not isinstance(taxonomy, list): raise ConfigValidationError( "adjudication.error_taxonomy must be a list of strings" ) for item in taxonomy: if not isinstance(item, str): raise ConfigValidationError( "adjudication.error_taxonomy entries must be strings" ) # Validate similarity config sim_config = adj_config.get('similarity', {}) if isinstance(sim_config, dict) and sim_config.get('enabled', False): top_k = sim_config.get('top_k', 5) if not isinstance(top_k, int) or top_k < 1 or top_k > 20: raise ConfigValidationError( "adjudication.similarity.top_k must be an integer between 1 and 20" ) model = sim_config.get('model', 'all-MiniLM-L6-v2') if not isinstance(model, str) or not model.strip(): raise ConfigValidationError( "adjudication.similarity.model must be a non-empty string" ) def _check_display_only_deprecation(config_data: Dict[str, Any]) -> None: """ Check for deprecated display-only pattern and log warning. Detects when image_annotation, video_annotation, or audio_annotation is used with min_annotations: 0 just to display content. Args: config_data: The configuration data """ # Get annotation schemes schemes = [] if "annotation_schemes" in config_data: schemes = config_data["annotation_schemes"] elif "phases" in config_data: phases = config_data["phases"] if isinstance(phases, list): for phase in phases: schemes.extend(phase.get("annotation_schemes", [])) elif isinstance(phases, dict): for phase_name, phase in phases.items(): if phase_name != "order" and isinstance(phase, dict): schemes.extend(phase.get("annotation_schemes", [])) for scheme in schemes: if not isinstance(scheme, dict): continue annotation_type = scheme.get("annotation_type") if annotation_type in ["image_annotation", "video_annotation", "audio_annotation"]: min_annotations = scheme.get("min_annotations", 1) if min_annotations == 0: logger.warning( f"Deprecation warning: Using {annotation_type} with min_annotations=0 " f"for display-only is deprecated. Use instance_display instead. " f"See docs/instance_display.md for migration guide." )