| """ |
| Config module with enhanced security validation and error handling. |
| """ |
|
|
| import yaml |
| import os |
| import logging |
| import re |
| import codecs |
| from pathlib import Path |
| from typing import Dict, Any, List, Optional, Tuple |
| from urllib.parse import urlparse |
| import json |
|
|
| config = {} |
|
|
|
|
| def clear_config(): |
| """Clear the global config dictionary. Used for testing to ensure clean state.""" |
| global config |
| config.clear() |
|
|
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ConfigValidationError(Exception): |
| """Custom exception for configuration validation errors.""" |
| pass |
|
|
|
|
| class ConfigSecurityError(Exception): |
| """Custom exception for configuration security violations.""" |
| pass |
|
|
|
|
| import difflib |
|
|
| |
| |
| |
| |
| |
| KNOWN_CONFIG_KEYS = { |
| |
| "item_properties": { |
| "id_key", "text_key", "category_key", "kwargs", |
| }, |
| "data_files": None, |
| "task_dir": None, |
| "output_annotation_dir": None, |
| "output_annotation_format": None, |
| "annotation_task_name": None, |
| "task_description": None, |
| "annotation_task_description": None, |
|
|
| |
| "data_directory": None, |
| "data_directory_encoding": None, |
| "data_sources": None, |
| "data_cache": {"enabled", "ttl_seconds", "max_size_mb"}, |
| "watch_data_directory": None, |
| "watch_poll_interval": None, |
| "partial_loading": None, |
|
|
| |
| "annotation_schemes": None, |
| "phases": None, |
| "output_annotation_format": None, |
|
|
| |
| "authentication": { |
| "method", "providers", "user_identity_field", "database_url", |
| "user_config_path", "auto_register", "allow_local_login", |
| "allowed_domain", "allowed_domains", "allowed_org", |
| }, |
| "login": {"type", "url_argument", "auto_redirect_delay", "auto_redirect_on_completion"}, |
| "user_config": {"allow_all_users", "users"}, |
| "require_password": None, |
| "require_no_password": None, |
| "secret_key": None, |
|
|
| |
| "server": {"port", "host", "debug"}, |
| "port": None, |
| "host": None, |
| "customjs": None, |
| "customjs_hostname": None, |
| "site_dir": None, |
| "site_file": None, |
| "persist_sessions": None, |
| "session_lifetime_days": None, |
| "base_html_template": None, |
|
|
| |
| "attention_checks": { |
| "enabled", "items_file", "frequency", "probability", |
| "min_response_time", "failure_handling", |
| }, |
| "gold_standards": { |
| "enabled", "items_file", "mode", "frequency", |
| "accuracy", "auto_promote", |
| }, |
| "gold_standards_file": None, |
| "pre_annotation": { |
| "enabled", "field", "highlight_low_confidence", |
| "agreement_metrics", "predictions_file", |
| "allow_modification", "show_confidence", |
| }, |
| "agreement_metrics": {"min_overlap", "refresh_interval", "enabled"}, |
| "quality_control": None, |
|
|
| |
| "ai_support": { |
| "enabled", "endpoint_type", "ai_config_file", "ai_config", |
| "option_highlighting", "features", "cache_config", |
| }, |
| "chat_support": { |
| "enabled", "endpoint_type", "ai_config", "ui", |
| }, |
|
|
| |
| "training": { |
| "enabled", "data_file", "annotation_schemes", |
| "passing_criteria", "feedback", "failure_action", |
| }, |
| "active_learning": { |
| "enabled", "classifier", "vectorizer", |
| "min_annotations_per_instance", "min_instances_for_training", |
| "max_instances_to_reorder", "update_frequency", |
| "resolution_strategy", "random_sample_percent", "schema_names", |
| "database", "model_persistence", "llm", "query_strategy", |
| "hybrid_weights", "cold_start_strategy", "confidence_method", |
| "classifier_params", "vectorizer_params", "calibrate_probabilities", |
| "bald_params", "use_icl_ensemble", "icl_ensemble_params", |
| "annotation_routing", "routing_thresholds", |
| }, |
| "category_assignment": { |
| "enabled", "category_key", "qualification", "fallback", "dynamic", |
| }, |
| "batch_assignment": { |
| "groups", "annotator_key", |
| }, |
| "diversity_ordering": { |
| "enabled", "model_name", "num_clusters", "items_per_cluster", |
| "auto_clusters", "prefill_count", "batch_size", |
| "recluster_threshold", "preserve_visited", |
| "trigger_ai_prefetch", "cache_dir", |
| }, |
| "diversity_config": None, |
| "embedding_visualization": { |
| "enabled", "sample_size", "include_all_annotated", |
| "embedding_model", "image_embedding_model", "umap", "label_source", |
| }, |
| "adjudication": { |
| "enabled", "adjudicator_users", "min_annotations", |
| "agreement_threshold", "fast_decision_warning_ms", |
| "error_taxonomy", "similarity", |
| "require_notes_on_override", "show_agreement_scores", |
| "show_annotator_names", |
| "output_subdir", "require_confidence", |
| "show_all_items", "show_timing_data", |
| }, |
| "database": {"type", "host", "database", "username", "password", "port", |
| "pool_size", "pool_timeout", "connection_string"}, |
| "bws_config": { |
| "tuple_size", "num_tuples", "seed", "min_item_appearances", "scoring", |
| }, |
| "ibws_config": { |
| "tuple_size", "max_rounds", "seed", "scoring_method", |
| "tuples_per_item_per_round", |
| }, |
| "mace": { |
| "enabled", "min_annotations_per_item", "trigger_every_n", "num_restarts", |
| "min_items", "num_iters", |
| }, |
| "icl_labeling": None, |
| "llm_labeling": None, |
|
|
| |
| "ui": None, |
| "ui_config": None, |
| "layout": {"grid", "breakpoints", "groups", "order", "styling"}, |
| "instance_display": {"fields", "layout", "resizable"}, |
| "format_handling": {"enabled", "default_format", "pdf", "spreadsheet"}, |
| "ui_language": { |
| "html_lang", "html_dir", |
| "next_button", "previous_button", "submit_button", "go_button", |
| "retry_button", "logout", |
| "labeled_badge", "in_progress_badge", "not_labeled_badge", |
| "progress_label", "loading", "error_heading", |
| "adjudicate", "codebook", "instructions_heading", |
| "text_to_annotate", "video_to_annotate", "audio_to_annotate", |
| "login_title", "login_subtitle_password", "login_subtitle_username", |
| "sign_in_tab", "register_tab", |
| "username_label", "password_label", |
| "sign_in_button", "continue_button", "register_button", |
| "forgot_password", "username_placeholder", |
| "choose_username_placeholder", "create_password_placeholder", |
| "sign_in_with", "or_divider", |
| "powered_by", "cite_us", |
| }, |
| "base_css": None, |
| "ui_debug": None, |
| "hide_navbar": None, |
| "task_layout": None, |
|
|
| |
| "annotation_instructions": None, |
| "annotation_codebook_url": None, |
| "custom_footer_html": None, |
| "header_file": None, |
| "header_logo": None, |
|
|
| |
| "keyword_highlight_settings": None, |
| "keyword_highlights_file": None, |
| "highlight_linebreaks": None, |
| "list_as_text": {"text_list_prefix_type", "horizontal", "alternating_shading"}, |
| "jumping_to_id_disabled": None, |
| "horizontal_key_bindings": None, |
| "completion_code": None, |
| "allow_phase_back_navigation": None, |
| "require_fully_annotated": None, |
| "export_include_phase_data": None, |
| "export_annotation_format": None, |
| "auto_export_interval": None, |
|
|
| |
| "audio_annotation": { |
| "waveform_cache_dir", "waveform_look_ahead", "waveform_cache_max_size", |
| "client_fallback_max_duration", |
| }, |
| "spectrogram": None, |
| "media_directory": None, |
| "default_video_fps": None, |
|
|
| |
| "mturk": None, |
| "prolific": { |
| "config_file_path", "token", "study_id", |
| "max_concurrent_sessions", "workload_checker_period", |
| "completion_code", "sandbox_mode", |
| }, |
| "webhooks": {"enabled", "endpoints"}, |
| "trace_ingestion": {"enabled", "sources", "api_key", "notify_annotators"}, |
| "judge_alignment": {"enabled", "ai_support", "schemas", "few_shot", "inline"}, |
| |
| |
| |
| |
| "judge_calibration": { |
| "enabled", "prompt", "models", "k_samples", "max_items", "fraction", |
| "sampling", "human", "schemas", "calibration", "output", "state_dir", |
| }, |
| "triage": {"enabled", "order", "default_priority", "show_badge", |
| "signal_field", "invert_signal", "rules"}, |
| "huggingface_backup": None, |
|
|
| |
| "debug": None, |
| "debug_phase": None, |
| "server_debug": None, |
| "verbose": None, |
| "very_verbose": None, |
| "debug_log": None, |
|
|
| |
| "live_agent": None, |
| "live_coding_agent": None, |
| "agent_proxy": None, |
|
|
| |
| "surveyflow": None, |
| "prestudy": None, |
| "automatic_assignment": None, |
|
|
| |
| "random_seed": None, |
| "max_annotations_per_user": None, |
| |
| |
| "max_annotations_per_item": None, |
| |
| |
| |
| |
| "num_annotators_per_item": None, |
| "min_annotators_per_instance": None, |
| |
| |
| "per_annotator_quota": None, |
| |
| |
| |
| |
| |
| |
| |
| |
| "qda_mode": { |
| "enabled": None, |
| "memos": None, |
| "codebook": None, |
| |
| |
| }, |
| |
| |
| |
| "annotation_ui": { |
| "memos": None, |
| "visibility": None, |
| }, |
| |
| |
| |
| "search": { |
| "enabled": None, |
| "backend": None, |
| "max_instances": None, |
| "annotator_claim": None, |
| }, |
| |
| |
| |
| |
| |
| "codebook": { |
| "enabled": None, |
| "mode": None, |
| }, |
| |
| "codebook_mode": None, |
| |
| |
| |
| |
| |
| "codebook_invivo_key": None, |
| |
| |
| |
| |
| "cases": { |
| "enabled": None, |
| "key": None, |
| "auto_detect": None, |
| "attributes": None, |
| }, |
| "solo_mode": { |
| "enabled": None, |
| "labeling_models": None, |
| "revision_models": None, |
| "embedding": None, |
| "uncertainty": None, |
| "thresholds": None, |
| "instance_selection": None, |
| "batches": None, |
| "prompt_optimization": None, |
| "edge_case_rules": None, |
| "labeling_functions": None, |
| "confidence_routing": None, |
| "confusion_analysis": None, |
| "state_dir": None, |
| "refinement_loop": { |
| "enabled", |
| "trigger_interval", |
| "min_improvement", |
| "max_cycles", |
| "patience", |
| "auto_apply_suggestions", |
| "refinement_strategy", |
| "validation_split_ratio", |
| "eval_sample_size", |
| "num_candidates", |
| "min_val_size", |
| "max_consecutive_failures", |
| "dry_run", |
| "require_approval", |
| "min_val_improvement", |
| "eval_temperature", |
| "prefer_consistent_disagreements", |
| }, |
| }, |
| "admin_api_key": None, |
| "alert_time_each_instance": None, |
| "assignment_strategy": None, |
| "reclaim_stale_assignments": None, |
| "instance_reclaim": None, |
| "max_session_seconds": None, |
| "env_substitution": None, |
|
|
| |
| "config_file": None, |
| "__config_file__": None, |
| "_bws_pool_items": None, |
| } |
|
|
|
|
| def validate_unknown_keys(config_data, schema=None, path=""): |
| """Recursively warn about unrecognized config keys and suggest corrections. |
| |
| Args: |
| config_data: The config dict (or sub-dict) to validate. |
| schema: The known-keys schema for this level (defaults to KNOWN_CONFIG_KEYS). |
| path: Dot-separated path prefix for nested key reporting (e.g., "training"). |
| """ |
| if schema is None: |
| schema = KNOWN_CONFIG_KEYS |
|
|
| if not isinstance(config_data, dict): |
| return |
|
|
| known_keys = set(schema.keys()) if isinstance(schema, dict) else schema |
| unknown_keys = set(config_data.keys()) - known_keys |
|
|
| for key in sorted(unknown_keys): |
| full_key = f"{path}.{key}" if path else key |
| matches = difflib.get_close_matches(key, known_keys, n=3, cutoff=0.6) |
| if matches: |
| suggestions = ", ".join(f"'{m}'" for m in matches) |
| logger.warning( |
| "Unrecognized config key '%s'. Did you mean: %s?", |
| full_key, suggestions |
| ) |
| else: |
| logger.warning( |
| "Unrecognized config key '%s'. This key will be ignored.", |
| full_key |
| ) |
|
|
| |
| if isinstance(schema, dict): |
| for key, sub_schema in schema.items(): |
| if sub_schema is not None and key in config_data: |
| value = config_data[key] |
| if isinstance(value, dict): |
| child_path = f"{path}.{key}" if path else key |
| if isinstance(sub_schema, dict): |
| validate_unknown_keys(value, sub_schema, child_path) |
| elif isinstance(sub_schema, set): |
| validate_unknown_keys( |
| value, {k: None for k in sub_schema}, child_path |
| ) |
|
|
|
|
| def validate_path_security(path: str, base_dir: str, project_dir: str = None) -> str: |
| """ |
| Validate that a path is secure and contained within the base directory. |
| |
| Args: |
| path: The path to validate |
| base_dir: The base directory that should contain the path |
| project_dir: The project directory for final security check (if different from base_dir) |
| |
| Returns: |
| The normalized absolute path if valid |
| |
| Raises: |
| ConfigSecurityError: If the path is not secure |
| """ |
| |
| if '....' in path or '..%2F' in path or '..%5C' in path: |
| raise ConfigSecurityError(f"Encoded path traversal detected in '{path}'. Encoded traversal patterns are not allowed for security reasons.") |
|
|
| |
| normalized_path = os.path.normpath(path) |
|
|
| |
| |
| path_parts = normalized_path.split(os.sep) |
| if path_parts.count('..') > 2: |
| raise ConfigSecurityError(f"Excessive path traversal detected in '{path}'. Too many '..' components for security reasons.") |
|
|
| |
| if os.path.isabs(normalized_path): |
| |
| try: |
| real_path = os.path.realpath(normalized_path) |
| real_base = os.path.realpath(base_dir) |
| if not (real_path == real_base or real_path.startswith(real_base + os.sep)): |
| raise ConfigSecurityError(f"Path '{path}' resolves to '{real_path}' which is outside the project directory '{real_base}'") |
| except (OSError, ValueError) as e: |
| raise ConfigSecurityError(f"Invalid path '{path}': {str(e)}") |
|
|
| |
| if not os.path.isabs(normalized_path): |
| resolved_path = os.path.join(base_dir, normalized_path) |
| normalized_path = os.path.normpath(resolved_path) |
|
|
| |
| try: |
| real_path = os.path.realpath(normalized_path) |
| |
| check_dir = project_dir if project_dir else base_dir |
| real_check_dir = os.path.realpath(check_dir) |
| if not (real_path == real_check_dir or real_path.startswith(real_check_dir + os.sep)): |
| raise ConfigSecurityError(f"Path '{path}' resolves to '{real_path}' which is outside the project directory '{real_check_dir}'") |
| except (OSError, ValueError) as e: |
| raise ConfigSecurityError(f"Invalid path '{path}': {str(e)}") |
|
|
| return normalized_path |
|
|
|
|
| |
| |
| |
| _OPTIONAL_INT_FIELDS = { |
| "alert_time_each_instance": ("seconds to alert per instance", False), |
| "max_annotations_per_item": ("max annotations per item", True), |
| "max_annotations_per_user": ("max annotations per user", True), |
| "min_annotators_per_instance": ("minimum annotators per instance", False), |
| "random_seed": ("random seed", True), |
| "max_session_seconds": ("max session duration in seconds", False), |
| } |
| |
|
|
| _OPTIONAL_BOOL_FIELDS = { |
| "highlight_linebreaks": "whether to highlight linebreaks", |
| "jumping_to_id_disabled": "whether jumping to ID is disabled", |
| "require_fully_annotated": "whether full annotation is required", |
| "require_password": "whether password is required", |
| "require_no_password": "whether no-password mode is enabled", |
| "customjs": "whether custom JS is enabled", |
| "watch_data_directory": "whether to watch data directory for changes", |
| "persist_sessions": "whether to persist sessions across restarts", |
| } |
|
|
| _VALID_ASSIGNMENT_STRATEGIES = [ |
| "random", "fixed_order", "active_learning", "llm_confidence", |
| "max_diversity", "least_annotated", "category_based", "diversity_clustering", |
| "batch", "priority", |
| ] |
|
|
|
|
| def validate_num_annotators_per_item(value: Any) -> None: |
| """ |
| Validate the shape of ``num_annotators_per_item``. |
| |
| Accepts either an int (legacy form) or a dict with optional keys |
| ``default``, ``overlap_sample``, ``adaptive``, and ``min``. |
| """ |
| if value is None: |
| return |
| if isinstance(value, bool): |
| raise ConfigValidationError( |
| "'num_annotators_per_item' must be an integer or a structured mapping, " |
| f"got bool: {value!r}" |
| ) |
| if isinstance(value, int): |
| if value < 0: |
| raise ConfigValidationError( |
| "'num_annotators_per_item' as integer must be non-negative; " |
| "use 0 or omit the key for unlimited (legacy used -1)." |
| ) |
| return |
| if not isinstance(value, dict): |
| raise ConfigValidationError( |
| "'num_annotators_per_item' must be an integer or a mapping, " |
| f"got {type(value).__name__}: {value!r}" |
| ) |
|
|
| allowed = {"default", "overlap_sample", "adaptive", "min"} |
| unknown = set(value) - allowed |
| if unknown: |
| raise ConfigValidationError( |
| f"Unknown keys in num_annotators_per_item: {sorted(unknown)}. " |
| f"Allowed: {sorted(allowed)}" |
| ) |
|
|
| default = value.get("default", 1) |
| if not isinstance(default, int) or isinstance(default, bool) or default < 1: |
| raise ConfigValidationError( |
| f"num_annotators_per_item.default must be a positive integer, got {default!r}" |
| ) |
|
|
| minimum = value.get("min") |
| if minimum is not None: |
| if not isinstance(minimum, int) or isinstance(minimum, bool) or minimum < 1: |
| raise ConfigValidationError( |
| f"num_annotators_per_item.min must be a positive integer, got {minimum!r}" |
| ) |
| if minimum > default: |
| raise ConfigValidationError( |
| "num_annotators_per_item.min cannot exceed num_annotators_per_item.default" |
| ) |
|
|
| overlap = value.get("overlap_sample") |
| if overlap is not None: |
| if not isinstance(overlap, dict): |
| raise ConfigValidationError( |
| "num_annotators_per_item.overlap_sample must be a mapping" |
| ) |
| unknown = set(overlap) - {"fraction", "count", "stratify_by", "seed"} |
| if unknown: |
| raise ConfigValidationError( |
| f"Unknown keys in overlap_sample: {sorted(unknown)}" |
| ) |
| frac = overlap.get("fraction") |
| if not isinstance(frac, (int, float)) or isinstance(frac, bool) or not (0 < frac <= 1): |
| raise ConfigValidationError( |
| f"overlap_sample.fraction must be in (0, 1], got {frac!r}" |
| ) |
| count = overlap.get("count") |
| if not isinstance(count, int) or isinstance(count, bool) or count < 2: |
| raise ConfigValidationError( |
| f"overlap_sample.count must be an integer >= 2, got {count!r}" |
| ) |
| if count <= default: |
| raise ConfigValidationError( |
| "overlap_sample.count must be greater than num_annotators_per_item.default " |
| f"({count} <= {default})" |
| ) |
| stratify_by = overlap.get("stratify_by") |
| if stratify_by is not None and not isinstance(stratify_by, str): |
| raise ConfigValidationError( |
| f"overlap_sample.stratify_by must be a string or omitted, got {stratify_by!r}" |
| ) |
| seed = overlap.get("seed") |
| if seed is not None and (not isinstance(seed, int) or isinstance(seed, bool)): |
| raise ConfigValidationError( |
| f"overlap_sample.seed must be an integer, got {seed!r}" |
| ) |
|
|
| adaptive = value.get("adaptive") |
| if adaptive is not None: |
| if not isinstance(adaptive, dict): |
| raise ConfigValidationError( |
| "num_annotators_per_item.adaptive must be a mapping" |
| ) |
| unknown = set(adaptive) - {"enabled", "disagreement_threshold", "boost_to"} |
| if unknown: |
| raise ConfigValidationError( |
| f"Unknown keys in adaptive: {sorted(unknown)}" |
| ) |
| if "enabled" in adaptive and not isinstance(adaptive["enabled"], bool): |
| raise ConfigValidationError( |
| f"adaptive.enabled must be a boolean, got {adaptive['enabled']!r}" |
| ) |
| thr = adaptive.get("disagreement_threshold") |
| if thr is not None and (not isinstance(thr, (int, float)) or isinstance(thr, bool) or not (0 <= thr <= 1)): |
| raise ConfigValidationError( |
| f"adaptive.disagreement_threshold must be in [0, 1], got {thr!r}" |
| ) |
| boost = adaptive.get("boost_to") |
| if boost is not None: |
| if not isinstance(boost, int) or isinstance(boost, bool) or boost < 2: |
| raise ConfigValidationError( |
| f"adaptive.boost_to must be an integer >= 2, got {boost!r}" |
| ) |
| if boost <= default: |
| raise ConfigValidationError( |
| f"adaptive.boost_to must exceed default ({boost} <= {default})" |
| ) |
|
|
|
|
| def validate_per_annotator_quota(value: Any) -> None: |
| """Validate the shape of ``per_annotator_quota``.""" |
| if value is None: |
| return |
| if not isinstance(value, dict): |
| raise ConfigValidationError( |
| "'per_annotator_quota' must be a mapping, " |
| f"got {type(value).__name__}: {value!r}" |
| ) |
| allowed = {"default", "by_user", "by_user_role"} |
| unknown = set(value) - allowed |
| if unknown: |
| raise ConfigValidationError( |
| f"Unknown keys in per_annotator_quota: {sorted(unknown)}. Allowed: {sorted(allowed)}" |
| ) |
| default = value.get("default") |
| if default is not None and (not isinstance(default, int) or isinstance(default, bool) or default < 0): |
| raise ConfigValidationError( |
| f"per_annotator_quota.default must be a non-negative integer, got {default!r}" |
| ) |
| for key in ("by_user", "by_user_role"): |
| mapping = value.get(key) |
| if mapping is None: |
| continue |
| if not isinstance(mapping, dict): |
| raise ConfigValidationError( |
| f"per_annotator_quota.{key} must be a mapping of name -> integer" |
| ) |
| for k, v in mapping.items(): |
| if not isinstance(k, str) or not k: |
| raise ConfigValidationError( |
| f"per_annotator_quota.{key} keys must be non-empty strings, got {k!r}" |
| ) |
| if not isinstance(v, int) or isinstance(v, bool) or v < 0: |
| raise ConfigValidationError( |
| f"per_annotator_quota.{key}[{k!r}] must be a non-negative integer, got {v!r}" |
| ) |
|
|
|
|
| def resolve_num_annotators_per_item(config_data: Dict[str, Any]) -> int: |
| """ |
| Resolve the *default* cap (used as ``ItemStateManager.max_annotations_per_item``). |
| |
| Resolution order: |
| 1. num_annotators_per_item (int form) → that value |
| 2. num_annotators_per_item.default → that value |
| 3. max_annotations_per_item (legacy) → that value |
| 4. otherwise → -1 (unlimited) |
| """ |
| val = config_data.get("num_annotators_per_item") |
| if isinstance(val, int) and not isinstance(val, bool): |
| return val |
| if isinstance(val, dict) and val.get("default") is not None: |
| return int(val["default"]) |
| legacy = config_data.get("max_annotations_per_item") |
| if isinstance(legacy, int) and not isinstance(legacy, bool): |
| return legacy |
| return -1 |
|
|
|
|
| def validate_optional_field_types(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate types for commonly misconfigured optional fields. |
| |
| Catches issues like string values for integer fields (e.g., alert_time_each_instance: "30") |
| or wrong types for booleans, which would silently produce incorrect behavior at runtime. |
| |
| Args: |
| config_data: The parsed configuration dictionary |
| |
| Raises: |
| ConfigValidationError: If a field has the wrong type |
| """ |
| |
| for field, (desc, allow_negative) in _OPTIONAL_INT_FIELDS.items(): |
| if field in config_data: |
| val = config_data[field] |
| if not isinstance(val, int) or isinstance(val, bool): |
| raise ConfigValidationError( |
| f"'{field}' must be an integer ({desc}), " |
| f"got {type(val).__name__}: {val!r}" |
| ) |
| if not allow_negative and val < 0: |
| raise ConfigValidationError( |
| f"'{field}' must be a non-negative integer ({desc}), got {val}" |
| ) |
|
|
| |
| for field, desc in _OPTIONAL_BOOL_FIELDS.items(): |
| if field in config_data: |
| val = config_data[field] |
| if val is not None and not isinstance(val, bool): |
| raise ConfigValidationError( |
| f"'{field}' must be a boolean ({desc}), " |
| f"got {type(val).__name__}: {val!r}" |
| ) |
|
|
| |
| if 'num_annotators_per_item' in config_data: |
| validate_num_annotators_per_item(config_data['num_annotators_per_item']) |
|
|
| |
| if 'per_annotator_quota' in config_data: |
| validate_per_annotator_quota(config_data['per_annotator_quota']) |
|
|
| |
| |
| |
| if 'max_annotations_per_item' in config_data and 'num_annotators_per_item' in config_data: |
| legacy = config_data['max_annotations_per_item'] |
| canonical = config_data['num_annotators_per_item'] |
| canonical_int = canonical if isinstance(canonical, int) else canonical.get('default') |
| if canonical_int is not None and legacy != canonical_int: |
| raise ConfigValidationError( |
| "'max_annotations_per_item' and 'num_annotators_per_item' are both " |
| f"set with conflicting values ({legacy} vs {canonical_int}). " |
| "Drop 'max_annotations_per_item' — 'num_annotators_per_item' is the canonical key." |
| ) |
| import warnings as _w |
| _w.warn( |
| "'max_annotations_per_item' is deprecated; use 'num_annotators_per_item' " |
| "instead. Setting both is redundant.", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
|
|
| |
| if 'assignment_strategy' in config_data: |
| strat = config_data['assignment_strategy'] |
| |
| strat_name = strat |
| if isinstance(strat, dict): |
| strat_name = strat.get('name', '') |
| if isinstance(strat_name, str) and strat_name.lower() not in _VALID_ASSIGNMENT_STRATEGIES: |
| raise ConfigValidationError( |
| f"'assignment_strategy' value '{strat_name}' is not recognized. " |
| f"Valid strategies: {', '.join(_VALID_ASSIGNMENT_STRATEGIES)}" |
| ) |
|
|
|
|
| def validate_judge_calibration_config(config_data: Dict[str, Any]) -> None: |
| """Validate the ``judge_calibration`` block when enabled. |
| |
| Delegates to the typed config's ``validate()`` (so the rules live in one |
| place) and additionally cross-checks that referenced schema names exist in |
| ``annotation_schemes``. Raises ConfigValidationError on hard errors. |
| """ |
| jc = config_data.get("judge_calibration") |
| if not isinstance(jc, dict) or not jc.get("enabled"): |
| return |
|
|
| from potato.judge_calibration.config import parse_judge_calibration_config |
|
|
| cfg = parse_judge_calibration_config(config_data) |
| errors = cfg.validate() |
|
|
| |
| declared = { |
| s.get("name") |
| for s in (config_data.get("annotation_schemes") or []) |
| if isinstance(s, dict) |
| } |
| for name in cfg.schemas: |
| if name not in declared: |
| errors.append( |
| f"judge_calibration.schemas references unknown scheme '{name}' " |
| f"(declared: {sorted(n for n in declared if n)})" |
| ) |
|
|
| if errors: |
| raise ConfigValidationError( |
| "Invalid judge_calibration configuration:\n - " + "\n - ".join(errors) |
| ) |
|
|
|
|
| def validate_yaml_structure(config_data: Dict[str, Any], project_dir: str = None, config_file_dir: str = None) -> None: |
| """ |
| Validate the structure and content of the YAML configuration. |
| |
| Args: |
| config_data: The parsed YAML configuration |
| project_dir: The project directory |
| config_file_dir: The directory containing the config file |
| |
| Raises: |
| ConfigValidationError: If the configuration is invalid |
| """ |
| if not isinstance(config_data, dict): |
| raise ConfigValidationError("Configuration must be a YAML object (dictionary)") |
|
|
| |
| |
| |
| |
| |
| required_fields = [ |
| 'item_properties', |
| 'task_dir', |
| 'output_annotation_dir', |
| 'annotation_task_name', |
| ] |
|
|
| missing_fields = [field for field in required_fields if field not in config_data] |
| if missing_fields: |
| raise ConfigValidationError(f"Missing required configuration fields: {', '.join(missing_fields)}") |
|
|
| |
| item_properties = config_data.get('item_properties', {}) |
| if not isinstance(item_properties, dict): |
| raise ConfigValidationError("item_properties must be a dictionary") |
|
|
| required_item_props = ['id_key', 'text_key'] |
| missing_item_props = [prop for prop in required_item_props if prop not in item_properties] |
| if missing_item_props: |
| raise ConfigValidationError(f"Missing required item_properties: {', '.join(missing_item_props)}") |
|
|
| |
| if 'category_key' in item_properties: |
| category_key = item_properties['category_key'] |
| if not isinstance(category_key, str) or not category_key.strip(): |
| raise ConfigValidationError("item_properties.category_key must be a non-empty string") |
|
|
| |
| data_files = config_data.get('data_files', []) |
| data_directory = config_data.get('data_directory') |
| data_sources = config_data.get('data_sources') |
|
|
| if not isinstance(data_files, list): |
| raise ConfigValidationError("data_files must be a list") |
|
|
| |
| if not data_files and not data_directory and not data_sources: |
| raise ConfigValidationError( |
| "At least one data source must be configured: " |
| "'data_files', 'data_directory', or 'data_sources'" |
| ) |
|
|
| |
| if data_sources: |
| validate_data_sources_config(config_data) |
|
|
| |
| validate_server_config(config_data) |
|
|
| |
| validate_authentication_config(config_data) |
|
|
| |
| validate_data_directory_config(config_data) |
|
|
| |
| validate_annotation_schemes(config_data) |
|
|
| |
| validate_training_config(config_data, project_dir, config_file_dir) |
|
|
| |
| if 'database' in config_data: |
| validate_database_config(config_data['database']) |
|
|
| |
| validate_active_learning_config(config_data) |
|
|
| |
| validate_ai_support_config(config_data) |
|
|
| |
| validate_chat_support_config(config_data) |
|
|
| |
| validate_category_assignment_config(config_data) |
|
|
| |
| validate_batch_assignment_config(config_data) |
|
|
| |
| validate_diversity_config(config_data) |
|
|
| |
| validate_embedding_visualization_config(config_data) |
|
|
| |
| if 'adjudication' in config_data: |
| validate_adjudication_config(config_data) |
|
|
| |
| validate_quality_control_config(config_data) |
|
|
| |
| validate_instance_reclaim_config(config_data) |
|
|
| |
| validate_instance_display_config(config_data) |
|
|
| |
| validate_format_handling_config(config_data) |
|
|
| |
| validate_layout_config(config_data) |
|
|
| |
| if 'bws_config' in config_data: |
| _validate_bws_config(config_data) |
|
|
| |
| if 'ibws_config' in config_data: |
| _validate_ibws_config(config_data) |
|
|
| |
| if 'mace' in config_data: |
| _validate_mace_config(config_data) |
|
|
| |
| validate_optional_field_types(config_data) |
|
|
| |
| |
| validate_search_assignment_compat(config_data) |
|
|
| |
| validate_codebook_config(config_data) |
|
|
| |
| validate_judge_calibration_config(config_data) |
|
|
| |
| validate_unknown_keys(config_data) |
|
|
|
|
| |
| _CLAIM_INCOMPATIBLE_STRATEGIES = { |
| "random", "diversity_clustering", "max_diversity", |
| "active_learning", "llm_confidence", "least_annotated", |
| "category_based", "batch", |
| } |
|
|
|
|
| def validate_search_assignment_compat(config_data: Dict[str, Any]) -> None: |
| """Hard-fail when ``search.annotator_claim`` is combined with a |
| feature whose integrity depends on the platform — not the annotator — |
| choosing the next item. Read-only admin search is unaffected. |
| |
| Solo/QDA mode (single coder over the whole corpus) is always allowed. |
| """ |
| search = config_data.get("search") |
| if not isinstance(search, dict) or not search.get("annotator_claim"): |
| return |
|
|
| |
| if (config_data.get("qda_mode") or {}).get("enabled") or \ |
| (config_data.get("solo_mode") or {}).get("enabled"): |
| return |
|
|
| conflicts = [] |
|
|
| strat = config_data.get("assignment_strategy") |
| if isinstance(strat, dict): |
| strat = strat.get("name") |
| if strat and str(strat).lower() in _CLAIM_INCOMPATIBLE_STRATEGIES: |
| conflicts.append( |
| f"assignment_strategy: {strat} (self-selection breaks " |
| f"sampling/ordering)") |
|
|
| for k in ("max_annotations_per_item", "num_annotators_per_item", |
| "min_annotators_per_instance"): |
| raw = config_data.get(k, -1) |
| |
| candidates = [] |
| if isinstance(raw, dict): |
| if raw.get("default") is not None: |
| candidates.append(raw["default"]) |
| overlap = raw.get("overlap_sample") or {} |
| if overlap.get("count") is not None: |
| candidates.append(overlap["count"]) |
| else: |
| candidates.append(raw) |
| for cand in candidates: |
| try: |
| if int(cand) > 1: |
| conflicts.append( |
| f"{k}: {config_data[k]} (inter-annotator overlap " |
| f"cannot be guaranteed under self-selection)") |
| break |
| except (TypeError, ValueError): |
| continue |
|
|
| if (config_data.get("attention_checks") or {}).get("enabled"): |
| conflicts.append("attention_checks.enabled (annotators could " |
| "locate/avoid QC items)") |
| if (config_data.get("gold_standards") or {}).get("enabled"): |
| conflicts.append("gold_standards.enabled (annotators could " |
| "locate/avoid gold items)") |
| if (config_data.get("icl_labeling") or {}).get("enabled"): |
| conflicts.append("icl_labeling.enabled (blind LLM-verification " |
| "tasks must not be findable)") |
| if (config_data.get("adjudication") or {}).get("enabled"): |
| conflicts.append("adjudication.enabled (the adjudication queue " |
| "is curated)") |
|
|
| login_type = (config_data.get("login") or {}).get("type") |
| crowd = ( |
| "mturk" in config_data or "prolific" in config_data |
| or login_type in ("mturk", "prolific") |
| ) |
| if crowd: |
| conflicts.append("crowdsourcing backend (HIT = the assigned " |
| "unit; self-selection breaks payment/coverage)") |
|
|
| if conflicts: |
| raise ConfigValidationError( |
| "search.annotator_claim: true is incompatible with this " |
| "configuration:\n - " + "\n - ".join(conflicts) + |
| "\n\nAnnotator search-and-claim is only supported with " |
| "solo_mode/qda_mode, or fixed_order assignment without " |
| "overlap, quality-control injection, ICL verification, " |
| "adjudication, or a crowdsourcing backend. Use read-only " |
| "admin search (no annotator_claim) for those designs." |
| ) |
|
|
|
|
| _CODEBOOK_MODES = ("fixed", "extensible", "open") |
|
|
|
|
| def _crowd_backend(config_data: Dict[str, Any]) -> bool: |
| login_type = (config_data.get("login") or {}).get("type") |
| return ( |
| "mturk" in config_data or "prolific" in config_data |
| or login_type in ("mturk", "prolific") |
| ) |
|
|
|
|
| def get_codebook_mode(config_data: Dict[str, Any]) -> str: |
| """Resolve the effective codebook mode. |
| |
| Precedence: explicit ``codebook_mode`` / ``codebook.mode`` if set; |
| else ``open`` when solo/QDA mode is enabled; else ``fixed``. A crowd |
| backend force-locks ``fixed`` regardless of the request (annotators |
| on a paid HIT must not reshape the shared codebook). |
| """ |
| raw = config_data.get("codebook_mode") |
| if raw is None: |
| raw = (config_data.get("codebook") or {}).get("mode") |
|
|
| if raw is None: |
| single = ( |
| (config_data.get("qda_mode") or {}).get("enabled") |
| or (config_data.get("solo_mode") or {}).get("enabled") |
| ) |
| mode = "open" if single else "fixed" |
| else: |
| mode = str(raw).strip().lower() |
|
|
| if _crowd_backend(config_data): |
| return "fixed" |
| return mode |
|
|
|
|
| def validate_codebook_config(config_data: Dict[str, Any]) -> None: |
| """Reject an invalid ``codebook_mode`` value, and warn when a crowd |
| backend overrides a requested non-fixed mode.""" |
| raw = config_data.get("codebook_mode") |
| if raw is None: |
| raw = (config_data.get("codebook") or {}).get("mode") |
| if raw is None: |
| return |
|
|
| mode = str(raw).strip().lower() |
| if mode not in _CODEBOOK_MODES: |
| raise ConfigValidationError( |
| f"codebook_mode must be one of {', '.join(_CODEBOOK_MODES)}; " |
| f"got {raw!r}." |
| ) |
| if mode != "fixed" and _crowd_backend(config_data): |
| logging.warning( |
| "codebook_mode=%s requested with a crowdsourcing backend; " |
| "force-locking to 'fixed' (paid annotators must not reshape " |
| "the shared codebook).", mode) |
|
|
|
|
| def validate_annotation_schemes(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate annotation schemes configuration. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If annotation schemes are invalid |
| """ |
| has_top_level = 'annotation_schemes' in config_data |
| has_phases = 'phases' in config_data and config_data['phases'] |
|
|
| |
| if has_top_level and has_phases: |
| |
| phases = config_data['phases'] |
| phases_with_schemes = [] |
| if isinstance(phases, list): |
| phases_with_schemes = [ |
| phase.get('name', f'phase[{i}]') |
| for i, phase in enumerate(phases) |
| if 'annotation_schemes' in phase |
| ] |
| elif isinstance(phases, dict): |
| phases_with_schemes = [ |
| name for name, phase in phases.items() |
| if name != 'order' and isinstance(phase, dict) and 'annotation_schemes' in phase |
| ] |
|
|
| if phases_with_schemes: |
| raise ConfigValidationError( |
| f"Configuration has both top-level 'annotation_schemes' and phase-level " |
| f"'annotation_schemes' in: {', '.join(phases_with_schemes)}. " |
| f"Use only one location to avoid confusion." |
| ) |
|
|
| |
| if has_top_level: |
| schemes = config_data['annotation_schemes'] |
| if not isinstance(schemes, list): |
| raise ConfigValidationError("annotation_schemes must be a list") |
| if not schemes: |
| raise ConfigValidationError("annotation_schemes cannot be empty") |
|
|
| for i, scheme in enumerate(schemes): |
| validate_single_annotation_scheme(scheme, f"annotation_schemes[{i}]") |
|
|
| elif 'phases' in config_data and config_data['phases']: |
| phases = config_data['phases'] |
| if isinstance(phases, list): |
| for i, phase in enumerate(phases): |
| phase_id = phase.get('name', f'phase[{i}]') |
| |
| if 'annotation_schemes' in phase: |
| schemes = phase['annotation_schemes'] |
| if not isinstance(schemes, list): |
| raise ConfigValidationError(f"Phase {phase_id} annotation_schemes must be a list") |
| if not schemes: |
| raise ConfigValidationError(f"Phase {phase_id} annotation_schemes cannot be empty") |
|
|
| for j, scheme in enumerate(schemes): |
| validate_single_annotation_scheme(scheme, f"phases[{i}].annotation_schemes[{j}]") |
| elif 'file' in phase or 'type' in phase or 'instrument' in phase or 'instruments' in phase: |
| |
| _validate_phase_instruments(phase, phase_id) |
| else: |
| raise ConfigValidationError( |
| f"Phase {phase_id} requires 'annotation_schemes', 'file', 'type', " |
| f"'instrument', or 'instruments'" |
| ) |
| else: |
| |
| for phase_name, phase in phases.items(): |
| if phase_name == 'order': |
| continue |
| |
| if 'annotation_schemes' in phase: |
| schemes = phase['annotation_schemes'] |
| if not isinstance(schemes, list): |
| raise ConfigValidationError(f"Phase {phase_name} annotation_schemes must be a list") |
| if not schemes: |
| raise ConfigValidationError(f"Phase {phase_name} annotation_schemes cannot be empty") |
|
|
| for j, scheme in enumerate(schemes): |
| validate_single_annotation_scheme(scheme, f"phases.{phase_name}.annotation_schemes[{j}]") |
| elif 'file' in phase or 'type' in phase or 'instrument' in phase or 'instruments' in phase: |
| |
| _validate_phase_instruments(phase, phase_name) |
| else: |
| raise ConfigValidationError( |
| f"Phase {phase_name} requires 'annotation_schemes', 'file', 'type', " |
| f"'instrument', or 'instruments'" |
| ) |
| else: |
| raise ConfigValidationError("Config must have either 'annotation_schemes' (top-level) or 'phases' with annotation_schemes") |
|
|
| |
| _validate_keyword_highlight_for_images(config_data) |
|
|
| |
| all_schemes = _collect_all_annotation_schemes(config_data) |
| if all_schemes: |
| validate_display_logic_references(all_schemes) |
|
|
|
|
| def _collect_all_annotation_schemes(config_data: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """ |
| Collect all annotation schemes from config, whether top-level or in phases. |
| |
| Args: |
| config_data: The configuration data |
| |
| Returns: |
| List of all annotation scheme dictionaries |
| """ |
| schemes = [] |
|
|
| if 'annotation_schemes' in config_data: |
| schemes.extend(config_data['annotation_schemes']) |
| elif 'phases' in config_data: |
| phases = config_data['phases'] |
| if isinstance(phases, list): |
| for phase in phases: |
| if 'annotation_schemes' in phase: |
| schemes.extend(phase['annotation_schemes']) |
| elif isinstance(phases, dict): |
| for phase_name, phase in phases.items(): |
| if phase_name != 'order' and isinstance(phase, dict): |
| if 'annotation_schemes' in phase: |
| schemes.extend(phase['annotation_schemes']) |
|
|
| return schemes |
|
|
|
|
| def _validate_keyword_highlight_for_images(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate that keyword_highlight is not enabled for image-based tasks. |
| |
| Keyword highlighting highlights text in the instance content, which doesn't |
| make sense for images. This validation catches configuration errors early. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If keyword_highlight is enabled for an image task |
| """ |
| |
| text_key = config_data.get('item_properties', {}).get('text_key', 'text') |
| image_indicators = ['image', 'img', 'photo', 'picture', 'url'] |
| is_likely_image_task = any(indicator in text_key.lower() for indicator in image_indicators) |
|
|
| if not is_likely_image_task: |
| return |
|
|
| |
| schemes = [] |
| if 'annotation_schemes' in config_data: |
| schemes = config_data['annotation_schemes'] |
| elif 'phases' in config_data: |
| phases = config_data['phases'] |
| if isinstance(phases, list): |
| for phase in phases: |
| schemes.extend(phase.get('annotation_schemes', [])) |
| elif isinstance(phases, dict): |
| for phase_name, phase in phases.items(): |
| if phase_name != 'order' and isinstance(phase, dict): |
| schemes.extend(phase.get('annotation_schemes', [])) |
|
|
| |
| for i, scheme in enumerate(schemes): |
| if not isinstance(scheme, dict): |
| continue |
| ai_support = scheme.get('ai_support', {}) |
| if not isinstance(ai_support, dict): |
| continue |
| features = ai_support.get('features', {}) |
| if not isinstance(features, dict): |
| continue |
|
|
| keyword_highlight = features.get('keyword_highlight', False) |
| if keyword_highlight: |
| scheme_name = scheme.get('name', f'scheme[{i}]') |
| raise ConfigValidationError( |
| f"annotation_schemes.{scheme_name}.ai_support.features.keyword_highlight is enabled, " |
| f"but item_properties.text_key='{text_key}' suggests this is an image-based task. " |
| f"Keyword highlighting only works with text content, not images. " |
| f"Set keyword_highlight: false or remove it from the ai_support features." |
| ) |
|
|
|
|
| def _validate_bws_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate Best-Worst Scaling configuration. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the BWS config is invalid |
| """ |
| bws = config_data['bws_config'] |
| if not isinstance(bws, dict): |
| raise ConfigValidationError("bws_config must be a dictionary") |
|
|
| if 'tuple_size' in bws: |
| if not isinstance(bws['tuple_size'], int) or bws['tuple_size'] < 2: |
| raise ConfigValidationError("bws_config.tuple_size must be an integer >= 2") |
|
|
| if 'seed' in bws: |
| if not isinstance(bws['seed'], int): |
| raise ConfigValidationError("bws_config.seed must be an integer") |
|
|
| if 'num_tuples' in bws and bws['num_tuples'] is not None: |
| if not isinstance(bws['num_tuples'], int) or bws['num_tuples'] < 1: |
| raise ConfigValidationError("bws_config.num_tuples must be a positive integer or null") |
|
|
| if 'min_item_appearances' in bws and bws['min_item_appearances'] is not None: |
| if not isinstance(bws['min_item_appearances'], int) or bws['min_item_appearances'] < 1: |
| raise ConfigValidationError("bws_config.min_item_appearances must be a positive integer or null") |
|
|
| |
| scoring = bws.get('scoring', {}) |
| if scoring: |
| if not isinstance(scoring, dict): |
| raise ConfigValidationError("bws_config.scoring must be a dictionary") |
| valid_methods = ['counting', 'bradley_terry', 'plackett_luce'] |
| method = scoring.get('method', 'counting') |
| if method not in valid_methods: |
| raise ConfigValidationError(f"bws_config.scoring.method must be one of: {valid_methods}") |
|
|
|
|
| def _validate_ibws_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate Iterative Best-Worst Scaling configuration. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the IBWS config is invalid |
| """ |
| |
| if 'bws_config' in config_data: |
| raise ConfigValidationError( |
| "ibws_config and bws_config are mutually exclusive. " |
| "Use ibws_config for iterative BWS or bws_config for standard BWS." |
| ) |
|
|
| ibws = config_data['ibws_config'] |
| if not isinstance(ibws, dict): |
| raise ConfigValidationError("ibws_config must be a dictionary") |
|
|
| |
| schemes = config_data.get('annotation_schemes', []) |
| has_bws_scheme = any(s.get('annotation_type') == 'bws' for s in schemes) |
| if not has_bws_scheme: |
| raise ConfigValidationError( |
| "ibws_config requires at least one annotation scheme with annotation_type: bws" |
| ) |
|
|
| |
| if 'tuple_size' in ibws: |
| if not isinstance(ibws['tuple_size'], int) or ibws['tuple_size'] < 2: |
| raise ConfigValidationError("ibws_config.tuple_size must be an integer >= 2") |
|
|
| |
| if 'max_rounds' in ibws and ibws['max_rounds'] is not None: |
| if not isinstance(ibws['max_rounds'], int) or ibws['max_rounds'] < 1: |
| raise ConfigValidationError("ibws_config.max_rounds must be a positive integer or null") |
|
|
| |
| if 'seed' in ibws: |
| if not isinstance(ibws['seed'], int): |
| raise ConfigValidationError("ibws_config.seed must be an integer") |
|
|
| |
| valid_methods = ['counting', 'bradley_terry', 'plackett_luce'] |
| if 'scoring_method' in ibws: |
| if ibws['scoring_method'] not in valid_methods: |
| raise ConfigValidationError( |
| f"ibws_config.scoring_method must be one of: {valid_methods}" |
| ) |
|
|
| |
| if 'tuples_per_item_per_round' in ibws: |
| val = ibws['tuples_per_item_per_round'] |
| if not isinstance(val, int) or val < 1: |
| raise ConfigValidationError( |
| "ibws_config.tuples_per_item_per_round must be a positive integer" |
| ) |
|
|
|
|
| def _validate_mace_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate MACE competence estimation configuration. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the MACE config is invalid |
| """ |
| mace = config_data.get('mace', {}) |
| if not isinstance(mace, dict): |
| raise ConfigValidationError("mace must be a dictionary") |
|
|
| if not mace.get('enabled', False): |
| return |
|
|
| |
| min_annots = mace.get('min_annotations_per_item', 3) |
| if not isinstance(min_annots, int) or min_annots < 2: |
| raise ConfigValidationError( |
| "mace.min_annotations_per_item must be an integer >= 2" |
| ) |
|
|
| trigger_n = mace.get('trigger_every_n', 10) |
| if not isinstance(trigger_n, int) or trigger_n < 1: |
| raise ConfigValidationError( |
| "mace.trigger_every_n must be an integer >= 1" |
| ) |
|
|
| num_restarts = mace.get('num_restarts', 10) |
| if not isinstance(num_restarts, int) or num_restarts < 1: |
| raise ConfigValidationError( |
| "mace.num_restarts must be an integer >= 1" |
| ) |
|
|
| |
| categorical_types = {'radio', 'likert', 'select', 'multiselect'} |
| schemes = config_data.get('annotation_schemes', []) |
| has_categorical = any( |
| s.get('annotation_type', '') in categorical_types |
| for s in schemes if isinstance(s, dict) |
| ) |
| if not has_categorical: |
| logger.warning( |
| "MACE is enabled but no categorical annotation schemes " |
| "(radio, likert, select, multiselect) are defined. " |
| "MACE will have no data to process." |
| ) |
|
|
|
|
| def _validate_phase_instruments(phase: Dict[str, Any], phase_name: str) -> None: |
| """ |
| Validate instrument references in a phase configuration. |
| |
| Args: |
| phase: The phase configuration |
| phase_name: Name of the phase for error messages |
| |
| Raises: |
| ConfigValidationError: If instrument references are invalid |
| """ |
| |
| if 'instrument' in phase: |
| inst_id = phase['instrument'] |
| if not isinstance(inst_id, str): |
| raise ConfigValidationError( |
| f"Phase {phase_name}: 'instrument' must be a string" |
| ) |
| try: |
| from potato.survey_instruments import get_registry |
| registry = get_registry() |
| if inst_id not in registry['instruments']: |
| available = sorted(registry['instruments'].keys())[:10] |
| raise ConfigValidationError( |
| f"Phase {phase_name}: Unknown instrument '{inst_id}'. " |
| f"Available instruments: {available}..." |
| ) |
| except ImportError: |
| |
| pass |
|
|
| |
| if 'instruments' in phase: |
| inst_list = phase['instruments'] |
| if not isinstance(inst_list, list): |
| raise ConfigValidationError( |
| f"Phase {phase_name}: 'instruments' must be a list" |
| ) |
| try: |
| from potato.survey_instruments import get_registry |
| registry = get_registry() |
| for inst_id in inst_list: |
| if not isinstance(inst_id, str): |
| raise ConfigValidationError( |
| f"Phase {phase_name}: All items in 'instruments' must be strings" |
| ) |
| if inst_id not in registry['instruments']: |
| available = sorted(registry['instruments'].keys())[:10] |
| raise ConfigValidationError( |
| f"Phase {phase_name}: Unknown instrument '{inst_id}'. " |
| f"Available instruments: {available}..." |
| ) |
| except ImportError: |
| |
| pass |
|
|
|
|
| def validate_single_annotation_scheme(scheme: Dict[str, Any], path: str) -> None: |
| """ |
| Validate a single annotation scheme. |
| |
| Args: |
| scheme: The annotation scheme to validate |
| path: The path in the config for error reporting |
| |
| Raises: |
| ConfigValidationError: If the scheme is invalid |
| """ |
| if not isinstance(scheme, dict): |
| raise ConfigValidationError(f"{path} must be a dictionary") |
|
|
| required_fields = ['annotation_type', 'name', 'description'] |
| missing_fields = [field for field in required_fields if field not in scheme] |
| if missing_fields: |
| raise ConfigValidationError(f"{path} missing required fields: {', '.join(missing_fields)}") |
|
|
| |
| from potato.server_utils.schemas.registry import schema_registry |
| valid_types = schema_registry.get_supported_types() |
| if scheme['annotation_type'] not in valid_types: |
| raise ConfigValidationError(f"{path}.annotation_type must be one of: {', '.join(sorted(valid_types))}") |
|
|
| |
| |
| |
| |
| |
| annotation_type = scheme['annotation_type'] |
| _types_with_explicit_validation = { |
| 'radio', 'multiselect', 'select', 'likert', 'slider', 'span', 'multirate', |
| 'image_annotation', 'audio_annotation', 'video_annotation', 'tiered_annotation', |
| 'pairwise', 'bws', 'soft_label', 'confidence', 'constant_sum', |
| 'semantic_differential', 'ranking', 'range_slider', 'hierarchical_multiselect', |
| 'vas', 'rubric_eval', 'error_span', 'card_sort', 'conjoint', |
| } |
| if annotation_type not in _types_with_explicit_validation: |
| schema_def = schema_registry.get(annotation_type) |
| if schema_def and schema_def.required_fields: |
| |
| extra_required = [f for f in schema_def.required_fields |
| if f not in ('name', 'description')] |
| missing = [f for f in extra_required if f not in scheme] |
| if missing: |
| raise ConfigValidationError( |
| f"{path} (type '{annotation_type}') missing required field(s): " |
| f"{', '.join(missing)}" |
| ) |
|
|
| |
| if annotation_type in ['radio', 'multiselect', 'select']: |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for {annotation_type} annotation type") |
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels cannot be empty") |
|
|
| elif annotation_type == 'likert': |
| |
| if 'labels' not in scheme: |
| required_likert_fields = ['min_label', 'max_label', 'size'] |
| missing_likert_fields = [field for field in required_likert_fields if field not in scheme] |
| if missing_likert_fields: |
| raise ConfigValidationError(f"{path} missing required fields for likert: {', '.join(missing_likert_fields)}") |
|
|
| if not isinstance(scheme['size'], int) or scheme['size'] < 2: |
| raise ConfigValidationError(f"{path}.size must be an integer >= 2") |
|
|
| elif annotation_type == 'slider': |
| |
| if 'labels' not in scheme: |
| required_slider_fields = ['min_value', 'max_value', 'starting_value'] |
| missing_slider_fields = [field for field in required_slider_fields if field not in scheme] |
| if missing_slider_fields: |
| raise ConfigValidationError(f"{path} missing required fields for slider: {', '.join(missing_slider_fields)}") |
|
|
| if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): |
| raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") |
| if scheme['min_value'] >= scheme['max_value']: |
| raise ConfigValidationError(f"{path}.min_value must be less than max_value") |
|
|
| elif annotation_type == 'span': |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for span annotation type") |
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels cannot be empty") |
|
|
| elif annotation_type == 'multirate': |
| |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing required field for multirate: labels") |
|
|
| has_options = 'options' in scheme |
| has_options_from_data = 'options_from_data' in scheme |
|
|
| if not has_options and not has_options_from_data: |
| raise ConfigValidationError(f"{path} must have either 'options' or 'options_from_data' for multirate") |
|
|
| if has_options: |
| if not isinstance(scheme['options'], list): |
| raise ConfigValidationError(f"{path}.options must be a list") |
| if not scheme['options']: |
| raise ConfigValidationError(f"{path}.options cannot be empty") |
|
|
| if has_options_from_data: |
| if not isinstance(scheme['options_from_data'], str) or not scheme['options_from_data'].strip(): |
| raise ConfigValidationError(f"{path}.options_from_data must be a non-empty string (instance data field name)") |
|
|
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels cannot be empty") |
|
|
| elif annotation_type == 'image_annotation': |
| |
| if 'tools' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'tools' field for image_annotation type") |
| if not isinstance(scheme['tools'], list): |
| raise ConfigValidationError(f"{path}.tools must be a list") |
| if not scheme['tools']: |
| raise ConfigValidationError(f"{path}.tools cannot be empty") |
|
|
| |
| valid_tools = ['bbox', 'polygon', 'freeform', 'landmark', 'fill', 'eraser', 'brush'] |
| invalid_tools = [t for t in scheme['tools'] if t not in valid_tools] |
| if invalid_tools: |
| raise ConfigValidationError(f"{path}.tools contains invalid values: {invalid_tools}. Valid tools are: {valid_tools}") |
|
|
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for image_annotation type") |
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels cannot be empty") |
|
|
| |
| if 'min_annotations' in scheme: |
| if not isinstance(scheme['min_annotations'], int) or scheme['min_annotations'] < 0: |
| raise ConfigValidationError(f"{path}.min_annotations must be a non-negative integer") |
|
|
| if 'max_annotations' in scheme and scheme['max_annotations'] is not None: |
| if not isinstance(scheme['max_annotations'], int) or scheme['max_annotations'] < 1: |
| raise ConfigValidationError(f"{path}.max_annotations must be a positive integer or null") |
|
|
| elif annotation_type == 'audio_annotation': |
| |
| valid_modes = ['label', 'questions', 'both'] |
| mode = scheme.get('mode', 'label') |
| if mode not in valid_modes: |
| raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") |
|
|
| |
| if mode in ['label', 'both']: |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for audio_annotation mode '{mode}'") |
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels cannot be empty for mode '{mode}'") |
|
|
| |
| if mode in ['questions', 'both']: |
| if 'segment_schemes' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'segment_schemes' field for audio_annotation mode '{mode}'") |
| if not isinstance(scheme['segment_schemes'], list): |
| raise ConfigValidationError(f"{path}.segment_schemes must be a list") |
| if not scheme['segment_schemes']: |
| raise ConfigValidationError(f"{path}.segment_schemes cannot be empty for mode '{mode}'") |
|
|
| |
| if 'min_segments' in scheme: |
| if not isinstance(scheme['min_segments'], int) or scheme['min_segments'] < 0: |
| raise ConfigValidationError(f"{path}.min_segments must be a non-negative integer") |
|
|
| if 'max_segments' in scheme and scheme['max_segments'] is not None: |
| if not isinstance(scheme['max_segments'], int) or scheme['max_segments'] < 1: |
| raise ConfigValidationError(f"{path}.max_segments must be a positive integer or null") |
|
|
| elif annotation_type == 'video_annotation': |
| |
| valid_modes = ['segment', 'frame', 'keyframe', 'tracking', 'combined'] |
| mode = scheme.get('mode', 'segment') |
| if mode not in valid_modes: |
| raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") |
|
|
| |
| if mode in ['segment', 'frame', 'keyframe', 'combined']: |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for video_annotation mode '{mode}'") |
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels cannot be empty for mode '{mode}'") |
|
|
| |
| if 'min_segments' in scheme: |
| if not isinstance(scheme['min_segments'], int) or scheme['min_segments'] < 0: |
| raise ConfigValidationError(f"{path}.min_segments must be a non-negative integer") |
|
|
| if 'max_segments' in scheme and scheme['max_segments'] is not None: |
| if not isinstance(scheme['max_segments'], int) or scheme['max_segments'] < 1: |
| raise ConfigValidationError(f"{path}.max_segments must be a positive integer or null") |
|
|
| if 'timeline_height' in scheme: |
| if not isinstance(scheme['timeline_height'], int) or scheme['timeline_height'] < 30: |
| raise ConfigValidationError(f"{path}.timeline_height must be an integer >= 30") |
|
|
| if 'video_fps' in scheme: |
| if not isinstance(scheme['video_fps'], (int, float)) or scheme['video_fps'] <= 0: |
| raise ConfigValidationError(f"{path}.video_fps must be a positive number") |
|
|
| elif annotation_type == 'tiered_annotation': |
| |
| if 'tiers' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'tiers' field for tiered_annotation") |
| if not isinstance(scheme['tiers'], list): |
| raise ConfigValidationError(f"{path}.tiers must be a list") |
| if not scheme['tiers']: |
| raise ConfigValidationError(f"{path}.tiers cannot be empty") |
|
|
| if 'source_field' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'source_field' field for tiered_annotation") |
|
|
| |
| media_type = scheme.get('media_type', 'audio') |
| if media_type not in ['audio', 'video']: |
| raise ConfigValidationError(f"{path}.media_type must be 'audio' or 'video'") |
|
|
| |
| tier_names = set() |
| valid_tier_types = ['independent', 'dependent'] |
| valid_constraint_types = ['time_subdivision', 'included_in', 'symbolic_association', 'symbolic_subdivision', 'none'] |
|
|
| for i, tier in enumerate(scheme['tiers']): |
| tier_path = f"{path}.tiers[{i}]" |
|
|
| if not isinstance(tier, dict): |
| raise ConfigValidationError(f"{tier_path} must be a dictionary") |
|
|
| if 'name' not in tier: |
| raise ConfigValidationError(f"{tier_path} missing 'name' field") |
|
|
| tier_name = tier['name'] |
| if tier_name in tier_names: |
| raise ConfigValidationError(f"{tier_path} duplicate tier name: '{tier_name}'") |
| tier_names.add(tier_name) |
|
|
| |
| tier_type = tier.get('tier_type', 'independent') |
| if tier_type not in valid_tier_types: |
| raise ConfigValidationError(f"{tier_path}.tier_type must be one of: {valid_tier_types}") |
|
|
| |
| if tier_type == 'dependent': |
| if 'parent_tier' not in tier: |
| raise ConfigValidationError(f"{tier_path} dependent tier must have 'parent_tier'") |
|
|
| |
| constraint_type = tier.get('constraint_type', 'none') |
| if constraint_type not in valid_constraint_types: |
| raise ConfigValidationError(f"{tier_path}.constraint_type must be one of: {valid_constraint_types}") |
|
|
| |
| for i, tier in enumerate(scheme['tiers']): |
| parent = tier.get('parent_tier') |
| if parent and parent not in tier_names: |
| raise ConfigValidationError(f"{path}.tiers[{i}] references unknown parent_tier: '{parent}'") |
| if parent and parent == tier['name']: |
| raise ConfigValidationError(f"{path}.tiers[{i}] cannot be its own parent") |
|
|
| |
| if 'tier_height' in scheme: |
| if not isinstance(scheme['tier_height'], int) or scheme['tier_height'] < 20: |
| raise ConfigValidationError(f"{path}.tier_height must be an integer >= 20") |
|
|
| elif annotation_type == 'pairwise': |
| |
| valid_modes = ['binary', 'scale', 'multi_dimension'] |
| mode = scheme.get('mode', 'binary') |
| if mode not in valid_modes: |
| raise ConfigValidationError(f"{path}.mode must be one of: {valid_modes}") |
|
|
| |
| if 'labels' in scheme: |
| if not isinstance(scheme['labels'], list): |
| raise ConfigValidationError(f"{path}.labels must be a list") |
| if len(scheme['labels']) < 2: |
| raise ConfigValidationError(f"{path}.labels must have at least 2 items (for A and B)") |
|
|
| |
| if mode == 'scale': |
| scale = scheme.get('scale', {}) |
| if not isinstance(scale, dict): |
| raise ConfigValidationError(f"{path}.scale must be a dictionary") |
|
|
| |
| min_val = scale.get('min', -3) |
| max_val = scale.get('max', 3) |
| if not isinstance(min_val, (int, float)) or not isinstance(max_val, (int, float)): |
| raise ConfigValidationError(f"{path}.scale.min and scale.max must be numbers") |
| if min_val >= max_val: |
| raise ConfigValidationError(f"{path}.scale.min must be less than scale.max") |
|
|
| |
| step = scale.get('step', 1) |
| if not isinstance(step, (int, float)) or step <= 0: |
| raise ConfigValidationError(f"{path}.scale.step must be a positive number") |
|
|
| |
| if 'labels' in scale: |
| scale_labels = scale['labels'] |
| if not isinstance(scale_labels, dict): |
| raise ConfigValidationError(f"{path}.scale.labels must be a dictionary") |
|
|
| |
| if mode == 'multi_dimension': |
| dimensions = scheme.get('dimensions', []) |
| if not isinstance(dimensions, list) or not dimensions: |
| raise ConfigValidationError(f"{path}.dimensions must be a non-empty list for multi_dimension mode") |
| for i, dim in enumerate(dimensions): |
| if not isinstance(dim, dict): |
| raise ConfigValidationError(f"{path}.dimensions[{i}] must be a dictionary") |
| if 'name' not in dim: |
| raise ConfigValidationError(f"{path}.dimensions[{i}] must have a 'name' field") |
|
|
| elif annotation_type == 'bws': |
| |
| if 'tuple_size' in scheme: |
| if not isinstance(scheme['tuple_size'], int) or scheme['tuple_size'] < 2: |
| raise ConfigValidationError(f"{path}.tuple_size must be an integer >= 2") |
|
|
| elif annotation_type == 'soft_label': |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for soft_label annotation type") |
| if not isinstance(scheme['labels'], list) or not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels must be a non-empty list") |
| if 'total' in scheme: |
| if not isinstance(scheme['total'], int) or scheme['total'] < 1: |
| raise ConfigValidationError(f"{path}.total must be a positive integer") |
|
|
| elif annotation_type == 'confidence': |
| if 'scale_type' in scheme: |
| if scheme['scale_type'] not in ['likert', 'slider']: |
| raise ConfigValidationError(f"{path}.scale_type must be 'likert' or 'slider'") |
| if 'scale_points' in scheme: |
| if not isinstance(scheme['scale_points'], int) or scheme['scale_points'] < 2: |
| raise ConfigValidationError(f"{path}.scale_points must be an integer >= 2") |
|
|
| elif annotation_type == 'constant_sum': |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for constant_sum annotation type") |
| if not isinstance(scheme['labels'], list) or not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels must be a non-empty list") |
| if 'total_points' in scheme: |
| if not isinstance(scheme['total_points'], int) or scheme['total_points'] < 1: |
| raise ConfigValidationError(f"{path}.total_points must be a positive integer") |
|
|
| elif annotation_type == 'semantic_differential': |
| if 'pairs' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'pairs' field for semantic_differential annotation type") |
| if not isinstance(scheme['pairs'], list) or not scheme['pairs']: |
| raise ConfigValidationError(f"{path}.pairs must be a non-empty list") |
| for i, pair in enumerate(scheme['pairs']): |
| if not isinstance(pair, list) or len(pair) != 2: |
| raise ConfigValidationError(f"{path}.pairs[{i}] must be a list of exactly two strings") |
|
|
| elif annotation_type == 'ranking': |
| if 'labels' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'labels' field for ranking annotation type") |
| if not isinstance(scheme['labels'], list) or not scheme['labels']: |
| raise ConfigValidationError(f"{path}.labels must be a non-empty list") |
|
|
| elif annotation_type == 'range_slider': |
| if 'min_value' in scheme and 'max_value' in scheme: |
| if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): |
| raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") |
| if scheme['min_value'] >= scheme['max_value']: |
| raise ConfigValidationError(f"{path}.min_value must be less than max_value") |
|
|
| elif annotation_type == 'hierarchical_multiselect': |
| if 'taxonomy' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'taxonomy' field for hierarchical_multiselect annotation type") |
| if not isinstance(scheme['taxonomy'], dict) or not scheme['taxonomy']: |
| raise ConfigValidationError(f"{path}.taxonomy must be a non-empty dictionary") |
|
|
| elif annotation_type == 'vas': |
| if 'min_value' in scheme and 'max_value' in scheme: |
| if not isinstance(scheme['min_value'], (int, float)) or not isinstance(scheme['max_value'], (int, float)): |
| raise ConfigValidationError(f"{path}.min_value and max_value must be numbers") |
| if scheme['min_value'] >= scheme['max_value']: |
| raise ConfigValidationError(f"{path}.min_value must be less than max_value") |
|
|
| elif annotation_type == 'rubric_eval': |
| if 'criteria' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'criteria' field for rubric_eval annotation type") |
| if not isinstance(scheme['criteria'], list) or not scheme['criteria']: |
| raise ConfigValidationError(f"{path}.criteria must be a non-empty list") |
| for i, crit in enumerate(scheme['criteria']): |
| if not isinstance(crit, dict) or 'name' not in crit: |
| raise ConfigValidationError(f"{path}.criteria[{i}] must be a dict with 'name'") |
| if 'scale_points' in scheme: |
| if not isinstance(scheme['scale_points'], int) or scheme['scale_points'] < 2: |
| raise ConfigValidationError(f"{path}.scale_points must be an integer >= 2") |
|
|
| elif annotation_type == 'error_span': |
| if 'error_types' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'error_types' field for error_span annotation type") |
| if not isinstance(scheme['error_types'], list) or not scheme['error_types']: |
| raise ConfigValidationError(f"{path}.error_types must be a non-empty list") |
| for i, et in enumerate(scheme['error_types']): |
| if not isinstance(et, dict) or 'name' not in et: |
| raise ConfigValidationError(f"{path}.error_types[{i}] must be a dict with 'name'") |
|
|
| elif annotation_type == 'card_sort': |
| mode = scheme.get('mode', 'closed') |
| if mode not in ['open', 'closed']: |
| raise ConfigValidationError(f"{path}.mode must be 'open' or 'closed'") |
| if mode == 'closed': |
| if 'groups' not in scheme: |
| raise ConfigValidationError(f"{path} missing 'groups' field for card_sort in closed mode") |
| if not isinstance(scheme['groups'], list) or not scheme['groups']: |
| raise ConfigValidationError(f"{path}.groups must be a non-empty list for closed mode") |
|
|
| elif annotation_type == 'conjoint': |
| if 'attributes' not in scheme and 'profiles_field' not in scheme: |
| raise ConfigValidationError(f"{path} requires 'attributes' or 'profiles_field' for conjoint annotation type") |
| if 'attributes' in scheme: |
| if not isinstance(scheme['attributes'], list) or not scheme['attributes']: |
| raise ConfigValidationError(f"{path}.attributes must be a non-empty list") |
| for i, attr in enumerate(scheme['attributes']): |
| if not isinstance(attr, dict) or 'name' not in attr: |
| raise ConfigValidationError(f"{path}.attributes[{i}] must be a dict with 'name'") |
| if 'profiles_per_set' in scheme: |
| if not isinstance(scheme['profiles_per_set'], int) or scheme['profiles_per_set'] < 2: |
| raise ConfigValidationError(f"{path}.profiles_per_set must be an integer >= 2") |
|
|
| |
| if 'display_logic' in scheme: |
| validate_display_logic_structure(scheme['display_logic'], path) |
|
|
|
|
| def validate_display_logic_structure(display_logic: Dict[str, Any], path: str) -> None: |
| """ |
| Validate the structure of a display_logic configuration block. |
| |
| This validates the syntax and structure of a single display_logic block. |
| Cross-schema validation (checking referenced schemas exist) is done separately |
| in validate_display_logic_references(). |
| |
| Args: |
| display_logic: The display_logic configuration |
| path: Path in the config for error reporting |
| |
| Raises: |
| ConfigValidationError: If the display_logic is invalid |
| """ |
| from potato.server_utils.display_logic import SUPPORTED_OPERATORS |
|
|
| if not isinstance(display_logic, dict): |
| raise ConfigValidationError(f"{path}.display_logic must be a dictionary") |
|
|
| |
| if 'show_when' not in display_logic: |
| raise ConfigValidationError(f"{path}.display_logic must have 'show_when' field") |
|
|
| show_when = display_logic['show_when'] |
| if not isinstance(show_when, list): |
| raise ConfigValidationError(f"{path}.display_logic.show_when must be a list of conditions") |
|
|
| if len(show_when) == 0: |
| raise ConfigValidationError(f"{path}.display_logic.show_when must have at least one condition") |
|
|
| |
| for i, condition in enumerate(show_when): |
| cond_path = f"{path}.display_logic.show_when[{i}]" |
|
|
| if not isinstance(condition, dict): |
| raise ConfigValidationError(f"{cond_path} must be a dictionary") |
|
|
| |
| if 'schema' not in condition: |
| raise ConfigValidationError(f"{cond_path} missing required 'schema' field") |
|
|
| if 'operator' not in condition: |
| raise ConfigValidationError(f"{cond_path} missing required 'operator' field") |
|
|
| operator = condition['operator'] |
| if operator not in SUPPORTED_OPERATORS: |
| raise ConfigValidationError( |
| f"{cond_path}.operator '{operator}' is not supported. " |
| f"Valid operators: {list(SUPPORTED_OPERATORS.keys())}" |
| ) |
|
|
| |
| value = condition.get('value') |
|
|
| |
| if operator in ('empty', 'not_empty'): |
| pass |
| |
| elif operator in ('in_range', 'not_in_range', 'length_in_range'): |
| if not isinstance(value, (list, tuple)): |
| raise ConfigValidationError( |
| f"{cond_path}: operator '{operator}' requires a range value as [min, max]" |
| ) |
| if len(value) != 2: |
| raise ConfigValidationError( |
| f"{cond_path}: range value must have exactly 2 elements [min, max]" |
| ) |
| try: |
| min_val, max_val = float(value[0]), float(value[1]) |
| if min_val > max_val: |
| raise ConfigValidationError( |
| f"{cond_path}: range min ({min_val}) is greater than max ({max_val})" |
| ) |
| except (ValueError, TypeError): |
| raise ConfigValidationError(f"{cond_path}: range values must be numeric") |
| |
| elif operator in ('gt', 'gte', 'lt', 'lte', 'length_gt', 'length_lt'): |
| if value is None: |
| raise ConfigValidationError(f"{cond_path}: operator '{operator}' requires a value") |
| try: |
| float(value) |
| except (ValueError, TypeError): |
| raise ConfigValidationError( |
| f"{cond_path}: operator '{operator}' requires a numeric value" |
| ) |
| |
| elif operator == 'matches': |
| if value is None: |
| raise ConfigValidationError(f"{cond_path}: operator 'matches' requires a regex pattern") |
| try: |
| import re |
| re.compile(value) |
| except re.error as e: |
| raise ConfigValidationError(f"{cond_path}: invalid regex pattern '{value}': {e}") |
| |
| elif value is None: |
| raise ConfigValidationError(f"{cond_path}: operator '{operator}' requires a value") |
|
|
| |
| logic = display_logic.get('logic', 'all') |
| if logic not in ('all', 'any'): |
| raise ConfigValidationError( |
| f"{path}.display_logic.logic must be 'all' or 'any', got '{logic}'" |
| ) |
|
|
|
|
| def validate_display_logic_references(annotation_schemes: List[Dict[str, Any]]) -> None: |
| """ |
| Validate that all display_logic references point to existing schemas |
| and check for circular dependencies. |
| |
| This is called after all annotation schemes have been validated individually. |
| |
| Args: |
| annotation_schemes: List of annotation scheme configurations |
| |
| Raises: |
| ConfigValidationError: If there are invalid references or circular dependencies |
| """ |
| from potato.server_utils.display_logic import validate_display_logic_config |
|
|
| |
| is_valid, errors = validate_display_logic_config(annotation_schemes) |
|
|
| if not is_valid: |
| |
| error_msg = "Display logic validation errors:\n" + "\n".join(f" - {e}" for e in errors) |
| raise ConfigValidationError(error_msg) |
|
|
|
|
| def validate_server_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate server configuration section. |
| |
| The server section allows specifying server settings in the YAML config |
| instead of via command-line flags. CLI flags take precedence over config values. |
| |
| Supported options: |
| - port: Port number to run on (1-65535) |
| - host: Host address to bind to (default: localhost) |
| - debug: Enable Flask debug mode (default: false) |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the server configuration is invalid |
| """ |
| if "server" not in config_data: |
| return |
|
|
| server_config = config_data["server"] |
|
|
| if not isinstance(server_config, dict): |
| raise ConfigValidationError("server configuration must be a dictionary") |
|
|
| |
| if "port" in server_config: |
| port = server_config["port"] |
| if not isinstance(port, int): |
| raise ConfigValidationError("server.port must be an integer") |
| if port < 1 or port > 65535: |
| raise ConfigValidationError("server.port must be between 1 and 65535") |
|
|
| |
| if "host" in server_config: |
| host = server_config["host"] |
| if not isinstance(host, str): |
| raise ConfigValidationError("server.host must be a string") |
| if not host.strip(): |
| raise ConfigValidationError("server.host cannot be empty") |
|
|
| |
| if "debug" in server_config: |
| if not isinstance(server_config["debug"], bool): |
| raise ConfigValidationError("server.debug must be a boolean") |
|
|
|
|
| def validate_authentication_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate authentication configuration section. |
| |
| Validates OAuth/OIDC provider settings, required fields, and |
| warns about common misconfigurations. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the authentication configuration is invalid |
| """ |
| if "authentication" not in config_data: |
| return |
|
|
| auth_config = config_data["authentication"] |
|
|
| if not isinstance(auth_config, dict): |
| raise ConfigValidationError("authentication configuration must be a dictionary") |
|
|
| method = auth_config.get("method", "in_memory") |
| valid_methods = ["in_memory", "database", "clerk", "oauth"] |
| if method not in valid_methods: |
| raise ConfigValidationError( |
| f"authentication.method must be one of: {', '.join(valid_methods)}. " |
| f"Got: '{method}'" |
| ) |
|
|
| |
| if method == "oauth": |
| |
| providers = auth_config.get("providers") |
| if not providers or not isinstance(providers, dict): |
| raise ConfigValidationError( |
| "authentication.providers is required when method is 'oauth' " |
| "and must be a dictionary with at least one provider" |
| ) |
|
|
| if len(providers) == 0: |
| raise ConfigValidationError( |
| "authentication.providers must contain at least one provider" |
| ) |
|
|
| |
| for name, pconfig in providers.items(): |
| if not isinstance(pconfig, dict): |
| raise ConfigValidationError( |
| f"authentication.providers.{name} must be a dictionary" |
| ) |
|
|
| |
| if "client_id" not in pconfig: |
| raise ConfigValidationError( |
| f"authentication.providers.{name}.client_id is required" |
| ) |
| if "client_secret" not in pconfig: |
| raise ConfigValidationError( |
| f"authentication.providers.{name}.client_secret is required" |
| ) |
|
|
| |
| if name not in ("google", "github") and "discovery_url" not in pconfig: |
| raise ConfigValidationError( |
| f"authentication.providers.{name} requires 'discovery_url' " |
| f"for OIDC providers (only 'google' and 'github' have built-in URLs)" |
| ) |
|
|
| |
| if "allowed_domain" in pconfig: |
| domain = pconfig["allowed_domain"] |
| if not isinstance(domain, str) or not domain.strip(): |
| raise ConfigValidationError( |
| f"authentication.providers.{name}.allowed_domain must be a non-empty string" |
| ) |
|
|
| if "allowed_org" in pconfig: |
| org = pconfig["allowed_org"] |
| if not isinstance(org, str) or not org.strip(): |
| raise ConfigValidationError( |
| f"authentication.providers.{name}.allowed_org must be a non-empty string" |
| ) |
|
|
| if "scopes" in pconfig: |
| scopes = pconfig["scopes"] |
| if not isinstance(scopes, list): |
| raise ConfigValidationError( |
| f"authentication.providers.{name}.scopes must be a list" |
| ) |
|
|
| |
| identity_field = auth_config.get("user_identity_field", "email") |
| valid_fields = ["email", "username", "sub", "name"] |
| if identity_field not in valid_fields: |
| raise ConfigValidationError( |
| f"authentication.user_identity_field must be one of: " |
| f"{', '.join(valid_fields)}. Got: '{identity_field}'" |
| ) |
|
|
| |
| if "secret_key" not in config_data: |
| import os |
| if not os.environ.get("POTATO_SECRET_KEY"): |
| logger.warning( |
| "OAuth is configured but no 'secret_key' is set in config " |
| "and POTATO_SECRET_KEY environment variable is not set. " |
| "Sessions will be lost on server restart. " |
| "Set 'secret_key' in config or POTATO_SECRET_KEY env var." |
| ) |
|
|
| |
| if method == "database": |
| db_url = auth_config.get("database_url") |
| if db_url: |
| if not (db_url.startswith("sqlite:///") or db_url.startswith("postgresql://")): |
| raise ConfigValidationError( |
| "authentication.database_url must start with 'sqlite:///' or 'postgresql://'. " |
| f"Got: '{db_url}'" |
| ) |
|
|
| |
| if "user_config_path" in auth_config: |
| raise ConfigValidationError( |
| "authentication.user_config_path cannot be used with method 'database'. " |
| "The database backend handles its own user persistence." |
| ) |
|
|
|
|
| def validate_quality_control_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate quality control configuration (attention checks, gold standards, pre-annotation). |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the configuration is invalid |
| """ |
| |
| if "attention_checks" in config_data: |
| attn_config = config_data["attention_checks"] |
| if not isinstance(attn_config, dict): |
| raise ConfigValidationError("attention_checks must be a dictionary") |
|
|
| if attn_config.get("enabled", False): |
| |
| if "items_file" not in attn_config: |
| raise ConfigValidationError("attention_checks.items_file is required when enabled") |
| if not isinstance(attn_config["items_file"], str): |
| raise ConfigValidationError("attention_checks.items_file must be a string path") |
|
|
| |
| has_frequency = "frequency" in attn_config |
| has_probability = "probability" in attn_config |
|
|
| if has_frequency and has_probability: |
| raise ConfigValidationError("attention_checks: specify either 'frequency' or 'probability', not both") |
|
|
| if has_frequency: |
| freq = attn_config["frequency"] |
| if not isinstance(freq, int) or freq < 1: |
| raise ConfigValidationError("attention_checks.frequency must be a positive integer") |
|
|
| if has_probability: |
| prob = attn_config["probability"] |
| if not isinstance(prob, (int, float)) or prob < 0 or prob > 1: |
| raise ConfigValidationError("attention_checks.probability must be a number between 0 and 1") |
|
|
| |
| if "min_response_time" in attn_config: |
| min_time = attn_config["min_response_time"] |
| if not isinstance(min_time, (int, float)) or min_time < 0: |
| raise ConfigValidationError("attention_checks.min_response_time must be a non-negative number") |
|
|
| |
| if "failure_handling" in attn_config: |
| failure_config = attn_config["failure_handling"] |
| if not isinstance(failure_config, dict): |
| raise ConfigValidationError("attention_checks.failure_handling must be a dictionary") |
|
|
| if "warn_threshold" in failure_config: |
| warn = failure_config["warn_threshold"] |
| if not isinstance(warn, int) or warn < 1: |
| raise ConfigValidationError("attention_checks.failure_handling.warn_threshold must be a positive integer") |
|
|
| if "block_threshold" in failure_config: |
| block = failure_config["block_threshold"] |
| if not isinstance(block, int) or block < 1: |
| raise ConfigValidationError("attention_checks.failure_handling.block_threshold must be a positive integer") |
|
|
| |
| warn = failure_config.get("warn_threshold", 2) |
| if block <= warn: |
| raise ConfigValidationError("attention_checks.failure_handling.block_threshold must be greater than warn_threshold") |
|
|
| |
| if "gold_standards" in config_data: |
| gold_config = config_data["gold_standards"] |
| if not isinstance(gold_config, dict): |
| raise ConfigValidationError("gold_standards must be a dictionary") |
|
|
| if gold_config.get("enabled", False): |
| |
| if "items_file" not in gold_config: |
| raise ConfigValidationError("gold_standards.items_file is required when enabled") |
| if not isinstance(gold_config["items_file"], str): |
| raise ConfigValidationError("gold_standards.items_file must be a string path") |
|
|
| |
| if "mode" in gold_config: |
| valid_modes = ["training", "mixed", "separate"] |
| if gold_config["mode"] not in valid_modes: |
| raise ConfigValidationError(f"gold_standards.mode must be one of: {', '.join(valid_modes)}") |
|
|
| |
| if "frequency" in gold_config: |
| freq = gold_config["frequency"] |
| if not isinstance(freq, int) or freq < 1: |
| raise ConfigValidationError("gold_standards.frequency must be a positive integer") |
|
|
| |
| if "accuracy" in gold_config: |
| accuracy_config = gold_config["accuracy"] |
| if not isinstance(accuracy_config, dict): |
| raise ConfigValidationError("gold_standards.accuracy must be a dictionary") |
|
|
| if "min_threshold" in accuracy_config: |
| threshold = accuracy_config["min_threshold"] |
| if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: |
| raise ConfigValidationError("gold_standards.accuracy.min_threshold must be between 0 and 1") |
|
|
| if "evaluation_count" in accuracy_config: |
| count = accuracy_config["evaluation_count"] |
| if not isinstance(count, int) or count < 1: |
| raise ConfigValidationError("gold_standards.accuracy.evaluation_count must be a positive integer") |
|
|
| |
| if "auto_promote" in gold_config: |
| auto_promote = gold_config["auto_promote"] |
| if not isinstance(auto_promote, dict): |
| raise ConfigValidationError("gold_standards.auto_promote must be a dictionary") |
|
|
| if "min_annotators" in auto_promote: |
| min_ann = auto_promote["min_annotators"] |
| if not isinstance(min_ann, int) or min_ann < 2: |
| raise ConfigValidationError("gold_standards.auto_promote.min_annotators must be an integer >= 2") |
|
|
| if "agreement_threshold" in auto_promote: |
| threshold = auto_promote["agreement_threshold"] |
| if not isinstance(threshold, (int, float)) or threshold < 0.5 or threshold > 1.0: |
| raise ConfigValidationError("gold_standards.auto_promote.agreement_threshold must be between 0.5 and 1.0") |
|
|
| |
| if "pre_annotation" in config_data: |
| pre_config = config_data["pre_annotation"] |
| if not isinstance(pre_config, dict): |
| raise ConfigValidationError("pre_annotation must be a dictionary") |
|
|
| if pre_config.get("enabled", False): |
| |
| if "field" in pre_config: |
| if not isinstance(pre_config["field"], str) or not pre_config["field"].strip(): |
| raise ConfigValidationError("pre_annotation.field must be a non-empty string") |
|
|
| |
| if "highlight_low_confidence" in pre_config: |
| threshold = pre_config["highlight_low_confidence"] |
| if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: |
| raise ConfigValidationError("pre_annotation.highlight_low_confidence must be between 0 and 1") |
|
|
| |
| if "agreement_metrics" in config_data: |
| agreement_config = config_data["agreement_metrics"] |
| if not isinstance(agreement_config, dict): |
| raise ConfigValidationError("agreement_metrics must be a dictionary") |
|
|
| if "min_overlap" in agreement_config: |
| overlap = agreement_config["min_overlap"] |
| if not isinstance(overlap, int) or overlap < 2: |
| raise ConfigValidationError("agreement_metrics.min_overlap must be an integer >= 2") |
|
|
| if "refresh_interval" in agreement_config: |
| interval = agreement_config["refresh_interval"] |
| if not isinstance(interval, int) or interval < 10: |
| raise ConfigValidationError("agreement_metrics.refresh_interval must be an integer >= 10 seconds") |
|
|
|
|
| def validate_instance_reclaim_config(config_data: Dict[str, Any]) -> None: |
| """Validate abandoned assignment reclaim configuration.""" |
| if "instance_reclaim" not in config_data: |
| return |
|
|
| reclaim_config = config_data["instance_reclaim"] |
| if not isinstance(reclaim_config, dict): |
| raise ConfigValidationError("instance_reclaim must be a dictionary") |
|
|
| def validate_bool(section: Dict[str, Any], path: str) -> None: |
| if "preserve_completed_annotations" in section and not isinstance(section["preserve_completed_annotations"], bool): |
| raise ConfigValidationError(f"{path}.preserve_completed_annotations must be a boolean") |
|
|
| def validate_section(section_name: str) -> None: |
| if section_name not in reclaim_config: |
| return |
| section = reclaim_config[section_name] |
| if not isinstance(section, dict): |
| raise ConfigValidationError(f"instance_reclaim.{section_name} must be a dictionary") |
| validate_bool(section, f"instance_reclaim.{section_name}") |
|
|
| if "enabled" in reclaim_config and not isinstance(reclaim_config["enabled"], bool): |
| raise ConfigValidationError("instance_reclaim.enabled must be a boolean") |
|
|
| if "timeout_hours" in reclaim_config: |
| timeout = reclaim_config["timeout_hours"] |
| if not isinstance(timeout, (int, float)) or timeout <= 0: |
| raise ConfigValidationError("instance_reclaim.timeout_hours must be a positive number") |
|
|
| validate_bool(reclaim_config, "instance_reclaim") |
|
|
| for section_name in ("stale", "manual", "quality_control", "prolific"): |
| validate_section(section_name) |
|
|
| prolific = reclaim_config.get("prolific") |
| if isinstance(prolific, dict) and "status_policies" in prolific: |
| status_policies = prolific["status_policies"] |
| if not isinstance(status_policies, dict): |
| raise ConfigValidationError("instance_reclaim.prolific.status_policies must be a dictionary") |
|
|
| valid_statuses = {"RETURNED", "TIMED-OUT", "REJECTED"} |
| for status, section in status_policies.items(): |
| if status not in valid_statuses: |
| raise ConfigValidationError( |
| "instance_reclaim.prolific.status_policies keys must be one of: RETURNED, TIMED-OUT, REJECTED" |
| ) |
| if not isinstance(section, dict): |
| raise ConfigValidationError( |
| f"instance_reclaim.prolific.status_policies.{status} must be a dictionary" |
| ) |
| validate_bool(section, f"instance_reclaim.prolific.status_policies.{status}") |
|
|
|
|
| def validate_data_directory_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate data_directory configuration. |
| |
| This function validates the directory watching configuration options: |
| - data_directory: Path to the directory containing data files |
| - watch_data_directory: Whether to watch for changes (default: False) |
| - watch_poll_interval: Seconds between scans (default: 5.0) |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the configuration is invalid |
| """ |
| if "data_directory" not in config_data: |
| return |
|
|
| data_directory = config_data["data_directory"] |
|
|
| |
| if not isinstance(data_directory, str): |
| raise ConfigValidationError("data_directory must be a string path") |
|
|
| if not data_directory.strip(): |
| raise ConfigValidationError("data_directory cannot be empty") |
|
|
| |
| if "watch_data_directory" in config_data: |
| watch_enabled = config_data["watch_data_directory"] |
| if not isinstance(watch_enabled, bool): |
| raise ConfigValidationError("watch_data_directory must be a boolean (true/false)") |
|
|
| |
| if "watch_poll_interval" in config_data: |
| interval = config_data["watch_poll_interval"] |
| if not isinstance(interval, (int, float)): |
| raise ConfigValidationError("watch_poll_interval must be a number") |
| if interval < 1.0: |
| raise ConfigValidationError("watch_poll_interval must be at least 1.0 seconds") |
| if interval > 3600: |
| raise ConfigValidationError("watch_poll_interval cannot exceed 3600 seconds (1 hour)") |
|
|
|
|
| def validate_data_sources_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate data_sources configuration for extended data loading. |
| |
| This function validates the configuration for loading data from |
| various sources including URLs, cloud storage, and databases. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the configuration is invalid |
| """ |
| data_sources = config_data.get("data_sources") |
| if not data_sources: |
| return |
|
|
| if not isinstance(data_sources, list): |
| raise ConfigValidationError("data_sources must be a list") |
|
|
| |
| valid_types = [ |
| "file", "url", "google_drive", "dropbox", |
| "s3", "huggingface", "google_sheets", "database" |
| ] |
|
|
| for i, source in enumerate(data_sources): |
| if not isinstance(source, dict): |
| raise ConfigValidationError( |
| f"data_sources[{i}] must be a dictionary" |
| ) |
|
|
| source_type = source.get("type") |
| if not source_type: |
| raise ConfigValidationError( |
| f"data_sources[{i}] is missing required 'type' field" |
| ) |
|
|
| if source_type not in valid_types: |
| raise ConfigValidationError( |
| f"data_sources[{i}] has invalid type '{source_type}'. " |
| f"Valid types: {', '.join(valid_types)}" |
| ) |
|
|
| |
| _validate_data_source_by_type(source, source_type, i) |
|
|
| |
| _validate_partial_loading_config(config_data) |
|
|
| |
| _validate_data_cache_config(config_data) |
|
|
|
|
| def _validate_data_source_by_type(source: Dict, source_type: str, index: int) -> None: |
| """Validate source-specific configuration.""" |
| prefix = f"data_sources[{index}]" |
|
|
| if source_type == "file": |
| if not source.get("path"): |
| raise ConfigValidationError(f"{prefix} (type=file) requires 'path'") |
|
|
| elif source_type == "url": |
| url = source.get("url") |
| if not url: |
| raise ConfigValidationError(f"{prefix} (type=url) requires 'url'") |
| if not isinstance(url, str): |
| raise ConfigValidationError(f"{prefix}.url must be a string") |
| |
| if not (url.startswith("http://") or url.startswith("https://")): |
| raise ConfigValidationError( |
| f"{prefix}.url must start with http:// or https://" |
| ) |
|
|
| elif source_type == "google_drive": |
| if not source.get("url") and not source.get("file_id"): |
| raise ConfigValidationError( |
| f"{prefix} (type=google_drive) requires 'url' or 'file_id'" |
| ) |
|
|
| elif source_type == "dropbox": |
| if not source.get("url") and not source.get("path"): |
| raise ConfigValidationError( |
| f"{prefix} (type=dropbox) requires 'url' or 'path'" |
| ) |
| |
| if source.get("path") and not source.get("access_token"): |
| raise ConfigValidationError( |
| f"{prefix} (type=dropbox) requires 'access_token' when using 'path'" |
| ) |
|
|
| elif source_type == "s3": |
| if not source.get("bucket"): |
| raise ConfigValidationError(f"{prefix} (type=s3) requires 'bucket'") |
| if not source.get("key"): |
| raise ConfigValidationError(f"{prefix} (type=s3) requires 'key'") |
|
|
| elif source_type == "huggingface": |
| if not source.get("dataset"): |
| raise ConfigValidationError( |
| f"{prefix} (type=huggingface) requires 'dataset'" |
| ) |
|
|
| elif source_type == "google_sheets": |
| if not source.get("spreadsheet_id"): |
| raise ConfigValidationError( |
| f"{prefix} (type=google_sheets) requires 'spreadsheet_id'" |
| ) |
| if not source.get("credentials_file"): |
| raise ConfigValidationError( |
| f"{prefix} (type=google_sheets) requires 'credentials_file'" |
| ) |
|
|
| elif source_type == "database": |
| |
| if not source.get("connection_string"): |
| if not source.get("dialect"): |
| raise ConfigValidationError( |
| f"{prefix} (type=database) requires 'connection_string' or 'dialect'" |
| ) |
| if not source.get("database") and source.get("dialect") != "sqlite": |
| raise ConfigValidationError( |
| f"{prefix} (type=database) requires 'database' when not using sqlite" |
| ) |
| |
| if not source.get("query") and not source.get("table"): |
| raise ConfigValidationError( |
| f"{prefix} (type=database) requires 'query' or 'table'" |
| ) |
|
|
|
|
| def _validate_partial_loading_config(config_data: Dict[str, Any]) -> None: |
| """Validate partial_loading configuration.""" |
| partial = config_data.get("partial_loading") |
| if not partial: |
| return |
|
|
| if not isinstance(partial, dict): |
| raise ConfigValidationError("partial_loading must be a dictionary") |
|
|
| |
| if "enabled" in partial and not isinstance(partial["enabled"], bool): |
| raise ConfigValidationError("partial_loading.enabled must be a boolean") |
|
|
| |
| if "initial_count" in partial: |
| count = partial["initial_count"] |
| if not isinstance(count, int) or count < 1: |
| raise ConfigValidationError( |
| "partial_loading.initial_count must be a positive integer" |
| ) |
|
|
| |
| if "batch_size" in partial: |
| size = partial["batch_size"] |
| if not isinstance(size, int) or size < 1: |
| raise ConfigValidationError( |
| "partial_loading.batch_size must be a positive integer" |
| ) |
|
|
| |
| if "auto_load_threshold" in partial: |
| threshold = partial["auto_load_threshold"] |
| if not isinstance(threshold, (int, float)) or not (0 <= threshold <= 1): |
| raise ConfigValidationError( |
| "partial_loading.auto_load_threshold must be between 0.0 and 1.0" |
| ) |
|
|
|
|
| def _validate_data_cache_config(config_data: Dict[str, Any]) -> None: |
| """Validate data_cache configuration.""" |
| cache = config_data.get("data_cache") |
| if not cache: |
| return |
|
|
| if not isinstance(cache, dict): |
| raise ConfigValidationError("data_cache must be a dictionary") |
|
|
| |
| if "ttl_seconds" in cache: |
| ttl = cache["ttl_seconds"] |
| if not isinstance(ttl, int) or ttl < 0: |
| raise ConfigValidationError( |
| "data_cache.ttl_seconds must be a non-negative integer" |
| ) |
|
|
| |
| if "max_size_mb" in cache: |
| size = cache["max_size_mb"] |
| if not isinstance(size, int) or size < 1: |
| raise ConfigValidationError( |
| "data_cache.max_size_mb must be a positive integer" |
| ) |
|
|
|
|
| def validate_database_config(db_config: Dict[str, Any]) -> None: |
| """ |
| Validate database configuration. |
| |
| Args: |
| db_config: The database configuration |
| |
| Raises: |
| ConfigValidationError: If the database configuration is invalid |
| """ |
| if not isinstance(db_config, dict): |
| raise ConfigValidationError("database configuration must be a dictionary") |
|
|
| required_fields = ['type', 'host', 'database', 'username'] |
| missing_fields = [field for field in required_fields if field not in db_config] |
| if missing_fields: |
| raise ConfigValidationError(f"Missing required database fields: {', '.join(missing_fields)}") |
|
|
| valid_types = ['mysql', 'file'] |
| if db_config['type'] not in valid_types: |
| raise ConfigValidationError(f"Unsupported database type: {db_config['type']}. Must be one of: {', '.join(valid_types)}") |
|
|
| |
| if db_config['type'] == 'mysql': |
| if 'password' not in db_config: |
| raise ConfigValidationError("MySQL database requires password") |
|
|
| |
| if 'port' in db_config: |
| try: |
| port = int(db_config['port']) |
| if port < 1 or port > 65535: |
| raise ConfigValidationError("Database port must be between 1 and 65535") |
| except (ValueError, TypeError): |
| raise ConfigValidationError("Database port must be a valid integer") |
|
|
|
|
| def validate_file_paths(config_data: Dict[str, Any], project_dir: str, config_file_dir: str = None) -> None: |
| """ |
| Validate that all file paths in the configuration are secure and exist. |
| |
| Args: |
| config_data: The configuration data |
| project_dir: The project directory |
| config_file_dir: The directory containing the config file (for relative path resolution) |
| |
| Raises: |
| ConfigSecurityError: If any file paths are not secure |
| ConfigValidationError: If required files don't exist |
| """ |
| |
| task_dir = config_data.get('task_dir') |
| if not task_dir: |
| raise ConfigValidationError("task_dir is required in configuration") |
|
|
| |
| try: |
| validated_task_dir = validate_path_security(task_dir, project_dir) |
| |
| |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"task_dir: {str(e)}") |
|
|
| |
| base_dir = validated_task_dir |
|
|
| |
| data_files = config_data.get('data_files', []) |
| for i, data_file in enumerate(data_files): |
| |
| if data_file in [None, "null", "default"]: |
| continue |
|
|
| |
| if isinstance(data_file, dict): |
| file_path = data_file.get("path") |
| if not file_path: |
| raise ConfigValidationError(f"Data file {i}: dict entry missing 'path' field") |
| |
| encoding = data_file.get("encoding") |
| if encoding is not None: |
| if not isinstance(encoding, str): |
| raise ConfigValidationError( |
| f"Data file {i}: 'encoding' must be a string, got {type(encoding).__name__}" |
| ) |
| try: |
| codecs.lookup(encoding) |
| except LookupError: |
| raise ConfigValidationError( |
| f"Data file {i}: unknown encoding '{encoding}'" |
| ) |
| else: |
| file_path = data_file |
|
|
| try: |
| validated_path = validate_path_security(file_path, base_dir, project_dir) |
| if not os.path.exists(validated_path): |
| raise ConfigValidationError(f"Data file not found: {file_path} (resolved to: {validated_path})") |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"Data file {i}: {str(e)}") |
|
|
| |
| batch_config = config_data.get('batch_assignment') |
| if isinstance(batch_config, dict): |
| for i, group in enumerate(batch_config.get('groups') or []): |
| if not isinstance(group, dict): |
| continue |
| file_entry = group.get( |
| 'instances_file', |
| group.get('items_file', group.get('instance_ids_file')), |
| ) |
| if not file_entry: |
| continue |
| if isinstance(file_entry, dict): |
| file_path = file_entry.get("path") |
| else: |
| file_path = file_entry |
|
|
| try: |
| validated_path = validate_path_security(file_path, base_dir, project_dir) |
| if not os.path.exists(validated_path): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{i}] file not found: " |
| f"{file_path} (resolved to: {validated_path})" |
| ) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError( |
| f"batch_assignment.groups[{i}] file: {str(e)}" |
| ) |
|
|
| |
| if 'data_directory' in config_data: |
| data_directory = config_data['data_directory'] |
| |
| if data_directory not in [None, "null", "default"]: |
| try: |
| validated_dir = validate_path_security(data_directory, base_dir, project_dir) |
| if not os.path.exists(validated_dir): |
| raise ConfigValidationError(f"data_directory not found: {data_directory} (resolved to: {validated_dir})") |
| if not os.path.isdir(validated_dir): |
| raise ConfigValidationError(f"data_directory is not a directory: {data_directory} (resolved to: {validated_dir})") |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"data_directory: {str(e)}") |
|
|
| |
| if 'output_annotation_dir' in config_data: |
| output_dir = config_data['output_annotation_dir'] |
| |
| if output_dir not in [None, "null", "default"]: |
| try: |
| validate_path_security(output_dir, project_dir) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"output_annotation_dir: {str(e)}") |
|
|
| |
| if 'site_dir' in config_data: |
| site_dir = config_data['site_dir'] |
| |
| if site_dir not in [None, "null", "default"]: |
| try: |
| validate_path_security(site_dir, base_dir, project_dir) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"site_dir: {str(e)}") |
|
|
| |
| if 'custom_ds' in config_data: |
| custom_ds = config_data['custom_ds'] |
| |
| if custom_ds not in [None, "null", "default"]: |
| try: |
| validate_path_security(custom_ds, base_dir, project_dir) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"custom_ds: {str(e)}") |
|
|
| |
| if 'base_css' in config_data: |
| base_css = config_data['base_css'] |
| if base_css not in [None, "null", "default"]: |
| try: |
| validated_css = validate_path_security(base_css, base_dir, project_dir) |
| if not os.path.exists(validated_css): |
| |
| if config_file_dir: |
| alt_path = os.path.join(config_file_dir, base_css) |
| if not os.path.exists(alt_path): |
| raise ConfigValidationError( |
| f"base_css file not found: {base_css} (resolved to: {validated_css})" |
| ) |
| else: |
| raise ConfigValidationError( |
| f"base_css file not found: {base_css} (resolved to: {validated_css})" |
| ) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"base_css: {str(e)}") |
|
|
| |
| if 'header_logo' in config_data: |
| header_logo = config_data['header_logo'] |
| if header_logo not in [None, "null", "default"]: |
| |
| if not str(header_logo).startswith(("http://", "https://")): |
| try: |
| validated_logo = validate_path_security(header_logo, base_dir, project_dir) |
| if not os.path.exists(validated_logo): |
| |
| if config_file_dir: |
| alt_path = os.path.join(config_file_dir, header_logo) |
| if not os.path.exists(alt_path): |
| raise ConfigValidationError( |
| f"header_logo file not found: {header_logo} (resolved to: {validated_logo})" |
| ) |
| else: |
| raise ConfigValidationError( |
| f"header_logo file not found: {header_logo} (resolved to: {validated_logo})" |
| ) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"header_logo: {str(e)}") |
|
|
|
|
| def validate_training_config(config_data: Dict[str, Any], project_dir: str, config_file_dir: str = None) -> None: |
| """ |
| Validate training configuration. |
| |
| Args: |
| config_data: The configuration data |
| project_dir: The project directory |
| config_file_dir: The directory containing the config file |
| |
| Raises: |
| ConfigValidationError: If training configuration is invalid |
| ConfigSecurityError: If training data file path is not secure |
| """ |
| if 'training' not in config_data: |
| return |
|
|
| training_config = config_data['training'] |
| if not isinstance(training_config, dict): |
| raise ConfigValidationError("training configuration must be a dictionary") |
|
|
| |
| if 'enabled' in training_config: |
| if not isinstance(training_config['enabled'], bool): |
| raise ConfigValidationError("training.enabled must be a boolean") |
|
|
| |
| if not training_config.get('enabled', False): |
| return |
|
|
| |
| if 'data_file' not in training_config: |
| raise ConfigValidationError("training.data_file is required when training is enabled") |
|
|
| data_file = training_config['data_file'] |
| if not isinstance(data_file, str): |
| raise ConfigValidationError("training.data_file must be a string") |
|
|
| |
| try: |
| base_dir = config_file_dir if config_file_dir else project_dir |
| validated_path = validate_path_security(data_file, base_dir, project_dir) |
| if not os.path.exists(validated_path): |
| raise ConfigValidationError(f"Training data file not found: {data_file} (resolved to: {validated_path})") |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"training.data_file: {str(e)}") |
|
|
| |
| if 'annotation_schemes' in training_config: |
| schemes = training_config['annotation_schemes'] |
| if not isinstance(schemes, list): |
| raise ConfigValidationError("training.annotation_schemes must be a list") |
| if not schemes: |
| raise ConfigValidationError("training.annotation_schemes cannot be empty") |
|
|
| for i, scheme in enumerate(schemes): |
| if isinstance(scheme, str): |
| |
| if not scheme.strip(): |
| raise ConfigValidationError(f"training.annotation_schemes[{i}] cannot be empty") |
| elif isinstance(scheme, dict): |
| |
| validate_single_annotation_scheme(scheme, f"training.annotation_schemes[{i}]") |
| else: |
| raise ConfigValidationError(f"training.annotation_schemes[{i}] must be a string or dictionary") |
|
|
| |
| if 'passing_criteria' in training_config: |
| criteria = training_config['passing_criteria'] |
| if not isinstance(criteria, dict): |
| raise ConfigValidationError("training.passing_criteria must be a dictionary") |
|
|
| |
| if 'min_correct' in criteria: |
| min_correct = criteria['min_correct'] |
| if not isinstance(min_correct, int) or min_correct < 1: |
| raise ConfigValidationError("training.passing_criteria.min_correct must be a positive integer") |
|
|
| |
| if 'max_attempts' in criteria: |
| max_attempts = criteria['max_attempts'] |
| if not isinstance(max_attempts, int) or max_attempts < 1: |
| raise ConfigValidationError("training.passing_criteria.max_attempts must be a positive integer") |
|
|
| |
| if 'require_all_correct' in criteria: |
| if not isinstance(criteria['require_all_correct'], bool): |
| raise ConfigValidationError("training.passing_criteria.require_all_correct must be a boolean") |
|
|
| |
| if 'feedback' in training_config: |
| feedback = training_config['feedback'] |
| if not isinstance(feedback, dict): |
| raise ConfigValidationError("training.feedback must be a dictionary") |
|
|
| |
| if 'show_explanations' in feedback: |
| if not isinstance(feedback['show_explanations'], bool): |
| raise ConfigValidationError("training.feedback.show_explanations must be a boolean") |
|
|
| |
| if 'allow_retry' in feedback: |
| if not isinstance(feedback['allow_retry'], bool): |
| raise ConfigValidationError("training.feedback.allow_retry must be a boolean") |
|
|
| |
| if 'failure_action' in training_config: |
| failure_action = training_config['failure_action'] |
| valid_actions = ['move_to_done', 'repeat_training'] |
| if failure_action not in valid_actions: |
| raise ConfigValidationError(f"training.failure_action must be one of: {', '.join(valid_actions)}") |
|
|
|
|
| def validate_training_data_file(data_file_path: str, annotation_schemes: List[Dict[str, Any]]) -> None: |
| """ |
| Validate training data file format and consistency. |
| |
| Args: |
| data_file_path: Path to the training data file |
| annotation_schemes: List of annotation schemes to validate against |
| |
| Raises: |
| ConfigValidationError: If training data is invalid |
| """ |
| try: |
| with open(data_file_path, 'r', encoding='utf-8') as f: |
| training_data = json.load(f) |
| except (json.JSONDecodeError, UnicodeDecodeError) as e: |
| raise ConfigValidationError(f"Training data file is not valid JSON: {str(e)}") |
| except FileNotFoundError: |
| raise ConfigValidationError(f"Training data file not found: {data_file_path}") |
|
|
| if not isinstance(training_data, dict): |
| raise ConfigValidationError("Training data must be a JSON object") |
|
|
| if 'training_instances' not in training_data: |
| raise ConfigValidationError("Training data must contain 'training_instances' field") |
|
|
| training_instances = training_data['training_instances'] |
| if not isinstance(training_instances, list): |
| raise ConfigValidationError("training_instances must be a list") |
|
|
| if not training_instances: |
| raise ConfigValidationError("training_instances cannot be empty") |
|
|
| |
| scheme_names = {scheme['name'] for scheme in annotation_schemes} |
|
|
| for i, instance in enumerate(training_instances): |
| if not isinstance(instance, dict): |
| raise ConfigValidationError(f"Training instance {i} must be a dictionary") |
|
|
| |
| required_fields = ['id', 'text', 'correct_answers'] |
| missing_fields = [field for field in required_fields if field not in instance] |
| if missing_fields: |
| raise ConfigValidationError(f"Training instance {i} missing required fields: {', '.join(missing_fields)}") |
|
|
| |
| if not isinstance(instance['id'], str): |
| raise ConfigValidationError(f"Training instance {i}.id must be a string") |
|
|
| |
| if not isinstance(instance['text'], str): |
| raise ConfigValidationError(f"Training instance {i}.text must be a string") |
|
|
| |
| correct_answers = instance['correct_answers'] |
| if not isinstance(correct_answers, dict): |
| raise ConfigValidationError(f"Training instance {i}.correct_answers must be a dictionary") |
|
|
| |
| for scheme_name, answer in correct_answers.items(): |
| if scheme_name not in scheme_names: |
| raise ConfigValidationError(f"Training instance {i}.correct_answers contains unknown scheme: {scheme_name}") |
|
|
| |
| if 'explanation' in instance: |
| if not isinstance(instance['explanation'], str): |
| raise ConfigValidationError(f"Training instance {i}.explanation must be a string") |
|
|
|
|
| def validate_batch_assignment_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate batch assignment configuration. |
| |
| ``batch_assignment`` supports explicit annotator cohorts for repeat-round |
| studies. Each group defines annotators allowed to receive a fixed item set, |
| either inline or through a separate supported data file. Items may also |
| carry annotator lists via ``annotator_key``; that field is validated at |
| assignment time because data files load later. |
| """ |
| if 'batch_assignment' not in config_data: |
| return |
|
|
| batch_config = config_data['batch_assignment'] |
| if not isinstance(batch_config, dict): |
| raise ConfigValidationError("batch_assignment must be a dictionary") |
|
|
| annotator_key = batch_config.get('annotator_key') |
| if annotator_key is not None and ( |
| not isinstance(annotator_key, str) or not annotator_key.strip() |
| ): |
| raise ConfigValidationError("batch_assignment.annotator_key must be a non-empty string") |
|
|
| groups = batch_config.get('groups', []) |
| if groups is None: |
| return |
| if not isinstance(groups, list): |
| raise ConfigValidationError("batch_assignment.groups must be a list") |
|
|
| for idx, group in enumerate(groups): |
| if not isinstance(group, dict): |
| raise ConfigValidationError(f"batch_assignment.groups[{idx}] must be a dictionary") |
|
|
| users = group.get('annotators', group.get('users')) |
| instances = group.get('instances', group.get('items', group.get('instance_ids'))) |
| file_entry = group.get( |
| 'instances_file', |
| group.get('items_file', group.get('instance_ids_file')), |
| ) |
|
|
| if not isinstance(users, list) or not users: |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] must define non-empty annotators/users list" |
| ) |
| if not all(isinstance(user, str) and user.strip() for user in users): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}].annotators/users must contain non-empty strings" |
| ) |
|
|
| has_instances = instances is not None |
| has_file = file_entry is not None |
|
|
| if not has_instances and not has_file: |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] must define either " |
| "instances/items/instance_ids or instances_file/items_file/instance_ids_file" |
| ) |
|
|
| if has_instances and (not isinstance(instances, list) or not instances): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] must define non-empty instances/items/instance_ids list" |
| ) |
| if has_instances and not all(isinstance(instance, str) and instance.strip() for instance in instances): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}].instances/items/instance_ids must contain non-empty strings" |
| ) |
|
|
| if has_file: |
| if isinstance(file_entry, str): |
| if not file_entry.strip(): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] file path must be non-empty" |
| ) |
| elif isinstance(file_entry, dict): |
| path = file_entry.get('path') |
| if not isinstance(path, str) or not path.strip(): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] file entry must define a non-empty path" |
| ) |
| encoding = file_entry.get('encoding') |
| if encoding is not None and not isinstance(encoding, str): |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] file encoding must be a string" |
| ) |
| else: |
| raise ConfigValidationError( |
| f"batch_assignment.groups[{idx}] file entry must be a path string or mapping" |
| ) |
|
|
|
|
| def validate_category_assignment_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate category assignment configuration. |
| |
| This function validates the category_assignment configuration section which |
| controls how users are assigned to annotation categories based on their |
| training/prestudy performance. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If category assignment configuration is invalid |
| """ |
| if 'category_assignment' not in config_data: |
| return |
|
|
| cat_config = config_data['category_assignment'] |
| if not isinstance(cat_config, dict): |
| raise ConfigValidationError("category_assignment must be a dictionary") |
|
|
| |
| if 'enabled' in cat_config: |
| if not isinstance(cat_config['enabled'], bool): |
| raise ConfigValidationError("category_assignment.enabled must be a boolean") |
|
|
| |
| if not cat_config.get('enabled', True): |
| return |
|
|
| |
| if 'category_key' in cat_config: |
| if not isinstance(cat_config['category_key'], str) or not cat_config['category_key'].strip(): |
| raise ConfigValidationError("category_assignment.category_key must be a non-empty string") |
|
|
| |
| if 'qualification' in cat_config: |
| qual = cat_config['qualification'] |
| if not isinstance(qual, dict): |
| raise ConfigValidationError("category_assignment.qualification must be a dictionary") |
|
|
| |
| if 'source' in qual: |
| valid_sources = ['training', 'prestudy', 'both'] |
| if qual['source'] not in valid_sources: |
| raise ConfigValidationError( |
| f"category_assignment.qualification.source must be one of: {', '.join(valid_sources)}" |
| ) |
|
|
| |
| if 'threshold' in qual: |
| threshold = qual['threshold'] |
| if not isinstance(threshold, (int, float)) or threshold < 0.0 or threshold > 1.0: |
| raise ConfigValidationError( |
| "category_assignment.qualification.threshold must be a number between 0.0 and 1.0" |
| ) |
|
|
| |
| if 'min_questions' in qual: |
| min_q = qual['min_questions'] |
| if not isinstance(min_q, int) or min_q < 1: |
| raise ConfigValidationError( |
| "category_assignment.qualification.min_questions must be a positive integer" |
| ) |
|
|
| |
| if 'combine_method' in qual: |
| valid_methods = ['average', 'max', 'sum'] |
| if qual['combine_method'] not in valid_methods: |
| raise ConfigValidationError( |
| f"category_assignment.qualification.combine_method must be one of: {', '.join(valid_methods)}" |
| ) |
|
|
| |
| if 'fallback' in cat_config: |
| valid_fallbacks = ['uncategorized', 'random', 'none'] |
| if cat_config['fallback'] not in valid_fallbacks: |
| raise ConfigValidationError( |
| f"category_assignment.fallback must be one of: {', '.join(valid_fallbacks)}" |
| ) |
|
|
| |
| if 'dynamic' in cat_config: |
| dynamic = cat_config['dynamic'] |
| if not isinstance(dynamic, dict): |
| raise ConfigValidationError("category_assignment.dynamic must be a dictionary") |
|
|
| |
| if 'enabled' in dynamic: |
| if not isinstance(dynamic['enabled'], bool): |
| raise ConfigValidationError("category_assignment.dynamic.enabled must be a boolean") |
|
|
| |
| if not dynamic.get('enabled', False): |
| return |
|
|
| |
| if 'agreement_method' in dynamic: |
| valid_methods = ['majority_vote', 'super_majority', 'unanimous'] |
| if dynamic['agreement_method'] not in valid_methods: |
| raise ConfigValidationError( |
| f"category_assignment.dynamic.agreement_method must be one of: {', '.join(valid_methods)}" |
| ) |
|
|
| |
| if 'min_annotations_for_consensus' in dynamic: |
| min_ann = dynamic['min_annotations_for_consensus'] |
| if not isinstance(min_ann, int) or min_ann < 2: |
| raise ConfigValidationError( |
| "category_assignment.dynamic.min_annotations_for_consensus must be an integer >= 2" |
| ) |
|
|
| |
| if 'learning_rate' in dynamic: |
| lr = dynamic['learning_rate'] |
| if not isinstance(lr, (int, float)) or lr <= 0.0 or lr > 1.0: |
| raise ConfigValidationError( |
| "category_assignment.dynamic.learning_rate must be a number between 0.0 (exclusive) and 1.0" |
| ) |
|
|
| |
| if 'update_interval_seconds' in dynamic: |
| interval = dynamic['update_interval_seconds'] |
| if not isinstance(interval, (int, float)) or interval < 1: |
| raise ConfigValidationError( |
| "category_assignment.dynamic.update_interval_seconds must be a number >= 1" |
| ) |
|
|
| |
| if 'base_probability' in dynamic: |
| base_prob = dynamic['base_probability'] |
| if not isinstance(base_prob, (int, float)) or base_prob < 0.0 or base_prob > 1.0: |
| raise ConfigValidationError( |
| "category_assignment.dynamic.base_probability must be a number between 0.0 and 1.0" |
| ) |
|
|
|
|
| def validate_diversity_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate diversity ordering configuration. |
| |
| This function validates the diversity_ordering section which controls |
| embedding-based clustering for diverse item ordering. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If diversity ordering configuration is invalid |
| """ |
| if 'diversity_ordering' not in config_data: |
| return |
|
|
| dc = config_data['diversity_ordering'] |
| if not isinstance(dc, dict): |
| raise ConfigValidationError("diversity_ordering must be a dictionary") |
|
|
| |
| if 'enabled' in dc: |
| if not isinstance(dc['enabled'], bool): |
| raise ConfigValidationError("diversity_ordering.enabled must be a boolean") |
|
|
| |
| if not dc.get('enabled', False): |
| return |
|
|
| |
| if 'model_name' in dc: |
| if not isinstance(dc['model_name'], str) or not dc['model_name'].strip(): |
| raise ConfigValidationError("diversity_ordering.model_name must be a non-empty string") |
|
|
| |
| if 'num_clusters' in dc: |
| num_clusters = dc['num_clusters'] |
| if not isinstance(num_clusters, int) or num_clusters < 2: |
| raise ConfigValidationError("diversity_ordering.num_clusters must be an integer >= 2") |
|
|
| |
| if 'items_per_cluster' in dc: |
| items_per_cluster = dc['items_per_cluster'] |
| if not isinstance(items_per_cluster, int) or items_per_cluster < 1: |
| raise ConfigValidationError("diversity_ordering.items_per_cluster must be a positive integer") |
|
|
| |
| if 'auto_clusters' in dc: |
| if not isinstance(dc['auto_clusters'], bool): |
| raise ConfigValidationError("diversity_ordering.auto_clusters must be a boolean") |
|
|
| |
| if 'prefill_count' in dc: |
| prefill_count = dc['prefill_count'] |
| if not isinstance(prefill_count, int) or prefill_count < 0: |
| raise ConfigValidationError("diversity_ordering.prefill_count must be a non-negative integer") |
|
|
| |
| if 'batch_size' in dc: |
| batch_size = dc['batch_size'] |
| if not isinstance(batch_size, int) or batch_size < 1: |
| raise ConfigValidationError("diversity_ordering.batch_size must be a positive integer") |
|
|
| |
| if 'recluster_threshold' in dc: |
| recluster_threshold = dc['recluster_threshold'] |
| if not isinstance(recluster_threshold, (int, float)) or recluster_threshold < 0 or recluster_threshold > 1: |
| raise ConfigValidationError( |
| "diversity_ordering.recluster_threshold must be a number between 0 and 1" |
| ) |
|
|
| |
| if 'preserve_visited' in dc: |
| if not isinstance(dc['preserve_visited'], bool): |
| raise ConfigValidationError("diversity_ordering.preserve_visited must be a boolean") |
|
|
| |
| if 'trigger_ai_prefetch' in dc: |
| if not isinstance(dc['trigger_ai_prefetch'], bool): |
| raise ConfigValidationError("diversity_ordering.trigger_ai_prefetch must be a boolean") |
|
|
| |
| if 'cache_dir' in dc: |
| cache_dir = dc['cache_dir'] |
| if cache_dir is not None and (not isinstance(cache_dir, str) or not cache_dir.strip()): |
| raise ConfigValidationError( |
| "diversity_ordering.cache_dir must be a non-empty string or null" |
| ) |
|
|
|
|
| def validate_embedding_visualization_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate embedding visualization configuration. |
| |
| This function validates the embedding_visualization section which controls |
| the admin dashboard 2D visualization of embeddings. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If embedding visualization configuration is invalid |
| """ |
| if 'embedding_visualization' not in config_data: |
| return |
|
|
| ev = config_data['embedding_visualization'] |
| if not isinstance(ev, dict): |
| raise ConfigValidationError("embedding_visualization must be a dictionary") |
|
|
| |
| if 'enabled' in ev: |
| if not isinstance(ev['enabled'], bool): |
| raise ConfigValidationError("embedding_visualization.enabled must be a boolean") |
|
|
| |
| if not ev.get('enabled', True): |
| return |
|
|
| |
| if 'sample_size' in ev: |
| sample_size = ev['sample_size'] |
| if not isinstance(sample_size, int) or sample_size < 1: |
| raise ConfigValidationError( |
| "embedding_visualization.sample_size must be a positive integer" |
| ) |
|
|
| |
| if 'include_all_annotated' in ev: |
| if not isinstance(ev['include_all_annotated'], bool): |
| raise ConfigValidationError( |
| "embedding_visualization.include_all_annotated must be a boolean" |
| ) |
|
|
| |
| if 'embedding_model' in ev: |
| if not isinstance(ev['embedding_model'], str) or not ev['embedding_model'].strip(): |
| raise ConfigValidationError( |
| "embedding_visualization.embedding_model must be a non-empty string" |
| ) |
|
|
| |
| if 'image_embedding_model' in ev: |
| if not isinstance(ev['image_embedding_model'], str) or not ev['image_embedding_model'].strip(): |
| raise ConfigValidationError( |
| "embedding_visualization.image_embedding_model must be a non-empty string" |
| ) |
|
|
| |
| if 'umap' in ev: |
| umap_config = ev['umap'] |
| if not isinstance(umap_config, dict): |
| raise ConfigValidationError("embedding_visualization.umap must be a dictionary") |
|
|
| |
| if 'n_neighbors' in umap_config: |
| n_neighbors = umap_config['n_neighbors'] |
| if not isinstance(n_neighbors, int) or n_neighbors < 2: |
| raise ConfigValidationError( |
| "embedding_visualization.umap.n_neighbors must be an integer >= 2" |
| ) |
|
|
| |
| if 'min_dist' in umap_config: |
| min_dist = umap_config['min_dist'] |
| if not isinstance(min_dist, (int, float)) or min_dist < 0 or min_dist > 1: |
| raise ConfigValidationError( |
| "embedding_visualization.umap.min_dist must be a number between 0 and 1" |
| ) |
|
|
| |
| if 'metric' in umap_config: |
| valid_metrics = ['cosine', 'euclidean', 'manhattan', 'correlation'] |
| if umap_config['metric'] not in valid_metrics: |
| raise ConfigValidationError( |
| f"embedding_visualization.umap.metric must be one of: {valid_metrics}" |
| ) |
|
|
| |
| if 'label_source' in ev: |
| valid_sources = ['mace', 'majority'] |
| if ev['label_source'] not in valid_sources: |
| raise ConfigValidationError( |
| f"embedding_visualization.label_source must be one of: {valid_sources}" |
| ) |
|
|
|
|
| def _merge_ai_config_file(config_data: Dict[str, Any], config_dir: str) -> Dict[str, Any]: |
| """ |
| Merge an external ai-config.yaml into the main config if specified. |
| |
| When ai_support.ai_config_file is set, loads that YAML file and merges its |
| contents into the ai_support section. The external file provides endpoint-specific |
| details (endpoint_type, model, api_key, base_url) while the inline ai_config |
| provides defaults (temperature, max_tokens, include settings). |
| |
| Args: |
| config_data: The parsed main configuration dictionary |
| config_dir: Directory containing the main config file (for resolving relative paths) |
| |
| Returns: |
| The config_data with external AI config merged in (modified in place and returned) |
| """ |
| ai_support = config_data.get("ai_support", {}) |
| if not isinstance(ai_support, dict): |
| return config_data |
|
|
| ai_config_file = ai_support.get("ai_config_file") |
|
|
| if not ai_config_file: |
| |
| if "ai_config" in ai_support: |
| from potato.data_sources.credentials import substitute_env_vars |
| ai_support["ai_config"] = substitute_env_vars(ai_support["ai_config"]) |
| config_data["ai_support"] = ai_support |
| return config_data |
|
|
| if not isinstance(ai_config_file, str): |
| logger.warning("ai_support.ai_config_file must be a string. Ignoring.") |
| return config_data |
|
|
| |
| ai_config_path = os.path.join(config_dir, ai_config_file) |
|
|
| if not os.path.exists(ai_config_path): |
| logger.warning( |
| f"AI config file '{ai_config_file}' not found at {ai_config_path}. " |
| f"AI support will be disabled. Create this file with your endpoint details." |
| ) |
| config_data["ai_support"]["enabled"] = False |
| return config_data |
|
|
| |
| try: |
| with open(ai_config_path, 'r', encoding='utf-8') as f: |
| external_config = yaml.safe_load(f) or {} |
| except yaml.YAMLError as e: |
| logger.warning(f"Invalid YAML in AI config file '{ai_config_file}': {e}. AI support will be disabled.") |
| config_data["ai_support"]["enabled"] = False |
| return config_data |
|
|
| if not isinstance(external_config, dict): |
| logger.warning(f"AI config file '{ai_config_file}' must contain a YAML dictionary. AI support will be disabled.") |
| config_data["ai_support"]["enabled"] = False |
| return config_data |
|
|
| |
| from potato.data_sources.credentials import substitute_env_vars |
| external_config = substitute_env_vars(external_config) |
|
|
| |
| if "endpoint_type" in external_config: |
| ai_support["endpoint_type"] = external_config.pop("endpoint_type") |
|
|
| |
| ai_config = ai_support.get("ai_config", {}) |
| if not isinstance(ai_config, dict): |
| ai_config = {} |
| ai_config.update(external_config) |
| ai_support["ai_config"] = ai_config |
|
|
| |
| ai_support["ai_config"] = substitute_env_vars(ai_support["ai_config"]) |
|
|
| config_data["ai_support"] = ai_support |
| logger.info(f"Loaded AI endpoint config from {ai_config_file}") |
| return config_data |
|
|
|
|
| def load_and_validate_config(config_file: str, project_dir: str) -> Dict[str, Any]: |
| """ |
| Load and validate a YAML configuration file with security checks. |
| |
| Args: |
| config_file: Path to the configuration file |
| project_dir: The project directory |
| |
| Returns: |
| The validated configuration dictionary |
| |
| Raises: |
| ConfigSecurityError: If the configuration file is not secure |
| ConfigValidationError: If the configuration is invalid |
| FileNotFoundError: If the configuration file doesn't exist |
| """ |
| |
| try: |
| validated_config_path = validate_path_security(config_file, project_dir) |
| except ConfigSecurityError as e: |
| raise ConfigSecurityError(f"Configuration file path: {str(e)}") |
|
|
| if not os.path.exists(validated_config_path): |
| raise FileNotFoundError(f"Configuration file not found: {config_file}") |
|
|
| |
| try: |
| with open(validated_config_path, 'r', encoding='utf-8') as file_p: |
| config_data = yaml.safe_load(file_p) |
| except yaml.YAMLError as e: |
| raise ConfigValidationError(f"Invalid YAML format in {config_file}: {str(e)}") |
| except UnicodeDecodeError as e: |
| raise ConfigValidationError(f"Invalid file encoding in {config_file}: {str(e)}") |
| except Exception as e: |
| raise ConfigValidationError(f"Error reading configuration file {config_file}: {str(e)}") |
|
|
| |
| config_file_dir = os.path.dirname(validated_config_path) |
|
|
| |
| config_data = _merge_ai_config_file(config_data, config_file_dir) |
|
|
| |
| if 'task_dir' not in config_data: |
| config_data['task_dir'] = '.' |
| logger.debug("task_dir not specified, defaulting to '.'") |
| if 'site_dir' not in config_data: |
| config_data['site_dir'] = 'default' |
| logger.debug("site_dir not specified, defaulting to 'default'") |
|
|
| |
| if 'task_dir' in config_data: |
| task_dir = config_data['task_dir'] |
| if task_dir == '.' or not os.path.isabs(task_dir): |
| |
| task_dir = os.path.normpath(os.path.join(config_file_dir, task_dir)) |
| config_data['task_dir'] = task_dir |
| logger.debug(f"Resolved task_dir to: {task_dir}") |
|
|
| |
| validate_yaml_structure(config_data, project_dir, config_file_dir) |
|
|
| |
| validate_file_paths(config_data, project_dir, config_file_dir) |
|
|
| return config_data |
|
|
|
|
| def init_config(args): |
| global config |
|
|
| project_dir = os.getcwd() |
| config_file = None |
| config_file_dir = None |
|
|
| try: |
| |
| if args.config_file[-5:] == '.yaml': |
| if os.path.exists(args.config_file): |
| print("INFO: when you run the server directly from a .yaml file, please make sure your config file is put in the annotation project folder") |
| config_file = args.config_file |
| |
| |
| else: |
| raise FileNotFoundError(f"Configuration file not found: {args.config_file}") |
|
|
| |
| elif os.path.isdir(args.config_file): |
| project_dir = args.config_file if os.path.isabs(args.config_file) else os.path.join(project_dir, args.config_file) |
| config_folder = os.path.join(args.config_file, 'configs') |
| if not os.path.isdir(config_folder): |
| raise ConfigValidationError(".yaml file must be put in the configs/ folder under the main project directory when you try to start the project with the project directory, otherwise please directly give the path of the .yaml file") |
|
|
| |
| yamlfiles = [it for it in os.listdir(config_folder) if it[-5:] == '.yaml'] |
|
|
| |
| if len(yamlfiles) == 0: |
| raise ConfigValidationError(f"Configuration file not found under {config_folder}, please make sure .yaml file exists in the given directory, or please directly give the path of the .yaml file") |
| |
| elif len(yamlfiles) == 1: |
| config_file = os.path.join(config_folder, yamlfiles[0]) |
| config_file_dir = config_folder |
|
|
| |
| else: |
| while True: |
| print("multiple config files found, please select the one you want to use (number 0-%d)"%len(yamlfiles)) |
| for i,it in enumerate(yamlfiles): |
| print("[%d] %s"%(i, it)) |
| input_id = input("number: ") |
| try: |
| config_file = os.path.join(config_folder, yamlfiles[int(input_id)]) |
| config_file_dir = config_folder |
| break |
| except Exception: |
| print("wrong input, please reselect") |
|
|
| if not config_file: |
| raise ConfigValidationError(f"Configuration file not found under {config_folder}, please make sure .yaml file exists in the given directory, or please directly give the path of the .yaml file") |
|
|
| |
| |
| if args.config_file[-5:] == '.yaml': |
| |
| try: |
| validated_config_path = validate_path_security(config_file, os.getcwd()) |
| with open(validated_config_path, 'r', encoding='utf-8') as file_p: |
| temp_config_data = yaml.safe_load(file_p) |
| except Exception as e: |
| raise ConfigValidationError(f"Error loading configuration file: {str(e)}") |
|
|
| |
| config_file_abs = os.path.abspath(config_file) |
| config_file_dir = os.path.dirname(config_file_abs) |
|
|
| |
| if 'task_dir' in temp_config_data: |
| task_dir = temp_config_data['task_dir'] |
| if task_dir == '.' or not os.path.isabs(task_dir): |
| |
| task_dir = os.path.normpath(os.path.join(config_file_dir, task_dir)) |
| temp_config_data['task_dir'] = task_dir |
| logger.debug(f"Resolved task_dir to: {task_dir}") |
|
|
| |
| skip_path_validation = os.environ.get('POTATO_SKIP_CONFIG_PATH_VALIDATION', '').lower() in ('1', 'true') |
| if 'task_dir' in temp_config_data and not skip_path_validation: |
| task_dir = temp_config_data['task_dir'] |
| task_dir_abs = os.path.abspath(task_dir) |
| if not config_file_abs.startswith(task_dir_abs): |
| raise ConfigValidationError(f"Configuration file must be in the task_dir. Config file is at '{config_file_abs}' but task_dir is '{task_dir_abs}'") |
| project_dir = task_dir |
|
|
| |
| config_data = load_and_validate_config(config_file, os.getcwd()) |
| |
| if 'task_dir' in temp_config_data: |
| config_data['task_dir'] = temp_config_data['task_dir'] |
| else: |
| config_data = load_and_validate_config(config_file, project_dir) |
|
|
| config.update(config_data) |
|
|
| |
| config_updates = { |
| "verbose": args.verbose, |
| "very_verbose": args.very_verbose, |
| |
| |
| |
| |
| "__config_file__": os.path.abspath(args.config_file), |
| "customjs": args.customjs, |
| "customjs_hostname": args.customjs_hostname, |
| "persist_sessions": args.persist_sessions, |
| } |
|
|
| |
| |
| if args.debug or "debug" not in config: |
| config_updates["debug"] = args.debug |
|
|
| |
| if hasattr(args, 'debug_log') and args.debug_log: |
| config_updates["debug_log"] = args.debug_log |
|
|
| |
| if hasattr(args, 'debug_phase') and args.debug_phase: |
| if not args.debug: |
| print("⚠️ Warning: --debug-phase requires --debug flag. Enabling debug mode.") |
| config_updates["debug"] = True |
| config_updates["debug_phase"] = args.debug_phase |
|
|
| config.update(config_updates) |
|
|
| |
| if "server" in config: |
| server_config = config["server"] |
|
|
| |
| if "port" in server_config and args.port is None: |
| config["port"] = server_config["port"] |
| logger.debug(f"Port set from config file: {server_config['port']}") |
|
|
| |
| if "host" in server_config: |
| |
| config["host"] = server_config["host"] |
| logger.debug(f"Host set from config file: {server_config['host']}") |
|
|
| |
| if "debug" in server_config and not args.debug: |
| config["debug"] = server_config["debug"] |
| logger.debug(f"Debug mode set from config file: {server_config['debug']}") |
|
|
| |
| os.chdir(project_dir) |
|
|
| except (ConfigSecurityError, ConfigValidationError, FileNotFoundError) as e: |
| logger.error(f"Configuration error: {str(e)}") |
| print(f"❌ Configuration error: {str(e)}") |
| print("Please check your configuration file and try again.") |
| raise |
| except Exception as e: |
| logger.error(f"Unexpected error during configuration initialization: {str(e)}") |
| print(f"❌ Unexpected error: {str(e)}") |
| raise |
|
|
|
|
| def validate_active_learning_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate active learning configuration. |
| |
| Args: |
| config_data: The configuration data containing active_learning section |
| |
| Raises: |
| ConfigValidationError: If the active learning configuration is invalid |
| """ |
| if "active_learning" not in config_data: |
| return |
|
|
| al_config = config_data["active_learning"] |
|
|
| |
| if not isinstance(al_config.get("enabled", False), bool): |
| raise ConfigValidationError("active_learning.enabled must be a boolean") |
|
|
| if not al_config.get("enabled", False): |
| return |
|
|
| |
| if "classifier" in al_config: |
| classifier_config = al_config["classifier"] |
| if not isinstance(classifier_config, dict): |
| raise ConfigValidationError("active_learning.classifier must be a dictionary") |
|
|
| if "name" not in classifier_config: |
| raise ConfigValidationError("active_learning.classifier.name is required") |
|
|
| if not isinstance(classifier_config["name"], str): |
| raise ConfigValidationError("active_learning.classifier.name must be a string") |
|
|
| |
| if "hyperparameters" in classifier_config: |
| if not isinstance(classifier_config["hyperparameters"], dict): |
| raise ConfigValidationError("active_learning.classifier.hyperparameters must be a dictionary") |
|
|
| |
| if "vectorizer" in al_config: |
| vectorizer_config = al_config["vectorizer"] |
| if not isinstance(vectorizer_config, dict): |
| raise ConfigValidationError("active_learning.vectorizer must be a dictionary") |
|
|
| if "name" not in vectorizer_config: |
| raise ConfigValidationError("active_learning.vectorizer.name is required") |
|
|
| if not isinstance(vectorizer_config["name"], str): |
| raise ConfigValidationError("active_learning.vectorizer.name must be a string") |
|
|
| |
| if "hyperparameters" in vectorizer_config: |
| if not isinstance(vectorizer_config["hyperparameters"], dict): |
| raise ConfigValidationError("active_learning.vectorizer.hyperparameters must be a dictionary") |
|
|
| |
| if "min_annotations_per_instance" in al_config: |
| min_ann = al_config["min_annotations_per_instance"] |
| if not isinstance(min_ann, int) or min_ann < 1: |
| raise ConfigValidationError("active_learning.min_annotations_per_instance must be a positive integer") |
|
|
| if "min_instances_for_training" in al_config: |
| min_inst = al_config["min_instances_for_training"] |
| if not isinstance(min_inst, int) or min_inst < 2: |
| raise ConfigValidationError("active_learning.min_instances_for_training must be an integer >= 2") |
|
|
| if "max_instances_to_reorder" in al_config: |
| max_inst = al_config["max_instances_to_reorder"] |
| if not isinstance(max_inst, int) or max_inst < 1: |
| raise ConfigValidationError("active_learning.max_instances_to_reorder must be a positive integer") |
|
|
| if "update_frequency" in al_config: |
| update_freq = al_config["update_frequency"] |
| if not isinstance(update_freq, int) or update_freq < 1: |
| raise ConfigValidationError("active_learning.update_frequency must be a positive integer") |
|
|
| |
| if "resolution_strategy" in al_config: |
| strategy = al_config["resolution_strategy"] |
| valid_strategies = ["majority_vote", "random", "consensus", "weighted_average"] |
| if strategy not in valid_strategies: |
| raise ConfigValidationError(f"active_learning.resolution_strategy must be one of: {', '.join(valid_strategies)}") |
|
|
| |
| if "random_sample_percent" in al_config: |
| random_pct = al_config["random_sample_percent"] |
| if not isinstance(random_pct, (int, float)) or random_pct < 0 or random_pct > 1: |
| raise ConfigValidationError("active_learning.random_sample_percent must be between 0 and 1") |
|
|
| |
| if "schema_names" in al_config: |
| schema_names = al_config["schema_names"] |
| if not isinstance(schema_names, list): |
| raise ConfigValidationError("active_learning.schema_names must be a list") |
|
|
| for schema in schema_names: |
| if not isinstance(schema, str): |
| raise ConfigValidationError("active_learning.schema_names must contain only strings") |
|
|
| |
| if schema in ["text", "span"]: |
| raise ConfigValidationError(f"Text and span annotation schemes are not supported for active learning: {schema}") |
|
|
| |
| if "database" in al_config: |
| db_config = al_config["database"] |
| if not isinstance(db_config, dict): |
| raise ConfigValidationError("active_learning.database must be a dictionary") |
|
|
| if "enabled" in db_config and not isinstance(db_config["enabled"], bool): |
| raise ConfigValidationError("active_learning.database.enabled must be a boolean") |
|
|
| |
| if "model_persistence" in al_config: |
| model_config = al_config["model_persistence"] |
| if not isinstance(model_config, dict): |
| raise ConfigValidationError("active_learning.model_persistence must be a dictionary") |
|
|
| if "enabled" in model_config and not isinstance(model_config["enabled"], bool): |
| raise ConfigValidationError("active_learning.model_persistence.enabled must be a boolean") |
|
|
| if "retention_count" in model_config: |
| retention = model_config["retention_count"] |
| if not isinstance(retention, int) or retention < 1: |
| raise ConfigValidationError("active_learning.model_persistence.retention_count must be a positive integer") |
|
|
| |
| if "llm" in al_config: |
| llm_config = al_config["llm"] |
| if not isinstance(llm_config, dict): |
| raise ConfigValidationError("active_learning.llm must be a dictionary") |
|
|
| if "enabled" in llm_config and not isinstance(llm_config["enabled"], bool): |
| raise ConfigValidationError("active_learning.llm.enabled must be a boolean") |
|
|
| if "endpoint_url" in llm_config and not isinstance(llm_config["endpoint_url"], str): |
| raise ConfigValidationError("active_learning.llm.endpoint_url must be a string") |
|
|
| if "model_name" in llm_config and not isinstance(llm_config["model_name"], str): |
| raise ConfigValidationError("active_learning.llm.model_name must be a string") |
|
|
| |
| if "query_strategy" in al_config: |
| strategy = al_config["query_strategy"] |
| valid_strategies = ["uncertainty", "diversity", "badge", "bald", "hybrid"] |
| if strategy not in valid_strategies: |
| raise ConfigValidationError( |
| f"active_learning.query_strategy must be one of: {', '.join(valid_strategies)}" |
| ) |
|
|
| |
| if "hybrid_weights" in al_config: |
| weights = al_config["hybrid_weights"] |
| if not isinstance(weights, dict): |
| raise ConfigValidationError("active_learning.hybrid_weights must be a dictionary") |
| weight_sum = sum(weights.values()) |
| if abs(weight_sum - 1.0) > 0.01: |
| raise ConfigValidationError( |
| f"active_learning.hybrid_weights must sum to 1.0 (got {weight_sum})" |
| ) |
|
|
| |
| if "cold_start_strategy" in al_config: |
| cs = al_config["cold_start_strategy"] |
| if cs not in ["random", "llm"]: |
| raise ConfigValidationError( |
| "active_learning.cold_start_strategy must be one of: random, llm" |
| ) |
|
|
| |
| if "confidence_method" in al_config: |
| cm = al_config["confidence_method"] |
| if cm not in ["logprobs", "verbalized", "consistency"]: |
| raise ConfigValidationError( |
| "active_learning.confidence_method must be one of: logprobs, verbalized, consistency" |
| ) |
|
|
| |
| if "classifier_params" in al_config: |
| if not isinstance(al_config["classifier_params"], dict): |
| raise ConfigValidationError("active_learning.classifier_params must be a dictionary") |
|
|
| if "vectorizer_params" in al_config: |
| if not isinstance(al_config["vectorizer_params"], dict): |
| raise ConfigValidationError("active_learning.vectorizer_params must be a dictionary") |
|
|
| |
| if "calibrate_probabilities" in al_config: |
| if not isinstance(al_config["calibrate_probabilities"], bool): |
| raise ConfigValidationError("active_learning.calibrate_probabilities must be a boolean") |
|
|
| |
| if "bald_params" in al_config: |
| bp = al_config["bald_params"] |
| if not isinstance(bp, dict): |
| raise ConfigValidationError("active_learning.bald_params must be a dictionary") |
| if "n_estimators" in bp: |
| if not isinstance(bp["n_estimators"], int) or bp["n_estimators"] < 2: |
| raise ConfigValidationError("active_learning.bald_params.n_estimators must be an integer >= 2") |
|
|
| |
| if "use_icl_ensemble" in al_config: |
| if not isinstance(al_config["use_icl_ensemble"], bool): |
| raise ConfigValidationError("active_learning.use_icl_ensemble must be a boolean") |
|
|
| if "icl_ensemble_params" in al_config: |
| if not isinstance(al_config["icl_ensemble_params"], dict): |
| raise ConfigValidationError("active_learning.icl_ensemble_params must be a dictionary") |
|
|
| |
| if "annotation_routing" in al_config: |
| if not isinstance(al_config["annotation_routing"], bool): |
| raise ConfigValidationError("active_learning.annotation_routing must be a boolean") |
|
|
| if "routing_thresholds" in al_config: |
| rt = al_config["routing_thresholds"] |
| if not isinstance(rt, dict): |
| raise ConfigValidationError("active_learning.routing_thresholds must be a dictionary") |
| for key in ["auto_label_min_confidence", "show_suggestion_below"]: |
| if key in rt: |
| val = rt[key] |
| if not isinstance(val, (int, float)) or val < 0 or val > 1: |
| raise ConfigValidationError( |
| f"active_learning.routing_thresholds.{key} must be between 0 and 1" |
| ) |
|
|
| |
| if al_config.get("vectorizer_name") == "sentence-transformers" or \ |
| (isinstance(al_config.get("vectorizer"), dict) and |
| al_config["vectorizer"].get("name") == "sentence-transformers"): |
| try: |
| import sentence_transformers |
| except ImportError: |
| logger.warning( |
| "sentence-transformers vectorizer configured but package not installed. " |
| "Install with: pip install sentence-transformers" |
| ) |
|
|
|
|
| def validate_ai_support_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate AI support configuration. |
| |
| Args: |
| config_data: The configuration data containing ai_support section |
| |
| Raises: |
| ConfigValidationError: If the AI support configuration is invalid |
| """ |
| if "ai_support" not in config_data: |
| return |
|
|
| ai_config = config_data["ai_support"] |
|
|
| |
| if not isinstance(ai_config.get("enabled", False), bool): |
| raise ConfigValidationError("ai_support.enabled must be a boolean") |
|
|
| if not ai_config.get("enabled", False): |
| return |
|
|
| |
| has_external_config = False |
| if "ai_config_file" in ai_config: |
| if not isinstance(ai_config["ai_config_file"], str): |
| raise ConfigValidationError("ai_support.ai_config_file must be a string") |
| has_external_config = True |
|
|
| |
| |
| |
| if "endpoint_type" not in ai_config: |
| if has_external_config: |
| return |
| raise ConfigValidationError("ai_support.endpoint_type is required when ai_support is enabled") |
|
|
| endpoint_type = ai_config["endpoint_type"] |
| if not isinstance(endpoint_type, str): |
| raise ConfigValidationError("ai_support.endpoint_type must be a string") |
|
|
| valid_endpoint_types = ["openai", "anthropic", "huggingface", "ollama", "gemini", "vllm", |
| "yolo", "ollama_vision", "openai_vision", "anthropic_vision"] |
| if endpoint_type not in valid_endpoint_types: |
| raise ConfigValidationError(f"ai_support.endpoint_type must be one of: {', '.join(valid_endpoint_types)}") |
|
|
| |
| if "ai_config" in ai_config: |
| ai_endpoint_config = ai_config["ai_config"] |
| if not isinstance(ai_endpoint_config, dict): |
| raise ConfigValidationError("ai_support.ai_config must be a dictionary") |
|
|
| |
| if "model" in ai_endpoint_config: |
| model = ai_endpoint_config["model"] |
| if not isinstance(model, str) or not model.strip(): |
| raise ConfigValidationError("ai_support.ai_config.model must be a non-empty string") |
|
|
| |
| if endpoint_type in ["openai", "anthropic", "huggingface", "gemini"]: |
| api_key = ai_endpoint_config.get("api_key", "") |
| if not api_key or not isinstance(api_key, str): |
| raise ConfigValidationError(f"ai_support.ai_config.api_key is required for {endpoint_type} endpoint") |
|
|
| |
| if endpoint_type == "vllm": |
| base_url = ai_endpoint_config.get("base_url", "") |
| if base_url and not isinstance(base_url, str): |
| raise ConfigValidationError("ai_support.ai_config.base_url must be a string") |
|
|
| |
| if "temperature" in ai_endpoint_config: |
| temperature = ai_endpoint_config["temperature"] |
| if not isinstance(temperature, (int, float)) or temperature < 0 or temperature > 2: |
| raise ConfigValidationError("ai_support.ai_config.temperature must be between 0 and 2") |
|
|
| |
| if "max_tokens" in ai_endpoint_config: |
| max_tokens = ai_endpoint_config["max_tokens"] |
| if not isinstance(max_tokens, int) or max_tokens < 1: |
| raise ConfigValidationError("ai_support.ai_config.max_tokens must be a positive integer") |
|
|
| |
| for prompt_key in ["hint_prompt", "keyword_prompt"]: |
| if prompt_key in ai_endpoint_config: |
| prompt = ai_endpoint_config[prompt_key] |
| if not isinstance(prompt, str): |
| raise ConfigValidationError(f"ai_support.ai_config.{prompt_key} must be a string") |
| if not prompt.strip(): |
| raise ConfigValidationError(f"ai_support.ai_config.{prompt_key} cannot be empty") |
|
|
| |
| if "option_highlighting" in ai_config: |
| _validate_option_highlighting_config(ai_config["option_highlighting"]) |
|
|
|
|
| def validate_chat_support_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate chat support configuration for LLM annotator assistance. |
| |
| Args: |
| config_data: The configuration data containing chat_support section |
| |
| Raises: |
| ConfigValidationError: If the chat support configuration is invalid |
| """ |
| if "chat_support" not in config_data: |
| return |
|
|
| chat_config = config_data["chat_support"] |
|
|
| if not isinstance(chat_config.get("enabled", False), bool): |
| raise ConfigValidationError("chat_support.enabled must be a boolean") |
|
|
| if not chat_config.get("enabled", False): |
| return |
|
|
| |
| if "endpoint_type" not in chat_config: |
| raise ConfigValidationError( |
| "chat_support.endpoint_type is required when chat_support is enabled" |
| ) |
|
|
| endpoint_type = chat_config["endpoint_type"] |
| valid_endpoint_types = [ |
| "openai", "anthropic", "huggingface", "ollama", "gemini", "vllm", "openrouter", |
| ] |
| if endpoint_type not in valid_endpoint_types: |
| raise ConfigValidationError( |
| f"chat_support.endpoint_type must be one of: {', '.join(valid_endpoint_types)}" |
| ) |
|
|
| |
| if "ai_config" in chat_config: |
| ai_cfg = chat_config["ai_config"] |
| if not isinstance(ai_cfg, dict): |
| raise ConfigValidationError("chat_support.ai_config must be a dictionary") |
|
|
| if "model" in ai_cfg: |
| if not isinstance(ai_cfg["model"], str) or not ai_cfg["model"].strip(): |
| raise ConfigValidationError( |
| "chat_support.ai_config.model must be a non-empty string" |
| ) |
|
|
| if "temperature" in ai_cfg: |
| temp = ai_cfg["temperature"] |
| if not isinstance(temp, (int, float)) or temp < 0 or temp > 2: |
| raise ConfigValidationError( |
| "chat_support.ai_config.temperature must be between 0 and 2" |
| ) |
|
|
| if "max_tokens" in ai_cfg: |
| mt = ai_cfg["max_tokens"] |
| if not isinstance(mt, int) or mt < 1: |
| raise ConfigValidationError( |
| "chat_support.ai_config.max_tokens must be a positive integer" |
| ) |
|
|
| |
| if endpoint_type in ["openai", "anthropic", "huggingface", "gemini", "openrouter"]: |
| api_key = ai_cfg.get("api_key", "") |
| if not api_key or not isinstance(api_key, str): |
| raise ConfigValidationError( |
| f"chat_support.ai_config.api_key is required for {endpoint_type} endpoint" |
| ) |
|
|
| |
| if "ui" in chat_config: |
| ui_cfg = chat_config["ui"] |
| if not isinstance(ui_cfg, dict): |
| raise ConfigValidationError("chat_support.ui must be a dictionary") |
|
|
| if "sidebar_width" in ui_cfg: |
| sw = ui_cfg["sidebar_width"] |
| if not isinstance(sw, int) or sw < 200 or sw > 800: |
| raise ConfigValidationError( |
| "chat_support.ui.sidebar_width must be an integer between 200 and 800" |
| ) |
|
|
| if "max_history_per_instance" in ui_cfg: |
| mh = ui_cfg["max_history_per_instance"] |
| if not isinstance(mh, int) or mh < 1: |
| raise ConfigValidationError( |
| "chat_support.ui.max_history_per_instance must be a positive integer" |
| ) |
|
|
|
|
| def _validate_option_highlighting_config(oh_config: Dict[str, Any]) -> None: |
| """ |
| Validate option highlighting configuration. |
| |
| Args: |
| oh_config: The option_highlighting configuration section |
| |
| Raises: |
| ConfigValidationError: If the configuration is invalid |
| """ |
| if not isinstance(oh_config, dict): |
| raise ConfigValidationError("ai_support.option_highlighting must be a dictionary") |
|
|
| |
| if "enabled" in oh_config: |
| if not isinstance(oh_config["enabled"], bool): |
| raise ConfigValidationError("ai_support.option_highlighting.enabled must be a boolean") |
|
|
| |
| if "top_k" in oh_config: |
| top_k = oh_config["top_k"] |
| if not isinstance(top_k, int) or top_k < 1 or top_k > 10: |
| raise ConfigValidationError("ai_support.option_highlighting.top_k must be an integer between 1 and 10") |
|
|
| |
| if "dim_opacity" in oh_config: |
| dim_opacity = oh_config["dim_opacity"] |
| if not isinstance(dim_opacity, (int, float)) or dim_opacity < 0.1 or dim_opacity > 0.9: |
| raise ConfigValidationError("ai_support.option_highlighting.dim_opacity must be a number between 0.1 and 0.9") |
|
|
| |
| if "auto_apply" in oh_config: |
| if not isinstance(oh_config["auto_apply"], bool): |
| raise ConfigValidationError("ai_support.option_highlighting.auto_apply must be a boolean") |
|
|
| |
| if "schemas" in oh_config: |
| schemas = oh_config["schemas"] |
| if schemas is not None: |
| if not isinstance(schemas, list): |
| raise ConfigValidationError("ai_support.option_highlighting.schemas must be a list or null") |
| for schema in schemas: |
| if not isinstance(schema, str): |
| raise ConfigValidationError("ai_support.option_highlighting.schemas must contain only strings") |
|
|
| |
| if "prefetch_count" in oh_config: |
| prefetch_count = oh_config["prefetch_count"] |
| if not isinstance(prefetch_count, int) or prefetch_count < 0 or prefetch_count > 100: |
| raise ConfigValidationError("ai_support.option_highlighting.prefetch_count must be an integer between 0 and 100") |
|
|
|
|
| def parse_active_learning_config(config_data: Dict[str, Any]) -> 'ActiveLearningConfig': |
| """ |
| Parse active learning configuration from YAML data. |
| |
| Args: |
| config_data: The configuration data containing active_learning section |
| |
| Returns: |
| ActiveLearningConfig: Parsed active learning configuration |
| |
| Raises: |
| ConfigValidationError: If the configuration is invalid |
| """ |
| from potato.active_learning_manager import ActiveLearningConfig, ResolutionStrategy |
|
|
| if "active_learning" not in config_data: |
| return ActiveLearningConfig() |
|
|
| al_config = config_data["active_learning"] |
|
|
| |
| classifier_name = "sklearn.linear_model.LogisticRegression" |
| classifier_kwargs = {} |
| if "classifier" in al_config: |
| classifier_config = al_config["classifier"] |
| classifier_name = classifier_config.get("name", classifier_name) |
| classifier_kwargs = classifier_config.get("hyperparameters", {}) |
|
|
| |
| vectorizer_name = "sklearn.feature_extraction.text.CountVectorizer" |
| vectorizer_kwargs = {} |
| if "vectorizer" in al_config: |
| vectorizer_config = al_config["vectorizer"] |
| vectorizer_name = vectorizer_config.get("name", vectorizer_name) |
| vectorizer_kwargs = vectorizer_config.get("hyperparameters", {}) |
|
|
| |
| resolution_strategy = ResolutionStrategy.MAJORITY_VOTE |
| if "resolution_strategy" in al_config: |
| strategy_str = al_config["resolution_strategy"] |
| if strategy_str == "majority_vote": |
| resolution_strategy = ResolutionStrategy.MAJORITY_VOTE |
| elif strategy_str == "random": |
| resolution_strategy = ResolutionStrategy.RANDOM |
| elif strategy_str == "consensus": |
| resolution_strategy = ResolutionStrategy.CONSENSUS |
| elif strategy_str == "weighted_average": |
| resolution_strategy = ResolutionStrategy.WEIGHTED_AVERAGE |
|
|
| |
| min_annotations_per_instance = al_config.get("min_annotations_per_instance", 1) |
| min_instances_for_training = al_config.get("min_instances_for_training", 10) |
| max_instances_to_reorder = al_config.get("max_instances_to_reorder") |
| random_sample_percent = al_config.get("random_sample_percent", 0.2) |
| update_frequency = al_config.get("update_frequency", 5) |
| schema_names = al_config.get("schema_names", []) |
|
|
| |
| database_enabled = False |
| database_config = {} |
| if "database" in al_config: |
| db_config = al_config["database"] |
| database_enabled = db_config.get("enabled", False) |
| database_config = {k: v for k, v in db_config.items() if k != "enabled"} |
|
|
| |
| model_persistence_enabled = False |
| model_save_directory = None |
| model_retention_count = 2 |
| if "model_persistence" in al_config: |
| model_config = al_config["model_persistence"] |
| model_persistence_enabled = model_config.get("enabled", False) |
| model_save_directory = model_config.get("save_directory") |
| model_retention_count = model_config.get("retention_count", 2) |
|
|
| |
| llm_enabled = False |
| llm_config = {} |
| if "llm" in al_config: |
| llm_config = al_config["llm"] |
| llm_enabled = llm_config.get("enabled", False) |
|
|
| return ActiveLearningConfig( |
| enabled=al_config.get("enabled", False), |
| classifier_name=classifier_name, |
| classifier_kwargs=classifier_kwargs, |
| vectorizer_name=vectorizer_name, |
| vectorizer_kwargs=vectorizer_kwargs, |
| min_annotations_per_instance=min_annotations_per_instance, |
| min_instances_for_training=min_instances_for_training, |
| max_instances_to_reorder=max_instances_to_reorder, |
| resolution_strategy=resolution_strategy, |
| random_sample_percent=random_sample_percent, |
| update_frequency=update_frequency, |
| schema_names=schema_names, |
| database_enabled=database_enabled, |
| database_config=database_config, |
| model_persistence_enabled=model_persistence_enabled, |
| model_save_directory=model_save_directory, |
| model_retention_count=model_retention_count, |
| llm_enabled=llm_enabled, |
| llm_config=llm_config |
| ) |
|
|
|
|
| def validate_instance_display_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate instance_display configuration. |
| |
| The instance_display section defines what content to show annotators, |
| separate from what annotations to collect. This allows displaying |
| images/videos/audio alongside any annotation type. |
| |
| Args: |
| config_data: The configuration data |
| |
| Raises: |
| ConfigValidationError: If the instance_display configuration is invalid |
| """ |
| if "instance_display" not in config_data: |
| return |
|
|
| display_config = config_data["instance_display"] |
|
|
| if not isinstance(display_config, dict): |
| raise ConfigValidationError("instance_display must be a dictionary") |
|
|
| |
| if "fields" not in display_config: |
| raise ConfigValidationError("instance_display must contain 'fields' list") |
|
|
| fields = display_config["fields"] |
| if not isinstance(fields, list): |
| raise ConfigValidationError("instance_display.fields must be a list") |
|
|
| if not fields: |
| raise ConfigValidationError("instance_display.fields cannot be empty") |
|
|
| |
| span_targets = [] |
|
|
| |
| |
| |
| try: |
| from .displays import display_registry |
| valid_display_types = display_registry.get_supported_types() |
| except Exception: |
| valid_display_types = [ |
| "text", "html", "image", "video", "audio", "dialogue", "pairwise", |
| "pdf", "document", "spreadsheet", "code", "agent_trace", "eval_trace", |
| "gallery", "conversation_tree", "interactive_chat", "web_agent_trace", |
| "live_agent", "coding_trace", "live_coding_agent", |
| ] |
|
|
| for i, field in enumerate(fields): |
| if not isinstance(field, dict): |
| raise ConfigValidationError(f"instance_display.fields[{i}] must be a dictionary") |
|
|
| |
| if "key" not in field: |
| raise ConfigValidationError(f"instance_display.fields[{i}] missing required 'key' property") |
|
|
| key = field["key"] |
| if not isinstance(key, str) or not key.strip(): |
| raise ConfigValidationError(f"instance_display.fields[{i}].key must be a non-empty string") |
|
|
| if "type" not in field: |
| raise ConfigValidationError(f"instance_display.fields[{i}] missing required 'type' property") |
|
|
| field_type = field["type"] |
| if field_type not in valid_display_types: |
| raise ConfigValidationError( |
| f"instance_display.fields[{i}].type '{field_type}' is invalid. " |
| f"Valid types are: {', '.join(valid_display_types)}" |
| ) |
|
|
| |
| if "label" in field: |
| if not isinstance(field["label"], str): |
| raise ConfigValidationError(f"instance_display.fields[{i}].label must be a string") |
|
|
| |
| if field.get("span_target"): |
| |
| span_target_types = ["text", "dialogue", "pdf", "document", "spreadsheet", "code", "agent_trace", "interactive_chat"] |
| if field_type not in span_target_types: |
| raise ConfigValidationError( |
| f"instance_display.fields[{i}].span_target is set but type '{field_type}' " |
| f"does not support span annotation. Types that support span_target: {', '.join(span_target_types)}." |
| ) |
| span_targets.append(key) |
|
|
| |
| if "display_options" in field: |
| options = field["display_options"] |
| if not isinstance(options, dict): |
| raise ConfigValidationError(f"instance_display.fields[{i}].display_options must be a dictionary") |
|
|
| |
| _validate_display_options(field_type, options, f"instance_display.fields[{i}]") |
|
|
| |
| if "layout" in display_config: |
| layout = display_config["layout"] |
| if not isinstance(layout, dict): |
| raise ConfigValidationError("instance_display.layout must be a dictionary") |
|
|
| if "direction" in layout: |
| valid_directions = ["vertical", "horizontal"] |
| if layout["direction"] not in valid_directions: |
| raise ConfigValidationError( |
| f"instance_display.layout.direction must be one of: {', '.join(valid_directions)}" |
| ) |
|
|
| if "gap" in layout: |
| gap = layout["gap"] |
| if not isinstance(gap, str): |
| raise ConfigValidationError("instance_display.layout.gap must be a string (e.g., '20px', '1rem')") |
|
|
| |
| if "resizable" in display_config: |
| if not isinstance(display_config["resizable"], bool): |
| raise ConfigValidationError("instance_display.resizable must be a boolean (true/false)") |
|
|
| |
| _check_display_only_deprecation(config_data) |
|
|
|
|
| def _validate_display_options(field_type: str, options: Dict[str, Any], path: str) -> None: |
| """ |
| Validate display options for a specific field type. |
| |
| Args: |
| field_type: The display type |
| options: The display options dictionary |
| path: The config path for error messages |
| |
| Raises: |
| ConfigValidationError: If options are invalid |
| """ |
| |
| if "max_width" in options: |
| max_width = options["max_width"] |
| if not isinstance(max_width, (int, str)): |
| raise ConfigValidationError(f"{path}.display_options.max_width must be an integer or string") |
| if isinstance(max_width, int) and max_width < 1: |
| raise ConfigValidationError(f"{path}.display_options.max_width must be positive") |
|
|
| if "max_height" in options: |
| max_height = options["max_height"] |
| if not isinstance(max_height, (int, str)): |
| raise ConfigValidationError(f"{path}.display_options.max_height must be an integer or string") |
| if isinstance(max_height, int) and max_height < 1: |
| raise ConfigValidationError(f"{path}.display_options.max_height must be positive") |
|
|
| if "min_height" in options: |
| min_height = options["min_height"] |
| if not isinstance(min_height, (int, str)): |
| raise ConfigValidationError(f"{path}.display_options.min_height must be an integer or string") |
| if isinstance(min_height, int) and min_height < 1: |
| raise ConfigValidationError(f"{path}.display_options.min_height must be positive") |
|
|
| if "resizable" in options: |
| if not isinstance(options["resizable"], bool): |
| raise ConfigValidationError(f"{path}.display_options.resizable must be a boolean") |
|
|
| |
| if field_type in ["text", "html"]: |
| if "collapsible" in options: |
| if not isinstance(options["collapsible"], bool): |
| raise ConfigValidationError(f"{path}.display_options.collapsible must be a boolean") |
|
|
| if "preserve_whitespace" in options: |
| if not isinstance(options["preserve_whitespace"], bool): |
| raise ConfigValidationError(f"{path}.display_options.preserve_whitespace must be a boolean") |
|
|
| |
| if field_type == "image": |
| if "zoomable" in options: |
| if not isinstance(options["zoomable"], bool): |
| raise ConfigValidationError(f"{path}.display_options.zoomable must be a boolean") |
|
|
| if "object_fit" in options: |
| valid_fits = ["contain", "cover", "fill", "none", "scale-down"] |
| if options["object_fit"] not in valid_fits: |
| raise ConfigValidationError( |
| f"{path}.display_options.object_fit must be one of: {', '.join(valid_fits)}" |
| ) |
|
|
| |
| if field_type == "video": |
| for bool_opt in ["controls", "autoplay", "loop", "muted"]: |
| if bool_opt in options: |
| if not isinstance(options[bool_opt], bool): |
| raise ConfigValidationError(f"{path}.display_options.{bool_opt} must be a boolean") |
|
|
| |
| if field_type == "audio": |
| if "controls" in options: |
| if not isinstance(options["controls"], bool): |
| raise ConfigValidationError(f"{path}.display_options.controls must be a boolean") |
|
|
| if "show_waveform" in options: |
| if not isinstance(options["show_waveform"], bool): |
| raise ConfigValidationError(f"{path}.display_options.show_waveform must be a boolean") |
|
|
| |
| if field_type == "dialogue": |
| if "alternating_shading" in options: |
| if not isinstance(options["alternating_shading"], bool): |
| raise ConfigValidationError(f"{path}.display_options.alternating_shading must be a boolean") |
|
|
| if "speaker_extraction" in options: |
| if not isinstance(options["speaker_extraction"], bool): |
| raise ConfigValidationError(f"{path}.display_options.speaker_extraction must be a boolean") |
|
|
| |
| if field_type == "pairwise": |
| if "cell_width" in options: |
| cell_width = options["cell_width"] |
| if not isinstance(cell_width, str): |
| raise ConfigValidationError(f"{path}.display_options.cell_width must be a string (e.g., '50%')") |
|
|
| |
| if field_type == "pdf": |
| if "view_mode" in options: |
| valid_modes = ["scroll", "paginated", "side-by-side"] |
| if options["view_mode"] not in valid_modes: |
| raise ConfigValidationError( |
| f"{path}.display_options.view_mode must be one of: {', '.join(valid_modes)}" |
| ) |
|
|
| if "text_layer" in options: |
| if not isinstance(options["text_layer"], bool): |
| raise ConfigValidationError(f"{path}.display_options.text_layer must be a boolean") |
|
|
| if "zoom" in options: |
| zoom = options["zoom"] |
| valid_zoom_modes = ["auto", "page-fit", "page-width"] |
| if zoom not in valid_zoom_modes: |
| try: |
| float(zoom) |
| except (TypeError, ValueError): |
| raise ConfigValidationError( |
| f"{path}.display_options.zoom must be one of {valid_zoom_modes} or a number" |
| ) |
|
|
| |
| if field_type == "document": |
| if "collapsible" in options: |
| if not isinstance(options["collapsible"], bool): |
| raise ConfigValidationError(f"{path}.display_options.collapsible must be a boolean") |
|
|
| if "show_outline" in options: |
| if not isinstance(options["show_outline"], bool): |
| raise ConfigValidationError(f"{path}.display_options.show_outline must be a boolean") |
|
|
| if "style_theme" in options: |
| valid_themes = ["default", "minimal", "print"] |
| if options["style_theme"] not in valid_themes: |
| raise ConfigValidationError( |
| f"{path}.display_options.style_theme must be one of: {', '.join(valid_themes)}" |
| ) |
|
|
| |
| if field_type == "spreadsheet": |
| if "annotation_mode" in options: |
| valid_modes = ["row", "cell", "range"] |
| if options["annotation_mode"] not in valid_modes: |
| raise ConfigValidationError( |
| f"{path}.display_options.annotation_mode must be one of: {', '.join(valid_modes)}" |
| ) |
|
|
| for bool_opt in ["show_headers", "striped", "hoverable", "sortable", "selectable", "compact"]: |
| if bool_opt in options: |
| if not isinstance(options[bool_opt], bool): |
| raise ConfigValidationError(f"{path}.display_options.{bool_opt} must be a boolean") |
|
|
| |
| if field_type == "code": |
| if "language" in options: |
| if not isinstance(options["language"], (str, type(None))): |
| raise ConfigValidationError(f"{path}.display_options.language must be a string or null") |
|
|
| if "show_line_numbers" in options: |
| if not isinstance(options["show_line_numbers"], bool): |
| raise ConfigValidationError(f"{path}.display_options.show_line_numbers must be a boolean") |
|
|
| if "wrap_lines" in options: |
| if not isinstance(options["wrap_lines"], bool): |
| raise ConfigValidationError(f"{path}.display_options.wrap_lines must be a boolean") |
|
|
| if "highlight_lines" in options: |
| hl = options["highlight_lines"] |
| if hl is not None and not isinstance(hl, list): |
| raise ConfigValidationError(f"{path}.display_options.highlight_lines must be a list of line numbers or null") |
|
|
| if "theme" in options: |
| valid_themes = ["default", "dark"] |
| if options["theme"] not in valid_themes: |
| raise ConfigValidationError( |
| f"{path}.display_options.theme must be one of: {', '.join(valid_themes)}" |
| ) |
|
|
|
|
| def validate_format_handling_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate format_handling configuration for extended format support. |
| |
| Args: |
| config_data: The full configuration data |
| |
| Raises: |
| ConfigValidationError: If the format_handling configuration is invalid |
| """ |
| format_config = config_data.get('format_handling') |
| if format_config is None: |
| return |
|
|
| if not isinstance(format_config, dict): |
| raise ConfigValidationError("format_handling must be a dictionary") |
|
|
| |
| if "enabled" in format_config: |
| if not isinstance(format_config["enabled"], bool): |
| raise ConfigValidationError("format_handling.enabled must be a boolean") |
|
|
| |
| if "default_format" in format_config: |
| default = format_config["default_format"] |
| valid_defaults = ["auto", "pdf", "docx", "markdown", "spreadsheet", "code"] |
| if default not in valid_defaults: |
| raise ConfigValidationError( |
| f"format_handling.default_format must be one of: {', '.join(valid_defaults)}" |
| ) |
|
|
| |
| if "pdf" in format_config: |
| pdf_opts = format_config["pdf"] |
| if not isinstance(pdf_opts, dict): |
| raise ConfigValidationError("format_handling.pdf must be a dictionary") |
|
|
| if "extraction_mode" in pdf_opts: |
| valid_modes = ["text", "ocr", "hybrid"] |
| if pdf_opts["extraction_mode"] not in valid_modes: |
| raise ConfigValidationError( |
| f"format_handling.pdf.extraction_mode must be one of: {', '.join(valid_modes)}" |
| ) |
|
|
| if "cache_extracted" in pdf_opts: |
| if not isinstance(pdf_opts["cache_extracted"], bool): |
| raise ConfigValidationError("format_handling.pdf.cache_extracted must be a boolean") |
|
|
| |
| if "spreadsheet" in format_config: |
| ss_opts = format_config["spreadsheet"] |
| if not isinstance(ss_opts, dict): |
| raise ConfigValidationError("format_handling.spreadsheet must be a dictionary") |
|
|
| if "annotation_mode" in ss_opts: |
| valid_modes = ["row", "cell", "range"] |
| if ss_opts["annotation_mode"] not in valid_modes: |
| raise ConfigValidationError( |
| f"format_handling.spreadsheet.annotation_mode must be one of: {', '.join(valid_modes)}" |
| ) |
|
|
| if "max_rows" in ss_opts: |
| max_rows = ss_opts["max_rows"] |
| if not isinstance(max_rows, int) or max_rows < 1: |
| raise ConfigValidationError("format_handling.spreadsheet.max_rows must be a positive integer") |
|
|
|
|
| def validate_layout_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate layout configuration for annotation form grid arrangement. |
| |
| The layout section configures how annotation forms are arranged in a grid, |
| supports grouping schemas with collapsible headers, and provides responsive |
| breakpoints for mobile/tablet displays. |
| |
| Args: |
| config_data: The full configuration data |
| |
| Raises: |
| ConfigValidationError: If the layout configuration is invalid |
| """ |
| layout = config_data.get('layout') |
| if layout is None: |
| return |
|
|
| if not isinstance(layout, dict): |
| raise ConfigValidationError("layout must be a dictionary") |
|
|
| |
| if 'grid' in layout: |
| grid = layout['grid'] |
| if not isinstance(grid, dict): |
| raise ConfigValidationError("layout.grid must be a dictionary") |
|
|
| |
| if 'columns' in grid: |
| columns = grid['columns'] |
| if not isinstance(columns, int) or columns < 1 or columns > 6: |
| raise ConfigValidationError("layout.grid.columns must be an integer between 1 and 6") |
|
|
| |
| if 'gap' in grid: |
| gap = grid['gap'] |
| if not isinstance(gap, str) or not gap.strip(): |
| raise ConfigValidationError("layout.grid.gap must be a non-empty CSS value string (e.g., '1rem', '16px')") |
|
|
| |
| if 'row_gap' in grid: |
| row_gap = grid['row_gap'] |
| if not isinstance(row_gap, str) or not row_gap.strip(): |
| raise ConfigValidationError("layout.grid.row_gap must be a non-empty CSS value string") |
|
|
| |
| if 'align_items' in grid: |
| valid_alignments = ['start', 'center', 'end', 'stretch'] |
| if grid['align_items'] not in valid_alignments: |
| raise ConfigValidationError( |
| f"layout.grid.align_items must be one of: {', '.join(valid_alignments)}" |
| ) |
|
|
| |
| if 'breakpoints' in layout: |
| breakpoints = layout['breakpoints'] |
| if not isinstance(breakpoints, dict): |
| raise ConfigValidationError("layout.breakpoints must be a dictionary") |
|
|
| for bp_name in ['mobile', 'tablet']: |
| if bp_name in breakpoints: |
| bp_value = breakpoints[bp_name] |
| if not isinstance(bp_value, int) or bp_value < 0: |
| raise ConfigValidationError( |
| f"layout.breakpoints.{bp_name} must be a non-negative integer (pixel value)" |
| ) |
|
|
| |
| if 'groups' in layout: |
| groups = layout['groups'] |
| if not isinstance(groups, list): |
| raise ConfigValidationError("layout.groups must be a list") |
|
|
| |
| all_schemas = set() |
| schemes = config_data.get('annotation_schemes', []) |
| for scheme in schemes: |
| if isinstance(scheme, dict) and 'name' in scheme: |
| all_schemas.add(scheme['name']) |
|
|
| group_ids = set() |
| for i, group in enumerate(groups): |
| if not isinstance(group, dict): |
| raise ConfigValidationError(f"layout.groups[{i}] must be a dictionary") |
|
|
| |
| if 'id' not in group: |
| raise ConfigValidationError(f"layout.groups[{i}] missing required 'id' field") |
|
|
| group_id = group['id'] |
| if not isinstance(group_id, str) or not group_id.strip(): |
| raise ConfigValidationError(f"layout.groups[{i}].id must be a non-empty string") |
|
|
| if group_id in group_ids: |
| raise ConfigValidationError(f"layout.groups[{i}].id '{group_id}' is duplicate") |
| group_ids.add(group_id) |
|
|
| |
| if 'schemas' not in group: |
| raise ConfigValidationError(f"layout.groups[{i}] missing required 'schemas' field") |
|
|
| group_schemas = group['schemas'] |
| if not isinstance(group_schemas, list): |
| raise ConfigValidationError(f"layout.groups[{i}].schemas must be a list") |
|
|
| if not group_schemas: |
| raise ConfigValidationError(f"layout.groups[{i}].schemas cannot be empty") |
|
|
| |
| for j, schema_name in enumerate(group_schemas): |
| if not isinstance(schema_name, str): |
| raise ConfigValidationError( |
| f"layout.groups[{i}].schemas[{j}] must be a string" |
| ) |
| if schema_name not in all_schemas: |
| raise ConfigValidationError( |
| f"layout.groups[{i}].schemas references unknown schema: '{schema_name}'" |
| ) |
|
|
| |
| if 'collapsible' in group: |
| if not isinstance(group['collapsible'], bool): |
| raise ConfigValidationError(f"layout.groups[{i}].collapsible must be a boolean") |
|
|
| if 'collapsed_default' in group: |
| if not isinstance(group['collapsed_default'], bool): |
| raise ConfigValidationError(f"layout.groups[{i}].collapsed_default must be a boolean") |
|
|
| |
| if 'title' in group: |
| if not isinstance(group['title'], str): |
| raise ConfigValidationError(f"layout.groups[{i}].title must be a string") |
|
|
| |
| if 'description' in group: |
| if not isinstance(group['description'], str): |
| raise ConfigValidationError(f"layout.groups[{i}].description must be a string") |
|
|
| |
| if 'order' in layout: |
| order = layout['order'] |
| if not isinstance(order, list): |
| raise ConfigValidationError("layout.order must be a list") |
|
|
| for i, schema_name in enumerate(order): |
| if not isinstance(schema_name, str): |
| raise ConfigValidationError(f"layout.order[{i}] must be a string") |
|
|
| |
| if 'styling' in layout: |
| styling = layout['styling'] |
| if not isinstance(styling, dict): |
| raise ConfigValidationError("layout.styling must be a dictionary") |
|
|
| |
| if 'align_items' in styling: |
| valid_alignments = ['start', 'center', 'end', 'stretch'] |
| if styling['align_items'] not in valid_alignments: |
| raise ConfigValidationError( |
| f"layout.styling.align_items must be one of: {', '.join(valid_alignments)}" |
| ) |
|
|
| |
| if 'content_align' in styling: |
| valid_content_align = ['left', 'center', 'right'] |
| if styling['content_align'] not in valid_content_align: |
| raise ConfigValidationError( |
| f"layout.styling.content_align must be one of: {', '.join(valid_content_align)}" |
| ) |
|
|
| |
| for color_key in ['group_background_odd', 'group_background_even']: |
| if color_key in styling: |
| color = styling[color_key] |
| if not isinstance(color, str) or not color.strip(): |
| raise ConfigValidationError( |
| f"layout.styling.{color_key} must be a non-empty CSS color value" |
| ) |
|
|
| |
| for padding_key in ['group_padding', 'form_padding']: |
| if padding_key in styling: |
| padding = styling[padding_key] |
| if not isinstance(padding, str) or not padding.strip(): |
| raise ConfigValidationError( |
| f"layout.styling.{padding_key} must be a non-empty CSS padding value" |
| ) |
|
|
| |
| if 'groups' in layout: |
| for i, group in enumerate(layout['groups']): |
| if 'background_color' in group: |
| bg_color = group['background_color'] |
| if not isinstance(bg_color, str) or not bg_color.strip(): |
| raise ConfigValidationError( |
| f"layout.groups[{i}].background_color must be a non-empty CSS color value" |
| ) |
|
|
|
|
| def validate_adjudication_config(config_data: Dict[str, Any]) -> None: |
| """ |
| Validate adjudication configuration. |
| |
| Args: |
| config_data: The full configuration data |
| |
| Raises: |
| ConfigValidationError: If the adjudication configuration is invalid |
| """ |
| adj_config = config_data.get('adjudication', {}) |
| if not isinstance(adj_config, dict): |
| raise ConfigValidationError("adjudication must be a dictionary") |
|
|
| if not adj_config.get('enabled', False): |
| return |
|
|
| |
| users = adj_config.get('adjudicator_users', []) |
| if not isinstance(users, list) or len(users) == 0: |
| raise ConfigValidationError( |
| "adjudication.adjudicator_users must be a non-empty list of usernames" |
| ) |
|
|
| |
| min_ann = adj_config.get('min_annotations', 2) |
| if not isinstance(min_ann, int) or min_ann < 1: |
| raise ConfigValidationError( |
| "adjudication.min_annotations must be a positive integer" |
| ) |
|
|
| threshold = adj_config.get('agreement_threshold', 0.75) |
| if not isinstance(threshold, (int, float)) or threshold < 0 or threshold > 1: |
| raise ConfigValidationError( |
| "adjudication.agreement_threshold must be a number between 0 and 1" |
| ) |
|
|
| fast_warn = adj_config.get('fast_decision_warning_ms', 2000) |
| if not isinstance(fast_warn, (int, float)) or fast_warn < 0: |
| raise ConfigValidationError( |
| "adjudication.fast_decision_warning_ms must be a non-negative number" |
| ) |
|
|
| |
| taxonomy = adj_config.get('error_taxonomy') |
| if taxonomy is not None: |
| if not isinstance(taxonomy, list): |
| raise ConfigValidationError( |
| "adjudication.error_taxonomy must be a list of strings" |
| ) |
| for item in taxonomy: |
| if not isinstance(item, str): |
| raise ConfigValidationError( |
| "adjudication.error_taxonomy entries must be strings" |
| ) |
|
|
| |
| sim_config = adj_config.get('similarity', {}) |
| if isinstance(sim_config, dict) and sim_config.get('enabled', False): |
| top_k = sim_config.get('top_k', 5) |
| if not isinstance(top_k, int) or top_k < 1 or top_k > 20: |
| raise ConfigValidationError( |
| "adjudication.similarity.top_k must be an integer between 1 and 20" |
| ) |
|
|
| model = sim_config.get('model', 'all-MiniLM-L6-v2') |
| if not isinstance(model, str) or not model.strip(): |
| raise ConfigValidationError( |
| "adjudication.similarity.model must be a non-empty string" |
| ) |
|
|
|
|
| def _check_display_only_deprecation(config_data: Dict[str, Any]) -> None: |
| """ |
| Check for deprecated display-only pattern and log warning. |
| |
| Detects when image_annotation, video_annotation, or audio_annotation |
| is used with min_annotations: 0 just to display content. |
| |
| Args: |
| config_data: The configuration data |
| """ |
| |
| schemes = [] |
| if "annotation_schemes" in config_data: |
| schemes = config_data["annotation_schemes"] |
| elif "phases" in config_data: |
| phases = config_data["phases"] |
| if isinstance(phases, list): |
| for phase in phases: |
| schemes.extend(phase.get("annotation_schemes", [])) |
| elif isinstance(phases, dict): |
| for phase_name, phase in phases.items(): |
| if phase_name != "order" and isinstance(phase, dict): |
| schemes.extend(phase.get("annotation_schemes", [])) |
|
|
| for scheme in schemes: |
| if not isinstance(scheme, dict): |
| continue |
|
|
| annotation_type = scheme.get("annotation_type") |
| if annotation_type in ["image_annotation", "video_annotation", "audio_annotation"]: |
| min_annotations = scheme.get("min_annotations", 1) |
| if min_annotations == 0: |
| logger.warning( |
| f"Deprecation warning: Using {annotation_type} with min_annotations=0 " |
| f"for display-only is deprecated. Use instance_display instead. " |
| f"See docs/instance_display.md for migration guide." |
| ) |
|
|