Spaces:
Paused
Paused
| """ | |
| Admin Dashboard Module | |
| This module provides comprehensive admin functionality for the annotation platform, | |
| including dashboard data generation, timing analysis, and configuration management. | |
| The admin dashboard offers: | |
| - Real-time overview of annotation progress and statistics | |
| - Detailed annotator performance metrics and timing analysis | |
| - Instance-level annotation tracking and disagreement analysis | |
| - Configuration management and system state monitoring | |
| - Question and annotation scheme analysis | |
| - User progress tracking and completion statistics | |
| - Comprehensive annotation history tracking and suspicious activity detection | |
| - Performance metrics and quality assurance monitoring | |
| - Session tracking and behavioral analysis | |
| Key Components: | |
| - AdminDashboard: Main class for admin functionality | |
| - AnnotatorTimingData: Data class for annotator timing information | |
| - InstanceData: Data class for instance information and statistics | |
| - Dashboard data generation and analysis functions | |
| - Configuration update and management functions | |
| - AnnotationHistoryAnalyzer: Advanced history analysis and suspicious activity detection | |
| The dashboard provides insights into: | |
| - Overall annotation progress and completion rates | |
| - Individual annotator performance and efficiency | |
| - Annotation quality through disagreement analysis | |
| - System configuration and operational status | |
| - Real-time monitoring of active annotation sessions | |
| - Fine-grained annotation timing and behavioral patterns | |
| - Suspicious activity detection and quality assurance | |
| - Session-based performance analysis | |
| Access Control: | |
| - Admin access is controlled via API key authentication | |
| - Debug mode allows admin access without API key | |
| - All admin endpoints require proper authentication | |
| """ | |
| import json | |
| import logging | |
| import datetime | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from collections import defaultdict, Counter | |
| from dataclasses import dataclass | |
| from flask import request, jsonify, session | |
| from potato.flask_server import ( | |
| config, logger, get_user_state_manager, get_item_state_manager, | |
| get_users, get_total_annotations | |
| ) | |
| from potato.annotation_history import AnnotationHistoryManager, AnnotationAction | |
| from potato.quality_control import get_quality_control_manager | |
| class AnnotatorTimingData: | |
| """ | |
| Data class for annotator timing information. | |
| This class encapsulates timing metrics for individual annotators, | |
| including total annotations, working time, and performance statistics. | |
| Now enhanced with annotation history tracking and suspicious activity detection. | |
| """ | |
| user_id: str | |
| total_annotations: int | |
| total_seconds: int | |
| average_seconds_per_annotation: float | |
| last_activity: Optional[datetime.datetime] | |
| current_instance_time: Optional[int] | |
| annotations_per_hour: float | |
| phase: str | |
| has_assignments: bool | |
| remaining_assignments: bool | |
| # Annotation history metrics | |
| total_actions: int | |
| average_action_time_ms: float | |
| fastest_action_time_ms: int | |
| slowest_action_time_ms: int | |
| actions_per_minute: float | |
| suspicious_score: float | |
| suspicious_level: str | |
| fast_actions_count: int | |
| burst_actions_count: int | |
| session_start_time: Optional[datetime.datetime] | |
| current_session_duration_minutes: Optional[float] | |
| recent_actions_count: int # Actions in last 5 minutes | |
| # Training metrics | |
| training_completed: bool | |
| training_correct_answers: int | |
| training_total_attempts: int | |
| training_pass_rate: float | |
| training_current_question: int | |
| training_total_questions: int | |
| class InstanceData: | |
| """ | |
| Data class for instance information. | |
| This class encapsulates information about annotation instances, | |
| including annotation counts, disagreement scores, and annotator lists. | |
| """ | |
| id: str | |
| text: str | |
| displayed_text: str | |
| annotation_count: int | |
| completion_percentage: float | |
| most_frequent_label: Optional[str] | |
| label_disagreement: float | |
| annotators: List[str] | |
| num_ai_instance: int | |
| average_time_per_annotation: Optional[float] | |
| class AdminDashboard: | |
| """ | |
| Main class for admin dashboard functionality. | |
| This class provides comprehensive admin features including dashboard | |
| data generation, timing analysis, configuration management, and | |
| system monitoring capabilities. | |
| """ | |
| def __init__(self): | |
| """Initialize the admin dashboard.""" | |
| self.logger = logging.getLogger(__name__) | |
| def check_admin_access(self) -> bool: | |
| """ | |
| Check if the current request has admin access via API key. | |
| Validates against all key sources (config, env var, auto-generated file) | |
| and accepts keys from X-API-Key header or session. | |
| Returns: | |
| bool: True if admin access is granted, False otherwise | |
| """ | |
| from potato.server_utils.admin_key import validate_admin_api_key | |
| api_key = request.headers.get('X-API-Key') or session.get('admin_api_key') | |
| return validate_admin_api_key(api_key, config) | |
| def get_dashboard_overview(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive dashboard overview data. | |
| This method generates a complete overview of the annotation system, | |
| including user statistics, annotation progress, and system configuration. | |
| Returns: | |
| Dict containing overview statistics with the following structure: | |
| - overview: User counts, annotation counts, completion percentages | |
| - config: System configuration and settings | |
| Side Effects: | |
| - Logs errors if data generation fails | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| usm = get_user_state_manager() | |
| ism = get_item_state_manager() | |
| # Get all users and their states | |
| users = get_users() | |
| total_annotations = get_total_annotations() | |
| # Calculate user statistics | |
| active_users = 0 | |
| completed_users = 0 | |
| total_working_time = 0 | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if user_state: | |
| if user_state.get_phase().value == "ANNOTATION": | |
| active_users += 1 | |
| elif user_state.get_phase().value == "DONE": | |
| completed_users += 1 | |
| # Get timing data | |
| timing_data = self._get_annotator_timing_data(username) | |
| if timing_data: | |
| total_working_time += timing_data.total_seconds | |
| # Get item statistics | |
| items = ism.items() | |
| items_with_annotations = 0 | |
| total_assignments = 0 | |
| for item in items: | |
| item_id = item.get_id() | |
| annotators = ism.get_annotators_for_item(item_id) | |
| if annotators: | |
| items_with_annotations += 1 | |
| total_assignments += len(annotators) | |
| # Calculate completion percentages | |
| total_items = len(items) | |
| completion_percentage = (items_with_annotations / total_items * 100) if total_items > 0 else 0 | |
| # Format total working time | |
| hours = total_working_time // 3600 | |
| minutes = (total_working_time % 3600) // 60 | |
| formatted_time = f"{hours}h {minutes}m" | |
| return { | |
| "overview": { | |
| "total_users": len(users), | |
| "active_users": active_users, | |
| "completed_users": completed_users, | |
| "total_annotations": total_annotations, | |
| "total_items": total_items, | |
| "items_with_annotations": items_with_annotations, | |
| "completion_percentage": round(completion_percentage, 1), | |
| "total_assignments": total_assignments, | |
| "total_working_time": formatted_time, | |
| "average_annotations_per_item": round(total_annotations / total_items, 1) if total_items > 0 else 0 | |
| }, | |
| "config": { | |
| "annotation_task_name": config.get("annotation_task_name", "Unknown"), | |
| "max_annotations_per_user": config.get("max_annotations_per_user", "Unlimited"), | |
| "max_annotations_per_item": config.get("max_annotations_per_item", "Unlimited"), | |
| "assignment_strategy": config.get("assignment_strategy", "fixed_order"), | |
| "debug_mode": config.get("debug", False) | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting dashboard overview: {e}") | |
| return {"error": f"Failed to get dashboard overview: {str(e)}"}, 500 | |
| def get_annotators_data(self) -> Dict[str, Any]: | |
| """ | |
| Get detailed annotator data including timing information. | |
| Returns: | |
| Dict containing annotator data with timing analysis | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| annotators_data = [] | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if user_state: | |
| timing_data = self._get_annotator_timing_data(username) | |
| if timing_data: | |
| annotators_data.append({ | |
| "user_id": timing_data.user_id, | |
| "total_annotations": timing_data.total_annotations, | |
| "completion_percentage": self._calculate_completion_percentage(timing_data.user_id), | |
| "total_seconds": timing_data.total_seconds, | |
| "average_seconds_per_annotation": timing_data.average_seconds_per_annotation, | |
| "annotations_per_hour": timing_data.annotations_per_hour, | |
| "phase": timing_data.phase, | |
| "has_assignments": timing_data.has_assignments, | |
| "remaining_assignments": timing_data.remaining_assignments, | |
| "max_assignments": user_state.get_max_assignments(), | |
| "last_activity": timing_data.last_activity.isoformat() if timing_data.last_activity else None, | |
| "current_instance_time": timing_data.current_instance_time, | |
| # NEW: Annotation history metrics | |
| "total_actions": timing_data.total_actions, | |
| "average_action_time_ms": timing_data.average_action_time_ms, | |
| "fastest_action_time_ms": timing_data.fastest_action_time_ms if timing_data.fastest_action_time_ms != float('inf') else None, | |
| "slowest_action_time_ms": timing_data.slowest_action_time_ms, | |
| "actions_per_minute": timing_data.actions_per_minute, | |
| "suspicious_score": timing_data.suspicious_score, | |
| "suspicious_level": timing_data.suspicious_level, | |
| "fast_actions_count": timing_data.fast_actions_count, | |
| "burst_actions_count": timing_data.burst_actions_count, | |
| "session_start_time": timing_data.session_start_time.isoformat() if timing_data.session_start_time else None, | |
| "current_session_duration_minutes": timing_data.current_session_duration_minutes, | |
| "recent_actions_count": timing_data.recent_actions_count, | |
| # Training metrics | |
| "training_completed": timing_data.training_completed, | |
| "training_correct_answers": timing_data.training_correct_answers, | |
| "training_total_attempts": timing_data.training_total_attempts, | |
| "training_pass_rate": round(timing_data.training_pass_rate, 2), | |
| "training_current_question": timing_data.training_current_question, | |
| "training_total_questions": timing_data.training_total_questions | |
| }) | |
| # Sort by suspicious score (highest first) | |
| annotators_data.sort(key=lambda x: x["suspicious_score"], reverse=True) | |
| return { | |
| "total_annotators": len(annotators_data), | |
| "annotators": annotators_data, | |
| "summary": { | |
| "high_suspicious_count": len([a for a in annotators_data if a["suspicious_level"] in ["High", "Very High"]]), | |
| "medium_suspicious_count": len([a for a in annotators_data if a["suspicious_level"] == "Medium"]), | |
| "low_suspicious_count": len([a for a in annotators_data if a["suspicious_level"] == "Low"]), | |
| "normal_count": len([a for a in annotators_data if a["suspicious_level"] == "Normal"]), | |
| "average_suspicious_score": sum(a["suspicious_score"] for a in annotators_data) / len(annotators_data) if annotators_data else 0 | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting annotators data: {e}") | |
| return {"error": f"Failed to get annotators data: {str(e)}"}, 500 | |
| def get_annotation_history_data(self, user_id: Optional[str] = None, | |
| instance_id: Optional[str] = None, | |
| minutes: Optional[int] = None) -> Dict[str, Any]: | |
| """ | |
| Get detailed annotation history data with filtering options. | |
| Args: | |
| user_id: Optional user ID to filter by | |
| instance_id: Optional instance ID to filter by | |
| minutes: Optional time window in minutes | |
| Returns: | |
| Dict containing annotation history data | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| usm = get_user_state_manager() | |
| if user_id: | |
| # Get history for specific user | |
| user_state = usm.get_user_state(user_id) | |
| if not user_state: | |
| return {"error": f"User {user_id} not found"}, 404 | |
| actions = user_state.get_annotation_history(instance_id) | |
| if minutes: | |
| actions = user_state.get_recent_actions(minutes) | |
| return self._format_annotation_history(actions, user_id) | |
| else: | |
| # Get history for all users | |
| all_actions = [] | |
| users = get_users() | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if user_state: | |
| user_actions = user_state.get_annotation_history(instance_id) | |
| if minutes: | |
| user_actions = user_state.get_recent_actions(minutes) | |
| all_actions.extend(user_actions) | |
| return self._format_annotation_history(all_actions, "all_users") | |
| except Exception as e: | |
| self.logger.error(f"Error getting annotation history data: {e}") | |
| return {"error": f"Failed to get annotation history data: {str(e)}"}, 500 | |
| def get_suspicious_activity_data(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive suspicious activity analysis. | |
| Returns: | |
| Dict containing suspicious activity data | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| suspicious_data = [] | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if user_state: | |
| suspicious_actions = user_state.get_suspicious_activity() | |
| if suspicious_actions: | |
| suspicious_data.append({ | |
| "user_id": username, | |
| "suspicious_actions_count": len(suspicious_actions), | |
| "suspicious_actions": [ | |
| { | |
| "action_id": action.action_id, | |
| "timestamp": action.timestamp.isoformat(), | |
| "instance_id": action.instance_id, | |
| "action_type": action.action_type, | |
| "schema_name": action.schema_name, | |
| "label_name": action.label_name, | |
| "server_processing_time_ms": action.server_processing_time_ms, | |
| "session_id": action.session_id | |
| } | |
| for action in suspicious_actions[:10] # Limit to 10 most recent | |
| ] | |
| }) | |
| return { | |
| "total_users_with_suspicious_activity": len(suspicious_data), | |
| "suspicious_activity": suspicious_data | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting suspicious activity data: {e}") | |
| return {"error": f"Failed to get suspicious activity data: {str(e)}"}, 500 | |
| def get_instances_data(self, page: int = 1, page_size: int = 25, | |
| sort_by: str = "annotation_count", sort_order: str = "desc", | |
| filter_completion: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Get paginated instances data with sorting and filtering. | |
| Args: | |
| page: Page number (1-based) | |
| page_size: Number of instances per page | |
| sort_by: Field to sort by (annotation_count, completion_percentage, disagreement, id) | |
| sort_order: Sort order (asc, desc) | |
| filter_completion: Filter by completion status (completed, incomplete, all) | |
| Returns: | |
| Dict containing paginated instances data | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| ism = get_item_state_manager() | |
| items = ism.items() | |
| # Convert items to InstanceData objects | |
| instances_data = [] | |
| for item in items: | |
| item_id = item.get_id() | |
| annotators = ism.get_annotators_for_item(item_id) | |
| annotation_count = len(annotators) if annotators else 0 | |
| # Calculate completion percentage | |
| max_annotations = config.get("max_annotations_per_item", -1) | |
| if max_annotations > 0: | |
| completion_percentage = min(100, (annotation_count / max_annotations) * 100) | |
| else: | |
| completion_percentage = 100 if annotation_count > 0 else 0 | |
| # Calculate most frequent label and disagreement | |
| most_frequent_label, disagreement = self._calculate_label_statistics(item_id) | |
| # Calculate average time per annotation | |
| avg_time = self._calculate_average_time_per_annotation(item_id) | |
| instance_data = InstanceData( | |
| id=item_id, | |
| text=item.get_text(), | |
| displayed_text=item.get_displayed_text(), | |
| annotation_count=annotation_count, | |
| completion_percentage=completion_percentage, | |
| most_frequent_label=most_frequent_label, | |
| label_disagreement=disagreement, | |
| annotators=list(annotators) if annotators else [], | |
| average_time_per_annotation=avg_time, | |
| num_ai_instance=self._calculate_total_instance_ai(item_id) | |
| ) | |
| instances_data.append(instance_data) | |
| # Apply filters | |
| if filter_completion == "completed": | |
| instances_data = [i for i in instances_data if i.completion_percentage >= 100] | |
| elif filter_completion == "incomplete": | |
| instances_data = [i for i in instances_data if i.completion_percentage < 100] | |
| # Apply sorting | |
| reverse = sort_order.lower() == "desc" | |
| if sort_by == "annotation_count": | |
| instances_data.sort(key=lambda x: x.annotation_count, reverse=reverse) | |
| elif sort_by == "completion_percentage": | |
| instances_data.sort(key=lambda x: x.completion_percentage, reverse=reverse) | |
| elif sort_by == "disagreement": | |
| instances_data.sort(key=lambda x: x.label_disagreement, reverse=reverse) | |
| elif sort_by == "id": | |
| instances_data.sort(key=lambda x: x.id, reverse=reverse) | |
| elif sort_by == "average_time": | |
| instances_data.sort(key=lambda x: x.average_time_per_annotation or 0, reverse=reverse) | |
| # Apply pagination | |
| total_instances = len(instances_data) | |
| start_idx = (page - 1) * page_size | |
| end_idx = start_idx + page_size | |
| paginated_instances = instances_data[start_idx:end_idx] | |
| # Convert to serializable format | |
| serialized_instances = [] | |
| for instance in paginated_instances: | |
| serialized_instances.append({ | |
| "id": instance.id, | |
| "text": instance.text[:100] + "..." if len(instance.text) > 100 else instance.text, | |
| "displayed_text": instance.displayed_text[:100] + "..." if len(instance.displayed_text) > 100 else instance.displayed_text, | |
| "annotation_count": instance.annotation_count, | |
| "completion_percentage": round(instance.completion_percentage, 1), | |
| "most_frequent_label": instance.most_frequent_label, | |
| "label_disagreement": round(instance.label_disagreement, 2), | |
| "annotators": instance.annotators, | |
| "num_ai_instance": instance.num_ai_instance, | |
| "average_time_per_annotation": self._format_seconds(instance.average_time_per_annotation) if instance.average_time_per_annotation else None | |
| }) | |
| return { | |
| "instances": serialized_instances, | |
| "pagination": { | |
| "page": page, | |
| "page_size": page_size, | |
| "total_instances": total_instances, | |
| "total_pages": (total_instances + page_size - 1) // page_size, | |
| "has_next": end_idx < total_instances, | |
| "has_prev": page > 1 | |
| }, | |
| "summary": { | |
| "completed_instances": len([i for i in instances_data if i.completion_percentage >= 100]), | |
| "incomplete_instances": len([i for i in instances_data if i.completion_percentage < 100]), | |
| "average_annotations_per_instance": round(sum(i.annotation_count for i in instances_data) / len(instances_data), 1) if instances_data else 0, | |
| "average_disagreement": round(sum(i.label_disagreement for i in instances_data) / len(instances_data), 2) if instances_data else 0 | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting instances data: {e}") | |
| return {"error": f"Failed to get instances data: {str(e)}"}, 500 | |
| def update_config(self, config_updates: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Update system configuration. | |
| Args: | |
| config_updates: Dictionary of configuration updates | |
| Returns: | |
| Dict containing update result | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| # Validate and apply updates | |
| updated_fields = [] | |
| for key, value in config_updates.items(): | |
| if key in ["max_annotations_per_user", "max_annotations_per_item"]: | |
| if isinstance(value, int) and value >= -1: | |
| config[key] = value | |
| updated_fields.append(key) | |
| else: | |
| return {"error": f"Invalid value for {key}: must be integer >= -1"}, 400 | |
| elif key == "assignment_strategy": | |
| valid_strategies = ["random", "fixed_order", "least_annotated", "max_diversity", "active_learning", "llm_confidence"] | |
| if value in valid_strategies: | |
| config[key] = value | |
| updated_fields.append(key) | |
| else: | |
| return {"error": f"Invalid assignment strategy: {value}"}, 400 | |
| return { | |
| "status": "success", | |
| "message": f"Updated configuration fields: {', '.join(updated_fields)}", | |
| "updated_fields": updated_fields | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error updating config: {e}") | |
| return {"error": f"Failed to update config: {str(e)}"}, 500 | |
| def get_questions_data(self) -> Dict[str, Any]: | |
| """ | |
| Get aggregate analysis data for each annotation schema/question. | |
| Returns: | |
| Dict containing questions data with visualizations for different annotation types | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| ism = get_item_state_manager() | |
| annotation_schemes = config.get("annotation_schemes", []) | |
| questions_data = [] | |
| users = get_users() | |
| for scheme in annotation_schemes: | |
| scheme_name = scheme.get("name", "Unknown") | |
| annotation_type = scheme.get("annotation_type", "unknown") | |
| all_annotations = [] | |
| item_annotations = {} | |
| for item in ism.items(): | |
| item_id = item.get_id() | |
| item_annotations[item_id] = [] | |
| for username in users: | |
| user_state = get_user_state_manager().get_user_state(username) | |
| if user_state: | |
| label_annotations = user_state.get_label_annotations(item_id) | |
| for label, value in label_annotations.items(): | |
| label_schema = None | |
| label_name = None | |
| if hasattr(label, 'get_schema'): | |
| label_schema = label.get_schema() | |
| label_name = label.get_name() | |
| elif hasattr(label, 'schema'): | |
| label_schema = label.schema | |
| label_name = getattr(label, 'name', None) | |
| elif isinstance(label, str): | |
| label_schema = label | |
| if label_schema == scheme_name: | |
| normalized_value = label_name if label_name else value | |
| if annotation_type in ["radio", "select"]: | |
| normalized_value = self._normalize_categorical_value(normalized_value) | |
| elif annotation_type == "multiselect" and isinstance(normalized_value, list): | |
| normalized_value = [ | |
| normalized_label | |
| for normalized_label in ( | |
| self._normalize_categorical_value(v) for v in normalized_value | |
| ) | |
| if normalized_label is not None | |
| ] | |
| if normalized_value is not None: | |
| all_annotations.append(normalized_value) | |
| item_annotations[item_id].append(normalized_value) | |
| analysis = self._analyze_annotation_scheme( | |
| annotation_type, scheme, all_annotations, item_annotations | |
| ) | |
| questions_data.append({ | |
| "name": scheme_name, | |
| "type": annotation_type, | |
| "description": scheme.get("description", ""), | |
| "total_annotations": len(all_annotations), | |
| "items_with_annotations": len([item_id for item_id, annotations in item_annotations.items() if annotations]), | |
| "analysis": analysis | |
| }) | |
| return { | |
| "questions": questions_data, | |
| "summary": { | |
| "total_questions": len(questions_data), | |
| "total_annotations": sum(q["total_annotations"] for q in questions_data), | |
| "question_types": list(set(q["type"] for q in questions_data)) | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting questions data: {e}") | |
| return {"error": f"Failed to get questions data: {str(e)}"}, 500 | |
| def _analyze_annotation_scheme(self, annotation_type: str, scheme: dict, | |
| all_annotations: list, item_annotations: dict) -> dict: | |
| """ | |
| Analyze annotations based on their type and generate appropriate visualizations. | |
| """ | |
| if not all_annotations: | |
| return {"error": "No annotations found"} | |
| analysis = { | |
| "type": annotation_type, | |
| "total_count": len(all_annotations) | |
| } | |
| if annotation_type in ["radio", "select"]: | |
| normalized_annotations = [ | |
| normalized for normalized in | |
| (self._normalize_categorical_value(annotation) for annotation in all_annotations) | |
| if normalized is not None | |
| ] | |
| if not normalized_annotations: | |
| return {"error": "No annotations found"} | |
| label_counts = Counter(normalized_annotations) | |
| raw_labels = scheme.get("labels", []) | |
| labels = [ | |
| normalized for normalized in | |
| (self._normalize_categorical_value(label) for label in raw_labels) | |
| if normalized is not None | |
| ] | |
| analysis.update({ | |
| "visualization_type": "histogram", | |
| "data": { | |
| "labels": labels, | |
| "counts": [label_counts.get(label, 0) for label in labels], | |
| "percentages": [round(label_counts.get(label, 0) / len(normalized_annotations) * 100, 1) | |
| for label in labels] | |
| }, | |
| "most_common": label_counts.most_common(1)[0] if label_counts else None, | |
| "agreement_score": self._calculate_agreement_score(item_annotations) | |
| }) | |
| elif annotation_type == "multiselect": | |
| # Multi-label data - show label frequency and co-occurrence | |
| label_counts = Counter() | |
| co_occurrence = defaultdict(int) | |
| labels = scheme.get("labels", []) | |
| for annotations in item_annotations.values(): | |
| if isinstance(annotations, list): | |
| # Count individual labels | |
| for annotation in annotations: | |
| if isinstance(annotation, list): | |
| for label in annotation: | |
| label_counts[label] += 1 | |
| # Count co-occurrences | |
| for i, annotation1 in enumerate(annotations): | |
| if isinstance(annotation1, list): | |
| for j, annotation2 in enumerate(annotations): | |
| if i != j and isinstance(annotation2, list): | |
| for label1 in annotation1: | |
| for label2 in annotation2: | |
| if label1 < label2: | |
| co_occurrence[(label1, label2)] += 1 | |
| analysis.update({ | |
| "visualization_type": "multiselect_analysis", | |
| "data": { | |
| "labels": labels, | |
| "counts": [label_counts.get(label, 0) for label in labels], | |
| "percentages": [round(label_counts.get(label, 0) / len(item_annotations) * 100, 1) | |
| for label in labels], | |
| "co_occurrence": dict(co_occurrence) | |
| }, | |
| "most_common": label_counts.most_common(3) if label_counts else [], | |
| "average_labels_per_item": round(sum(len(ann) if isinstance(ann, list) else 1 | |
| for anns in item_annotations.values() | |
| for ann in anns) / len(all_annotations), 2) | |
| }) | |
| elif annotation_type in ["likert", "number", "slider"]: | |
| # Numeric data - show distribution and statistics | |
| numeric_values = [] | |
| for value in all_annotations: | |
| try: | |
| if isinstance(value, (int, float)): | |
| numeric_values.append(float(value)) | |
| elif isinstance(value, str) and value.replace('.', '').replace('-', '').isdigit(): | |
| numeric_values.append(float(value)) | |
| except (ValueError, TypeError): | |
| continue | |
| if numeric_values: | |
| analysis.update({ | |
| "visualization_type": "distribution", | |
| "data": { | |
| "values": numeric_values, | |
| "bins": self._create_histogram_bins(numeric_values, scheme), | |
| "statistics": { | |
| "mean": round(sum(numeric_values) / len(numeric_values), 2), | |
| "median": round(sorted(numeric_values)[len(numeric_values)//2], 2), | |
| "min": min(numeric_values), | |
| "max": max(numeric_values), | |
| "std": round((sum((x - sum(numeric_values)/len(numeric_values))**2 | |
| for x in numeric_values) / len(numeric_values))**0.5, 2) | |
| } | |
| }, | |
| "range": scheme.get("min", 0) if "min" in scheme else None, | |
| "max": scheme.get("max", 10) if "max" in scheme else None | |
| }) | |
| else: | |
| analysis["error"] = "No valid numeric values found" | |
| elif annotation_type == "text": | |
| # Text data - show length distribution and common patterns | |
| text_lengths = [] | |
| word_counts = [] | |
| common_words = Counter() | |
| for value in all_annotations: | |
| if isinstance(value, str) and value.strip(): | |
| text_lengths.append(len(value)) | |
| words = value.lower().split() | |
| word_counts.append(len(words)) | |
| common_words.update(words) | |
| if text_lengths: | |
| analysis.update({ | |
| "visualization_type": "text_analysis", | |
| "data": { | |
| "lengths": text_lengths, | |
| "word_counts": word_counts, | |
| "common_words": common_words.most_common(10), | |
| "statistics": { | |
| "avg_length": round(sum(text_lengths) / len(text_lengths), 1), | |
| "avg_words": round(sum(word_counts) / len(word_counts), 1), | |
| "min_length": min(text_lengths), | |
| "max_length": max(text_lengths), | |
| "empty_responses": len([v for v in all_annotations | |
| if not isinstance(v, str) or not v.strip()]) | |
| } | |
| } | |
| }) | |
| else: | |
| analysis["error"] = "No valid text responses found" | |
| elif annotation_type == "span": | |
| # Span data - show coverage and overlap statistics | |
| span_counts = [] | |
| total_spans = 0 | |
| for annotations in item_annotations.values(): | |
| if isinstance(annotations, list): | |
| for annotation in annotations: | |
| if isinstance(annotation, list): | |
| span_counts.append(len(annotation)) | |
| total_spans += len(annotation) | |
| if span_counts: | |
| analysis.update({ | |
| "visualization_type": "span_analysis", | |
| "data": { | |
| "span_counts": span_counts, | |
| "total_spans": total_spans, | |
| "statistics": { | |
| "avg_spans_per_item": round(sum(span_counts) / len(span_counts), 2), | |
| "items_with_spans": len([c for c in span_counts if c > 0]), | |
| "max_spans": max(span_counts) if span_counts else 0, | |
| "min_spans": min(span_counts) if span_counts else 0 | |
| } | |
| } | |
| }) | |
| else: | |
| analysis["error"] = "No valid span annotations found" | |
| else: | |
| analysis["error"] = f"Unsupported annotation type: {annotation_type}" | |
| return analysis | |
| def _calculate_agreement_score(self, item_annotations: dict) -> float: | |
| """Calculate agreement score for categorical annotations.""" | |
| if not item_annotations: | |
| return 0.0 | |
| agreement_scores = [] | |
| for annotations in item_annotations.values(): | |
| if len(annotations) > 1: | |
| # Calculate percentage of most common annotation | |
| counter = Counter(annotations) | |
| most_common_count = counter.most_common(1)[0][1] | |
| agreement_scores.append(most_common_count / len(annotations)) | |
| return round(sum(agreement_scores) / len(agreement_scores) * 100, 1) if agreement_scores else 0.0 | |
| def _create_histogram_bins(self, values: list, scheme: dict) -> dict: | |
| """Create histogram bins for numeric data.""" | |
| if not values: | |
| return {"bins": [], "counts": []} | |
| min_val = scheme.get("min", min(values)) | |
| max_val = scheme.get("max", max(values)) | |
| # Create 10 bins | |
| bin_size = (max_val - min_val) / 10 | |
| bins = [min_val + i * bin_size for i in range(11)] | |
| counts = [0] * 10 | |
| for value in values: | |
| bin_index = min(int((value - min_val) / bin_size), 9) | |
| counts[bin_index] += 1 | |
| return { | |
| "bins": [round(b, 2) for b in bins], | |
| "counts": counts | |
| } | |
| def _get_annotator_timing_data(self, user_id: str) -> Optional[AnnotatorTimingData]: | |
| """ | |
| Get timing data for a specific annotator. | |
| Args: | |
| user_id: The user ID to get timing data for | |
| Returns: | |
| AnnotatorTimingData object or None if user not found | |
| """ | |
| try: | |
| usm = get_user_state_manager() | |
| user_state = usm.get_user_state(user_id) | |
| if not user_state: | |
| return None | |
| # Get basic user info | |
| total_annotations = len(user_state.get_all_annotations()) | |
| phase = str(user_state.get_phase()) | |
| has_assignments = user_state.has_assignments() | |
| remaining_assignments = user_state.has_remaining_assignments() | |
| # Calculate timing data | |
| total_seconds = 0 | |
| instance_times = [] | |
| for instance_id, behavioral_data in user_state.instance_id_to_behavioral_data.items(): | |
| instance_seconds = None | |
| # Handle both BehavioralData objects and plain dicts | |
| if hasattr(behavioral_data, 'total_time_ms'): | |
| # BehavioralData object (loaded from JSON) | |
| if behavioral_data.total_time_ms: | |
| instance_seconds = behavioral_data.total_time_ms / 1000.0 | |
| elif isinstance(behavioral_data, dict): | |
| # Plain dict (runtime data) | |
| if behavioral_data.get("total_time_ms"): | |
| instance_seconds = behavioral_data["total_time_ms"] / 1000.0 | |
| elif behavioral_data.get("time_string"): | |
| parsed_time = user_state.parse_time_string(behavioral_data["time_string"]) | |
| if parsed_time: | |
| instance_seconds = parsed_time["total_seconds"] | |
| if instance_seconds is not None: | |
| total_seconds += instance_seconds | |
| instance_times.append(instance_seconds) | |
| # Calculate averages | |
| average_seconds_per_annotation = total_seconds / total_annotations if total_annotations > 0 else 0 | |
| annotations_per_hour = (total_annotations * 3600) / total_seconds if total_seconds > 0 else 0 | |
| # Get current instance time (if any) | |
| current_instance_time = None | |
| current_instance = user_state.get_current_instance() | |
| if current_instance: | |
| current_instance_id = current_instance.get_id() | |
| current_behavioral = user_state.instance_id_to_behavioral_data.get(current_instance_id) | |
| if current_behavioral: | |
| if hasattr(current_behavioral, 'total_time_ms'): | |
| if current_behavioral.total_time_ms: | |
| current_instance_time = current_behavioral.total_time_ms / 1000.0 | |
| elif isinstance(current_behavioral, dict): | |
| if current_behavioral.get("total_time_ms"): | |
| current_instance_time = current_behavioral["total_time_ms"] / 1000.0 | |
| elif current_behavioral.get("time_string"): | |
| parsed_current = user_state.parse_time_string(current_behavioral["time_string"]) | |
| if parsed_current: | |
| current_instance_time = parsed_current["total_seconds"] | |
| # Estimate last activity (for now, use current time - this could be enhanced) | |
| last_activity = datetime.datetime.now() | |
| # NEW: Get annotation history metrics | |
| performance_metrics = user_state.get_performance_metrics() | |
| suspicious_analysis = AnnotationHistoryManager.detect_suspicious_activity( | |
| user_state.get_annotation_history() | |
| ) | |
| recent_actions = user_state.get_recent_actions(5) # Last 5 minutes | |
| # Calculate session duration | |
| current_session_duration_minutes = None | |
| if user_state.session_start_time: | |
| duration = datetime.datetime.now() - user_state.session_start_time | |
| current_session_duration_minutes = duration.total_seconds() / 60 | |
| # Get training statistics | |
| training_state = user_state.get_training_state() | |
| training_completed = training_state.is_passed() if training_state else False | |
| training_correct_answers = training_state.get_correct_answer_count() if training_state else 0 | |
| training_total_attempts = training_state.get_total_attempts() if training_state else 0 | |
| training_pass_rate = (training_correct_answers / training_total_attempts * 100) if training_total_attempts > 0 else 0 | |
| training_current_question = training_state.get_current_question_index() if training_state else 0 | |
| training_total_questions = len(training_state.get_training_instances()) if training_state else 0 | |
| return AnnotatorTimingData( | |
| user_id=user_id, | |
| total_annotations=total_annotations, | |
| total_seconds=total_seconds, | |
| average_seconds_per_annotation=average_seconds_per_annotation, | |
| last_activity=last_activity, | |
| current_instance_time=current_instance_time, | |
| annotations_per_hour=annotations_per_hour, | |
| phase=phase, | |
| has_assignments=has_assignments, | |
| remaining_assignments=remaining_assignments, | |
| # NEW: Annotation history metrics | |
| total_actions=performance_metrics.get('total_actions', 0), | |
| average_action_time_ms=performance_metrics.get('average_action_time_ms', 0.0), | |
| fastest_action_time_ms=performance_metrics.get('fastest_action_time_ms', 0), | |
| slowest_action_time_ms=performance_metrics.get('slowest_action_time_ms', 0), | |
| actions_per_minute=performance_metrics.get('actions_per_minute', 0.0), | |
| suspicious_score=suspicious_analysis.get('suspicious_score', 0.0), | |
| suspicious_level=suspicious_analysis.get('suspicious_level', 'Normal'), | |
| fast_actions_count=suspicious_analysis.get('fast_actions_count', 0), | |
| burst_actions_count=suspicious_analysis.get('burst_actions_count', 0), | |
| session_start_time=user_state.session_start_time, | |
| current_session_duration_minutes=current_session_duration_minutes, | |
| recent_actions_count=len(recent_actions), | |
| # Training metrics | |
| training_completed=training_completed, | |
| training_correct_answers=training_correct_answers, | |
| training_total_attempts=training_total_attempts, | |
| training_pass_rate=training_pass_rate, | |
| training_current_question=training_current_question, | |
| training_total_questions=training_total_questions | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error getting timing data for user {user_id}: {e}") | |
| return None | |
| def _extract_behavioral_total_seconds(self, behavioral_data: Any, user_state=None) -> Optional[float]: | |
| """Extract total annotation time in seconds from behavioral data objects or legacy dicts.""" | |
| if not behavioral_data: | |
| return None | |
| if hasattr(behavioral_data, 'total_time_ms') and behavioral_data.total_time_ms is not None: | |
| return behavioral_data.total_time_ms / 1000.0 | |
| if isinstance(behavioral_data, dict): | |
| total_time_ms = behavioral_data.get("total_time_ms") | |
| if total_time_ms is not None: | |
| return total_time_ms / 1000.0 | |
| time_string = behavioral_data.get("time_string") | |
| if time_string and user_state and hasattr(user_state, 'parse_time_string'): | |
| parsed_time = user_state.parse_time_string(time_string) | |
| if parsed_time: | |
| return parsed_time.get("total_seconds") | |
| return None | |
| def _extract_behavioral_ai_count(self, behavioral_data: Any) -> int: | |
| """Extract AI usage count from behavioral data objects or legacy dicts.""" | |
| if not behavioral_data: | |
| return 0 | |
| if hasattr(behavioral_data, 'ai_usage'): | |
| ai_usage = behavioral_data.ai_usage or [] | |
| return len(ai_usage) | |
| if isinstance(behavioral_data, dict): | |
| ai_usage = behavioral_data.get("ai_usage", []) or [] | |
| return len(ai_usage) | |
| return 0 | |
| def _calculate_total_instance_ai(self, instance_id: str) -> int: | |
| """ | |
| Calculate total AI assistance events for an instance across all users. | |
| Args: | |
| instance_id: The instance ID to analyze | |
| Returns: | |
| Total number of AI usage events recorded for the instance | |
| """ | |
| try: | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| total_ai = 0 | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if not user_state: | |
| continue | |
| behavioral_data = user_state.instance_id_to_behavioral_data.get(instance_id) | |
| total_ai += self._extract_behavioral_ai_count(behavioral_data) | |
| return total_ai | |
| except Exception as e: | |
| self.logger.error(f"Error calculating AI statistics for instance {instance_id}: {e}") | |
| return 0 | |
| def _calculate_average_time_per_annotation(self, instance_id: str) -> Optional[float]: | |
| """ | |
| Calculate average time per annotation for an instance. | |
| Args: | |
| instance_id: The instance ID to analyze | |
| Returns: | |
| Average time in seconds or None if no data | |
| """ | |
| try: | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| total_time = 0 | |
| annotation_count = 0 | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if user_state: | |
| behavioral_data = user_state.instance_id_to_behavioral_data.get(instance_id) | |
| total_seconds = self._extract_behavioral_total_seconds(behavioral_data, user_state) | |
| if total_seconds is not None: | |
| total_time += total_seconds | |
| annotation_count += 1 | |
| return total_time / annotation_count if annotation_count > 0 else None | |
| except Exception as e: | |
| self.logger.error(f"Error calculating average time for instance {instance_id}: {e}") | |
| return None | |
| def _calculate_completion_percentage(self, user_id: str) -> float: | |
| """ | |
| Calculate completion percentage for a user. | |
| Args: | |
| user_id: The user ID to calculate completion for | |
| Returns: | |
| Completion percentage (0-100) | |
| """ | |
| try: | |
| usm = get_user_state_manager() | |
| user_state = usm.get_user_state(user_id) | |
| if not user_state: | |
| return 0.0 | |
| total_assignments = user_state.get_assigned_instance_count() | |
| completed_assignments = len(user_state.get_all_annotations()) | |
| if total_assignments == 0: | |
| return 0.0 | |
| return (completed_assignments / total_assignments) * 100 | |
| except Exception as e: | |
| self.logger.error(f"Error calculating completion percentage for user {user_id}: {e}") | |
| return 0.0 | |
| def _format_seconds(self, seconds: Optional[float]) -> Optional[str]: | |
| """ | |
| Format seconds into a human-readable string. | |
| Args: | |
| seconds: Number of seconds to format | |
| Returns: | |
| Formatted time string or None if input is None | |
| """ | |
| if seconds is None: | |
| return None | |
| if seconds < 60: | |
| return f"{int(seconds)}s" | |
| elif seconds < 3600: | |
| minutes = int(seconds // 60) | |
| remaining_seconds = int(seconds % 60) | |
| return f"{minutes}m {remaining_seconds}s" | |
| else: | |
| hours = int(seconds // 3600) | |
| remaining_minutes = int((seconds % 3600) // 60) | |
| return f"{hours}h {remaining_minutes}m" | |
| def _format_annotation_history(self, actions: List[AnnotationAction], context: str) -> Dict[str, Any]: | |
| """ | |
| Format annotation history data for API response. | |
| Args: | |
| actions: List of annotation actions | |
| context: Context string (user_id or "all_users") | |
| Returns: | |
| Formatted annotation history data | |
| """ | |
| if not actions: | |
| return { | |
| "context": context, | |
| "total_actions": 0, | |
| "actions": [], | |
| "summary": { | |
| "action_types": {}, | |
| "time_distribution": {}, | |
| "performance_metrics": {} | |
| } | |
| } | |
| # Calculate summary statistics | |
| action_types = Counter(action.action_type for action in actions) | |
| time_distribution = self._calculate_time_distribution(actions) | |
| performance_metrics = AnnotationHistoryManager.calculate_performance_metrics(actions) | |
| # Format actions for response | |
| formatted_actions = [] | |
| for action in actions[-100:]: # Limit to 100 most recent | |
| formatted_actions.append({ | |
| "action_id": action.action_id, | |
| "timestamp": action.timestamp.isoformat(), | |
| "user_id": action.user_id, | |
| "instance_id": action.instance_id, | |
| "action_type": action.action_type, | |
| "schema_name": action.schema_name, | |
| "label_name": action.label_name, | |
| "old_value": action.old_value, | |
| "new_value": action.new_value, | |
| "span_data": action.span_data, | |
| "session_id": action.session_id, | |
| "client_timestamp": action.client_timestamp.isoformat() if action.client_timestamp else None, | |
| "server_processing_time_ms": action.server_processing_time_ms, | |
| "metadata": action.metadata | |
| }) | |
| return { | |
| "context": context, | |
| "total_actions": len(actions), | |
| "actions": formatted_actions, | |
| "summary": { | |
| "action_types": dict(action_types), | |
| "time_distribution": time_distribution, | |
| "performance_metrics": performance_metrics | |
| } | |
| } | |
| def _calculate_time_distribution(self, actions: List[AnnotationAction]) -> Dict[str, int]: | |
| """ | |
| Calculate time distribution of actions. | |
| Args: | |
| actions: List of annotation actions | |
| Returns: | |
| Dictionary with time distribution data | |
| """ | |
| if not actions: | |
| return {} | |
| # Group by hour of day | |
| hour_distribution = defaultdict(int) | |
| for action in actions: | |
| hour = action.timestamp.hour | |
| hour_distribution[f"{hour:02d}:00"] += 1 | |
| return dict(hour_distribution) | |
| def get_crowdsourcing_data(self) -> Dict[str, Any]: | |
| """ | |
| Get crowdsourcing platform statistics (MTurk, Prolific). | |
| This method analyzes user data to provide statistics about workers | |
| from crowdsourcing platforms like Amazon Mechanical Turk and Prolific. | |
| Returns: | |
| Dict containing crowdsourcing statistics with the following structure: | |
| - summary: Overall counts of crowdsourcing workers | |
| - prolific: Prolific-specific statistics | |
| - mturk: MTurk-specific statistics | |
| - workers: List of individual worker data | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| from potato.authentication import UserAuthenticator | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| # Initialize counters | |
| prolific_workers = [] | |
| mturk_workers = [] | |
| other_workers = [] | |
| # Track unique study/HIT IDs | |
| prolific_study_ids = set() | |
| mturk_hit_ids = set() | |
| # Get user authenticator to access stored user data | |
| try: | |
| user_auth = UserAuthenticator.get_instance() | |
| user_data_store = getattr(user_auth.auth_backend, 'user_data', {}) | |
| except (ValueError, AttributeError): | |
| user_data_store = {} | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if not user_state: | |
| continue | |
| # Get timing data for the user | |
| timing_data = self._get_annotator_timing_data(username) | |
| # Get stored user data (from authentication) | |
| stored_data = user_data_store.get(username, {}) | |
| # Determine platform based on stored data | |
| prolific_session_id = stored_data.get('prolific_session_id') | |
| prolific_study_id = stored_data.get('prolific_study_id') | |
| mturk_assignment_id = stored_data.get('mturk_assignment_id') | |
| mturk_hit_id = stored_data.get('mturk_hit_id') | |
| worker_info = { | |
| "worker_id": username, | |
| "total_annotations": timing_data.total_annotations if timing_data else 0, | |
| "phase": timing_data.phase if timing_data else "unknown", | |
| "total_seconds": timing_data.total_seconds if timing_data else 0, | |
| "annotations_per_hour": timing_data.annotations_per_hour if timing_data else 0, | |
| "completion_percentage": self._calculate_completion_percentage(username), | |
| "suspicious_level": timing_data.suspicious_level if timing_data else "Normal", | |
| } | |
| # Check for Prolific workers | |
| if prolific_session_id or prolific_study_id or username.startswith('P'): | |
| worker_info["platform"] = "prolific" | |
| worker_info["session_id"] = prolific_session_id | |
| worker_info["study_id"] = prolific_study_id | |
| prolific_workers.append(worker_info) | |
| if prolific_study_id: | |
| prolific_study_ids.add(prolific_study_id) | |
| # Check for MTurk workers | |
| elif mturk_assignment_id or mturk_hit_id or username.startswith('A'): | |
| worker_info["platform"] = "mturk" | |
| worker_info["assignment_id"] = mturk_assignment_id | |
| worker_info["hit_id"] = mturk_hit_id | |
| mturk_workers.append(worker_info) | |
| if mturk_hit_id: | |
| mturk_hit_ids.add(mturk_hit_id) | |
| else: | |
| worker_info["platform"] = "other" | |
| other_workers.append(worker_info) | |
| # Calculate summary statistics | |
| all_workers = prolific_workers + mturk_workers + other_workers | |
| def calc_stats(workers): | |
| if not workers: | |
| return { | |
| "count": 0, | |
| "total_annotations": 0, | |
| "total_time_seconds": 0, | |
| "avg_annotations_per_worker": 0, | |
| "avg_time_per_worker_minutes": 0, | |
| "completed_count": 0, | |
| "in_progress_count": 0, | |
| } | |
| total_annotations = sum(w["total_annotations"] for w in workers) | |
| total_time = sum(w["total_seconds"] for w in workers) | |
| completed = len([w for w in workers if w["phase"] == "Phase.DONE"]) | |
| in_progress = len([w for w in workers if w["phase"] == "Phase.ANNOTATION"]) | |
| return { | |
| "count": len(workers), | |
| "total_annotations": total_annotations, | |
| "total_time_seconds": total_time, | |
| "avg_annotations_per_worker": round(total_annotations / len(workers), 1) if workers else 0, | |
| "avg_time_per_worker_minutes": round(total_time / len(workers) / 60, 1) if workers else 0, | |
| "completed_count": completed, | |
| "in_progress_count": in_progress, | |
| } | |
| return { | |
| "summary": { | |
| "total_workers": len(all_workers), | |
| "prolific_workers": len(prolific_workers), | |
| "mturk_workers": len(mturk_workers), | |
| "other_workers": len(other_workers), | |
| "prolific_studies": len(prolific_study_ids), | |
| "mturk_hits": len(mturk_hit_ids), | |
| }, | |
| "prolific": { | |
| "stats": calc_stats(prolific_workers), | |
| "study_ids": list(prolific_study_ids), | |
| "workers": prolific_workers, | |
| }, | |
| "mturk": { | |
| "stats": calc_stats(mturk_workers), | |
| "hit_ids": list(mturk_hit_ids), | |
| "workers": mturk_workers, | |
| }, | |
| "other": { | |
| "stats": calc_stats(other_workers), | |
| "workers": other_workers, | |
| }, | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting crowdsourcing data: {e}") | |
| return {"error": f"Failed to get crowdsourcing data: {str(e)}"}, 500 | |
| def get_agreement_metrics(self) -> Dict[str, Any]: | |
| """ | |
| Get inter-annotator agreement metrics using Krippendorff's alpha. | |
| This leverages the existing agreement.py module for calculations. | |
| Returns: | |
| Dict containing agreement metrics by schema and overall | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| import simpledorff | |
| from simpledorff.metrics import nominal_metric, interval_metric | |
| import pandas as pd | |
| agreement_config = config.get("agreement_metrics", {}) | |
| min_overlap = agreement_config.get("min_overlap", 2) | |
| ism = get_item_state_manager() | |
| usm = get_user_state_manager() | |
| annotation_schemes = config.get("annotation_schemes", []) | |
| users = get_users() | |
| metrics = { | |
| "enabled": agreement_config.get("enabled", True), | |
| "overall": {}, | |
| "by_schema": {}, | |
| "warnings": [] | |
| } | |
| for scheme in annotation_schemes: | |
| schema_name = scheme.get("name", "Unknown") | |
| annotation_type = scheme.get("annotation_type", "unknown") | |
| # Collect annotations per item for this schema | |
| annotations_by_item = {} | |
| for item in ism.items(): | |
| item_id = item.get_id() | |
| item_annotations = [] | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if not user_state: | |
| continue | |
| # Get annotations for this item | |
| all_annotations = user_state.get_all_annotations() | |
| if item_id not in all_annotations: | |
| continue | |
| instance_annotations = all_annotations[item_id] | |
| labels = instance_annotations.get("labels", {}) | |
| # Find annotation for this schema | |
| for label, value in labels.items(): | |
| label_schema = None | |
| if hasattr(label, 'schema'): | |
| label_schema = label.schema | |
| elif hasattr(label, 'get_schema'): | |
| label_schema = label.get_schema() | |
| if label_schema == schema_name: | |
| item_annotations.append({ | |
| "user": username, | |
| "value": value | |
| }) | |
| if item_annotations: | |
| annotations_by_item[item_id] = item_annotations | |
| # Filter items with minimum overlap | |
| valid_items = { | |
| item_id: annots | |
| for item_id, annots in annotations_by_item.items() | |
| if len(annots) >= min_overlap | |
| } | |
| if not valid_items: | |
| metrics["by_schema"][schema_name] = { | |
| "error": f"No items with {min_overlap}+ annotators", | |
| "items_count": len(annotations_by_item) | |
| } | |
| continue | |
| # Format for simpledorff | |
| try: | |
| reliability_data = [] | |
| for item_id, annots in valid_items.items(): | |
| for annot in annots: | |
| reliability_data.append({ | |
| "unit": item_id, | |
| "annotator": annot["user"], | |
| "annotation": self._normalize_annotation_value(annot["value"]) | |
| }) | |
| df = pd.DataFrame(reliability_data) | |
| # Choose metric based on annotation type | |
| if annotation_type in ["likert", "slider", "number"]: | |
| metric_fn = interval_metric | |
| metric_name = "interval" | |
| else: | |
| metric_fn = nominal_metric | |
| metric_name = "nominal" | |
| # Calculate alpha | |
| alpha = simpledorff.calculate_krippendorffs_alpha( | |
| df, | |
| experiment_col="unit", | |
| annotator_col="annotator", | |
| class_col="annotation", | |
| metric_fn=metric_fn | |
| ) | |
| schema_metrics = { | |
| "krippendorff_alpha": round(alpha, 4), | |
| "metric_type": metric_name, | |
| "items_evaluated": len(valid_items), | |
| "total_annotations": len(reliability_data), | |
| "interpretation": self._interpret_alpha(alpha) | |
| } | |
| # Cohen's kappa (pairwise) and Fleiss' kappa apply to | |
| # categorical schemas; skip for interval-metric data where | |
| # Krippendorff alpha is the appropriate measure. | |
| if metric_name == "nominal": | |
| try: | |
| from potato.agreement import ( | |
| cohen_kappa_pairwise, fleiss_kappa, | |
| ) | |
| schema_metrics["cohen_kappa"] = cohen_kappa_pairwise(df) | |
| schema_metrics["fleiss_kappa"] = fleiss_kappa(df) | |
| except Exception as e: | |
| self.logger.error(f"Error calculating kappas for {schema_name}: {e}") | |
| schema_metrics["kappa_error"] = str(e) | |
| metrics["by_schema"][schema_name] = schema_metrics | |
| except Exception as e: | |
| self.logger.error(f"Error calculating alpha for {schema_name}: {e}") | |
| metrics["by_schema"][schema_name] = { | |
| "error": str(e), | |
| "items_count": len(valid_items) | |
| } | |
| # Calculate overall metrics | |
| alphas = [ | |
| m["krippendorff_alpha"] | |
| for m in metrics["by_schema"].values() | |
| if "krippendorff_alpha" in m | |
| ] | |
| if alphas: | |
| avg_alpha = sum(alphas) / len(alphas) | |
| metrics["overall"] = { | |
| "average_krippendorff_alpha": round(avg_alpha, 4), | |
| "schemas_evaluated": len(alphas), | |
| "interpretation": self._interpret_alpha(avg_alpha) | |
| } | |
| cohen_means = [ | |
| m["cohen_kappa"]["mean_kappa"] | |
| for m in metrics["by_schema"].values() | |
| if isinstance(m.get("cohen_kappa"), dict) | |
| and m["cohen_kappa"].get("mean_kappa") is not None | |
| ] | |
| if cohen_means: | |
| metrics["overall"]["average_cohen_kappa"] = round( | |
| sum(cohen_means) / len(cohen_means), 4 | |
| ) | |
| fleiss_values = [ | |
| m["fleiss_kappa"]["kappa"] | |
| for m in metrics["by_schema"].values() | |
| if isinstance(m.get("fleiss_kappa"), dict) | |
| and m["fleiss_kappa"].get("kappa") is not None | |
| ] | |
| if fleiss_values: | |
| metrics["overall"]["average_fleiss_kappa"] = round( | |
| sum(fleiss_values) / len(fleiss_values), 4 | |
| ) | |
| return metrics | |
| except ImportError as e: | |
| self.logger.error(f"simpledorff not installed: {e}") | |
| return { | |
| "enabled": False, | |
| "error": "simpledorff library not installed. Run: pip install simpledorff" | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting agreement metrics: {e}") | |
| return {"error": f"Failed to get agreement metrics: {str(e)}"}, 500 | |
| def _interpret_alpha(self, alpha: float) -> str: | |
| """Human-readable interpretation of Krippendorff's alpha.""" | |
| if alpha >= 0.8: | |
| return "Good agreement" | |
| elif alpha >= 0.67: | |
| return "Tentative agreement" | |
| elif alpha >= 0.33: | |
| return "Low agreement" | |
| else: | |
| return "Poor agreement" | |
| def _normalize_annotation_value(self, value: Any) -> Any: | |
| """Normalize annotation value for comparison.""" | |
| if isinstance(value, list): | |
| return tuple(sorted(str(v) for v in value)) | |
| elif isinstance(value, bool): | |
| return str(value).lower() | |
| return str(value) | |
| def _normalize_categorical_value(self, value: Any) -> Optional[str]: | |
| """Normalize categorical annotation values and label definitions into readable strings.""" | |
| if value is None: | |
| return None | |
| if isinstance(value, str): | |
| return value | |
| if isinstance(value, dict): | |
| for key in ("name", "label", "value", "id", "text"): | |
| candidate = value.get(key) | |
| if isinstance(candidate, str) and candidate: | |
| return candidate | |
| return json.dumps(value, sort_keys=True) | |
| if isinstance(value, (int, float, bool)): | |
| return str(value) | |
| return str(value) | |
| def get_code_cooccurrence_matrix(self, schema_filter: Optional[str] = None, | |
| min_count: int = 1) -> Dict[str, Any]: | |
| """ | |
| Compute pairwise code co-occurrence across instances. | |
| Two codes co-occur on an instance when at least one annotator applied | |
| each to that instance. Pairs are de-duplicated within an instance | |
| (multiple annotators applying the same pair count once). | |
| Args: | |
| schema_filter: If set, restrict to codes belonging to this schema. | |
| min_count: Skip pairs with co-occurrence below this threshold. | |
| Returns: | |
| Dict with `codes` (sorted code list), `pairs` | |
| ({code_a, code_b, count}), and `n_instances` for context. | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| ism = get_item_state_manager() | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| codes_per_instance: Dict[str, set] = {} | |
| for item in ism.items(): | |
| instance_id = item.get_id() | |
| codes: set = set() | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if not user_state: | |
| continue | |
| all_anns = user_state.get_all_annotations() | |
| if instance_id not in all_anns: | |
| continue | |
| instance_anns = all_anns[instance_id] | |
| labels = instance_anns.get("labels", {}) or {} | |
| for label, value in labels.items(): | |
| schema_name = self._schema_for_label_key(label) | |
| if schema_filter and schema_name != schema_filter: | |
| continue | |
| for code in self._labels_from_value(value): | |
| codes.add(f"{schema_name}::{code}") | |
| spans = instance_anns.get("spans", {}) or {} | |
| for schema_name, span_list in spans.items(): | |
| if schema_filter and schema_name != schema_filter: | |
| continue | |
| for span in span_list or []: | |
| code = span.get("label") or span.get("annotation") | |
| if code: | |
| codes.add(f"{schema_name}::{code}") | |
| if codes: | |
| codes_per_instance[instance_id] = codes | |
| pair_counts: Dict[Tuple[str, str], int] = {} | |
| for codes in codes_per_instance.values(): | |
| sorted_codes = sorted(codes) | |
| for i in range(len(sorted_codes)): | |
| for j in range(i + 1, len(sorted_codes)): | |
| key = (sorted_codes[i], sorted_codes[j]) | |
| pair_counts[key] = pair_counts.get(key, 0) + 1 | |
| pairs = [ | |
| {"code_a": a, "code_b": b, "count": c} | |
| for (a, b), c in pair_counts.items() if c >= min_count | |
| ] | |
| pairs.sort(key=lambda x: x["count"], reverse=True) | |
| all_codes = sorted({ | |
| code for codes in codes_per_instance.values() for code in codes | |
| }) | |
| return { | |
| "codes": all_codes, | |
| "pairs": pairs, | |
| "n_instances": len(codes_per_instance), | |
| "n_pairs": len(pairs), | |
| "schema_filter": schema_filter, | |
| "min_count": min_count, | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error computing co-occurrence: {e}") | |
| return {"error": f"Failed to compute co-occurrence: {str(e)}"}, 500 | |
| def get_code_crosstab(self, attribute_key: str, | |
| schema_filter: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Compute a codes-by-instance-attribute crosstab. | |
| Each instance contributes one row to the count of (code, attribute_value); | |
| multiple annotators applying the same code count once per instance. | |
| Args: | |
| attribute_key: Name of the item-metadata field to use as the column axis | |
| (e.g. "site", "condition", "language"). | |
| schema_filter: If set, restrict to codes belonging to this schema. | |
| Returns: | |
| Dict with `codes` (row labels), `values` (column labels), | |
| `cells` ({code, value, count}), and totals. | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| if not attribute_key: | |
| return {"error": "attribute_key is required"}, 400 | |
| try: | |
| ism = get_item_state_manager() | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| cell_counts: Dict[Tuple[str, str], int] = {} | |
| values_seen: set = set() | |
| codes_seen: set = set() | |
| n_instances_with_attr = 0 | |
| # When the attribute is not on the instance itself, fall | |
| # back to the case-level attribute (cases group instances by | |
| # participant/respondent; the attribute may live on the | |
| # case). No-op for projects that don't use cases. | |
| cb_task_dir = config.get("task_dir", ".") | |
| cb_project = config.get("annotation_task_name") or "default" | |
| for item in ism.items(): | |
| instance_id = item.get_id() | |
| item_data = self._get_item_data(item) | |
| attr_value = item_data.get(attribute_key) | |
| if attr_value is None or attr_value == "": | |
| try: | |
| from potato.cases import attribute_for_instance | |
| attr_value = attribute_for_instance( | |
| cb_task_dir, cb_project, instance_id, | |
| attribute_key) | |
| except Exception: | |
| attr_value = None | |
| if attr_value is None or attr_value == "": | |
| continue | |
| attr_value = str(attr_value) | |
| values_seen.add(attr_value) | |
| n_instances_with_attr += 1 | |
| codes: set = set() | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if not user_state: | |
| continue | |
| all_anns = user_state.get_all_annotations() | |
| if instance_id not in all_anns: | |
| continue | |
| instance_anns = all_anns[instance_id] | |
| labels = instance_anns.get("labels", {}) or {} | |
| for label, value in labels.items(): | |
| schema_name = self._schema_for_label_key(label) | |
| if schema_filter and schema_name != schema_filter: | |
| continue | |
| for code in self._labels_from_value(value): | |
| codes.add(f"{schema_name}::{code}") | |
| spans = instance_anns.get("spans", {}) or {} | |
| for schema_name, span_list in spans.items(): | |
| if schema_filter and schema_name != schema_filter: | |
| continue | |
| for span in span_list or []: | |
| code = span.get("label") or span.get("annotation") | |
| if code: | |
| codes.add(f"{schema_name}::{code}") | |
| for code in codes: | |
| codes_seen.add(code) | |
| key = (code, attr_value) | |
| cell_counts[key] = cell_counts.get(key, 0) + 1 | |
| cells = [ | |
| {"code": code, "value": value, "count": count} | |
| for (code, value), count in cell_counts.items() | |
| ] | |
| cells.sort(key=lambda x: (x["code"], x["value"])) | |
| return { | |
| "codes": sorted(codes_seen), | |
| "values": sorted(values_seen), | |
| "cells": cells, | |
| "n_instances": n_instances_with_attr, | |
| "attribute_key": attribute_key, | |
| "schema_filter": schema_filter, | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error computing crosstab: {e}") | |
| return {"error": f"Failed to compute crosstab: {str(e)}"}, 500 | |
| def _schema_for_label_key(label_key) -> str: | |
| """Extract schema name from a label key in user_state annotations.""" | |
| if hasattr(label_key, "schema"): | |
| return label_key.schema | |
| if hasattr(label_key, "get_schema"): | |
| return label_key.get_schema() | |
| return str(label_key) | |
| def _labels_from_value(value) -> List[str]: | |
| """Pull individual code names out of an annotation value blob.""" | |
| if value is None or value == "": | |
| return [] | |
| if isinstance(value, dict): | |
| return [k for k, v in value.items() if v] | |
| if isinstance(value, list): | |
| return [str(x) for x in value] | |
| return [str(value)] | |
| def _get_item_data(item) -> dict: | |
| """Return the raw data dict for an ItemStateManager item.""" | |
| for attr in ("data", "_data", "item_data"): | |
| data = getattr(item, attr, None) | |
| if isinstance(data, dict): | |
| return data | |
| if hasattr(item, "to_dict"): | |
| try: | |
| return item.to_dict() | |
| except Exception: | |
| pass | |
| return {} | |
| def get_quality_control_data(self) -> Dict[str, Any]: | |
| """ | |
| Get quality control metrics (attention checks, gold standards, pre-annotation). | |
| Returns: | |
| Dict containing quality control metrics | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| qc_manager = get_quality_control_manager() | |
| if not qc_manager: | |
| return { | |
| "enabled": False, | |
| "message": "Quality control not configured" | |
| } | |
| metrics = qc_manager.get_quality_metrics() | |
| return { | |
| "enabled": True, | |
| **metrics | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting quality control data: {e}") | |
| return {"error": f"Failed to get quality control data: {str(e)}"}, 500 | |
| def _behavioral_sequence(self, value: Any) -> list: | |
| """Normalize behavioral list-like values to a safe list.""" | |
| if value is None: | |
| return [] | |
| if isinstance(value, list): | |
| return value | |
| if isinstance(value, tuple): | |
| return list(value) | |
| return [value] | |
| def _behavioral_field(self, payload: Any, field_name: str, default: Any = None) -> Any: | |
| """Read a field from either a dict or an object used in behavioral analytics.""" | |
| if isinstance(payload, dict): | |
| return payload.get(field_name, default) | |
| return getattr(payload, field_name, default) | |
| def get_behavioral_analytics_data(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive behavioral analytics data for all annotators. | |
| Returns: | |
| Dict containing behavioral analytics metrics including: | |
| - Per-user statistics (time, interactions, AI usage) | |
| - Aggregate statistics | |
| - Quality indicators | |
| - AI assistance analysis | |
| """ | |
| if not self.check_admin_access(): | |
| return {"error": "Admin access required"}, 403 | |
| try: | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| user_stats = [] | |
| ai_usage_total = {'requests': 0, 'accepts': 0, 'rejects': 0, 'decision_times': []} | |
| all_times = [] | |
| interaction_counts = Counter() | |
| change_sources = Counter() | |
| total_interactions = 0 | |
| total_changes = 0 | |
| total_ai_requests = 0 | |
| users_with_fast_annotations = 0 | |
| users_with_low_interaction = 0 | |
| users_with_no_changes = 0 | |
| for user_id in users: | |
| user_state = usm.get_user_state(user_id) | |
| if not user_state: | |
| continue | |
| behavioral_data = user_state.instance_id_to_behavioral_data | |
| if not behavioral_data: | |
| continue | |
| user_times = [] | |
| user_interactions = 0 | |
| user_changes = 0 | |
| user_ai_requests = 0 | |
| user_ai_accepts = 0 | |
| user_fast_count = 0 | |
| user_low_interaction_count = 0 | |
| user_no_scroll_count = 0 | |
| user_no_change_count = 0 | |
| for instance_id, bd in behavioral_data.items(): | |
| time_ms = self._behavioral_field(bd, 'total_time_ms', 0) or 0 | |
| time_sec = time_ms / 1000 | |
| user_times.append(time_sec) | |
| all_times.append(time_sec) | |
| if time_sec < 5: | |
| user_fast_count += 1 | |
| interactions = self._behavioral_sequence(self._behavioral_field(bd, 'interactions', [])) | |
| user_interactions += len(interactions) | |
| total_interactions += len(interactions) | |
| if len(interactions) < 3: | |
| user_low_interaction_count += 1 | |
| for event in interactions: | |
| event_type = self._behavioral_field(event, 'event_type', 'unknown') | |
| interaction_counts[event_type] += 1 | |
| scroll = self._behavioral_field(bd, 'scroll_depth_max', 0) or 0 | |
| if scroll < 25: | |
| user_no_scroll_count += 1 | |
| changes = self._behavioral_sequence(self._behavioral_field(bd, 'annotation_changes', [])) | |
| user_changes += len(changes) | |
| total_changes += len(changes) | |
| if len(changes) == 0: | |
| user_no_change_count += 1 | |
| for change in changes: | |
| source = self._behavioral_field(change, 'source', 'user') | |
| change_sources[source] += 1 | |
| ai_events = self._behavioral_sequence(self._behavioral_field(bd, 'ai_usage', [])) | |
| for ai in ai_events: | |
| user_ai_requests += 1 | |
| total_ai_requests += 1 | |
| ai_usage_total['requests'] += 1 | |
| accepted = self._behavioral_field(ai, 'suggestion_accepted', None) | |
| if accepted: | |
| user_ai_accepts += 1 | |
| ai_usage_total['accepts'] += 1 | |
| else: | |
| ai_usage_total['rejects'] += 1 | |
| decision_time = self._behavioral_field(ai, 'time_to_decision_ms', None) | |
| if isinstance(decision_time, (int, float)): | |
| ai_usage_total['decision_times'].append(decision_time) | |
| total_instances = len(behavioral_data) | |
| if total_instances > 0: | |
| fast_rate = user_fast_count / total_instances | |
| low_interaction_rate = user_low_interaction_count / total_instances | |
| no_scroll_rate = user_no_scroll_count / total_instances | |
| no_change_rate = user_no_change_count / total_instances | |
| suspicion_score = fast_rate * 0.3 + low_interaction_rate * 0.35 + no_scroll_rate * 0.2 + no_change_rate * 0.15 | |
| if user_fast_count > 0: | |
| users_with_fast_annotations += 1 | |
| if user_low_interaction_count > 0: | |
| users_with_low_interaction += 1 | |
| if user_no_change_count > 0: | |
| users_with_no_changes += 1 | |
| user_stats.append({ | |
| 'user_id': user_id, | |
| 'total_instances': total_instances, | |
| 'total_time_sec': sum(user_times), | |
| 'avg_time_sec': sum(user_times) / len(user_times) if user_times else 0, | |
| 'min_time_sec': min(user_times) if user_times else 0, | |
| 'max_time_sec': max(user_times) if user_times else 0, | |
| 'total_interactions': user_interactions, | |
| 'avg_interactions': user_interactions / total_instances, | |
| 'total_changes': user_changes, | |
| 'avg_changes': user_changes / total_instances, | |
| 'ai_requests': user_ai_requests, | |
| 'ai_accepts': user_ai_accepts, | |
| 'ai_accept_rate': (user_ai_accepts / user_ai_requests) if user_ai_requests > 0 else None, | |
| 'fast_annotation_rate': fast_rate, | |
| 'low_interaction_rate': low_interaction_rate, | |
| 'no_scroll_rate': no_scroll_rate, | |
| 'no_change_rate': no_change_rate, | |
| 'suspicion_score': suspicion_score, | |
| 'quality_flag': 'SUSPICIOUS' if suspicion_score > 0.5 else 'WARNING' if suspicion_score > 0.3 else 'OK' | |
| }) | |
| aggregate = { | |
| 'total_users_with_data': len(user_stats), | |
| 'total_instances': sum(u['total_instances'] for u in user_stats), | |
| 'total_time_minutes': sum(u['total_time_sec'] for u in user_stats) / 60, | |
| 'avg_time_per_instance': sum(all_times) / len(all_times) if all_times else 0, | |
| 'median_time_per_instance': sorted(all_times)[len(all_times)//2] if all_times else 0, | |
| } | |
| aggregate_stats = { | |
| 'total_users': len(user_stats), | |
| 'total_instances': aggregate['total_instances'], | |
| 'avg_time_per_instance_sec': aggregate['avg_time_per_instance'], | |
| 'total_interactions': total_interactions, | |
| 'total_changes': total_changes, | |
| 'total_ai_requests': total_ai_requests, | |
| } | |
| ai_summary = { | |
| 'total_requests': ai_usage_total['requests'], | |
| 'total_accepts': ai_usage_total['accepts'], | |
| 'total_rejects': ai_usage_total['rejects'], | |
| 'accept_rate': (ai_usage_total['accepts'] / ai_usage_total['requests']) if ai_usage_total['requests'] > 0 else 0, | |
| 'avg_decision_time_ms': sum(ai_usage_total['decision_times']) / len(ai_usage_total['decision_times']) if ai_usage_total['decision_times'] else 0 | |
| } | |
| flagged_users = [u for u in user_stats if u['quality_flag'] == 'SUSPICIOUS'] | |
| warning_users = [u for u in user_stats if u['quality_flag'] == 'WARNING'] | |
| total_users_with_data = len(user_stats) | |
| quality_summary = { | |
| 'total_flagged': len(flagged_users), | |
| 'total_warnings': len(warning_users), | |
| 'flagged_user_ids': [u['user_id'] for u in flagged_users], | |
| 'warning_user_ids': [u['user_id'] for u in warning_users], | |
| 'high_suspicion_users': len(flagged_users), | |
| 'fast_annotation_rate': (users_with_fast_annotations / total_users_with_data) if total_users_with_data > 0 else 0, | |
| 'low_interaction_rate': (users_with_low_interaction / total_users_with_data) if total_users_with_data > 0 else 0, | |
| 'no_change_rate': (users_with_no_changes / total_users_with_data) if total_users_with_data > 0 else 0, | |
| } | |
| return { | |
| 'aggregate': aggregate, | |
| 'aggregate_stats': aggregate_stats, | |
| 'ai_usage': ai_summary, | |
| 'quality_summary': quality_summary, | |
| 'interaction_types': dict(interaction_counts.most_common(20)), | |
| 'change_sources': dict(change_sources), | |
| 'users': sorted(user_stats, key=lambda x: -x['suspicion_score']) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting behavioral analytics data: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {"error": f"Failed to get behavioral analytics data: {str(e)}"}, 500 | |
| def get_adjudication_overview(self) -> Dict[str, Any]: | |
| """ | |
| Get an overview of adjudication status for the admin dashboard. | |
| Returns: | |
| Dict with queue stats, adjudicator stats, error taxonomy, | |
| guideline flags, disagreement patterns, and similarity stats. | |
| """ | |
| from potato.adjudication import get_adjudication_manager | |
| adj_mgr = get_adjudication_manager() | |
| if not adj_mgr or not adj_mgr.adj_config.enabled: | |
| return {"enabled": False, "message": "Adjudication not configured"} | |
| try: | |
| # Queue stats | |
| queue_stats = adj_mgr.get_stats() | |
| # Error taxonomy frequency | |
| error_counts = Counter() | |
| guideline_flag_count = 0 | |
| for decision in adj_mgr.decisions.values(): | |
| for tag in decision.error_taxonomy: | |
| error_counts[tag] += 1 | |
| if decision.guideline_update_flag: | |
| guideline_flag_count += 1 | |
| # Per-adjudicator stats with avg time | |
| adjudicator_details = {} | |
| for adj_id, stats in queue_stats.get("adjudicator_stats", {}).items(): | |
| completed = stats.get("completed", 0) | |
| total_time = stats.get("total_time_ms", 0) | |
| adjudicator_details[adj_id] = { | |
| "completed": completed, | |
| "total_time_ms": total_time, | |
| "avg_time_ms": ( | |
| round(total_time / completed) if completed > 0 else 0 | |
| ), | |
| } | |
| # Disagreement patterns | |
| disagreement_patterns = self._analyze_disagreement_patterns(adj_mgr) | |
| # Similarity engine stats | |
| similarity_stats = {} | |
| if adj_mgr.similarity_engine: | |
| similarity_stats = adj_mgr.similarity_engine.get_stats() | |
| return { | |
| "enabled": True, | |
| "queue_stats": queue_stats, | |
| "adjudicator_details": adjudicator_details, | |
| "error_taxonomy_counts": dict(error_counts.most_common()), | |
| "guideline_flag_count": guideline_flag_count, | |
| "disagreement_patterns": disagreement_patterns, | |
| "similarity_stats": similarity_stats, | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting adjudication overview: {e}") | |
| return {"enabled": True, "error": str(e)} | |
| def _analyze_disagreement_patterns(self, adj_mgr) -> List[Dict[str, Any]]: | |
| """ | |
| Analyze per-schema disagreement patterns across the queue. | |
| Returns: | |
| List of dicts sorted by worst agreement first, with schema name | |
| and average agreement score. | |
| """ | |
| from collections import defaultdict | |
| schema_scores = defaultdict(list) | |
| for item in adj_mgr.queue.values(): | |
| for schema_name, score in item.agreement_scores.items(): | |
| schema_scores[schema_name].append(score) | |
| patterns = [] | |
| for schema_name, scores in schema_scores.items(): | |
| avg = sum(scores) / len(scores) if scores else 1.0 | |
| patterns.append({ | |
| "schema": schema_name, | |
| "avg_agreement": round(avg, 3), | |
| "num_items": len(scores), | |
| }) | |
| patterns.sort(key=lambda x: x["avg_agreement"]) | |
| return patterns | |
| # ======================================================================== | |
| # MACE Competence Estimation | |
| # ======================================================================== | |
| def get_mace_overview(self) -> Dict[str, Any]: | |
| """Get MACE competence estimation overview for admin dashboard. | |
| Returns: | |
| Dict with competence scores, schema summaries, and config. | |
| """ | |
| from potato.mace_manager import get_mace_manager | |
| mace_mgr = get_mace_manager() | |
| if not mace_mgr or not mace_mgr.mace_config.enabled: | |
| return {"enabled": False, "message": "MACE not configured"} | |
| return mace_mgr.get_results_summary() | |
| def get_mace_predictions( | |
| self, schema: str, instance_id: str = None | |
| ) -> Dict[str, Any]: | |
| """Get MACE predicted labels for a schema, optionally filtered by instance. | |
| Args: | |
| schema: Schema name to get predictions for. | |
| instance_id: Optional specific instance to filter. | |
| Returns: | |
| Dict with predictions and entropy data. | |
| """ | |
| from potato.mace_manager import get_mace_manager | |
| mace_mgr = get_mace_manager() | |
| if not mace_mgr or not mace_mgr.mace_config.enabled: | |
| return {"error": "MACE not configured"} | |
| return mace_mgr.get_predictions_for_schema(schema, instance_id) | |
| def _calculate_label_statistics(self, instance_id: str) -> Tuple[Optional[str], float]: | |
| """ | |
| Calculate most frequent label and disagreement for an instance. | |
| Args: | |
| instance_id: The instance ID to analyze | |
| Returns: | |
| Tuple of (most_frequent_label, disagreement_score) | |
| """ | |
| try: | |
| usm = get_user_state_manager() | |
| users = get_users() | |
| all_labels = [] | |
| for username in users: | |
| user_state = usm.get_user_state(username) | |
| if user_state: | |
| annotations = user_state.get_all_annotations() | |
| if instance_id in annotations: | |
| instance_annotations = annotations[instance_id] | |
| if "labels" in instance_annotations: | |
| for label, value in instance_annotations["labels"].items(): | |
| if hasattr(label, 'label_name'): | |
| all_labels.append(label.label_name) | |
| else: | |
| all_labels.append(str(value)) | |
| if not all_labels: | |
| return None, 0.0 | |
| label_counts = Counter(all_labels) | |
| most_frequent_label = label_counts.most_common(1)[0][0] | |
| total_annotations = len(all_labels) | |
| most_frequent_count = label_counts[most_frequent_label] | |
| disagreement = 1 - (most_frequent_count / total_annotations) | |
| return most_frequent_label, disagreement | |
| except Exception as e: | |
| self.logger.error(f"Error calculating label statistics for instance {instance_id}: {e}") | |
| return None, 0.0 | |
| # Global instance | |
| admin_dashboard = AdminDashboard() | |