codebook / potato /adjudication.py
davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
46.8 kB
"""
Adjudication Module
This module provides a comprehensive adjudication system where designated users
review items with multiple annotations, resolve disagreements, and produce
gold-standard final decisions.
Adjudication is NOT a phase — it's a parallel workflow accessible via a dedicated
/adjudicate route, available to users with adjudicator privileges. This avoids
disrupting the existing phase progression system.
Key Components:
- AdjudicationConfig: Configuration dataclass for adjudication settings
- AdjudicationItem: Represents an item eligible for adjudication with all annotations
- AdjudicationDecision: Represents an adjudicator's final decision on an item
- AdjudicationManager: Singleton manager for the adjudication workflow
The workflow:
1. Annotators complete annotations via /annotate (existing workflow)
2. AdjudicationManager monitors annotation counts and agreement
3. Items are flagged when criteria are met (min annotations, low agreement)
4. Adjudicators review items via /adjudicate and submit decisions
5. Final dataset CLI merges unanimous + adjudicated decisions
"""
import json
import logging
import math
import os
import threading
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Any, Set
logger = logging.getLogger(__name__)
# Singleton instance
_ADJUDICATION_MANAGER = None
_ADJUDICATION_LOCK = threading.Lock()
@dataclass
class AdjudicationConfig:
"""Configuration for adjudication features."""
enabled: bool = False
adjudicator_users: List[str] = field(default_factory=list)
# Trigger criteria
min_annotations: int = 2
require_fully_annotated: bool = False
agreement_threshold: float = 0.75
show_all_items: bool = False
# Display options
show_annotator_names: bool = True
show_timing_data: bool = True
show_agreement_scores: bool = True
fast_decision_warning_ms: int = 2000
# Adjudicator metadata fields
require_confidence: bool = True
require_notes_on_override: bool = False
error_taxonomy: List[str] = field(default_factory=lambda: [
"ambiguous_text", "guideline_gap", "annotator_error",
"edge_case", "subjective_disagreement", "other"
])
# Similarity (Phase 3, optional)
similarity_enabled: bool = False
similarity_model: str = "all-MiniLM-L6-v2"
similarity_top_k: int = 5
similarity_precompute: bool = True
# Output
output_subdir: str = "adjudication"
@dataclass
class AdjudicationItem:
"""Represents an item eligible for adjudication with all annotator data."""
instance_id: str
annotations: Dict[str, Dict[str, Any]] # user_id -> {schema: {label: value}}
span_annotations: Dict[str, List[Dict]] # user_id -> [span_dict, ...]
behavioral_data: Dict[str, Dict] # user_id -> {total_time_ms, ...}
agreement_scores: Dict[str, float] # schema_name -> agreement score
overall_agreement: float
num_annotators: int
status: str = "pending" # pending, in_progress, completed, skipped
assigned_adjudicator: Optional[str] = None
mace_predictions: Dict[str, Any] = field(default_factory=dict) # schema -> predicted label
def to_dict(self) -> Dict[str, Any]:
"""Serialize to dictionary for JSON output."""
result = {
"instance_id": self.instance_id,
"annotations": self.annotations,
"span_annotations": self.span_annotations,
"behavioral_data": self.behavioral_data,
"agreement_scores": self.agreement_scores,
"overall_agreement": self.overall_agreement,
"num_annotators": self.num_annotators,
"status": self.status,
"assigned_adjudicator": self.assigned_adjudicator,
}
if self.mace_predictions:
result["mace_predictions"] = self.mace_predictions
return result
@dataclass
class AdjudicationDecision:
"""Represents an adjudicator's final decision on an item."""
instance_id: str
adjudicator_id: str
timestamp: str # ISO format string
label_decisions: Dict[str, Any] # schema -> value
span_decisions: List[Dict] # list of span dicts
source: Dict[str, str] # schema -> "annotator_X" | "adjudicator" | "merged"
confidence: str # "high", "medium", "low"
notes: str
error_taxonomy: List[str]
guideline_update_flag: bool = False
guideline_update_notes: str = ""
time_spent_ms: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Serialize to dictionary for JSON output."""
return {
"instance_id": self.instance_id,
"adjudicator_id": self.adjudicator_id,
"timestamp": self.timestamp,
"label_decisions": self.label_decisions,
"span_decisions": self.span_decisions,
"source": self.source,
"confidence": self.confidence,
"notes": self.notes,
"error_taxonomy": self.error_taxonomy,
"guideline_update_flag": self.guideline_update_flag,
"guideline_update_notes": self.guideline_update_notes,
"time_spent_ms": self.time_spent_ms,
}
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "AdjudicationDecision":
"""Deserialize from dictionary."""
return cls(
instance_id=d["instance_id"],
adjudicator_id=d["adjudicator_id"],
timestamp=d["timestamp"],
label_decisions=d.get("label_decisions", {}),
span_decisions=d.get("span_decisions", []),
source=d.get("source", {}),
confidence=d.get("confidence", "medium"),
notes=d.get("notes", ""),
error_taxonomy=d.get("error_taxonomy", []),
guideline_update_flag=d.get("guideline_update_flag", False),
guideline_update_notes=d.get("guideline_update_notes", ""),
time_spent_ms=d.get("time_spent_ms", 0),
)
class AdjudicationManager:
"""
Manages the adjudication workflow including queue building, agreement
computation, decision storage, and final dataset generation.
Follows the singleton pattern used by QualityControlManager.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the adjudication manager.
Args:
config: The full application configuration dictionary
"""
self.config = config
self.logger = logging.getLogger(__name__)
self._lock = threading.RLock()
# Parse configuration
self.adj_config = self._parse_config(config)
# Queue and decisions
self.queue: Dict[str, AdjudicationItem] = {} # instance_id -> AdjudicationItem
self.decisions: Dict[str, AdjudicationDecision] = {} # instance_id -> decision
self._queue_built = False
# Load any previously saved decisions
self._load_decisions()
# Initialize similarity engine (Phase 3)
self.similarity_engine = None
if self.adj_config.similarity_enabled:
from potato.similarity import init_similarity_engine
self.similarity_engine = init_similarity_engine(config, self.adj_config)
if (self.similarity_engine and self.similarity_engine.enabled
and self.adj_config.similarity_precompute):
self._precompute_similarities()
self.logger.info(
f"AdjudicationManager initialized: enabled={self.adj_config.enabled}, "
f"adjudicators={self.adj_config.adjudicator_users}"
)
def _parse_config(self, config: Dict[str, Any]) -> AdjudicationConfig:
"""Parse adjudication configuration from the main config."""
adj = AdjudicationConfig()
adj_config = config.get("adjudication", {})
if not adj_config or not adj_config.get("enabled", False):
return adj
adj.enabled = True
adj.adjudicator_users = adj_config.get("adjudicator_users", [])
adj.min_annotations = adj_config.get("min_annotations", 2)
adj.require_fully_annotated = adj_config.get("require_fully_annotated", False)
adj.agreement_threshold = adj_config.get("agreement_threshold", 0.75)
adj.show_all_items = adj_config.get("show_all_items", False)
adj.show_annotator_names = adj_config.get("show_annotator_names", True)
adj.show_timing_data = adj_config.get("show_timing_data", True)
adj.show_agreement_scores = adj_config.get("show_agreement_scores", True)
adj.fast_decision_warning_ms = adj_config.get("fast_decision_warning_ms", 2000)
adj.require_confidence = adj_config.get("require_confidence", True)
adj.require_notes_on_override = adj_config.get("require_notes_on_override", False)
if "error_taxonomy" in adj_config:
adj.error_taxonomy = adj_config["error_taxonomy"]
# Similarity settings
sim_config = adj_config.get("similarity", {})
if sim_config.get("enabled", False):
adj.similarity_enabled = True
adj.similarity_model = sim_config.get("model", "all-MiniLM-L6-v2")
adj.similarity_top_k = sim_config.get("top_k", 5)
adj.similarity_precompute = sim_config.get("precompute_on_start", True)
adj.output_subdir = adj_config.get("output_subdir", "adjudication")
return adj
def is_adjudicator(self, username: str) -> bool:
"""Check if a user is an authorized adjudicator."""
if not self.adj_config.enabled:
return False
return username in self.adj_config.adjudicator_users
def build_queue(self) -> List[AdjudicationItem]:
"""
Scan all user annotations and build the adjudication queue.
Items become eligible when they have enough annotations and
agreement is below the threshold.
Returns:
List of AdjudicationItem objects
"""
from potato.user_state_management import get_user_state_manager
from potato.item_state_management import get_item_state_manager
with self._lock:
usm = get_user_state_manager()
ism = get_item_state_manager()
# Get all annotation schemes from config
annotation_schemes = self.config.get("annotation_schemes", [])
scheme_names = [s.get("name", "") for s in annotation_schemes]
# Iterate over all items
for instance_id, item in ism.instance_id_to_instance.items():
instance_id_str = str(instance_id)
# Skip if already decided
if instance_id_str in self.decisions:
if instance_id_str not in self.queue:
continue
# Mark as completed if decision exists
self.queue[instance_id_str].status = "completed"
continue
# Get all annotators for this item
annotators = ism.instance_annotators.get(instance_id, set())
# Filter out adjudicators from annotator list
annotators = {
u for u in annotators
if u not in self.adj_config.adjudicator_users
}
if len(annotators) < self.adj_config.min_annotations:
continue
# Check if we require fully annotated items
if self.adj_config.require_fully_annotated:
max_per_item = ism.max_annotations_per_item
if max_per_item > 0 and len(annotators) < max_per_item:
continue
# Collect annotations from all annotators
item_annotations = {}
item_spans = {}
item_behavioral = {}
for user_id in annotators:
user_state = usm.get_user_state(user_id)
if not user_state:
continue
# Get label annotations
label_annots = user_state.instance_id_to_label_to_value.get(
instance_id_str, {}
)
if label_annots:
item_annotations[user_id] = self._serialize_labels(label_annots)
# Get span annotations
span_annots = user_state.instance_id_to_span_to_value.get(
instance_id_str, {}
)
if span_annots:
item_spans[user_id] = self._serialize_spans(span_annots)
# Get behavioral data
bd = user_state.instance_id_to_behavioral_data.get(
instance_id_str, {}
)
if bd:
item_behavioral[user_id] = self._serialize_behavioral(bd)
if not item_annotations and not item_spans:
continue
# Compute agreement scores
agreement_scores = self._compute_agreement(
item_annotations, scheme_names
)
overall = self._compute_overall_agreement(agreement_scores)
# Filter by agreement threshold
if not self.adj_config.show_all_items:
if overall >= self.adj_config.agreement_threshold:
continue
# Preserve existing status if already in queue
existing = self.queue.get(instance_id_str)
status = existing.status if existing else "pending"
assigned = existing.assigned_adjudicator if existing else None
# Enrich with MACE predictions if available
mace_preds = {}
try:
from potato.mace_manager import get_mace_manager
mace_mgr = get_mace_manager()
if mace_mgr and mace_mgr.results:
for sname in scheme_names:
pred = mace_mgr.get_prediction(instance_id_str, sname)
if pred is not None:
mace_preds[sname] = pred
except Exception:
pass # MACE is optional
self.queue[instance_id_str] = AdjudicationItem(
instance_id=instance_id_str,
annotations=item_annotations,
span_annotations=item_spans,
behavioral_data=item_behavioral,
agreement_scores=agreement_scores,
overall_agreement=overall,
num_annotators=len(annotators),
status=status,
assigned_adjudicator=assigned,
mace_predictions=mace_preds,
)
self._queue_built = True
return list(self.queue.values())
def try_enqueue_item(self, instance_id: str) -> bool:
"""
Evaluate a single item and, if it qualifies, add it to the queue.
Called when an overlap-sample item saturates so that low-agreement
items show up in the adjudication queue without needing a full
``build_queue()`` rescan. Returns True if the item ended up in the
queue, False otherwise.
"""
if not self.adj_config.enabled:
return False
from potato.user_state_management import get_user_state_manager
from potato.item_state_management import get_item_state_manager
usm = get_user_state_manager()
ism = get_item_state_manager()
if usm is None or ism is None:
return False
with self._lock:
instance_id_str = str(instance_id)
if instance_id_str in self.decisions:
return False
item = ism.instance_id_to_instance.get(instance_id)
if item is None:
return False
annotators = {
u for u in ism.instance_annotators.get(instance_id, set())
if u not in self.adj_config.adjudicator_users
}
if len(annotators) < self.adj_config.min_annotations:
return False
scheme_names = [s.get("name", "") for s in self.config.get("annotation_schemes", [])]
item_annotations: Dict[str, Any] = {}
item_spans: Dict[str, Any] = {}
item_behavioral: Dict[str, Any] = {}
for user_id in annotators:
ustate = usm.get_user_state(user_id)
if not ustate:
continue
la = ustate.instance_id_to_label_to_value.get(instance_id_str, {})
if la:
item_annotations[user_id] = self._serialize_labels(la)
sa = ustate.instance_id_to_span_to_value.get(instance_id_str, {})
if sa:
item_spans[user_id] = self._serialize_spans(sa)
bd = ustate.instance_id_to_behavioral_data.get(instance_id_str, {})
if bd:
item_behavioral[user_id] = self._serialize_behavioral(bd)
if not item_annotations and not item_spans:
return False
agreement_scores = self._compute_agreement(item_annotations, scheme_names)
overall = self._compute_overall_agreement(agreement_scores)
if not self.adj_config.show_all_items:
if overall >= self.adj_config.agreement_threshold:
return False
existing = self.queue.get(instance_id_str)
self.queue[instance_id_str] = AdjudicationItem(
instance_id=instance_id_str,
annotations=item_annotations,
span_annotations=item_spans,
behavioral_data=item_behavioral,
agreement_scores=agreement_scores,
overall_agreement=overall,
num_annotators=len(annotators),
status=existing.status if existing else "pending",
assigned_adjudicator=existing.assigned_adjudicator if existing else None,
)
self.logger.info(
"Auto-routed item %s into adjudication queue (overall agreement=%.3f, "
"threshold=%.3f, annotators=%d)",
instance_id_str, overall, self.adj_config.agreement_threshold, len(annotators),
)
return True
def _serialize_labels(self, label_data: Dict) -> Dict[str, Any]:
"""Convert label annotation data to serializable dict."""
result = {}
for key, value in label_data.items():
# Key might be a Label object or a string
if hasattr(key, 'get_schema'):
schema = key.get_schema()
name = key.get_name()
if schema not in result:
result[schema] = {}
result[schema][name] = value
elif isinstance(key, str):
result[key] = value
else:
result[str(key)] = value
return result
def _serialize_spans(self, span_data: Dict) -> List[Dict]:
"""Convert span annotation data to serializable list."""
spans = []
for key, value in span_data.items():
if hasattr(key, 'get_schema'):
spans.append({
"schema": key.get_schema(),
"name": key.get_name(),
"title": key.get_title() if hasattr(key, 'get_title') else "",
"start": key.get_start(),
"end": key.get_end(),
"id": key.get_id(),
"target_field": key.get_target_field() if hasattr(key, 'get_target_field') else None,
})
elif isinstance(value, dict):
spans.append(value)
return spans
def _serialize_behavioral(self, bd) -> Dict:
"""Convert behavioral data to serializable dict."""
if hasattr(bd, 'to_dict'):
return bd.to_dict()
elif isinstance(bd, dict):
return bd
return {}
def _compute_agreement(
self, item_annotations: Dict[str, Dict], scheme_names: List[str]
) -> Dict[str, float]:
"""
Compute per-schema agreement for an item.
Uses simple percentage agreement (proportion of annotators who chose
the most common label). For more sophisticated metrics, simpledorff
can be used but requires multiple items.
Returns:
Dict mapping schema_name to agreement score (0.0 - 1.0)
"""
agreement_scores = {}
for schema in scheme_names:
values = []
for user_id, user_annots in item_annotations.items():
if schema in user_annots:
val = user_annots[schema]
# Normalize to comparable form
if isinstance(val, dict):
# Radio stores {label: label} (value is the label string)
# and multiselect stores {label: value/true}. A label is
# "selected" when its value is present/truthy. The old
# filter (v is True / == "true" / == 1) dropped radio's
# string value, collapsing every annotator to an empty
# frozenset -> a spurious 1.0 agreement even on total
# disagreement.
falsey = (False, None, "", "false", "False", 0, "0")
selected = frozenset(
k for k, v in val.items() if v not in falsey
)
values.append(selected)
else:
values.append(val)
if len(values) < 2:
continue
# Compute pairwise agreement (percentage)
agree_count = 0
total_pairs = 0
for i in range(len(values)):
for j in range(i + 1, len(values)):
total_pairs += 1
if values[i] == values[j]:
agree_count += 1
agreement_scores[schema] = (
agree_count / total_pairs if total_pairs > 0 else 1.0
)
return agreement_scores
def _compute_overall_agreement(self, agreement_scores: Dict[str, float]) -> float:
"""Compute overall agreement as the mean of per-schema scores."""
if not agreement_scores:
return 1.0
return sum(agreement_scores.values()) / len(agreement_scores)
def get_queue(
self,
adjudicator_id: Optional[str] = None,
filter_status: Optional[str] = None,
) -> List[AdjudicationItem]:
"""
Get the adjudication queue, optionally filtered by status.
Args:
adjudicator_id: Optional adjudicator to filter by assignment
filter_status: Optional status filter ("pending", "completed", etc.)
Returns:
List of AdjudicationItem objects
"""
with self._lock:
if not self._queue_built:
self.build_queue()
items = list(self.queue.values())
if filter_status:
items = [i for i in items if i.status == filter_status]
# Sort: pending first, then by agreement (lowest first)
items.sort(key=lambda x: (
0 if x.status == "pending" else 1 if x.status == "in_progress" else 2,
x.overall_agreement,
))
return items
def get_item(self, instance_id: str) -> Optional[AdjudicationItem]:
"""
Get full item data for adjudication.
Args:
instance_id: The instance ID to retrieve
Returns:
AdjudicationItem or None if not in queue
"""
with self._lock:
if not self._queue_built:
self.build_queue()
return self.queue.get(str(instance_id))
def get_item_text(self, instance_id: str) -> str:
"""Get the text content for an item."""
from potato.item_state_management import get_item_state_manager
ism = get_item_state_manager()
item = ism.instance_id_to_instance.get(instance_id)
if item:
# Use text_key from config if available
text_key = self.config.get("item_properties", {}).get("text_key", "text")
data = item.get_data()
if isinstance(data, dict) and text_key in data:
return data[text_key]
return item.get_text()
return ""
def get_item_data(self, instance_id: str) -> Dict[str, Any]:
"""Get the full raw data for an item."""
from potato.item_state_management import get_item_state_manager
ism = get_item_state_manager()
item = ism.instance_id_to_instance.get(instance_id)
if item:
data = item.get_data()
if isinstance(data, dict):
return data
return {"text": str(data)}
return {}
def get_next_item(self, adjudicator_id: str) -> Optional[AdjudicationItem]:
"""Get the next pending item for an adjudicator."""
items = self.get_queue(filter_status="pending")
if items:
return items[0]
return None
def skip_item(self, instance_id: str, adjudicator_id: str) -> bool:
"""Mark an item as skipped."""
with self._lock:
item = self.queue.get(str(instance_id))
if item:
item.status = "skipped"
return True
return False
def submit_decision(self, decision: AdjudicationDecision) -> bool:
"""
Submit an adjudication decision.
Args:
decision: The AdjudicationDecision to save
Returns:
True if successful
"""
with self._lock:
instance_id = str(decision.instance_id)
self.decisions[instance_id] = decision
# Update queue status
if instance_id in self.queue:
self.queue[instance_id].status = "completed"
self.queue[instance_id].assigned_adjudicator = decision.adjudicator_id
# Persist to disk
self._save_decisions()
self.logger.info(
f"Adjudication decision saved for {instance_id} "
f"by {decision.adjudicator_id}"
)
return True
def get_stats(self) -> Dict[str, Any]:
"""Get adjudication progress statistics."""
with self._lock:
if not self._queue_built:
self.build_queue()
total = len(self.queue)
completed = sum(
1 for i in self.queue.values() if i.status == "completed"
)
pending = sum(
1 for i in self.queue.values() if i.status == "pending"
)
skipped = sum(
1 for i in self.queue.values() if i.status == "skipped"
)
in_progress = sum(
1 for i in self.queue.values() if i.status == "in_progress"
)
avg_agreement = 0.0
if self.queue:
avg_agreement = sum(
i.overall_agreement for i in self.queue.values()
) / len(self.queue)
# Per-adjudicator stats
adjudicator_stats = defaultdict(lambda: {"completed": 0, "total_time_ms": 0})
for decision in self.decisions.values():
adj_id = decision.adjudicator_id
adjudicator_stats[adj_id]["completed"] += 1
adjudicator_stats[adj_id]["total_time_ms"] += decision.time_spent_ms
return {
"total": total,
"completed": completed,
"pending": pending,
"skipped": skipped,
"in_progress": in_progress,
"completion_rate": completed / total if total > 0 else 0.0,
"avg_agreement": avg_agreement,
"adjudicator_stats": dict(adjudicator_stats),
}
def get_decision(self, instance_id: str) -> Optional[AdjudicationDecision]:
"""Get the decision for an item, if one exists."""
return self.decisions.get(str(instance_id))
# ------------------------------------------------------------------
# Phase 3: Similarity integration
# ------------------------------------------------------------------
def _precompute_similarities(self) -> None:
"""Precompute embeddings for all items in the item state manager."""
if not self.similarity_engine or not self.similarity_engine.enabled:
return
from potato.item_state_management import get_item_state_manager
try:
ism = get_item_state_manager()
item_texts = {}
for instance_id, item in ism.instance_id_to_instance.items():
text = self.get_item_text(str(instance_id))
if text:
item_texts[str(instance_id)] = text
if item_texts:
count = self.similarity_engine.precompute_embeddings(item_texts)
self.logger.info(f"Precomputed {count} similarity embeddings")
except Exception as e:
self.logger.error(f"Error precomputing similarities: {e}")
def get_similar_items(
self, instance_id: str, include_metadata: bool = True
) -> List[Dict[str, Any]]:
"""
Get similar items for a given instance, enriched with queue metadata.
Args:
instance_id: The reference instance ID
include_metadata: Whether to include decision/consensus data
Returns:
List of dicts with instance_id, similarity, and optional metadata
"""
if not self.similarity_engine or not self.similarity_engine.enabled:
return []
similar = self.similarity_engine.find_similar(instance_id)
results = []
for other_id, score in similar:
entry = {
"instance_id": other_id,
"similarity": round(score, 4),
"text_preview": self.similarity_engine.text_cache.get(
other_id, ""
),
}
if include_metadata:
queue_item = self.queue.get(other_id)
decision = self.decisions.get(other_id)
entry["in_queue"] = queue_item is not None
entry["status"] = queue_item.status if queue_item else None
entry["overall_agreement"] = (
queue_item.overall_agreement if queue_item else None
)
if decision:
entry["decision"] = "completed"
entry["consensus_label"] = None
else:
entry["decision"] = None
if queue_item:
entry["consensus_label"] = self._get_consensus_label(
queue_item
)
else:
entry["consensus_label"] = None
results.append(entry)
return results
def _get_consensus_label(self, item: AdjudicationItem) -> Optional[str]:
"""
Get the majority/consensus label for an item across the first schema.
Args:
item: The AdjudicationItem
Returns:
The most common label value as a string, or None
"""
if not item.annotations:
return None
# Use the first schema that has values
for user_annots in item.annotations.values():
for schema_name in user_annots:
# Collect all values for this schema
values = []
for ua in item.annotations.values():
val = ua.get(schema_name)
if val is not None:
if isinstance(val, dict):
# Multiselect: use frozenset representation
selected = sorted(
k for k, v in val.items()
if v is True or v == "true" or v == 1
)
values.append(", ".join(selected) if selected else str(val))
else:
values.append(str(val))
if values:
counter = Counter(values)
return counter.most_common(1)[0][0]
return None
# ------------------------------------------------------------------
# Phase 3: Behavioral signal analysis
# ------------------------------------------------------------------
def get_annotator_signals(
self, user_id: str, instance_id: str
) -> Dict[str, Any]:
"""
Compute per-annotator quality signals for a specific item.
Returns:
Dict with user_id, instance_id, flags list, and metrics dict
"""
flags = []
metrics = {}
instance_id = str(instance_id)
item = self.queue.get(instance_id)
if not item:
return {"user_id": user_id, "instance_id": instance_id,
"flags": [], "metrics": {}}
# Get behavioral data for this user on this item
bd = item.behavioral_data.get(user_id, {})
if hasattr(bd, 'to_dict'):
bd = bd.to_dict()
total_time = bd.get("total_time_ms", 0)
metrics["total_time_ms"] = total_time
# 1. Speed z-score vs user's typical time
user_times = self._get_user_times(user_id)
if len(user_times) >= 3 and total_time > 0:
mean_time = sum(user_times) / len(user_times)
std_time = math.sqrt(
sum((t - mean_time) ** 2 for t in user_times) / len(user_times)
)
if std_time > 0:
z_score = (total_time - mean_time) / std_time
metrics["speed_z_score"] = round(z_score, 2)
if z_score < -2.0:
flags.append({
"type": "unusually_fast",
"severity": "high",
"message": f"Annotation time ({total_time}ms) is {abs(z_score):.1f} std devs below average"
})
# 2. Fast decision warning
fast_threshold = self.adj_config.fast_decision_warning_ms
if fast_threshold > 0 and 0 < total_time < fast_threshold:
flags.append({
"type": "fast_decision",
"severity": "medium",
"message": f"Decision made in {total_time}ms (below {fast_threshold}ms threshold)"
})
# 3. Annotation change count
raw_changes = bd.get("annotation_changes", [])
change_count = len(raw_changes) if isinstance(raw_changes, list) else int(raw_changes or 0)
metrics["annotation_changes"] = change_count
if change_count > 5:
flags.append({
"type": "excessive_changes",
"severity": "medium",
"message": f"Made {change_count} annotation changes on this item"
})
# 4. Historical agreement rate with consensus
agreement_rate = self._compute_user_agreement_rate(user_id)
if agreement_rate is not None:
metrics["agreement_rate"] = round(agreement_rate, 3)
if agreement_rate < 0.4:
flags.append({
"type": "low_agreement",
"severity": "high",
"message": f"Agreement rate with consensus: {agreement_rate:.0%}"
})
# 5. Similar item consistency
if self.similarity_engine and self.similarity_engine.enabled:
inconsistencies = self._check_similar_item_consistency(
user_id, instance_id
)
metrics["similar_item_inconsistencies"] = inconsistencies
if inconsistencies > 0:
flags.append({
"type": "similar_item_inconsistency",
"severity": "medium",
"message": f"Different label on {inconsistencies} similar item(s)"
})
return {
"user_id": user_id,
"instance_id": instance_id,
"flags": flags,
"metrics": metrics,
}
def _get_user_times(self, user_id: str) -> List[float]:
"""Collect all annotation times for a user across queue items."""
times = []
for item in self.queue.values():
bd = item.behavioral_data.get(user_id, {})
if hasattr(bd, 'to_dict'):
bd = bd.to_dict()
t = bd.get("total_time_ms", 0)
if t > 0:
times.append(t)
return times
def _compute_user_agreement_rate(self, user_id: str) -> Optional[float]:
"""
Compute how often a user agrees with the consensus across all items.
Returns:
Float 0-1 or None if insufficient data (needs >= 3 items)
"""
agree_count = 0
total_count = 0
for item in self.queue.values():
if user_id not in item.annotations:
continue
consensus = self._get_consensus_label(item)
if consensus is None:
continue
user_annots = item.annotations[user_id]
# Check the first schema
for schema_name, val in user_annots.items():
if isinstance(val, dict):
selected = sorted(
k for k, v in val.items()
if v is True or v == "true" or v == 1
)
user_label = ", ".join(selected) if selected else str(val)
else:
user_label = str(val)
if user_label == consensus:
agree_count += 1
total_count += 1
break # Only check first schema
if total_count < 3:
return None
return agree_count / total_count
def _check_similar_item_consistency(
self, user_id: str, instance_id: str
) -> int:
"""
Check if user's label on similar items (>0.8 similarity) is consistent.
Returns:
Count of similar items where user's label differs
"""
if not self.similarity_engine:
return 0
similar = self.similarity_engine.find_similar(instance_id)
if not similar:
return 0
# Get user's label on the current item
item = self.queue.get(instance_id)
if not item or user_id not in item.annotations:
return 0
user_annots = item.annotations[user_id]
current_label = None
current_schema = None
for schema_name, val in user_annots.items():
current_schema = schema_name
if isinstance(val, dict):
selected = sorted(
k for k, v in val.items()
if v is True or v == "true" or v == 1
)
current_label = ", ".join(selected) if selected else str(val)
else:
current_label = str(val)
break
if current_label is None:
return 0
inconsistencies = 0
for other_id, score in similar:
if score < 0.8:
break # Results are sorted by score desc
other_item = self.queue.get(other_id)
if not other_item or user_id not in other_item.annotations:
continue
other_annots = other_item.annotations[user_id]
other_val = other_annots.get(current_schema)
if other_val is None:
continue
if isinstance(other_val, dict):
selected = sorted(
k for k, v in other_val.items()
if v is True or v == "true" or v == 1
)
other_label = ", ".join(selected) if selected else str(other_val)
else:
other_label = str(other_val)
if other_label != current_label:
inconsistencies += 1
return inconsistencies
def _get_output_dir(self) -> str:
"""Get the adjudication output directory."""
output_dir = self.config.get("output_annotation_dir", "annotation_output")
adj_dir = os.path.join(output_dir, self.adj_config.output_subdir)
os.makedirs(adj_dir, exist_ok=True)
return adj_dir
def _save_decisions(self) -> None:
"""Persist all decisions to disk."""
try:
adj_dir = self._get_output_dir()
decisions_file = os.path.join(adj_dir, "decisions.json")
data = {
"decisions": [d.to_dict() for d in self.decisions.values()],
"last_updated": datetime.now().isoformat(),
}
with open(decisions_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception as e:
self.logger.error(f"Failed to save adjudication decisions: {e}")
def _load_decisions(self) -> None:
"""Load previously saved decisions from disk."""
try:
output_dir = self.config.get("output_annotation_dir", "annotation_output")
adj_dir = os.path.join(output_dir, self.adj_config.output_subdir)
decisions_file = os.path.join(adj_dir, "decisions.json")
if not os.path.exists(decisions_file):
return
with open(decisions_file, "r", encoding="utf-8") as f:
data = json.load(f)
for d in data.get("decisions", []):
decision = AdjudicationDecision.from_dict(d)
self.decisions[decision.instance_id] = decision
self.logger.info(
f"Loaded {len(self.decisions)} previous adjudication decisions"
)
except Exception as e:
self.logger.warning(f"Failed to load adjudication decisions: {e}")
def generate_final_dataset(self) -> List[Dict[str, Any]]:
"""
Generate the final dataset by merging unanimous agreements
and adjudication decisions.
Returns:
List of item dicts with final labels and provenance
"""
from potato.user_state_management import get_user_state_manager
from potato.item_state_management import get_item_state_manager
usm = get_user_state_manager()
ism = get_item_state_manager()
annotation_schemes = self.config.get("annotation_schemes", [])
scheme_names = [s.get("name", "") for s in annotation_schemes]
results = []
for instance_id, item in ism.instance_id_to_instance.items():
instance_id_str = str(instance_id)
result = {
"instance_id": instance_id_str,
"item_data": item.get_data() if hasattr(item, 'get_data') else {},
}
# Check if we have an adjudication decision
decision = self.decisions.get(instance_id_str)
if decision:
result["labels"] = decision.label_decisions
result["spans"] = decision.span_decisions
result["source"] = "adjudicated"
result["adjudicator"] = decision.adjudicator_id
result["confidence"] = decision.confidence
result["provenance"] = decision.source
results.append(result)
continue
# Check for unanimous agreement
annotators = ism.instance_annotators.get(instance_id, set())
annotators = {
u for u in annotators
if u not in self.adj_config.adjudicator_users
}
if len(annotators) < 2:
continue
# Collect annotations
annotations = {}
for user_id in annotators:
user_state = usm.get_user_state(user_id)
if not user_state:
continue
labels = user_state.instance_id_to_label_to_value.get(
instance_id_str, {}
)
if labels:
annotations[user_id] = self._serialize_labels(labels)
if not annotations:
continue
# Check for unanimity per schema
unanimous_labels = {}
is_unanimous = True
for schema in scheme_names:
values = []
for user_annots in annotations.values():
if schema in user_annots:
values.append(json.dumps(user_annots[schema], sort_keys=True))
if len(values) < 2:
continue
if len(set(values)) == 1:
unanimous_labels[schema] = json.loads(values[0])
else:
is_unanimous = False
if is_unanimous and unanimous_labels:
result["labels"] = unanimous_labels
result["source"] = "unanimous"
result["num_annotators"] = len(annotators)
results.append(result)
else:
result["labels"] = {}
result["source"] = "unresolved"
result["num_annotators"] = len(annotators)
results.append(result)
return results
def init_adjudication_manager(config: Dict[str, Any]) -> Optional[AdjudicationManager]:
"""Initialize the singleton AdjudicationManager."""
global _ADJUDICATION_MANAGER
with _ADJUDICATION_LOCK:
if _ADJUDICATION_MANAGER is None:
_ADJUDICATION_MANAGER = AdjudicationManager(config)
return _ADJUDICATION_MANAGER
def get_adjudication_manager() -> Optional[AdjudicationManager]:
"""Get the singleton AdjudicationManager instance."""
return _ADJUDICATION_MANAGER
def clear_adjudication_manager():
"""Clear the singleton (for testing)."""
global _ADJUDICATION_MANAGER
with _ADJUDICATION_LOCK:
_ADJUDICATION_MANAGER = None