Spaces:
Paused
Paused
| """ | |
| Adjudication Module | |
| This module provides a comprehensive adjudication system where designated users | |
| review items with multiple annotations, resolve disagreements, and produce | |
| gold-standard final decisions. | |
| Adjudication is NOT a phase — it's a parallel workflow accessible via a dedicated | |
| /adjudicate route, available to users with adjudicator privileges. This avoids | |
| disrupting the existing phase progression system. | |
| Key Components: | |
| - AdjudicationConfig: Configuration dataclass for adjudication settings | |
| - AdjudicationItem: Represents an item eligible for adjudication with all annotations | |
| - AdjudicationDecision: Represents an adjudicator's final decision on an item | |
| - AdjudicationManager: Singleton manager for the adjudication workflow | |
| The workflow: | |
| 1. Annotators complete annotations via /annotate (existing workflow) | |
| 2. AdjudicationManager monitors annotation counts and agreement | |
| 3. Items are flagged when criteria are met (min annotations, low agreement) | |
| 4. Adjudicators review items via /adjudicate and submit decisions | |
| 5. Final dataset CLI merges unanimous + adjudicated decisions | |
| """ | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import threading | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any, Set | |
| logger = logging.getLogger(__name__) | |
| # Singleton instance | |
| _ADJUDICATION_MANAGER = None | |
| _ADJUDICATION_LOCK = threading.Lock() | |
| class AdjudicationConfig: | |
| """Configuration for adjudication features.""" | |
| enabled: bool = False | |
| adjudicator_users: List[str] = field(default_factory=list) | |
| # Trigger criteria | |
| min_annotations: int = 2 | |
| require_fully_annotated: bool = False | |
| agreement_threshold: float = 0.75 | |
| show_all_items: bool = False | |
| # Display options | |
| show_annotator_names: bool = True | |
| show_timing_data: bool = True | |
| show_agreement_scores: bool = True | |
| fast_decision_warning_ms: int = 2000 | |
| # Adjudicator metadata fields | |
| require_confidence: bool = True | |
| require_notes_on_override: bool = False | |
| error_taxonomy: List[str] = field(default_factory=lambda: [ | |
| "ambiguous_text", "guideline_gap", "annotator_error", | |
| "edge_case", "subjective_disagreement", "other" | |
| ]) | |
| # Similarity (Phase 3, optional) | |
| similarity_enabled: bool = False | |
| similarity_model: str = "all-MiniLM-L6-v2" | |
| similarity_top_k: int = 5 | |
| similarity_precompute: bool = True | |
| # Output | |
| output_subdir: str = "adjudication" | |
| class AdjudicationItem: | |
| """Represents an item eligible for adjudication with all annotator data.""" | |
| instance_id: str | |
| annotations: Dict[str, Dict[str, Any]] # user_id -> {schema: {label: value}} | |
| span_annotations: Dict[str, List[Dict]] # user_id -> [span_dict, ...] | |
| behavioral_data: Dict[str, Dict] # user_id -> {total_time_ms, ...} | |
| agreement_scores: Dict[str, float] # schema_name -> agreement score | |
| overall_agreement: float | |
| num_annotators: int | |
| status: str = "pending" # pending, in_progress, completed, skipped | |
| assigned_adjudicator: Optional[str] = None | |
| mace_predictions: Dict[str, Any] = field(default_factory=dict) # schema -> predicted label | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Serialize to dictionary for JSON output.""" | |
| result = { | |
| "instance_id": self.instance_id, | |
| "annotations": self.annotations, | |
| "span_annotations": self.span_annotations, | |
| "behavioral_data": self.behavioral_data, | |
| "agreement_scores": self.agreement_scores, | |
| "overall_agreement": self.overall_agreement, | |
| "num_annotators": self.num_annotators, | |
| "status": self.status, | |
| "assigned_adjudicator": self.assigned_adjudicator, | |
| } | |
| if self.mace_predictions: | |
| result["mace_predictions"] = self.mace_predictions | |
| return result | |
| class AdjudicationDecision: | |
| """Represents an adjudicator's final decision on an item.""" | |
| instance_id: str | |
| adjudicator_id: str | |
| timestamp: str # ISO format string | |
| label_decisions: Dict[str, Any] # schema -> value | |
| span_decisions: List[Dict] # list of span dicts | |
| source: Dict[str, str] # schema -> "annotator_X" | "adjudicator" | "merged" | |
| confidence: str # "high", "medium", "low" | |
| notes: str | |
| error_taxonomy: List[str] | |
| guideline_update_flag: bool = False | |
| guideline_update_notes: str = "" | |
| time_spent_ms: int = 0 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Serialize to dictionary for JSON output.""" | |
| return { | |
| "instance_id": self.instance_id, | |
| "adjudicator_id": self.adjudicator_id, | |
| "timestamp": self.timestamp, | |
| "label_decisions": self.label_decisions, | |
| "span_decisions": self.span_decisions, | |
| "source": self.source, | |
| "confidence": self.confidence, | |
| "notes": self.notes, | |
| "error_taxonomy": self.error_taxonomy, | |
| "guideline_update_flag": self.guideline_update_flag, | |
| "guideline_update_notes": self.guideline_update_notes, | |
| "time_spent_ms": self.time_spent_ms, | |
| } | |
| def from_dict(cls, d: Dict[str, Any]) -> "AdjudicationDecision": | |
| """Deserialize from dictionary.""" | |
| return cls( | |
| instance_id=d["instance_id"], | |
| adjudicator_id=d["adjudicator_id"], | |
| timestamp=d["timestamp"], | |
| label_decisions=d.get("label_decisions", {}), | |
| span_decisions=d.get("span_decisions", []), | |
| source=d.get("source", {}), | |
| confidence=d.get("confidence", "medium"), | |
| notes=d.get("notes", ""), | |
| error_taxonomy=d.get("error_taxonomy", []), | |
| guideline_update_flag=d.get("guideline_update_flag", False), | |
| guideline_update_notes=d.get("guideline_update_notes", ""), | |
| time_spent_ms=d.get("time_spent_ms", 0), | |
| ) | |
| class AdjudicationManager: | |
| """ | |
| Manages the adjudication workflow including queue building, agreement | |
| computation, decision storage, and final dataset generation. | |
| Follows the singleton pattern used by QualityControlManager. | |
| """ | |
| def __init__(self, config: Dict[str, Any]): | |
| """ | |
| Initialize the adjudication manager. | |
| Args: | |
| config: The full application configuration dictionary | |
| """ | |
| self.config = config | |
| self.logger = logging.getLogger(__name__) | |
| self._lock = threading.RLock() | |
| # Parse configuration | |
| self.adj_config = self._parse_config(config) | |
| # Queue and decisions | |
| self.queue: Dict[str, AdjudicationItem] = {} # instance_id -> AdjudicationItem | |
| self.decisions: Dict[str, AdjudicationDecision] = {} # instance_id -> decision | |
| self._queue_built = False | |
| # Load any previously saved decisions | |
| self._load_decisions() | |
| # Initialize similarity engine (Phase 3) | |
| self.similarity_engine = None | |
| if self.adj_config.similarity_enabled: | |
| from potato.similarity import init_similarity_engine | |
| self.similarity_engine = init_similarity_engine(config, self.adj_config) | |
| if (self.similarity_engine and self.similarity_engine.enabled | |
| and self.adj_config.similarity_precompute): | |
| self._precompute_similarities() | |
| self.logger.info( | |
| f"AdjudicationManager initialized: enabled={self.adj_config.enabled}, " | |
| f"adjudicators={self.adj_config.adjudicator_users}" | |
| ) | |
| def _parse_config(self, config: Dict[str, Any]) -> AdjudicationConfig: | |
| """Parse adjudication configuration from the main config.""" | |
| adj = AdjudicationConfig() | |
| adj_config = config.get("adjudication", {}) | |
| if not adj_config or not adj_config.get("enabled", False): | |
| return adj | |
| adj.enabled = True | |
| adj.adjudicator_users = adj_config.get("adjudicator_users", []) | |
| adj.min_annotations = adj_config.get("min_annotations", 2) | |
| adj.require_fully_annotated = adj_config.get("require_fully_annotated", False) | |
| adj.agreement_threshold = adj_config.get("agreement_threshold", 0.75) | |
| adj.show_all_items = adj_config.get("show_all_items", False) | |
| adj.show_annotator_names = adj_config.get("show_annotator_names", True) | |
| adj.show_timing_data = adj_config.get("show_timing_data", True) | |
| adj.show_agreement_scores = adj_config.get("show_agreement_scores", True) | |
| adj.fast_decision_warning_ms = adj_config.get("fast_decision_warning_ms", 2000) | |
| adj.require_confidence = adj_config.get("require_confidence", True) | |
| adj.require_notes_on_override = adj_config.get("require_notes_on_override", False) | |
| if "error_taxonomy" in adj_config: | |
| adj.error_taxonomy = adj_config["error_taxonomy"] | |
| # Similarity settings | |
| sim_config = adj_config.get("similarity", {}) | |
| if sim_config.get("enabled", False): | |
| adj.similarity_enabled = True | |
| adj.similarity_model = sim_config.get("model", "all-MiniLM-L6-v2") | |
| adj.similarity_top_k = sim_config.get("top_k", 5) | |
| adj.similarity_precompute = sim_config.get("precompute_on_start", True) | |
| adj.output_subdir = adj_config.get("output_subdir", "adjudication") | |
| return adj | |
| def is_adjudicator(self, username: str) -> bool: | |
| """Check if a user is an authorized adjudicator.""" | |
| if not self.adj_config.enabled: | |
| return False | |
| return username in self.adj_config.adjudicator_users | |
| def build_queue(self) -> List[AdjudicationItem]: | |
| """ | |
| Scan all user annotations and build the adjudication queue. | |
| Items become eligible when they have enough annotations and | |
| agreement is below the threshold. | |
| Returns: | |
| List of AdjudicationItem objects | |
| """ | |
| from potato.user_state_management import get_user_state_manager | |
| from potato.item_state_management import get_item_state_manager | |
| with self._lock: | |
| usm = get_user_state_manager() | |
| ism = get_item_state_manager() | |
| # Get all annotation schemes from config | |
| annotation_schemes = self.config.get("annotation_schemes", []) | |
| scheme_names = [s.get("name", "") for s in annotation_schemes] | |
| # Iterate over all items | |
| for instance_id, item in ism.instance_id_to_instance.items(): | |
| instance_id_str = str(instance_id) | |
| # Skip if already decided | |
| if instance_id_str in self.decisions: | |
| if instance_id_str not in self.queue: | |
| continue | |
| # Mark as completed if decision exists | |
| self.queue[instance_id_str].status = "completed" | |
| continue | |
| # Get all annotators for this item | |
| annotators = ism.instance_annotators.get(instance_id, set()) | |
| # Filter out adjudicators from annotator list | |
| annotators = { | |
| u for u in annotators | |
| if u not in self.adj_config.adjudicator_users | |
| } | |
| if len(annotators) < self.adj_config.min_annotations: | |
| continue | |
| # Check if we require fully annotated items | |
| if self.adj_config.require_fully_annotated: | |
| max_per_item = ism.max_annotations_per_item | |
| if max_per_item > 0 and len(annotators) < max_per_item: | |
| continue | |
| # Collect annotations from all annotators | |
| item_annotations = {} | |
| item_spans = {} | |
| item_behavioral = {} | |
| for user_id in annotators: | |
| user_state = usm.get_user_state(user_id) | |
| if not user_state: | |
| continue | |
| # Get label annotations | |
| label_annots = user_state.instance_id_to_label_to_value.get( | |
| instance_id_str, {} | |
| ) | |
| if label_annots: | |
| item_annotations[user_id] = self._serialize_labels(label_annots) | |
| # Get span annotations | |
| span_annots = user_state.instance_id_to_span_to_value.get( | |
| instance_id_str, {} | |
| ) | |
| if span_annots: | |
| item_spans[user_id] = self._serialize_spans(span_annots) | |
| # Get behavioral data | |
| bd = user_state.instance_id_to_behavioral_data.get( | |
| instance_id_str, {} | |
| ) | |
| if bd: | |
| item_behavioral[user_id] = self._serialize_behavioral(bd) | |
| if not item_annotations and not item_spans: | |
| continue | |
| # Compute agreement scores | |
| agreement_scores = self._compute_agreement( | |
| item_annotations, scheme_names | |
| ) | |
| overall = self._compute_overall_agreement(agreement_scores) | |
| # Filter by agreement threshold | |
| if not self.adj_config.show_all_items: | |
| if overall >= self.adj_config.agreement_threshold: | |
| continue | |
| # Preserve existing status if already in queue | |
| existing = self.queue.get(instance_id_str) | |
| status = existing.status if existing else "pending" | |
| assigned = existing.assigned_adjudicator if existing else None | |
| # Enrich with MACE predictions if available | |
| mace_preds = {} | |
| try: | |
| from potato.mace_manager import get_mace_manager | |
| mace_mgr = get_mace_manager() | |
| if mace_mgr and mace_mgr.results: | |
| for sname in scheme_names: | |
| pred = mace_mgr.get_prediction(instance_id_str, sname) | |
| if pred is not None: | |
| mace_preds[sname] = pred | |
| except Exception: | |
| pass # MACE is optional | |
| self.queue[instance_id_str] = AdjudicationItem( | |
| instance_id=instance_id_str, | |
| annotations=item_annotations, | |
| span_annotations=item_spans, | |
| behavioral_data=item_behavioral, | |
| agreement_scores=agreement_scores, | |
| overall_agreement=overall, | |
| num_annotators=len(annotators), | |
| status=status, | |
| assigned_adjudicator=assigned, | |
| mace_predictions=mace_preds, | |
| ) | |
| self._queue_built = True | |
| return list(self.queue.values()) | |
| def try_enqueue_item(self, instance_id: str) -> bool: | |
| """ | |
| Evaluate a single item and, if it qualifies, add it to the queue. | |
| Called when an overlap-sample item saturates so that low-agreement | |
| items show up in the adjudication queue without needing a full | |
| ``build_queue()`` rescan. Returns True if the item ended up in the | |
| queue, False otherwise. | |
| """ | |
| if not self.adj_config.enabled: | |
| return False | |
| from potato.user_state_management import get_user_state_manager | |
| from potato.item_state_management import get_item_state_manager | |
| usm = get_user_state_manager() | |
| ism = get_item_state_manager() | |
| if usm is None or ism is None: | |
| return False | |
| with self._lock: | |
| instance_id_str = str(instance_id) | |
| if instance_id_str in self.decisions: | |
| return False | |
| item = ism.instance_id_to_instance.get(instance_id) | |
| if item is None: | |
| return False | |
| annotators = { | |
| u for u in ism.instance_annotators.get(instance_id, set()) | |
| if u not in self.adj_config.adjudicator_users | |
| } | |
| if len(annotators) < self.adj_config.min_annotations: | |
| return False | |
| scheme_names = [s.get("name", "") for s in self.config.get("annotation_schemes", [])] | |
| item_annotations: Dict[str, Any] = {} | |
| item_spans: Dict[str, Any] = {} | |
| item_behavioral: Dict[str, Any] = {} | |
| for user_id in annotators: | |
| ustate = usm.get_user_state(user_id) | |
| if not ustate: | |
| continue | |
| la = ustate.instance_id_to_label_to_value.get(instance_id_str, {}) | |
| if la: | |
| item_annotations[user_id] = self._serialize_labels(la) | |
| sa = ustate.instance_id_to_span_to_value.get(instance_id_str, {}) | |
| if sa: | |
| item_spans[user_id] = self._serialize_spans(sa) | |
| bd = ustate.instance_id_to_behavioral_data.get(instance_id_str, {}) | |
| if bd: | |
| item_behavioral[user_id] = self._serialize_behavioral(bd) | |
| if not item_annotations and not item_spans: | |
| return False | |
| agreement_scores = self._compute_agreement(item_annotations, scheme_names) | |
| overall = self._compute_overall_agreement(agreement_scores) | |
| if not self.adj_config.show_all_items: | |
| if overall >= self.adj_config.agreement_threshold: | |
| return False | |
| existing = self.queue.get(instance_id_str) | |
| self.queue[instance_id_str] = AdjudicationItem( | |
| instance_id=instance_id_str, | |
| annotations=item_annotations, | |
| span_annotations=item_spans, | |
| behavioral_data=item_behavioral, | |
| agreement_scores=agreement_scores, | |
| overall_agreement=overall, | |
| num_annotators=len(annotators), | |
| status=existing.status if existing else "pending", | |
| assigned_adjudicator=existing.assigned_adjudicator if existing else None, | |
| ) | |
| self.logger.info( | |
| "Auto-routed item %s into adjudication queue (overall agreement=%.3f, " | |
| "threshold=%.3f, annotators=%d)", | |
| instance_id_str, overall, self.adj_config.agreement_threshold, len(annotators), | |
| ) | |
| return True | |
| def _serialize_labels(self, label_data: Dict) -> Dict[str, Any]: | |
| """Convert label annotation data to serializable dict.""" | |
| result = {} | |
| for key, value in label_data.items(): | |
| # Key might be a Label object or a string | |
| if hasattr(key, 'get_schema'): | |
| schema = key.get_schema() | |
| name = key.get_name() | |
| if schema not in result: | |
| result[schema] = {} | |
| result[schema][name] = value | |
| elif isinstance(key, str): | |
| result[key] = value | |
| else: | |
| result[str(key)] = value | |
| return result | |
| def _serialize_spans(self, span_data: Dict) -> List[Dict]: | |
| """Convert span annotation data to serializable list.""" | |
| spans = [] | |
| for key, value in span_data.items(): | |
| if hasattr(key, 'get_schema'): | |
| spans.append({ | |
| "schema": key.get_schema(), | |
| "name": key.get_name(), | |
| "title": key.get_title() if hasattr(key, 'get_title') else "", | |
| "start": key.get_start(), | |
| "end": key.get_end(), | |
| "id": key.get_id(), | |
| "target_field": key.get_target_field() if hasattr(key, 'get_target_field') else None, | |
| }) | |
| elif isinstance(value, dict): | |
| spans.append(value) | |
| return spans | |
| def _serialize_behavioral(self, bd) -> Dict: | |
| """Convert behavioral data to serializable dict.""" | |
| if hasattr(bd, 'to_dict'): | |
| return bd.to_dict() | |
| elif isinstance(bd, dict): | |
| return bd | |
| return {} | |
| def _compute_agreement( | |
| self, item_annotations: Dict[str, Dict], scheme_names: List[str] | |
| ) -> Dict[str, float]: | |
| """ | |
| Compute per-schema agreement for an item. | |
| Uses simple percentage agreement (proportion of annotators who chose | |
| the most common label). For more sophisticated metrics, simpledorff | |
| can be used but requires multiple items. | |
| Returns: | |
| Dict mapping schema_name to agreement score (0.0 - 1.0) | |
| """ | |
| agreement_scores = {} | |
| for schema in scheme_names: | |
| values = [] | |
| for user_id, user_annots in item_annotations.items(): | |
| if schema in user_annots: | |
| val = user_annots[schema] | |
| # Normalize to comparable form | |
| if isinstance(val, dict): | |
| # Radio stores {label: label} (value is the label string) | |
| # and multiselect stores {label: value/true}. A label is | |
| # "selected" when its value is present/truthy. The old | |
| # filter (v is True / == "true" / == 1) dropped radio's | |
| # string value, collapsing every annotator to an empty | |
| # frozenset -> a spurious 1.0 agreement even on total | |
| # disagreement. | |
| falsey = (False, None, "", "false", "False", 0, "0") | |
| selected = frozenset( | |
| k for k, v in val.items() if v not in falsey | |
| ) | |
| values.append(selected) | |
| else: | |
| values.append(val) | |
| if len(values) < 2: | |
| continue | |
| # Compute pairwise agreement (percentage) | |
| agree_count = 0 | |
| total_pairs = 0 | |
| for i in range(len(values)): | |
| for j in range(i + 1, len(values)): | |
| total_pairs += 1 | |
| if values[i] == values[j]: | |
| agree_count += 1 | |
| agreement_scores[schema] = ( | |
| agree_count / total_pairs if total_pairs > 0 else 1.0 | |
| ) | |
| return agreement_scores | |
| def _compute_overall_agreement(self, agreement_scores: Dict[str, float]) -> float: | |
| """Compute overall agreement as the mean of per-schema scores.""" | |
| if not agreement_scores: | |
| return 1.0 | |
| return sum(agreement_scores.values()) / len(agreement_scores) | |
| def get_queue( | |
| self, | |
| adjudicator_id: Optional[str] = None, | |
| filter_status: Optional[str] = None, | |
| ) -> List[AdjudicationItem]: | |
| """ | |
| Get the adjudication queue, optionally filtered by status. | |
| Args: | |
| adjudicator_id: Optional adjudicator to filter by assignment | |
| filter_status: Optional status filter ("pending", "completed", etc.) | |
| Returns: | |
| List of AdjudicationItem objects | |
| """ | |
| with self._lock: | |
| if not self._queue_built: | |
| self.build_queue() | |
| items = list(self.queue.values()) | |
| if filter_status: | |
| items = [i for i in items if i.status == filter_status] | |
| # Sort: pending first, then by agreement (lowest first) | |
| items.sort(key=lambda x: ( | |
| 0 if x.status == "pending" else 1 if x.status == "in_progress" else 2, | |
| x.overall_agreement, | |
| )) | |
| return items | |
| def get_item(self, instance_id: str) -> Optional[AdjudicationItem]: | |
| """ | |
| Get full item data for adjudication. | |
| Args: | |
| instance_id: The instance ID to retrieve | |
| Returns: | |
| AdjudicationItem or None if not in queue | |
| """ | |
| with self._lock: | |
| if not self._queue_built: | |
| self.build_queue() | |
| return self.queue.get(str(instance_id)) | |
| def get_item_text(self, instance_id: str) -> str: | |
| """Get the text content for an item.""" | |
| from potato.item_state_management import get_item_state_manager | |
| ism = get_item_state_manager() | |
| item = ism.instance_id_to_instance.get(instance_id) | |
| if item: | |
| # Use text_key from config if available | |
| text_key = self.config.get("item_properties", {}).get("text_key", "text") | |
| data = item.get_data() | |
| if isinstance(data, dict) and text_key in data: | |
| return data[text_key] | |
| return item.get_text() | |
| return "" | |
| def get_item_data(self, instance_id: str) -> Dict[str, Any]: | |
| """Get the full raw data for an item.""" | |
| from potato.item_state_management import get_item_state_manager | |
| ism = get_item_state_manager() | |
| item = ism.instance_id_to_instance.get(instance_id) | |
| if item: | |
| data = item.get_data() | |
| if isinstance(data, dict): | |
| return data | |
| return {"text": str(data)} | |
| return {} | |
| def get_next_item(self, adjudicator_id: str) -> Optional[AdjudicationItem]: | |
| """Get the next pending item for an adjudicator.""" | |
| items = self.get_queue(filter_status="pending") | |
| if items: | |
| return items[0] | |
| return None | |
| def skip_item(self, instance_id: str, adjudicator_id: str) -> bool: | |
| """Mark an item as skipped.""" | |
| with self._lock: | |
| item = self.queue.get(str(instance_id)) | |
| if item: | |
| item.status = "skipped" | |
| return True | |
| return False | |
| def submit_decision(self, decision: AdjudicationDecision) -> bool: | |
| """ | |
| Submit an adjudication decision. | |
| Args: | |
| decision: The AdjudicationDecision to save | |
| Returns: | |
| True if successful | |
| """ | |
| with self._lock: | |
| instance_id = str(decision.instance_id) | |
| self.decisions[instance_id] = decision | |
| # Update queue status | |
| if instance_id in self.queue: | |
| self.queue[instance_id].status = "completed" | |
| self.queue[instance_id].assigned_adjudicator = decision.adjudicator_id | |
| # Persist to disk | |
| self._save_decisions() | |
| self.logger.info( | |
| f"Adjudication decision saved for {instance_id} " | |
| f"by {decision.adjudicator_id}" | |
| ) | |
| return True | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get adjudication progress statistics.""" | |
| with self._lock: | |
| if not self._queue_built: | |
| self.build_queue() | |
| total = len(self.queue) | |
| completed = sum( | |
| 1 for i in self.queue.values() if i.status == "completed" | |
| ) | |
| pending = sum( | |
| 1 for i in self.queue.values() if i.status == "pending" | |
| ) | |
| skipped = sum( | |
| 1 for i in self.queue.values() if i.status == "skipped" | |
| ) | |
| in_progress = sum( | |
| 1 for i in self.queue.values() if i.status == "in_progress" | |
| ) | |
| avg_agreement = 0.0 | |
| if self.queue: | |
| avg_agreement = sum( | |
| i.overall_agreement for i in self.queue.values() | |
| ) / len(self.queue) | |
| # Per-adjudicator stats | |
| adjudicator_stats = defaultdict(lambda: {"completed": 0, "total_time_ms": 0}) | |
| for decision in self.decisions.values(): | |
| adj_id = decision.adjudicator_id | |
| adjudicator_stats[adj_id]["completed"] += 1 | |
| adjudicator_stats[adj_id]["total_time_ms"] += decision.time_spent_ms | |
| return { | |
| "total": total, | |
| "completed": completed, | |
| "pending": pending, | |
| "skipped": skipped, | |
| "in_progress": in_progress, | |
| "completion_rate": completed / total if total > 0 else 0.0, | |
| "avg_agreement": avg_agreement, | |
| "adjudicator_stats": dict(adjudicator_stats), | |
| } | |
| def get_decision(self, instance_id: str) -> Optional[AdjudicationDecision]: | |
| """Get the decision for an item, if one exists.""" | |
| return self.decisions.get(str(instance_id)) | |
| # ------------------------------------------------------------------ | |
| # Phase 3: Similarity integration | |
| # ------------------------------------------------------------------ | |
| def _precompute_similarities(self) -> None: | |
| """Precompute embeddings for all items in the item state manager.""" | |
| if not self.similarity_engine or not self.similarity_engine.enabled: | |
| return | |
| from potato.item_state_management import get_item_state_manager | |
| try: | |
| ism = get_item_state_manager() | |
| item_texts = {} | |
| for instance_id, item in ism.instance_id_to_instance.items(): | |
| text = self.get_item_text(str(instance_id)) | |
| if text: | |
| item_texts[str(instance_id)] = text | |
| if item_texts: | |
| count = self.similarity_engine.precompute_embeddings(item_texts) | |
| self.logger.info(f"Precomputed {count} similarity embeddings") | |
| except Exception as e: | |
| self.logger.error(f"Error precomputing similarities: {e}") | |
| def get_similar_items( | |
| self, instance_id: str, include_metadata: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get similar items for a given instance, enriched with queue metadata. | |
| Args: | |
| instance_id: The reference instance ID | |
| include_metadata: Whether to include decision/consensus data | |
| Returns: | |
| List of dicts with instance_id, similarity, and optional metadata | |
| """ | |
| if not self.similarity_engine or not self.similarity_engine.enabled: | |
| return [] | |
| similar = self.similarity_engine.find_similar(instance_id) | |
| results = [] | |
| for other_id, score in similar: | |
| entry = { | |
| "instance_id": other_id, | |
| "similarity": round(score, 4), | |
| "text_preview": self.similarity_engine.text_cache.get( | |
| other_id, "" | |
| ), | |
| } | |
| if include_metadata: | |
| queue_item = self.queue.get(other_id) | |
| decision = self.decisions.get(other_id) | |
| entry["in_queue"] = queue_item is not None | |
| entry["status"] = queue_item.status if queue_item else None | |
| entry["overall_agreement"] = ( | |
| queue_item.overall_agreement if queue_item else None | |
| ) | |
| if decision: | |
| entry["decision"] = "completed" | |
| entry["consensus_label"] = None | |
| else: | |
| entry["decision"] = None | |
| if queue_item: | |
| entry["consensus_label"] = self._get_consensus_label( | |
| queue_item | |
| ) | |
| else: | |
| entry["consensus_label"] = None | |
| results.append(entry) | |
| return results | |
| def _get_consensus_label(self, item: AdjudicationItem) -> Optional[str]: | |
| """ | |
| Get the majority/consensus label for an item across the first schema. | |
| Args: | |
| item: The AdjudicationItem | |
| Returns: | |
| The most common label value as a string, or None | |
| """ | |
| if not item.annotations: | |
| return None | |
| # Use the first schema that has values | |
| for user_annots in item.annotations.values(): | |
| for schema_name in user_annots: | |
| # Collect all values for this schema | |
| values = [] | |
| for ua in item.annotations.values(): | |
| val = ua.get(schema_name) | |
| if val is not None: | |
| if isinstance(val, dict): | |
| # Multiselect: use frozenset representation | |
| selected = sorted( | |
| k for k, v in val.items() | |
| if v is True or v == "true" or v == 1 | |
| ) | |
| values.append(", ".join(selected) if selected else str(val)) | |
| else: | |
| values.append(str(val)) | |
| if values: | |
| counter = Counter(values) | |
| return counter.most_common(1)[0][0] | |
| return None | |
| # ------------------------------------------------------------------ | |
| # Phase 3: Behavioral signal analysis | |
| # ------------------------------------------------------------------ | |
| def get_annotator_signals( | |
| self, user_id: str, instance_id: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compute per-annotator quality signals for a specific item. | |
| Returns: | |
| Dict with user_id, instance_id, flags list, and metrics dict | |
| """ | |
| flags = [] | |
| metrics = {} | |
| instance_id = str(instance_id) | |
| item = self.queue.get(instance_id) | |
| if not item: | |
| return {"user_id": user_id, "instance_id": instance_id, | |
| "flags": [], "metrics": {}} | |
| # Get behavioral data for this user on this item | |
| bd = item.behavioral_data.get(user_id, {}) | |
| if hasattr(bd, 'to_dict'): | |
| bd = bd.to_dict() | |
| total_time = bd.get("total_time_ms", 0) | |
| metrics["total_time_ms"] = total_time | |
| # 1. Speed z-score vs user's typical time | |
| user_times = self._get_user_times(user_id) | |
| if len(user_times) >= 3 and total_time > 0: | |
| mean_time = sum(user_times) / len(user_times) | |
| std_time = math.sqrt( | |
| sum((t - mean_time) ** 2 for t in user_times) / len(user_times) | |
| ) | |
| if std_time > 0: | |
| z_score = (total_time - mean_time) / std_time | |
| metrics["speed_z_score"] = round(z_score, 2) | |
| if z_score < -2.0: | |
| flags.append({ | |
| "type": "unusually_fast", | |
| "severity": "high", | |
| "message": f"Annotation time ({total_time}ms) is {abs(z_score):.1f} std devs below average" | |
| }) | |
| # 2. Fast decision warning | |
| fast_threshold = self.adj_config.fast_decision_warning_ms | |
| if fast_threshold > 0 and 0 < total_time < fast_threshold: | |
| flags.append({ | |
| "type": "fast_decision", | |
| "severity": "medium", | |
| "message": f"Decision made in {total_time}ms (below {fast_threshold}ms threshold)" | |
| }) | |
| # 3. Annotation change count | |
| raw_changes = bd.get("annotation_changes", []) | |
| change_count = len(raw_changes) if isinstance(raw_changes, list) else int(raw_changes or 0) | |
| metrics["annotation_changes"] = change_count | |
| if change_count > 5: | |
| flags.append({ | |
| "type": "excessive_changes", | |
| "severity": "medium", | |
| "message": f"Made {change_count} annotation changes on this item" | |
| }) | |
| # 4. Historical agreement rate with consensus | |
| agreement_rate = self._compute_user_agreement_rate(user_id) | |
| if agreement_rate is not None: | |
| metrics["agreement_rate"] = round(agreement_rate, 3) | |
| if agreement_rate < 0.4: | |
| flags.append({ | |
| "type": "low_agreement", | |
| "severity": "high", | |
| "message": f"Agreement rate with consensus: {agreement_rate:.0%}" | |
| }) | |
| # 5. Similar item consistency | |
| if self.similarity_engine and self.similarity_engine.enabled: | |
| inconsistencies = self._check_similar_item_consistency( | |
| user_id, instance_id | |
| ) | |
| metrics["similar_item_inconsistencies"] = inconsistencies | |
| if inconsistencies > 0: | |
| flags.append({ | |
| "type": "similar_item_inconsistency", | |
| "severity": "medium", | |
| "message": f"Different label on {inconsistencies} similar item(s)" | |
| }) | |
| return { | |
| "user_id": user_id, | |
| "instance_id": instance_id, | |
| "flags": flags, | |
| "metrics": metrics, | |
| } | |
| def _get_user_times(self, user_id: str) -> List[float]: | |
| """Collect all annotation times for a user across queue items.""" | |
| times = [] | |
| for item in self.queue.values(): | |
| bd = item.behavioral_data.get(user_id, {}) | |
| if hasattr(bd, 'to_dict'): | |
| bd = bd.to_dict() | |
| t = bd.get("total_time_ms", 0) | |
| if t > 0: | |
| times.append(t) | |
| return times | |
| def _compute_user_agreement_rate(self, user_id: str) -> Optional[float]: | |
| """ | |
| Compute how often a user agrees with the consensus across all items. | |
| Returns: | |
| Float 0-1 or None if insufficient data (needs >= 3 items) | |
| """ | |
| agree_count = 0 | |
| total_count = 0 | |
| for item in self.queue.values(): | |
| if user_id not in item.annotations: | |
| continue | |
| consensus = self._get_consensus_label(item) | |
| if consensus is None: | |
| continue | |
| user_annots = item.annotations[user_id] | |
| # Check the first schema | |
| for schema_name, val in user_annots.items(): | |
| if isinstance(val, dict): | |
| selected = sorted( | |
| k for k, v in val.items() | |
| if v is True or v == "true" or v == 1 | |
| ) | |
| user_label = ", ".join(selected) if selected else str(val) | |
| else: | |
| user_label = str(val) | |
| if user_label == consensus: | |
| agree_count += 1 | |
| total_count += 1 | |
| break # Only check first schema | |
| if total_count < 3: | |
| return None | |
| return agree_count / total_count | |
| def _check_similar_item_consistency( | |
| self, user_id: str, instance_id: str | |
| ) -> int: | |
| """ | |
| Check if user's label on similar items (>0.8 similarity) is consistent. | |
| Returns: | |
| Count of similar items where user's label differs | |
| """ | |
| if not self.similarity_engine: | |
| return 0 | |
| similar = self.similarity_engine.find_similar(instance_id) | |
| if not similar: | |
| return 0 | |
| # Get user's label on the current item | |
| item = self.queue.get(instance_id) | |
| if not item or user_id not in item.annotations: | |
| return 0 | |
| user_annots = item.annotations[user_id] | |
| current_label = None | |
| current_schema = None | |
| for schema_name, val in user_annots.items(): | |
| current_schema = schema_name | |
| if isinstance(val, dict): | |
| selected = sorted( | |
| k for k, v in val.items() | |
| if v is True or v == "true" or v == 1 | |
| ) | |
| current_label = ", ".join(selected) if selected else str(val) | |
| else: | |
| current_label = str(val) | |
| break | |
| if current_label is None: | |
| return 0 | |
| inconsistencies = 0 | |
| for other_id, score in similar: | |
| if score < 0.8: | |
| break # Results are sorted by score desc | |
| other_item = self.queue.get(other_id) | |
| if not other_item or user_id not in other_item.annotations: | |
| continue | |
| other_annots = other_item.annotations[user_id] | |
| other_val = other_annots.get(current_schema) | |
| if other_val is None: | |
| continue | |
| if isinstance(other_val, dict): | |
| selected = sorted( | |
| k for k, v in other_val.items() | |
| if v is True or v == "true" or v == 1 | |
| ) | |
| other_label = ", ".join(selected) if selected else str(other_val) | |
| else: | |
| other_label = str(other_val) | |
| if other_label != current_label: | |
| inconsistencies += 1 | |
| return inconsistencies | |
| def _get_output_dir(self) -> str: | |
| """Get the adjudication output directory.""" | |
| output_dir = self.config.get("output_annotation_dir", "annotation_output") | |
| adj_dir = os.path.join(output_dir, self.adj_config.output_subdir) | |
| os.makedirs(adj_dir, exist_ok=True) | |
| return adj_dir | |
| def _save_decisions(self) -> None: | |
| """Persist all decisions to disk.""" | |
| try: | |
| adj_dir = self._get_output_dir() | |
| decisions_file = os.path.join(adj_dir, "decisions.json") | |
| data = { | |
| "decisions": [d.to_dict() for d in self.decisions.values()], | |
| "last_updated": datetime.now().isoformat(), | |
| } | |
| with open(decisions_file, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2) | |
| except Exception as e: | |
| self.logger.error(f"Failed to save adjudication decisions: {e}") | |
| def _load_decisions(self) -> None: | |
| """Load previously saved decisions from disk.""" | |
| try: | |
| output_dir = self.config.get("output_annotation_dir", "annotation_output") | |
| adj_dir = os.path.join(output_dir, self.adj_config.output_subdir) | |
| decisions_file = os.path.join(adj_dir, "decisions.json") | |
| if not os.path.exists(decisions_file): | |
| return | |
| with open(decisions_file, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| for d in data.get("decisions", []): | |
| decision = AdjudicationDecision.from_dict(d) | |
| self.decisions[decision.instance_id] = decision | |
| self.logger.info( | |
| f"Loaded {len(self.decisions)} previous adjudication decisions" | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to load adjudication decisions: {e}") | |
| def generate_final_dataset(self) -> List[Dict[str, Any]]: | |
| """ | |
| Generate the final dataset by merging unanimous agreements | |
| and adjudication decisions. | |
| Returns: | |
| List of item dicts with final labels and provenance | |
| """ | |
| from potato.user_state_management import get_user_state_manager | |
| from potato.item_state_management import get_item_state_manager | |
| usm = get_user_state_manager() | |
| ism = get_item_state_manager() | |
| annotation_schemes = self.config.get("annotation_schemes", []) | |
| scheme_names = [s.get("name", "") for s in annotation_schemes] | |
| results = [] | |
| for instance_id, item in ism.instance_id_to_instance.items(): | |
| instance_id_str = str(instance_id) | |
| result = { | |
| "instance_id": instance_id_str, | |
| "item_data": item.get_data() if hasattr(item, 'get_data') else {}, | |
| } | |
| # Check if we have an adjudication decision | |
| decision = self.decisions.get(instance_id_str) | |
| if decision: | |
| result["labels"] = decision.label_decisions | |
| result["spans"] = decision.span_decisions | |
| result["source"] = "adjudicated" | |
| result["adjudicator"] = decision.adjudicator_id | |
| result["confidence"] = decision.confidence | |
| result["provenance"] = decision.source | |
| results.append(result) | |
| continue | |
| # Check for unanimous agreement | |
| annotators = ism.instance_annotators.get(instance_id, set()) | |
| annotators = { | |
| u for u in annotators | |
| if u not in self.adj_config.adjudicator_users | |
| } | |
| if len(annotators) < 2: | |
| continue | |
| # Collect annotations | |
| annotations = {} | |
| for user_id in annotators: | |
| user_state = usm.get_user_state(user_id) | |
| if not user_state: | |
| continue | |
| labels = user_state.instance_id_to_label_to_value.get( | |
| instance_id_str, {} | |
| ) | |
| if labels: | |
| annotations[user_id] = self._serialize_labels(labels) | |
| if not annotations: | |
| continue | |
| # Check for unanimity per schema | |
| unanimous_labels = {} | |
| is_unanimous = True | |
| for schema in scheme_names: | |
| values = [] | |
| for user_annots in annotations.values(): | |
| if schema in user_annots: | |
| values.append(json.dumps(user_annots[schema], sort_keys=True)) | |
| if len(values) < 2: | |
| continue | |
| if len(set(values)) == 1: | |
| unanimous_labels[schema] = json.loads(values[0]) | |
| else: | |
| is_unanimous = False | |
| if is_unanimous and unanimous_labels: | |
| result["labels"] = unanimous_labels | |
| result["source"] = "unanimous" | |
| result["num_annotators"] = len(annotators) | |
| results.append(result) | |
| else: | |
| result["labels"] = {} | |
| result["source"] = "unresolved" | |
| result["num_annotators"] = len(annotators) | |
| results.append(result) | |
| return results | |
| def init_adjudication_manager(config: Dict[str, Any]) -> Optional[AdjudicationManager]: | |
| """Initialize the singleton AdjudicationManager.""" | |
| global _ADJUDICATION_MANAGER | |
| with _ADJUDICATION_LOCK: | |
| if _ADJUDICATION_MANAGER is None: | |
| _ADJUDICATION_MANAGER = AdjudicationManager(config) | |
| return _ADJUDICATION_MANAGER | |
| def get_adjudication_manager() -> Optional[AdjudicationManager]: | |
| """Get the singleton AdjudicationManager instance.""" | |
| return _ADJUDICATION_MANAGER | |
| def clear_adjudication_manager(): | |
| """Clear the singleton (for testing).""" | |
| global _ADJUDICATION_MANAGER | |
| with _ADJUDICATION_LOCK: | |
| _ADJUDICATION_MANAGER = None | |