| """ |
| Admin Dashboard Module |
| |
| This module provides comprehensive admin functionality for the annotation platform, |
| including dashboard data generation, timing analysis, and configuration management. |
| |
| The admin dashboard offers: |
| - Real-time overview of annotation progress and statistics |
| - Detailed annotator performance metrics and timing analysis |
| - Instance-level annotation tracking and disagreement analysis |
| - Configuration management and system state monitoring |
| - Question and annotation scheme analysis |
| - User progress tracking and completion statistics |
| - Comprehensive annotation history tracking and suspicious activity detection |
| - Performance metrics and quality assurance monitoring |
| - Session tracking and behavioral analysis |
| |
| Key Components: |
| - AdminDashboard: Main class for admin functionality |
| - AnnotatorTimingData: Data class for annotator timing information |
| - InstanceData: Data class for instance information and statistics |
| - Dashboard data generation and analysis functions |
| - Configuration update and management functions |
| - AnnotationHistoryAnalyzer: Advanced history analysis and suspicious activity detection |
| |
| The dashboard provides insights into: |
| - Overall annotation progress and completion rates |
| - Individual annotator performance and efficiency |
| - Annotation quality through disagreement analysis |
| - System configuration and operational status |
| - Real-time monitoring of active annotation sessions |
| - Fine-grained annotation timing and behavioral patterns |
| - Suspicious activity detection and quality assurance |
| - Session-based performance analysis |
| |
| Access Control: |
| - Admin access is controlled via API key authentication |
| - Debug mode allows admin access without API key |
| - All admin endpoints require proper authentication |
| """ |
|
|
| import json |
| import logging |
| import datetime |
| from typing import Dict, List, Optional, Tuple, Any |
| from collections import defaultdict, Counter |
| from dataclasses import dataclass |
| from flask import request, jsonify, session |
|
|
| from potato.flask_server import ( |
| config, logger, get_user_state_manager, get_item_state_manager, |
| get_users, get_total_annotations |
| ) |
| from potato.annotation_history import AnnotationHistoryManager, AnnotationAction |
| from potato.quality_control import get_quality_control_manager |
|
|
| @dataclass |
| class AnnotatorTimingData: |
| """ |
| Data class for annotator timing information. |
| |
| This class encapsulates timing metrics for individual annotators, |
| including total annotations, working time, and performance statistics. |
| Now enhanced with annotation history tracking and suspicious activity detection. |
| """ |
| user_id: str |
| total_annotations: int |
| total_seconds: int |
| average_seconds_per_annotation: float |
| last_activity: Optional[datetime.datetime] |
| current_instance_time: Optional[int] |
| annotations_per_hour: float |
| phase: str |
| has_assignments: bool |
| remaining_assignments: bool |
|
|
| |
| total_actions: int |
| average_action_time_ms: float |
| fastest_action_time_ms: int |
| slowest_action_time_ms: int |
| actions_per_minute: float |
| suspicious_score: float |
| suspicious_level: str |
| fast_actions_count: int |
| burst_actions_count: int |
| session_start_time: Optional[datetime.datetime] |
| current_session_duration_minutes: Optional[float] |
| recent_actions_count: int |
|
|
| |
| training_completed: bool |
| training_correct_answers: int |
| training_total_attempts: int |
| training_pass_rate: float |
| training_current_question: int |
| training_total_questions: int |
|
|
| @dataclass |
| class InstanceData: |
| """ |
| Data class for instance information. |
| |
| This class encapsulates information about annotation instances, |
| including annotation counts, disagreement scores, and annotator lists. |
| """ |
| id: str |
| text: str |
| displayed_text: str |
| annotation_count: int |
| completion_percentage: float |
| most_frequent_label: Optional[str] |
| label_disagreement: float |
| annotators: List[str] |
| num_ai_instance: int |
| average_time_per_annotation: Optional[float] |
|
|
| class AdminDashboard: |
| """ |
| Main class for admin dashboard functionality. |
| |
| This class provides comprehensive admin features including dashboard |
| data generation, timing analysis, configuration management, and |
| system monitoring capabilities. |
| """ |
|
|
| def __init__(self): |
| """Initialize the admin dashboard.""" |
| self.logger = logging.getLogger(__name__) |
|
|
| def check_admin_access(self) -> bool: |
| """ |
| Check if the current request has admin access via API key. |
| |
| Validates against all key sources (config, env var, auto-generated file) |
| and accepts keys from X-API-Key header or session. |
| |
| Returns: |
| bool: True if admin access is granted, False otherwise |
| """ |
| from potato.server_utils.admin_key import validate_admin_api_key |
| api_key = request.headers.get('X-API-Key') or session.get('admin_api_key') |
| return validate_admin_api_key(api_key, config) |
|
|
| def get_dashboard_overview(self) -> Dict[str, Any]: |
| """ |
| Get comprehensive dashboard overview data. |
| |
| This method generates a complete overview of the annotation system, |
| including user statistics, annotation progress, and system configuration. |
| |
| Returns: |
| Dict containing overview statistics with the following structure: |
| - overview: User counts, annotation counts, completion percentages |
| - config: System configuration and settings |
| |
| Side Effects: |
| - Logs errors if data generation fails |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| usm = get_user_state_manager() |
| ism = get_item_state_manager() |
|
|
| |
| users = get_users() |
| total_annotations = get_total_annotations() |
|
|
| |
| active_users = 0 |
| completed_users = 0 |
| total_working_time = 0 |
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if user_state: |
| if user_state.get_phase().value == "ANNOTATION": |
| active_users += 1 |
| elif user_state.get_phase().value == "DONE": |
| completed_users += 1 |
|
|
| |
| timing_data = self._get_annotator_timing_data(username) |
| if timing_data: |
| total_working_time += timing_data.total_seconds |
|
|
| |
| items = ism.items() |
| items_with_annotations = 0 |
| total_assignments = 0 |
|
|
| for item in items: |
| item_id = item.get_id() |
| annotators = ism.get_annotators_for_item(item_id) |
| if annotators: |
| items_with_annotations += 1 |
| total_assignments += len(annotators) |
|
|
| |
| total_items = len(items) |
| completion_percentage = (items_with_annotations / total_items * 100) if total_items > 0 else 0 |
|
|
| |
| hours = total_working_time // 3600 |
| minutes = (total_working_time % 3600) // 60 |
| formatted_time = f"{hours}h {minutes}m" |
|
|
| return { |
| "overview": { |
| "total_users": len(users), |
| "active_users": active_users, |
| "completed_users": completed_users, |
| "total_annotations": total_annotations, |
| "total_items": total_items, |
| "items_with_annotations": items_with_annotations, |
| "completion_percentage": round(completion_percentage, 1), |
| "total_assignments": total_assignments, |
| "total_working_time": formatted_time, |
| "average_annotations_per_item": round(total_annotations / total_items, 1) if total_items > 0 else 0 |
| }, |
| "config": { |
| "annotation_task_name": config.get("annotation_task_name", "Unknown"), |
| "max_annotations_per_user": config.get("max_annotations_per_user", "Unlimited"), |
| "max_annotations_per_item": config.get("max_annotations_per_item", "Unlimited"), |
| "assignment_strategy": config.get("assignment_strategy", "fixed_order"), |
| "debug_mode": config.get("debug", False) |
| } |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting dashboard overview: {e}") |
| return {"error": f"Failed to get dashboard overview: {str(e)}"}, 500 |
|
|
| def get_annotators_data(self) -> Dict[str, Any]: |
| """ |
| Get detailed annotator data including timing information. |
| |
| Returns: |
| Dict containing annotator data with timing analysis |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| usm = get_user_state_manager() |
| users = get_users() |
| annotators_data = [] |
|
|
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if user_state: |
| timing_data = self._get_annotator_timing_data(username) |
| if timing_data: |
| annotators_data.append({ |
| "user_id": timing_data.user_id, |
| "total_annotations": timing_data.total_annotations, |
| "completion_percentage": self._calculate_completion_percentage(timing_data.user_id), |
| "total_seconds": timing_data.total_seconds, |
| "average_seconds_per_annotation": timing_data.average_seconds_per_annotation, |
| "annotations_per_hour": timing_data.annotations_per_hour, |
| "phase": timing_data.phase, |
| "has_assignments": timing_data.has_assignments, |
| "remaining_assignments": timing_data.remaining_assignments, |
| "max_assignments": user_state.get_max_assignments(), |
| "last_activity": timing_data.last_activity.isoformat() if timing_data.last_activity else None, |
| "current_instance_time": timing_data.current_instance_time, |
|
|
| |
| "total_actions": timing_data.total_actions, |
| "average_action_time_ms": timing_data.average_action_time_ms, |
| "fastest_action_time_ms": timing_data.fastest_action_time_ms if timing_data.fastest_action_time_ms != float('inf') else None, |
| "slowest_action_time_ms": timing_data.slowest_action_time_ms, |
| "actions_per_minute": timing_data.actions_per_minute, |
| "suspicious_score": timing_data.suspicious_score, |
| "suspicious_level": timing_data.suspicious_level, |
| "fast_actions_count": timing_data.fast_actions_count, |
| "burst_actions_count": timing_data.burst_actions_count, |
| "session_start_time": timing_data.session_start_time.isoformat() if timing_data.session_start_time else None, |
| "current_session_duration_minutes": timing_data.current_session_duration_minutes, |
| "recent_actions_count": timing_data.recent_actions_count, |
|
|
| |
| "training_completed": timing_data.training_completed, |
| "training_correct_answers": timing_data.training_correct_answers, |
| "training_total_attempts": timing_data.training_total_attempts, |
| "training_pass_rate": round(timing_data.training_pass_rate, 2), |
| "training_current_question": timing_data.training_current_question, |
| "training_total_questions": timing_data.training_total_questions |
| }) |
|
|
| |
| annotators_data.sort(key=lambda x: x["suspicious_score"], reverse=True) |
|
|
| return { |
| "total_annotators": len(annotators_data), |
| "annotators": annotators_data, |
| "summary": { |
| "high_suspicious_count": len([a for a in annotators_data if a["suspicious_level"] in ["High", "Very High"]]), |
| "medium_suspicious_count": len([a for a in annotators_data if a["suspicious_level"] == "Medium"]), |
| "low_suspicious_count": len([a for a in annotators_data if a["suspicious_level"] == "Low"]), |
| "normal_count": len([a for a in annotators_data if a["suspicious_level"] == "Normal"]), |
| "average_suspicious_score": sum(a["suspicious_score"] for a in annotators_data) / len(annotators_data) if annotators_data else 0 |
| } |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting annotators data: {e}") |
| return {"error": f"Failed to get annotators data: {str(e)}"}, 500 |
|
|
| def get_annotation_history_data(self, user_id: Optional[str] = None, |
| instance_id: Optional[str] = None, |
| minutes: Optional[int] = None) -> Dict[str, Any]: |
| """ |
| Get detailed annotation history data with filtering options. |
| |
| Args: |
| user_id: Optional user ID to filter by |
| instance_id: Optional instance ID to filter by |
| minutes: Optional time window in minutes |
| |
| Returns: |
| Dict containing annotation history data |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| usm = get_user_state_manager() |
|
|
| if user_id: |
| |
| user_state = usm.get_user_state(user_id) |
| if not user_state: |
| return {"error": f"User {user_id} not found"}, 404 |
|
|
| actions = user_state.get_annotation_history(instance_id) |
| if minutes: |
| actions = user_state.get_recent_actions(minutes) |
|
|
| return self._format_annotation_history(actions, user_id) |
| else: |
| |
| all_actions = [] |
| users = get_users() |
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if user_state: |
| user_actions = user_state.get_annotation_history(instance_id) |
| if minutes: |
| user_actions = user_state.get_recent_actions(minutes) |
| all_actions.extend(user_actions) |
|
|
| return self._format_annotation_history(all_actions, "all_users") |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting annotation history data: {e}") |
| return {"error": f"Failed to get annotation history data: {str(e)}"}, 500 |
|
|
| def get_suspicious_activity_data(self) -> Dict[str, Any]: |
| """ |
| Get comprehensive suspicious activity analysis. |
| |
| Returns: |
| Dict containing suspicious activity data |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| usm = get_user_state_manager() |
| users = get_users() |
| suspicious_data = [] |
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if user_state: |
| suspicious_actions = user_state.get_suspicious_activity() |
| if suspicious_actions: |
| suspicious_data.append({ |
| "user_id": username, |
| "suspicious_actions_count": len(suspicious_actions), |
| "suspicious_actions": [ |
| { |
| "action_id": action.action_id, |
| "timestamp": action.timestamp.isoformat(), |
| "instance_id": action.instance_id, |
| "action_type": action.action_type, |
| "schema_name": action.schema_name, |
| "label_name": action.label_name, |
| "server_processing_time_ms": action.server_processing_time_ms, |
| "session_id": action.session_id |
| } |
| for action in suspicious_actions[:10] |
| ] |
| }) |
|
|
| return { |
| "total_users_with_suspicious_activity": len(suspicious_data), |
| "suspicious_activity": suspicious_data |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting suspicious activity data: {e}") |
| return {"error": f"Failed to get suspicious activity data: {str(e)}"}, 500 |
|
|
| def get_instances_data(self, page: int = 1, page_size: int = 25, |
| sort_by: str = "annotation_count", sort_order: str = "desc", |
| filter_completion: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Get paginated instances data with sorting and filtering. |
| |
| Args: |
| page: Page number (1-based) |
| page_size: Number of instances per page |
| sort_by: Field to sort by (annotation_count, completion_percentage, disagreement, id) |
| sort_order: Sort order (asc, desc) |
| filter_completion: Filter by completion status (completed, incomplete, all) |
| |
| Returns: |
| Dict containing paginated instances data |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| ism = get_item_state_manager() |
| items = ism.items() |
|
|
| |
| instances_data = [] |
| for item in items: |
| item_id = item.get_id() |
| annotators = ism.get_annotators_for_item(item_id) |
| annotation_count = len(annotators) if annotators else 0 |
|
|
| |
| max_annotations = config.get("max_annotations_per_item", -1) |
| if max_annotations > 0: |
| completion_percentage = min(100, (annotation_count / max_annotations) * 100) |
| else: |
| completion_percentage = 100 if annotation_count > 0 else 0 |
|
|
| |
| most_frequent_label, disagreement = self._calculate_label_statistics(item_id) |
|
|
| |
| avg_time = self._calculate_average_time_per_annotation(item_id) |
|
|
| instance_data = InstanceData( |
| id=item_id, |
| text=item.get_text(), |
| displayed_text=item.get_displayed_text(), |
| annotation_count=annotation_count, |
| completion_percentage=completion_percentage, |
| most_frequent_label=most_frequent_label, |
| label_disagreement=disagreement, |
| annotators=list(annotators) if annotators else [], |
| average_time_per_annotation=avg_time, |
| num_ai_instance=self._calculate_total_instance_ai(item_id) |
| ) |
| instances_data.append(instance_data) |
|
|
| |
| if filter_completion == "completed": |
| instances_data = [i for i in instances_data if i.completion_percentage >= 100] |
| elif filter_completion == "incomplete": |
| instances_data = [i for i in instances_data if i.completion_percentage < 100] |
|
|
| |
| reverse = sort_order.lower() == "desc" |
| if sort_by == "annotation_count": |
| instances_data.sort(key=lambda x: x.annotation_count, reverse=reverse) |
| elif sort_by == "completion_percentage": |
| instances_data.sort(key=lambda x: x.completion_percentage, reverse=reverse) |
| elif sort_by == "disagreement": |
| instances_data.sort(key=lambda x: x.label_disagreement, reverse=reverse) |
| elif sort_by == "id": |
| instances_data.sort(key=lambda x: x.id, reverse=reverse) |
| elif sort_by == "average_time": |
| instances_data.sort(key=lambda x: x.average_time_per_annotation or 0, reverse=reverse) |
|
|
| |
| total_instances = len(instances_data) |
| start_idx = (page - 1) * page_size |
| end_idx = start_idx + page_size |
| paginated_instances = instances_data[start_idx:end_idx] |
|
|
| |
| serialized_instances = [] |
| for instance in paginated_instances: |
| serialized_instances.append({ |
| "id": instance.id, |
| "text": instance.text[:100] + "..." if len(instance.text) > 100 else instance.text, |
| "displayed_text": instance.displayed_text[:100] + "..." if len(instance.displayed_text) > 100 else instance.displayed_text, |
| "annotation_count": instance.annotation_count, |
| "completion_percentage": round(instance.completion_percentage, 1), |
| "most_frequent_label": instance.most_frequent_label, |
| "label_disagreement": round(instance.label_disagreement, 2), |
| "annotators": instance.annotators, |
| "num_ai_instance": instance.num_ai_instance, |
| "average_time_per_annotation": self._format_seconds(instance.average_time_per_annotation) if instance.average_time_per_annotation else None |
| }) |
|
|
| return { |
| "instances": serialized_instances, |
| "pagination": { |
| "page": page, |
| "page_size": page_size, |
| "total_instances": total_instances, |
| "total_pages": (total_instances + page_size - 1) // page_size, |
| "has_next": end_idx < total_instances, |
| "has_prev": page > 1 |
| }, |
| "summary": { |
| "completed_instances": len([i for i in instances_data if i.completion_percentage >= 100]), |
| "incomplete_instances": len([i for i in instances_data if i.completion_percentage < 100]), |
| "average_annotations_per_instance": round(sum(i.annotation_count for i in instances_data) / len(instances_data), 1) if instances_data else 0, |
| "average_disagreement": round(sum(i.label_disagreement for i in instances_data) / len(instances_data), 2) if instances_data else 0 |
| } |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting instances data: {e}") |
| return {"error": f"Failed to get instances data: {str(e)}"}, 500 |
|
|
| def update_config(self, config_updates: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Update system configuration. |
| |
| Args: |
| config_updates: Dictionary of configuration updates |
| |
| Returns: |
| Dict containing update result |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| |
| updated_fields = [] |
|
|
| for key, value in config_updates.items(): |
| if key in ["max_annotations_per_user", "max_annotations_per_item"]: |
| if isinstance(value, int) and value >= -1: |
| config[key] = value |
| updated_fields.append(key) |
| else: |
| return {"error": f"Invalid value for {key}: must be integer >= -1"}, 400 |
|
|
| elif key == "assignment_strategy": |
| valid_strategies = ["random", "fixed_order", "least_annotated", "max_diversity", "active_learning", "llm_confidence"] |
| if value in valid_strategies: |
| config[key] = value |
| updated_fields.append(key) |
| else: |
| return {"error": f"Invalid assignment strategy: {value}"}, 400 |
|
|
| return { |
| "status": "success", |
| "message": f"Updated configuration fields: {', '.join(updated_fields)}", |
| "updated_fields": updated_fields |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error updating config: {e}") |
| return {"error": f"Failed to update config: {str(e)}"}, 500 |
|
|
| def get_questions_data(self) -> Dict[str, Any]: |
| """ |
| Get aggregate analysis data for each annotation schema/question. |
| |
| Returns: |
| Dict containing questions data with visualizations for different annotation types |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| ism = get_item_state_manager() |
| annotation_schemes = config.get("annotation_schemes", []) |
| questions_data = [] |
|
|
| users = get_users() |
|
|
| for scheme in annotation_schemes: |
| scheme_name = scheme.get("name", "Unknown") |
| annotation_type = scheme.get("annotation_type", "unknown") |
|
|
| all_annotations = [] |
| item_annotations = {} |
|
|
| for item in ism.items(): |
| item_id = item.get_id() |
| item_annotations[item_id] = [] |
|
|
| for username in users: |
| user_state = get_user_state_manager().get_user_state(username) |
| if user_state: |
| label_annotations = user_state.get_label_annotations(item_id) |
| for label, value in label_annotations.items(): |
| label_schema = None |
| label_name = None |
| if hasattr(label, 'get_schema'): |
| label_schema = label.get_schema() |
| label_name = label.get_name() |
| elif hasattr(label, 'schema'): |
| label_schema = label.schema |
| label_name = getattr(label, 'name', None) |
| elif isinstance(label, str): |
| label_schema = label |
|
|
| if label_schema == scheme_name: |
| normalized_value = label_name if label_name else value |
|
|
| if annotation_type in ["radio", "select"]: |
| normalized_value = self._normalize_categorical_value(normalized_value) |
| elif annotation_type == "multiselect" and isinstance(normalized_value, list): |
| normalized_value = [ |
| normalized_label |
| for normalized_label in ( |
| self._normalize_categorical_value(v) for v in normalized_value |
| ) |
| if normalized_label is not None |
| ] |
|
|
| if normalized_value is not None: |
| all_annotations.append(normalized_value) |
| item_annotations[item_id].append(normalized_value) |
|
|
| analysis = self._analyze_annotation_scheme( |
| annotation_type, scheme, all_annotations, item_annotations |
| ) |
|
|
| questions_data.append({ |
| "name": scheme_name, |
| "type": annotation_type, |
| "description": scheme.get("description", ""), |
| "total_annotations": len(all_annotations), |
| "items_with_annotations": len([item_id for item_id, annotations in item_annotations.items() if annotations]), |
| "analysis": analysis |
| }) |
|
|
| return { |
| "questions": questions_data, |
| "summary": { |
| "total_questions": len(questions_data), |
| "total_annotations": sum(q["total_annotations"] for q in questions_data), |
| "question_types": list(set(q["type"] for q in questions_data)) |
| } |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting questions data: {e}") |
| return {"error": f"Failed to get questions data: {str(e)}"}, 500 |
|
|
| def _analyze_annotation_scheme(self, annotation_type: str, scheme: dict, |
| all_annotations: list, item_annotations: dict) -> dict: |
| """ |
| Analyze annotations based on their type and generate appropriate visualizations. |
| """ |
| if not all_annotations: |
| return {"error": "No annotations found"} |
|
|
| analysis = { |
| "type": annotation_type, |
| "total_count": len(all_annotations) |
| } |
|
|
| if annotation_type in ["radio", "select"]: |
| normalized_annotations = [ |
| normalized for normalized in |
| (self._normalize_categorical_value(annotation) for annotation in all_annotations) |
| if normalized is not None |
| ] |
| if not normalized_annotations: |
| return {"error": "No annotations found"} |
|
|
| label_counts = Counter(normalized_annotations) |
| raw_labels = scheme.get("labels", []) |
| labels = [ |
| normalized for normalized in |
| (self._normalize_categorical_value(label) for label in raw_labels) |
| if normalized is not None |
| ] |
|
|
| analysis.update({ |
| "visualization_type": "histogram", |
| "data": { |
| "labels": labels, |
| "counts": [label_counts.get(label, 0) for label in labels], |
| "percentages": [round(label_counts.get(label, 0) / len(normalized_annotations) * 100, 1) |
| for label in labels] |
| }, |
| "most_common": label_counts.most_common(1)[0] if label_counts else None, |
| "agreement_score": self._calculate_agreement_score(item_annotations) |
| }) |
| elif annotation_type == "multiselect": |
| |
| label_counts = Counter() |
| co_occurrence = defaultdict(int) |
| labels = scheme.get("labels", []) |
|
|
| for annotations in item_annotations.values(): |
| if isinstance(annotations, list): |
| |
| for annotation in annotations: |
| if isinstance(annotation, list): |
| for label in annotation: |
| label_counts[label] += 1 |
|
|
| |
| for i, annotation1 in enumerate(annotations): |
| if isinstance(annotation1, list): |
| for j, annotation2 in enumerate(annotations): |
| if i != j and isinstance(annotation2, list): |
| for label1 in annotation1: |
| for label2 in annotation2: |
| if label1 < label2: |
| co_occurrence[(label1, label2)] += 1 |
|
|
| analysis.update({ |
| "visualization_type": "multiselect_analysis", |
| "data": { |
| "labels": labels, |
| "counts": [label_counts.get(label, 0) for label in labels], |
| "percentages": [round(label_counts.get(label, 0) / len(item_annotations) * 100, 1) |
| for label in labels], |
| "co_occurrence": dict(co_occurrence) |
| }, |
| "most_common": label_counts.most_common(3) if label_counts else [], |
| "average_labels_per_item": round(sum(len(ann) if isinstance(ann, list) else 1 |
| for anns in item_annotations.values() |
| for ann in anns) / len(all_annotations), 2) |
| }) |
|
|
| elif annotation_type in ["likert", "number", "slider"]: |
| |
| numeric_values = [] |
| for value in all_annotations: |
| try: |
| if isinstance(value, (int, float)): |
| numeric_values.append(float(value)) |
| elif isinstance(value, str) and value.replace('.', '').replace('-', '').isdigit(): |
| numeric_values.append(float(value)) |
| except (ValueError, TypeError): |
| continue |
|
|
| if numeric_values: |
| analysis.update({ |
| "visualization_type": "distribution", |
| "data": { |
| "values": numeric_values, |
| "bins": self._create_histogram_bins(numeric_values, scheme), |
| "statistics": { |
| "mean": round(sum(numeric_values) / len(numeric_values), 2), |
| "median": round(sorted(numeric_values)[len(numeric_values)//2], 2), |
| "min": min(numeric_values), |
| "max": max(numeric_values), |
| "std": round((sum((x - sum(numeric_values)/len(numeric_values))**2 |
| for x in numeric_values) / len(numeric_values))**0.5, 2) |
| } |
| }, |
| "range": scheme.get("min", 0) if "min" in scheme else None, |
| "max": scheme.get("max", 10) if "max" in scheme else None |
| }) |
| else: |
| analysis["error"] = "No valid numeric values found" |
|
|
| elif annotation_type == "text": |
| |
| text_lengths = [] |
| word_counts = [] |
| common_words = Counter() |
|
|
| for value in all_annotations: |
| if isinstance(value, str) and value.strip(): |
| text_lengths.append(len(value)) |
| words = value.lower().split() |
| word_counts.append(len(words)) |
| common_words.update(words) |
|
|
| if text_lengths: |
| analysis.update({ |
| "visualization_type": "text_analysis", |
| "data": { |
| "lengths": text_lengths, |
| "word_counts": word_counts, |
| "common_words": common_words.most_common(10), |
| "statistics": { |
| "avg_length": round(sum(text_lengths) / len(text_lengths), 1), |
| "avg_words": round(sum(word_counts) / len(word_counts), 1), |
| "min_length": min(text_lengths), |
| "max_length": max(text_lengths), |
| "empty_responses": len([v for v in all_annotations |
| if not isinstance(v, str) or not v.strip()]) |
| } |
| } |
| }) |
| else: |
| analysis["error"] = "No valid text responses found" |
|
|
| elif annotation_type == "span": |
| |
| span_counts = [] |
| total_spans = 0 |
|
|
| for annotations in item_annotations.values(): |
| if isinstance(annotations, list): |
| for annotation in annotations: |
| if isinstance(annotation, list): |
| span_counts.append(len(annotation)) |
| total_spans += len(annotation) |
|
|
| if span_counts: |
| analysis.update({ |
| "visualization_type": "span_analysis", |
| "data": { |
| "span_counts": span_counts, |
| "total_spans": total_spans, |
| "statistics": { |
| "avg_spans_per_item": round(sum(span_counts) / len(span_counts), 2), |
| "items_with_spans": len([c for c in span_counts if c > 0]), |
| "max_spans": max(span_counts) if span_counts else 0, |
| "min_spans": min(span_counts) if span_counts else 0 |
| } |
| } |
| }) |
| else: |
| analysis["error"] = "No valid span annotations found" |
|
|
| else: |
| analysis["error"] = f"Unsupported annotation type: {annotation_type}" |
|
|
| return analysis |
|
|
| def _calculate_agreement_score(self, item_annotations: dict) -> float: |
| """Calculate agreement score for categorical annotations.""" |
| if not item_annotations: |
| return 0.0 |
|
|
| agreement_scores = [] |
| for annotations in item_annotations.values(): |
| if len(annotations) > 1: |
| |
| counter = Counter(annotations) |
| most_common_count = counter.most_common(1)[0][1] |
| agreement_scores.append(most_common_count / len(annotations)) |
|
|
| return round(sum(agreement_scores) / len(agreement_scores) * 100, 1) if agreement_scores else 0.0 |
|
|
| def _create_histogram_bins(self, values: list, scheme: dict) -> dict: |
| """Create histogram bins for numeric data.""" |
| if not values: |
| return {"bins": [], "counts": []} |
|
|
| min_val = scheme.get("min", min(values)) |
| max_val = scheme.get("max", max(values)) |
|
|
| |
| bin_size = (max_val - min_val) / 10 |
| bins = [min_val + i * bin_size for i in range(11)] |
| counts = [0] * 10 |
|
|
| for value in values: |
| bin_index = min(int((value - min_val) / bin_size), 9) |
| counts[bin_index] += 1 |
|
|
| return { |
| "bins": [round(b, 2) for b in bins], |
| "counts": counts |
| } |
|
|
| def _get_annotator_timing_data(self, user_id: str) -> Optional[AnnotatorTimingData]: |
| """ |
| Get timing data for a specific annotator. |
| |
| Args: |
| user_id: The user ID to get timing data for |
| |
| Returns: |
| AnnotatorTimingData object or None if user not found |
| """ |
| try: |
| usm = get_user_state_manager() |
| user_state = usm.get_user_state(user_id) |
|
|
| if not user_state: |
| return None |
|
|
| |
| total_annotations = len(user_state.get_all_annotations()) |
| phase = str(user_state.get_phase()) |
| has_assignments = user_state.has_assignments() |
| remaining_assignments = user_state.has_remaining_assignments() |
|
|
| |
| total_seconds = 0 |
| instance_times = [] |
|
|
| for instance_id, behavioral_data in user_state.instance_id_to_behavioral_data.items(): |
| instance_seconds = None |
| |
| if hasattr(behavioral_data, 'total_time_ms'): |
| |
| if behavioral_data.total_time_ms: |
| instance_seconds = behavioral_data.total_time_ms / 1000.0 |
| elif isinstance(behavioral_data, dict): |
| |
| if behavioral_data.get("total_time_ms"): |
| instance_seconds = behavioral_data["total_time_ms"] / 1000.0 |
| elif behavioral_data.get("time_string"): |
| parsed_time = user_state.parse_time_string(behavioral_data["time_string"]) |
| if parsed_time: |
| instance_seconds = parsed_time["total_seconds"] |
| if instance_seconds is not None: |
| total_seconds += instance_seconds |
| instance_times.append(instance_seconds) |
|
|
| |
| average_seconds_per_annotation = total_seconds / total_annotations if total_annotations > 0 else 0 |
| annotations_per_hour = (total_annotations * 3600) / total_seconds if total_seconds > 0 else 0 |
|
|
| |
| current_instance_time = None |
| current_instance = user_state.get_current_instance() |
| if current_instance: |
| current_instance_id = current_instance.get_id() |
| current_behavioral = user_state.instance_id_to_behavioral_data.get(current_instance_id) |
| if current_behavioral: |
| if hasattr(current_behavioral, 'total_time_ms'): |
| if current_behavioral.total_time_ms: |
| current_instance_time = current_behavioral.total_time_ms / 1000.0 |
| elif isinstance(current_behavioral, dict): |
| if current_behavioral.get("total_time_ms"): |
| current_instance_time = current_behavioral["total_time_ms"] / 1000.0 |
| elif current_behavioral.get("time_string"): |
| parsed_current = user_state.parse_time_string(current_behavioral["time_string"]) |
| if parsed_current: |
| current_instance_time = parsed_current["total_seconds"] |
|
|
| |
| last_activity = datetime.datetime.now() |
|
|
| |
| performance_metrics = user_state.get_performance_metrics() |
| suspicious_analysis = AnnotationHistoryManager.detect_suspicious_activity( |
| user_state.get_annotation_history() |
| ) |
| recent_actions = user_state.get_recent_actions(5) |
|
|
| |
| current_session_duration_minutes = None |
| if user_state.session_start_time: |
| duration = datetime.datetime.now() - user_state.session_start_time |
| current_session_duration_minutes = duration.total_seconds() / 60 |
|
|
| |
| training_state = user_state.get_training_state() |
| training_completed = training_state.is_passed() if training_state else False |
| training_correct_answers = training_state.get_correct_answer_count() if training_state else 0 |
| training_total_attempts = training_state.get_total_attempts() if training_state else 0 |
| training_pass_rate = (training_correct_answers / training_total_attempts * 100) if training_total_attempts > 0 else 0 |
| training_current_question = training_state.get_current_question_index() if training_state else 0 |
| training_total_questions = len(training_state.get_training_instances()) if training_state else 0 |
|
|
| return AnnotatorTimingData( |
| user_id=user_id, |
| total_annotations=total_annotations, |
| total_seconds=total_seconds, |
| average_seconds_per_annotation=average_seconds_per_annotation, |
| last_activity=last_activity, |
| current_instance_time=current_instance_time, |
| annotations_per_hour=annotations_per_hour, |
| phase=phase, |
| has_assignments=has_assignments, |
| remaining_assignments=remaining_assignments, |
|
|
| |
| total_actions=performance_metrics.get('total_actions', 0), |
| average_action_time_ms=performance_metrics.get('average_action_time_ms', 0.0), |
| fastest_action_time_ms=performance_metrics.get('fastest_action_time_ms', 0), |
| slowest_action_time_ms=performance_metrics.get('slowest_action_time_ms', 0), |
| actions_per_minute=performance_metrics.get('actions_per_minute', 0.0), |
| suspicious_score=suspicious_analysis.get('suspicious_score', 0.0), |
| suspicious_level=suspicious_analysis.get('suspicious_level', 'Normal'), |
| fast_actions_count=suspicious_analysis.get('fast_actions_count', 0), |
| burst_actions_count=suspicious_analysis.get('burst_actions_count', 0), |
| session_start_time=user_state.session_start_time, |
| current_session_duration_minutes=current_session_duration_minutes, |
| recent_actions_count=len(recent_actions), |
|
|
| |
| training_completed=training_completed, |
| training_correct_answers=training_correct_answers, |
| training_total_attempts=training_total_attempts, |
| training_pass_rate=training_pass_rate, |
| training_current_question=training_current_question, |
| training_total_questions=training_total_questions |
| ) |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting timing data for user {user_id}: {e}") |
| return None |
|
|
| def _extract_behavioral_total_seconds(self, behavioral_data: Any, user_state=None) -> Optional[float]: |
| """Extract total annotation time in seconds from behavioral data objects or legacy dicts.""" |
| if not behavioral_data: |
| return None |
|
|
| if hasattr(behavioral_data, 'total_time_ms') and behavioral_data.total_time_ms is not None: |
| return behavioral_data.total_time_ms / 1000.0 |
|
|
| if isinstance(behavioral_data, dict): |
| total_time_ms = behavioral_data.get("total_time_ms") |
| if total_time_ms is not None: |
| return total_time_ms / 1000.0 |
|
|
| time_string = behavioral_data.get("time_string") |
| if time_string and user_state and hasattr(user_state, 'parse_time_string'): |
| parsed_time = user_state.parse_time_string(time_string) |
| if parsed_time: |
| return parsed_time.get("total_seconds") |
|
|
| return None |
|
|
| def _extract_behavioral_ai_count(self, behavioral_data: Any) -> int: |
| """Extract AI usage count from behavioral data objects or legacy dicts.""" |
| if not behavioral_data: |
| return 0 |
|
|
| if hasattr(behavioral_data, 'ai_usage'): |
| ai_usage = behavioral_data.ai_usage or [] |
| return len(ai_usage) |
|
|
| if isinstance(behavioral_data, dict): |
| ai_usage = behavioral_data.get("ai_usage", []) or [] |
| return len(ai_usage) |
|
|
| return 0 |
|
|
| def _calculate_total_instance_ai(self, instance_id: str) -> int: |
| """ |
| Calculate total AI assistance events for an instance across all users. |
| |
| Args: |
| instance_id: The instance ID to analyze |
| |
| Returns: |
| Total number of AI usage events recorded for the instance |
| """ |
| try: |
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| total_ai = 0 |
| for username in users: |
| user_state = usm.get_user_state(username) |
| if not user_state: |
| continue |
|
|
| behavioral_data = user_state.instance_id_to_behavioral_data.get(instance_id) |
| total_ai += self._extract_behavioral_ai_count(behavioral_data) |
|
|
| return total_ai |
|
|
| except Exception as e: |
| self.logger.error(f"Error calculating AI statistics for instance {instance_id}: {e}") |
| return 0 |
|
|
| def _calculate_average_time_per_annotation(self, instance_id: str) -> Optional[float]: |
| """ |
| Calculate average time per annotation for an instance. |
| |
| Args: |
| instance_id: The instance ID to analyze |
| |
| Returns: |
| Average time in seconds or None if no data |
| """ |
| try: |
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| total_time = 0 |
| annotation_count = 0 |
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if user_state: |
| behavioral_data = user_state.instance_id_to_behavioral_data.get(instance_id) |
| total_seconds = self._extract_behavioral_total_seconds(behavioral_data, user_state) |
| if total_seconds is not None: |
| total_time += total_seconds |
| annotation_count += 1 |
|
|
| return total_time / annotation_count if annotation_count > 0 else None |
|
|
| except Exception as e: |
| self.logger.error(f"Error calculating average time for instance {instance_id}: {e}") |
| return None |
|
|
| def _calculate_completion_percentage(self, user_id: str) -> float: |
| """ |
| Calculate completion percentage for a user. |
| |
| Args: |
| user_id: The user ID to calculate completion for |
| |
| Returns: |
| Completion percentage (0-100) |
| """ |
| try: |
| usm = get_user_state_manager() |
| user_state = usm.get_user_state(user_id) |
|
|
| if not user_state: |
| return 0.0 |
|
|
| total_assignments = user_state.get_assigned_instance_count() |
| completed_assignments = len(user_state.get_all_annotations()) |
|
|
| if total_assignments == 0: |
| return 0.0 |
|
|
| return (completed_assignments / total_assignments) * 100 |
|
|
| except Exception as e: |
| self.logger.error(f"Error calculating completion percentage for user {user_id}: {e}") |
| return 0.0 |
|
|
| def _format_seconds(self, seconds: Optional[float]) -> Optional[str]: |
| """ |
| Format seconds into a human-readable string. |
| |
| Args: |
| seconds: Number of seconds to format |
| |
| Returns: |
| Formatted time string or None if input is None |
| """ |
| if seconds is None: |
| return None |
|
|
| if seconds < 60: |
| return f"{int(seconds)}s" |
| elif seconds < 3600: |
| minutes = int(seconds // 60) |
| remaining_seconds = int(seconds % 60) |
| return f"{minutes}m {remaining_seconds}s" |
| else: |
| hours = int(seconds // 3600) |
| remaining_minutes = int((seconds % 3600) // 60) |
| return f"{hours}h {remaining_minutes}m" |
|
|
| def _format_annotation_history(self, actions: List[AnnotationAction], context: str) -> Dict[str, Any]: |
| """ |
| Format annotation history data for API response. |
| |
| Args: |
| actions: List of annotation actions |
| context: Context string (user_id or "all_users") |
| |
| Returns: |
| Formatted annotation history data |
| """ |
| if not actions: |
| return { |
| "context": context, |
| "total_actions": 0, |
| "actions": [], |
| "summary": { |
| "action_types": {}, |
| "time_distribution": {}, |
| "performance_metrics": {} |
| } |
| } |
|
|
| |
| action_types = Counter(action.action_type for action in actions) |
| time_distribution = self._calculate_time_distribution(actions) |
| performance_metrics = AnnotationHistoryManager.calculate_performance_metrics(actions) |
|
|
| |
| formatted_actions = [] |
| for action in actions[-100:]: |
| formatted_actions.append({ |
| "action_id": action.action_id, |
| "timestamp": action.timestamp.isoformat(), |
| "user_id": action.user_id, |
| "instance_id": action.instance_id, |
| "action_type": action.action_type, |
| "schema_name": action.schema_name, |
| "label_name": action.label_name, |
| "old_value": action.old_value, |
| "new_value": action.new_value, |
| "span_data": action.span_data, |
| "session_id": action.session_id, |
| "client_timestamp": action.client_timestamp.isoformat() if action.client_timestamp else None, |
| "server_processing_time_ms": action.server_processing_time_ms, |
| "metadata": action.metadata |
| }) |
|
|
| return { |
| "context": context, |
| "total_actions": len(actions), |
| "actions": formatted_actions, |
| "summary": { |
| "action_types": dict(action_types), |
| "time_distribution": time_distribution, |
| "performance_metrics": performance_metrics |
| } |
| } |
|
|
| def _calculate_time_distribution(self, actions: List[AnnotationAction]) -> Dict[str, int]: |
| """ |
| Calculate time distribution of actions. |
| |
| Args: |
| actions: List of annotation actions |
| |
| Returns: |
| Dictionary with time distribution data |
| """ |
| if not actions: |
| return {} |
|
|
| |
| hour_distribution = defaultdict(int) |
| for action in actions: |
| hour = action.timestamp.hour |
| hour_distribution[f"{hour:02d}:00"] += 1 |
|
|
| return dict(hour_distribution) |
|
|
| def get_crowdsourcing_data(self) -> Dict[str, Any]: |
| """ |
| Get crowdsourcing platform statistics (MTurk, Prolific). |
| |
| This method analyzes user data to provide statistics about workers |
| from crowdsourcing platforms like Amazon Mechanical Turk and Prolific. |
| |
| Returns: |
| Dict containing crowdsourcing statistics with the following structure: |
| - summary: Overall counts of crowdsourcing workers |
| - prolific: Prolific-specific statistics |
| - mturk: MTurk-specific statistics |
| - workers: List of individual worker data |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| from potato.authentication import UserAuthenticator |
|
|
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| |
| prolific_workers = [] |
| mturk_workers = [] |
| other_workers = [] |
|
|
| |
| prolific_study_ids = set() |
| mturk_hit_ids = set() |
|
|
| |
| try: |
| user_auth = UserAuthenticator.get_instance() |
| user_data_store = getattr(user_auth.auth_backend, 'user_data', {}) |
| except (ValueError, AttributeError): |
| user_data_store = {} |
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if not user_state: |
| continue |
|
|
| |
| timing_data = self._get_annotator_timing_data(username) |
|
|
| |
| stored_data = user_data_store.get(username, {}) |
|
|
| |
| prolific_session_id = stored_data.get('prolific_session_id') |
| prolific_study_id = stored_data.get('prolific_study_id') |
| mturk_assignment_id = stored_data.get('mturk_assignment_id') |
| mturk_hit_id = stored_data.get('mturk_hit_id') |
|
|
| worker_info = { |
| "worker_id": username, |
| "total_annotations": timing_data.total_annotations if timing_data else 0, |
| "phase": timing_data.phase if timing_data else "unknown", |
| "total_seconds": timing_data.total_seconds if timing_data else 0, |
| "annotations_per_hour": timing_data.annotations_per_hour if timing_data else 0, |
| "completion_percentage": self._calculate_completion_percentage(username), |
| "suspicious_level": timing_data.suspicious_level if timing_data else "Normal", |
| } |
|
|
| |
| if prolific_session_id or prolific_study_id or username.startswith('P'): |
| worker_info["platform"] = "prolific" |
| worker_info["session_id"] = prolific_session_id |
| worker_info["study_id"] = prolific_study_id |
| prolific_workers.append(worker_info) |
| if prolific_study_id: |
| prolific_study_ids.add(prolific_study_id) |
|
|
| |
| elif mturk_assignment_id or mturk_hit_id or username.startswith('A'): |
| worker_info["platform"] = "mturk" |
| worker_info["assignment_id"] = mturk_assignment_id |
| worker_info["hit_id"] = mturk_hit_id |
| mturk_workers.append(worker_info) |
| if mturk_hit_id: |
| mturk_hit_ids.add(mturk_hit_id) |
|
|
| else: |
| worker_info["platform"] = "other" |
| other_workers.append(worker_info) |
|
|
| |
| all_workers = prolific_workers + mturk_workers + other_workers |
|
|
| def calc_stats(workers): |
| if not workers: |
| return { |
| "count": 0, |
| "total_annotations": 0, |
| "total_time_seconds": 0, |
| "avg_annotations_per_worker": 0, |
| "avg_time_per_worker_minutes": 0, |
| "completed_count": 0, |
| "in_progress_count": 0, |
| } |
| total_annotations = sum(w["total_annotations"] for w in workers) |
| total_time = sum(w["total_seconds"] for w in workers) |
| completed = len([w for w in workers if w["phase"] == "Phase.DONE"]) |
| in_progress = len([w for w in workers if w["phase"] == "Phase.ANNOTATION"]) |
| return { |
| "count": len(workers), |
| "total_annotations": total_annotations, |
| "total_time_seconds": total_time, |
| "avg_annotations_per_worker": round(total_annotations / len(workers), 1) if workers else 0, |
| "avg_time_per_worker_minutes": round(total_time / len(workers) / 60, 1) if workers else 0, |
| "completed_count": completed, |
| "in_progress_count": in_progress, |
| } |
|
|
| return { |
| "summary": { |
| "total_workers": len(all_workers), |
| "prolific_workers": len(prolific_workers), |
| "mturk_workers": len(mturk_workers), |
| "other_workers": len(other_workers), |
| "prolific_studies": len(prolific_study_ids), |
| "mturk_hits": len(mturk_hit_ids), |
| }, |
| "prolific": { |
| "stats": calc_stats(prolific_workers), |
| "study_ids": list(prolific_study_ids), |
| "workers": prolific_workers, |
| }, |
| "mturk": { |
| "stats": calc_stats(mturk_workers), |
| "hit_ids": list(mturk_hit_ids), |
| "workers": mturk_workers, |
| }, |
| "other": { |
| "stats": calc_stats(other_workers), |
| "workers": other_workers, |
| }, |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting crowdsourcing data: {e}") |
| return {"error": f"Failed to get crowdsourcing data: {str(e)}"}, 500 |
|
|
|
|
| def get_agreement_metrics(self) -> Dict[str, Any]: |
| """ |
| Get inter-annotator agreement metrics using Krippendorff's alpha. |
| |
| This leverages the existing agreement.py module for calculations. |
| |
| Returns: |
| Dict containing agreement metrics by schema and overall |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| import simpledorff |
| from simpledorff.metrics import nominal_metric, interval_metric |
| import pandas as pd |
|
|
| agreement_config = config.get("agreement_metrics", {}) |
| min_overlap = agreement_config.get("min_overlap", 2) |
|
|
| ism = get_item_state_manager() |
| usm = get_user_state_manager() |
| annotation_schemes = config.get("annotation_schemes", []) |
| users = get_users() |
|
|
| metrics = { |
| "enabled": agreement_config.get("enabled", True), |
| "overall": {}, |
| "by_schema": {}, |
| "warnings": [] |
| } |
|
|
| for scheme in annotation_schemes: |
| schema_name = scheme.get("name", "Unknown") |
| annotation_type = scheme.get("annotation_type", "unknown") |
|
|
| |
| annotations_by_item = {} |
|
|
| for item in ism.items(): |
| item_id = item.get_id() |
| item_annotations = [] |
|
|
| for username in users: |
| user_state = usm.get_user_state(username) |
| if not user_state: |
| continue |
|
|
| |
| all_annotations = user_state.get_all_annotations() |
| if item_id not in all_annotations: |
| continue |
|
|
| instance_annotations = all_annotations[item_id] |
| labels = instance_annotations.get("labels", {}) |
|
|
| |
| for label, value in labels.items(): |
| label_schema = None |
| if hasattr(label, 'schema'): |
| label_schema = label.schema |
| elif hasattr(label, 'get_schema'): |
| label_schema = label.get_schema() |
|
|
| if label_schema == schema_name: |
| item_annotations.append({ |
| "user": username, |
| "value": value |
| }) |
|
|
| if item_annotations: |
| annotations_by_item[item_id] = item_annotations |
|
|
| |
| valid_items = { |
| item_id: annots |
| for item_id, annots in annotations_by_item.items() |
| if len(annots) >= min_overlap |
| } |
|
|
| if not valid_items: |
| metrics["by_schema"][schema_name] = { |
| "error": f"No items with {min_overlap}+ annotators", |
| "items_count": len(annotations_by_item) |
| } |
| continue |
|
|
| |
| try: |
| reliability_data = [] |
| for item_id, annots in valid_items.items(): |
| for annot in annots: |
| reliability_data.append({ |
| "unit": item_id, |
| "annotator": annot["user"], |
| "annotation": self._normalize_annotation_value(annot["value"]) |
| }) |
|
|
| df = pd.DataFrame(reliability_data) |
|
|
| |
| if annotation_type in ["likert", "slider", "number"]: |
| metric_fn = interval_metric |
| metric_name = "interval" |
| else: |
| metric_fn = nominal_metric |
| metric_name = "nominal" |
|
|
| |
| alpha = simpledorff.calculate_krippendorffs_alpha( |
| df, |
| experiment_col="unit", |
| annotator_col="annotator", |
| class_col="annotation", |
| metric_fn=metric_fn |
| ) |
|
|
| schema_metrics = { |
| "krippendorff_alpha": round(alpha, 4), |
| "metric_type": metric_name, |
| "items_evaluated": len(valid_items), |
| "total_annotations": len(reliability_data), |
| "interpretation": self._interpret_alpha(alpha) |
| } |
|
|
| |
| |
| |
| if metric_name == "nominal": |
| try: |
| from potato.agreement import ( |
| cohen_kappa_pairwise, fleiss_kappa, |
| ) |
| schema_metrics["cohen_kappa"] = cohen_kappa_pairwise(df) |
| schema_metrics["fleiss_kappa"] = fleiss_kappa(df) |
| except Exception as e: |
| self.logger.error(f"Error calculating kappas for {schema_name}: {e}") |
| schema_metrics["kappa_error"] = str(e) |
|
|
| metrics["by_schema"][schema_name] = schema_metrics |
|
|
| except Exception as e: |
| self.logger.error(f"Error calculating alpha for {schema_name}: {e}") |
| metrics["by_schema"][schema_name] = { |
| "error": str(e), |
| "items_count": len(valid_items) |
| } |
|
|
| |
| alphas = [ |
| m["krippendorff_alpha"] |
| for m in metrics["by_schema"].values() |
| if "krippendorff_alpha" in m |
| ] |
| if alphas: |
| avg_alpha = sum(alphas) / len(alphas) |
| metrics["overall"] = { |
| "average_krippendorff_alpha": round(avg_alpha, 4), |
| "schemas_evaluated": len(alphas), |
| "interpretation": self._interpret_alpha(avg_alpha) |
| } |
|
|
| cohen_means = [ |
| m["cohen_kappa"]["mean_kappa"] |
| for m in metrics["by_schema"].values() |
| if isinstance(m.get("cohen_kappa"), dict) |
| and m["cohen_kappa"].get("mean_kappa") is not None |
| ] |
| if cohen_means: |
| metrics["overall"]["average_cohen_kappa"] = round( |
| sum(cohen_means) / len(cohen_means), 4 |
| ) |
|
|
| fleiss_values = [ |
| m["fleiss_kappa"]["kappa"] |
| for m in metrics["by_schema"].values() |
| if isinstance(m.get("fleiss_kappa"), dict) |
| and m["fleiss_kappa"].get("kappa") is not None |
| ] |
| if fleiss_values: |
| metrics["overall"]["average_fleiss_kappa"] = round( |
| sum(fleiss_values) / len(fleiss_values), 4 |
| ) |
|
|
| return metrics |
|
|
| except ImportError as e: |
| self.logger.error(f"simpledorff not installed: {e}") |
| return { |
| "enabled": False, |
| "error": "simpledorff library not installed. Run: pip install simpledorff" |
| } |
| except Exception as e: |
| self.logger.error(f"Error getting agreement metrics: {e}") |
| return {"error": f"Failed to get agreement metrics: {str(e)}"}, 500 |
|
|
| def _interpret_alpha(self, alpha: float) -> str: |
| """Human-readable interpretation of Krippendorff's alpha.""" |
| if alpha >= 0.8: |
| return "Good agreement" |
| elif alpha >= 0.67: |
| return "Tentative agreement" |
| elif alpha >= 0.33: |
| return "Low agreement" |
| else: |
| return "Poor agreement" |
|
|
| def _normalize_annotation_value(self, value: Any) -> Any: |
| """Normalize annotation value for comparison.""" |
| if isinstance(value, list): |
| return tuple(sorted(str(v) for v in value)) |
| elif isinstance(value, bool): |
| return str(value).lower() |
| return str(value) |
|
|
| def _normalize_categorical_value(self, value: Any) -> Optional[str]: |
| """Normalize categorical annotation values and label definitions into readable strings.""" |
| if value is None: |
| return None |
|
|
| if isinstance(value, str): |
| return value |
|
|
| if isinstance(value, dict): |
| for key in ("name", "label", "value", "id", "text"): |
| candidate = value.get(key) |
| if isinstance(candidate, str) and candidate: |
| return candidate |
| return json.dumps(value, sort_keys=True) |
|
|
| if isinstance(value, (int, float, bool)): |
| return str(value) |
|
|
| return str(value) |
|
|
| def get_code_cooccurrence_matrix(self, schema_filter: Optional[str] = None, |
| min_count: int = 1) -> Dict[str, Any]: |
| """ |
| Compute pairwise code co-occurrence across instances. |
| |
| Two codes co-occur on an instance when at least one annotator applied |
| each to that instance. Pairs are de-duplicated within an instance |
| (multiple annotators applying the same pair count once). |
| |
| Args: |
| schema_filter: If set, restrict to codes belonging to this schema. |
| min_count: Skip pairs with co-occurrence below this threshold. |
| |
| Returns: |
| Dict with `codes` (sorted code list), `pairs` |
| ({code_a, code_b, count}), and `n_instances` for context. |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| ism = get_item_state_manager() |
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| codes_per_instance: Dict[str, set] = {} |
| for item in ism.items(): |
| instance_id = item.get_id() |
| codes: set = set() |
| for username in users: |
| user_state = usm.get_user_state(username) |
| if not user_state: |
| continue |
| all_anns = user_state.get_all_annotations() |
| if instance_id not in all_anns: |
| continue |
| instance_anns = all_anns[instance_id] |
| labels = instance_anns.get("labels", {}) or {} |
| for label, value in labels.items(): |
| schema_name = self._schema_for_label_key(label) |
| if schema_filter and schema_name != schema_filter: |
| continue |
| for code in self._labels_from_value(value): |
| codes.add(f"{schema_name}::{code}") |
| spans = instance_anns.get("spans", {}) or {} |
| for schema_name, span_list in spans.items(): |
| if schema_filter and schema_name != schema_filter: |
| continue |
| for span in span_list or []: |
| code = span.get("label") or span.get("annotation") |
| if code: |
| codes.add(f"{schema_name}::{code}") |
| if codes: |
| codes_per_instance[instance_id] = codes |
|
|
| pair_counts: Dict[Tuple[str, str], int] = {} |
| for codes in codes_per_instance.values(): |
| sorted_codes = sorted(codes) |
| for i in range(len(sorted_codes)): |
| for j in range(i + 1, len(sorted_codes)): |
| key = (sorted_codes[i], sorted_codes[j]) |
| pair_counts[key] = pair_counts.get(key, 0) + 1 |
|
|
| pairs = [ |
| {"code_a": a, "code_b": b, "count": c} |
| for (a, b), c in pair_counts.items() if c >= min_count |
| ] |
| pairs.sort(key=lambda x: x["count"], reverse=True) |
|
|
| all_codes = sorted({ |
| code for codes in codes_per_instance.values() for code in codes |
| }) |
|
|
| return { |
| "codes": all_codes, |
| "pairs": pairs, |
| "n_instances": len(codes_per_instance), |
| "n_pairs": len(pairs), |
| "schema_filter": schema_filter, |
| "min_count": min_count, |
| } |
| except Exception as e: |
| self.logger.error(f"Error computing co-occurrence: {e}") |
| return {"error": f"Failed to compute co-occurrence: {str(e)}"}, 500 |
|
|
| def get_code_crosstab(self, attribute_key: str, |
| schema_filter: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Compute a codes-by-instance-attribute crosstab. |
| |
| Each instance contributes one row to the count of (code, attribute_value); |
| multiple annotators applying the same code count once per instance. |
| |
| Args: |
| attribute_key: Name of the item-metadata field to use as the column axis |
| (e.g. "site", "condition", "language"). |
| schema_filter: If set, restrict to codes belonging to this schema. |
| |
| Returns: |
| Dict with `codes` (row labels), `values` (column labels), |
| `cells` ({code, value, count}), and totals. |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
| if not attribute_key: |
| return {"error": "attribute_key is required"}, 400 |
|
|
| try: |
| ism = get_item_state_manager() |
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| cell_counts: Dict[Tuple[str, str], int] = {} |
| values_seen: set = set() |
| codes_seen: set = set() |
| n_instances_with_attr = 0 |
|
|
| |
| |
| |
| |
| cb_task_dir = config.get("task_dir", ".") |
| cb_project = config.get("annotation_task_name") or "default" |
|
|
| for item in ism.items(): |
| instance_id = item.get_id() |
| item_data = self._get_item_data(item) |
| attr_value = item_data.get(attribute_key) |
| if attr_value is None or attr_value == "": |
| try: |
| from potato.cases import attribute_for_instance |
| attr_value = attribute_for_instance( |
| cb_task_dir, cb_project, instance_id, |
| attribute_key) |
| except Exception: |
| attr_value = None |
| if attr_value is None or attr_value == "": |
| continue |
| attr_value = str(attr_value) |
| values_seen.add(attr_value) |
| n_instances_with_attr += 1 |
|
|
| codes: set = set() |
| for username in users: |
| user_state = usm.get_user_state(username) |
| if not user_state: |
| continue |
| all_anns = user_state.get_all_annotations() |
| if instance_id not in all_anns: |
| continue |
| instance_anns = all_anns[instance_id] |
| labels = instance_anns.get("labels", {}) or {} |
| for label, value in labels.items(): |
| schema_name = self._schema_for_label_key(label) |
| if schema_filter and schema_name != schema_filter: |
| continue |
| for code in self._labels_from_value(value): |
| codes.add(f"{schema_name}::{code}") |
| spans = instance_anns.get("spans", {}) or {} |
| for schema_name, span_list in spans.items(): |
| if schema_filter and schema_name != schema_filter: |
| continue |
| for span in span_list or []: |
| code = span.get("label") or span.get("annotation") |
| if code: |
| codes.add(f"{schema_name}::{code}") |
|
|
| for code in codes: |
| codes_seen.add(code) |
| key = (code, attr_value) |
| cell_counts[key] = cell_counts.get(key, 0) + 1 |
|
|
| cells = [ |
| {"code": code, "value": value, "count": count} |
| for (code, value), count in cell_counts.items() |
| ] |
| cells.sort(key=lambda x: (x["code"], x["value"])) |
|
|
| return { |
| "codes": sorted(codes_seen), |
| "values": sorted(values_seen), |
| "cells": cells, |
| "n_instances": n_instances_with_attr, |
| "attribute_key": attribute_key, |
| "schema_filter": schema_filter, |
| } |
| except Exception as e: |
| self.logger.error(f"Error computing crosstab: {e}") |
| return {"error": f"Failed to compute crosstab: {str(e)}"}, 500 |
|
|
| @staticmethod |
| def _schema_for_label_key(label_key) -> str: |
| """Extract schema name from a label key in user_state annotations.""" |
| if hasattr(label_key, "schema"): |
| return label_key.schema |
| if hasattr(label_key, "get_schema"): |
| return label_key.get_schema() |
| return str(label_key) |
|
|
| @staticmethod |
| def _labels_from_value(value) -> List[str]: |
| """Pull individual code names out of an annotation value blob.""" |
| if value is None or value == "": |
| return [] |
| if isinstance(value, dict): |
| return [k for k, v in value.items() if v] |
| if isinstance(value, list): |
| return [str(x) for x in value] |
| return [str(value)] |
|
|
| @staticmethod |
| def _get_item_data(item) -> dict: |
| """Return the raw data dict for an ItemStateManager item.""" |
| for attr in ("data", "_data", "item_data"): |
| data = getattr(item, attr, None) |
| if isinstance(data, dict): |
| return data |
| if hasattr(item, "to_dict"): |
| try: |
| return item.to_dict() |
| except Exception: |
| pass |
| return {} |
|
|
| def get_quality_control_data(self) -> Dict[str, Any]: |
| """ |
| Get quality control metrics (attention checks, gold standards, pre-annotation). |
| |
| Returns: |
| Dict containing quality control metrics |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| qc_manager = get_quality_control_manager() |
|
|
| if not qc_manager: |
| return { |
| "enabled": False, |
| "message": "Quality control not configured" |
| } |
|
|
| metrics = qc_manager.get_quality_metrics() |
| return { |
| "enabled": True, |
| **metrics |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting quality control data: {e}") |
| return {"error": f"Failed to get quality control data: {str(e)}"}, 500 |
|
|
| def _behavioral_sequence(self, value: Any) -> list: |
| """Normalize behavioral list-like values to a safe list.""" |
| if value is None: |
| return [] |
| if isinstance(value, list): |
| return value |
| if isinstance(value, tuple): |
| return list(value) |
| return [value] |
|
|
| def _behavioral_field(self, payload: Any, field_name: str, default: Any = None) -> Any: |
| """Read a field from either a dict or an object used in behavioral analytics.""" |
| if isinstance(payload, dict): |
| return payload.get(field_name, default) |
| return getattr(payload, field_name, default) |
|
|
| def get_behavioral_analytics_data(self) -> Dict[str, Any]: |
| """ |
| Get comprehensive behavioral analytics data for all annotators. |
| |
| Returns: |
| Dict containing behavioral analytics metrics including: |
| - Per-user statistics (time, interactions, AI usage) |
| - Aggregate statistics |
| - Quality indicators |
| - AI assistance analysis |
| """ |
| if not self.check_admin_access(): |
| return {"error": "Admin access required"}, 403 |
|
|
| try: |
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| user_stats = [] |
| ai_usage_total = {'requests': 0, 'accepts': 0, 'rejects': 0, 'decision_times': []} |
| all_times = [] |
| interaction_counts = Counter() |
| change_sources = Counter() |
| total_interactions = 0 |
| total_changes = 0 |
| total_ai_requests = 0 |
| users_with_fast_annotations = 0 |
| users_with_low_interaction = 0 |
| users_with_no_changes = 0 |
|
|
| for user_id in users: |
| user_state = usm.get_user_state(user_id) |
| if not user_state: |
| continue |
|
|
| behavioral_data = user_state.instance_id_to_behavioral_data |
| if not behavioral_data: |
| continue |
|
|
| user_times = [] |
| user_interactions = 0 |
| user_changes = 0 |
| user_ai_requests = 0 |
| user_ai_accepts = 0 |
| user_fast_count = 0 |
| user_low_interaction_count = 0 |
| user_no_scroll_count = 0 |
| user_no_change_count = 0 |
|
|
| for instance_id, bd in behavioral_data.items(): |
| time_ms = self._behavioral_field(bd, 'total_time_ms', 0) or 0 |
| time_sec = time_ms / 1000 |
| user_times.append(time_sec) |
| all_times.append(time_sec) |
|
|
| if time_sec < 5: |
| user_fast_count += 1 |
|
|
| interactions = self._behavioral_sequence(self._behavioral_field(bd, 'interactions', [])) |
| user_interactions += len(interactions) |
| total_interactions += len(interactions) |
| if len(interactions) < 3: |
| user_low_interaction_count += 1 |
|
|
| for event in interactions: |
| event_type = self._behavioral_field(event, 'event_type', 'unknown') |
| interaction_counts[event_type] += 1 |
|
|
| scroll = self._behavioral_field(bd, 'scroll_depth_max', 0) or 0 |
| if scroll < 25: |
| user_no_scroll_count += 1 |
|
|
| changes = self._behavioral_sequence(self._behavioral_field(bd, 'annotation_changes', [])) |
| user_changes += len(changes) |
| total_changes += len(changes) |
| if len(changes) == 0: |
| user_no_change_count += 1 |
|
|
| for change in changes: |
| source = self._behavioral_field(change, 'source', 'user') |
| change_sources[source] += 1 |
|
|
| ai_events = self._behavioral_sequence(self._behavioral_field(bd, 'ai_usage', [])) |
| for ai in ai_events: |
| user_ai_requests += 1 |
| total_ai_requests += 1 |
| ai_usage_total['requests'] += 1 |
|
|
| accepted = self._behavioral_field(ai, 'suggestion_accepted', None) |
| if accepted: |
| user_ai_accepts += 1 |
| ai_usage_total['accepts'] += 1 |
| else: |
| ai_usage_total['rejects'] += 1 |
|
|
| decision_time = self._behavioral_field(ai, 'time_to_decision_ms', None) |
| if isinstance(decision_time, (int, float)): |
| ai_usage_total['decision_times'].append(decision_time) |
|
|
| total_instances = len(behavioral_data) |
| if total_instances > 0: |
| fast_rate = user_fast_count / total_instances |
| low_interaction_rate = user_low_interaction_count / total_instances |
| no_scroll_rate = user_no_scroll_count / total_instances |
| no_change_rate = user_no_change_count / total_instances |
| suspicion_score = fast_rate * 0.3 + low_interaction_rate * 0.35 + no_scroll_rate * 0.2 + no_change_rate * 0.15 |
|
|
| if user_fast_count > 0: |
| users_with_fast_annotations += 1 |
| if user_low_interaction_count > 0: |
| users_with_low_interaction += 1 |
| if user_no_change_count > 0: |
| users_with_no_changes += 1 |
|
|
| user_stats.append({ |
| 'user_id': user_id, |
| 'total_instances': total_instances, |
| 'total_time_sec': sum(user_times), |
| 'avg_time_sec': sum(user_times) / len(user_times) if user_times else 0, |
| 'min_time_sec': min(user_times) if user_times else 0, |
| 'max_time_sec': max(user_times) if user_times else 0, |
| 'total_interactions': user_interactions, |
| 'avg_interactions': user_interactions / total_instances, |
| 'total_changes': user_changes, |
| 'avg_changes': user_changes / total_instances, |
| 'ai_requests': user_ai_requests, |
| 'ai_accepts': user_ai_accepts, |
| 'ai_accept_rate': (user_ai_accepts / user_ai_requests) if user_ai_requests > 0 else None, |
| 'fast_annotation_rate': fast_rate, |
| 'low_interaction_rate': low_interaction_rate, |
| 'no_scroll_rate': no_scroll_rate, |
| 'no_change_rate': no_change_rate, |
| 'suspicion_score': suspicion_score, |
| 'quality_flag': 'SUSPICIOUS' if suspicion_score > 0.5 else 'WARNING' if suspicion_score > 0.3 else 'OK' |
| }) |
|
|
| aggregate = { |
| 'total_users_with_data': len(user_stats), |
| 'total_instances': sum(u['total_instances'] for u in user_stats), |
| 'total_time_minutes': sum(u['total_time_sec'] for u in user_stats) / 60, |
| 'avg_time_per_instance': sum(all_times) / len(all_times) if all_times else 0, |
| 'median_time_per_instance': sorted(all_times)[len(all_times)//2] if all_times else 0, |
| } |
| aggregate_stats = { |
| 'total_users': len(user_stats), |
| 'total_instances': aggregate['total_instances'], |
| 'avg_time_per_instance_sec': aggregate['avg_time_per_instance'], |
| 'total_interactions': total_interactions, |
| 'total_changes': total_changes, |
| 'total_ai_requests': total_ai_requests, |
| } |
|
|
| ai_summary = { |
| 'total_requests': ai_usage_total['requests'], |
| 'total_accepts': ai_usage_total['accepts'], |
| 'total_rejects': ai_usage_total['rejects'], |
| 'accept_rate': (ai_usage_total['accepts'] / ai_usage_total['requests']) if ai_usage_total['requests'] > 0 else 0, |
| 'avg_decision_time_ms': sum(ai_usage_total['decision_times']) / len(ai_usage_total['decision_times']) if ai_usage_total['decision_times'] else 0 |
| } |
|
|
| flagged_users = [u for u in user_stats if u['quality_flag'] == 'SUSPICIOUS'] |
| warning_users = [u for u in user_stats if u['quality_flag'] == 'WARNING'] |
| total_users_with_data = len(user_stats) |
| quality_summary = { |
| 'total_flagged': len(flagged_users), |
| 'total_warnings': len(warning_users), |
| 'flagged_user_ids': [u['user_id'] for u in flagged_users], |
| 'warning_user_ids': [u['user_id'] for u in warning_users], |
| 'high_suspicion_users': len(flagged_users), |
| 'fast_annotation_rate': (users_with_fast_annotations / total_users_with_data) if total_users_with_data > 0 else 0, |
| 'low_interaction_rate': (users_with_low_interaction / total_users_with_data) if total_users_with_data > 0 else 0, |
| 'no_change_rate': (users_with_no_changes / total_users_with_data) if total_users_with_data > 0 else 0, |
| } |
|
|
| return { |
| 'aggregate': aggregate, |
| 'aggregate_stats': aggregate_stats, |
| 'ai_usage': ai_summary, |
| 'quality_summary': quality_summary, |
| 'interaction_types': dict(interaction_counts.most_common(20)), |
| 'change_sources': dict(change_sources), |
| 'users': sorted(user_stats, key=lambda x: -x['suspicion_score']) |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting behavioral analytics data: {e}") |
| import traceback |
| traceback.print_exc() |
| return {"error": f"Failed to get behavioral analytics data: {str(e)}"}, 500 |
|
|
| def get_adjudication_overview(self) -> Dict[str, Any]: |
| """ |
| Get an overview of adjudication status for the admin dashboard. |
| |
| Returns: |
| Dict with queue stats, adjudicator stats, error taxonomy, |
| guideline flags, disagreement patterns, and similarity stats. |
| """ |
| from potato.adjudication import get_adjudication_manager |
|
|
| adj_mgr = get_adjudication_manager() |
| if not adj_mgr or not adj_mgr.adj_config.enabled: |
| return {"enabled": False, "message": "Adjudication not configured"} |
|
|
| try: |
| |
| queue_stats = adj_mgr.get_stats() |
|
|
| |
| error_counts = Counter() |
| guideline_flag_count = 0 |
| for decision in adj_mgr.decisions.values(): |
| for tag in decision.error_taxonomy: |
| error_counts[tag] += 1 |
| if decision.guideline_update_flag: |
| guideline_flag_count += 1 |
|
|
| |
| adjudicator_details = {} |
| for adj_id, stats in queue_stats.get("adjudicator_stats", {}).items(): |
| completed = stats.get("completed", 0) |
| total_time = stats.get("total_time_ms", 0) |
| adjudicator_details[adj_id] = { |
| "completed": completed, |
| "total_time_ms": total_time, |
| "avg_time_ms": ( |
| round(total_time / completed) if completed > 0 else 0 |
| ), |
| } |
|
|
| |
| disagreement_patterns = self._analyze_disagreement_patterns(adj_mgr) |
|
|
| |
| similarity_stats = {} |
| if adj_mgr.similarity_engine: |
| similarity_stats = adj_mgr.similarity_engine.get_stats() |
|
|
| return { |
| "enabled": True, |
| "queue_stats": queue_stats, |
| "adjudicator_details": adjudicator_details, |
| "error_taxonomy_counts": dict(error_counts.most_common()), |
| "guideline_flag_count": guideline_flag_count, |
| "disagreement_patterns": disagreement_patterns, |
| "similarity_stats": similarity_stats, |
| } |
|
|
| except Exception as e: |
| self.logger.error(f"Error getting adjudication overview: {e}") |
| return {"enabled": True, "error": str(e)} |
|
|
| def _analyze_disagreement_patterns(self, adj_mgr) -> List[Dict[str, Any]]: |
| """ |
| Analyze per-schema disagreement patterns across the queue. |
| |
| Returns: |
| List of dicts sorted by worst agreement first, with schema name |
| and average agreement score. |
| """ |
| from collections import defaultdict |
|
|
| schema_scores = defaultdict(list) |
|
|
| for item in adj_mgr.queue.values(): |
| for schema_name, score in item.agreement_scores.items(): |
| schema_scores[schema_name].append(score) |
|
|
| patterns = [] |
| for schema_name, scores in schema_scores.items(): |
| avg = sum(scores) / len(scores) if scores else 1.0 |
| patterns.append({ |
| "schema": schema_name, |
| "avg_agreement": round(avg, 3), |
| "num_items": len(scores), |
| }) |
|
|
| patterns.sort(key=lambda x: x["avg_agreement"]) |
| return patterns |
|
|
| |
| |
| |
|
|
| def get_mace_overview(self) -> Dict[str, Any]: |
| """Get MACE competence estimation overview for admin dashboard. |
| |
| Returns: |
| Dict with competence scores, schema summaries, and config. |
| """ |
| from potato.mace_manager import get_mace_manager |
|
|
| mace_mgr = get_mace_manager() |
| if not mace_mgr or not mace_mgr.mace_config.enabled: |
| return {"enabled": False, "message": "MACE not configured"} |
|
|
| return mace_mgr.get_results_summary() |
|
|
| def get_mace_predictions( |
| self, schema: str, instance_id: str = None |
| ) -> Dict[str, Any]: |
| """Get MACE predicted labels for a schema, optionally filtered by instance. |
| |
| Args: |
| schema: Schema name to get predictions for. |
| instance_id: Optional specific instance to filter. |
| |
| Returns: |
| Dict with predictions and entropy data. |
| """ |
| from potato.mace_manager import get_mace_manager |
|
|
| mace_mgr = get_mace_manager() |
| if not mace_mgr or not mace_mgr.mace_config.enabled: |
| return {"error": "MACE not configured"} |
|
|
| return mace_mgr.get_predictions_for_schema(schema, instance_id) |
|
|
| def _calculate_label_statistics(self, instance_id: str) -> Tuple[Optional[str], float]: |
| """ |
| Calculate most frequent label and disagreement for an instance. |
| |
| Args: |
| instance_id: The instance ID to analyze |
| |
| Returns: |
| Tuple of (most_frequent_label, disagreement_score) |
| """ |
| try: |
| usm = get_user_state_manager() |
| users = get_users() |
|
|
| all_labels = [] |
| for username in users: |
| user_state = usm.get_user_state(username) |
| if user_state: |
| annotations = user_state.get_all_annotations() |
| if instance_id in annotations: |
| instance_annotations = annotations[instance_id] |
| if "labels" in instance_annotations: |
| for label, value in instance_annotations["labels"].items(): |
| if hasattr(label, 'label_name'): |
| all_labels.append(label.label_name) |
| else: |
| all_labels.append(str(value)) |
|
|
| if not all_labels: |
| return None, 0.0 |
|
|
| label_counts = Counter(all_labels) |
| most_frequent_label = label_counts.most_common(1)[0][0] |
| total_annotations = len(all_labels) |
| most_frequent_count = label_counts[most_frequent_label] |
| disagreement = 1 - (most_frequent_count / total_annotations) |
|
|
| return most_frequent_label, disagreement |
|
|
| except Exception as e: |
| self.logger.error(f"Error calculating label statistics for instance {instance_id}: {e}") |
| return None, 0.0 |
|
|
|
|
| |
| admin_dashboard = AdminDashboard() |
|
|