Spaces:
Running
Running
| """ | |
| Feedback service for storing and managing prediction feedback. | |
| Stores feedback in a local JSON file for simplicity and privacy. | |
| """ | |
| import json | |
| import uuid | |
| import hashlib | |
| import base64 | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| import csv | |
| import io | |
| class FeedbackService: | |
| """Service for managing prediction feedback with local JSON storage.""" | |
| def __init__(self, storage_dir: Optional[Path] = None): | |
| """Initialize feedback service with storage directory.""" | |
| if storage_dir is None: | |
| # Default to user's home directory for persistence | |
| storage_dir = Path.home() / ".fetalclip" | |
| self.storage_dir = Path(storage_dir) | |
| self.storage_dir.mkdir(parents=True, exist_ok=True) | |
| self.feedback_file = self.storage_dir / "feedback.json" | |
| self.sessions_file = self.storage_dir / "sessions.json" | |
| self.images_dir = self.storage_dir / "images" | |
| self.images_dir.mkdir(parents=True, exist_ok=True) | |
| # Initialize files if they don't exist | |
| if not self.feedback_file.exists(): | |
| self._save_feedback([]) | |
| if not self.sessions_file.exists(): | |
| self._save_sessions({}) | |
| def _load_feedback(self) -> List[Dict]: | |
| """Load feedback from JSON file.""" | |
| try: | |
| with open(self.feedback_file, 'r') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, FileNotFoundError): | |
| return [] | |
| def _save_feedback(self, feedback: List[Dict]): | |
| """Save feedback to JSON file.""" | |
| with open(self.feedback_file, 'w') as f: | |
| json.dump(feedback, f, indent=2, default=str) | |
| def _load_sessions(self) -> Dict: | |
| """Load sessions from JSON file.""" | |
| try: | |
| with open(self.sessions_file, 'r') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, FileNotFoundError): | |
| return {} | |
| def _save_sessions(self, sessions: Dict): | |
| """Save sessions to JSON file.""" | |
| with open(self.sessions_file, 'w') as f: | |
| json.dump(sessions, f, indent=2, default=str) | |
| def create_session(self) -> str: | |
| """Create a new session and return its ID.""" | |
| session_id = str(uuid.uuid4())[:8] | |
| sessions = self._load_sessions() | |
| sessions[session_id] = { | |
| "created_at": datetime.now().isoformat(), | |
| "image_count": 0, | |
| "feedback_count": 0, | |
| "correct_count": 0, | |
| "incorrect_count": 0, | |
| "not_sure_count": 0 | |
| } | |
| self._save_sessions(sessions) | |
| return session_id | |
| def get_session(self, session_id: str) -> Optional[Dict]: | |
| """Get session info by ID.""" | |
| sessions = self._load_sessions() | |
| return sessions.get(session_id) | |
| def get_all_sessions(self) -> Dict: | |
| """Get all sessions.""" | |
| return self._load_sessions() | |
| def update_session_stats(self, session_id: str, is_correct: Optional[bool] = None, image_analyzed: bool = False): | |
| """Update session statistics.""" | |
| sessions = self._load_sessions() | |
| if session_id not in sessions: | |
| sessions[session_id] = { | |
| "created_at": datetime.now().isoformat(), | |
| "image_count": 0, | |
| "feedback_count": 0, | |
| "correct_count": 0, | |
| "incorrect_count": 0, | |
| "not_sure_count": 0 | |
| } | |
| # Ensure not_sure_count exists for older sessions | |
| if "not_sure_count" not in sessions[session_id]: | |
| sessions[session_id]["not_sure_count"] = 0 | |
| if image_analyzed: | |
| sessions[session_id]["image_count"] += 1 | |
| if is_correct is True: | |
| sessions[session_id]["feedback_count"] += 1 | |
| sessions[session_id]["correct_count"] += 1 | |
| elif is_correct is False: | |
| sessions[session_id]["feedback_count"] += 1 | |
| sessions[session_id]["incorrect_count"] += 1 | |
| elif is_correct is None: | |
| # Not sure case - only increment feedback_count and not_sure_count | |
| sessions[session_id]["feedback_count"] += 1 | |
| sessions[session_id]["not_sure_count"] += 1 | |
| self._save_sessions(sessions) | |
| def add_feedback( | |
| self, | |
| session_id: str, | |
| filename: str, | |
| file_type: str, | |
| predicted_label: str, | |
| predicted_confidence: float, | |
| all_predictions: List[Dict], | |
| is_correct: Optional[bool], | |
| correct_label: Optional[str] = None, | |
| reviewer_notes: Optional[str] = None, | |
| patient_id: Optional[str] = None, | |
| image_hash: Optional[str] = None, | |
| preprocessed_image_base64: Optional[str] = None | |
| ) -> Dict: | |
| """Add new feedback entry.""" | |
| feedback_id = str(uuid.uuid4())[:12] | |
| # Save preprocessed image if provided | |
| preprocessed_image_path = None | |
| if preprocessed_image_base64: | |
| try: | |
| image_data = base64.b64decode(preprocessed_image_base64) | |
| image_filename = f"{feedback_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
| image_path = self.images_dir / image_filename | |
| with open(image_path, 'wb') as f: | |
| f.write(image_data) | |
| preprocessed_image_path = str(image_path) | |
| except Exception as e: | |
| print(f"Failed to save preprocessed image: {e}") | |
| entry = { | |
| "id": feedback_id, | |
| "session_id": session_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "filename": filename, | |
| "file_type": file_type, | |
| "patient_id": patient_id, | |
| "image_hash": image_hash, | |
| "predicted_label": predicted_label, | |
| "predicted_confidence": round(predicted_confidence, 2), | |
| "all_predictions": all_predictions, | |
| "is_correct": is_correct, | |
| "correct_label": correct_label if is_correct is False else None, | |
| "reviewer_notes": reviewer_notes, | |
| "preprocessed_image_path": preprocessed_image_path | |
| } | |
| feedback_list = self._load_feedback() | |
| feedback_list.append(entry) | |
| self._save_feedback(feedback_list) | |
| # Update session stats | |
| self.update_session_stats(session_id, is_correct=is_correct) | |
| return entry | |
| def get_feedback(self, session_id: Optional[str] = None) -> List[Dict]: | |
| """Get all feedback, optionally filtered by session.""" | |
| feedback_list = self._load_feedback() | |
| if session_id: | |
| feedback_list = [f for f in feedback_list if f.get("session_id") == session_id] | |
| return feedback_list | |
| def get_feedback_by_id(self, feedback_id: str) -> Optional[Dict]: | |
| """Get single feedback entry by ID.""" | |
| feedback_list = self._load_feedback() | |
| for entry in feedback_list: | |
| if entry.get("id") == feedback_id: | |
| return entry | |
| return None | |
| def delete_feedback(self, feedback_id: str) -> bool: | |
| """Delete feedback entry by ID.""" | |
| feedback_list = self._load_feedback() | |
| original_length = len(feedback_list) | |
| feedback_list = [f for f in feedback_list if f.get("id") != feedback_id] | |
| if len(feedback_list) < original_length: | |
| self._save_feedback(feedback_list) | |
| return True | |
| return False | |
| def export_to_csv(self, session_id: Optional[str] = None) -> str: | |
| """Export feedback to CSV format.""" | |
| feedback_list = self.get_feedback(session_id) | |
| if not feedback_list: | |
| return "" | |
| output = io.StringIO() | |
| # Define CSV columns | |
| fieldnames = [ | |
| "timestamp", | |
| "session_id", | |
| "filename", | |
| "patient_id", | |
| "file_type", | |
| "predicted_label", | |
| "predicted_confidence", | |
| "is_correct", | |
| "correct_label", | |
| "reviewer_notes", | |
| "preprocessed_image_path" | |
| ] | |
| writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction='ignore') | |
| writer.writeheader() | |
| for entry in feedback_list: | |
| # Flatten the entry for CSV | |
| is_correct = entry.get("is_correct") | |
| if is_correct is True: | |
| is_correct_str = "Yes" | |
| elif is_correct is False: | |
| is_correct_str = "No" | |
| else: | |
| is_correct_str = "Not Sure" | |
| row = { | |
| "timestamp": entry.get("timestamp", ""), | |
| "session_id": entry.get("session_id", ""), | |
| "filename": entry.get("filename", ""), | |
| "patient_id": entry.get("patient_id", ""), | |
| "file_type": entry.get("file_type", ""), | |
| "predicted_label": entry.get("predicted_label", ""), | |
| "predicted_confidence": entry.get("predicted_confidence", ""), | |
| "is_correct": is_correct_str, | |
| "correct_label": entry.get("correct_label", ""), | |
| "reviewer_notes": entry.get("reviewer_notes", ""), | |
| "preprocessed_image_path": entry.get("preprocessed_image_path", "") | |
| } | |
| writer.writerow(row) | |
| return output.getvalue() | |
| def get_statistics(self, session_id: Optional[str] = None) -> Dict: | |
| """Get feedback statistics.""" | |
| feedback_list = self.get_feedback(session_id) | |
| total = len(feedback_list) | |
| correct = sum(1 for f in feedback_list if f.get("is_correct") is True) | |
| incorrect = sum(1 for f in feedback_list if f.get("is_correct") is False) | |
| not_sure = sum(1 for f in feedback_list if f.get("is_correct") is None) | |
| # Count by predicted label | |
| label_stats = {} | |
| for entry in feedback_list: | |
| label = entry.get("predicted_label", "Unknown") | |
| if label not in label_stats: | |
| label_stats[label] = {"total": 0, "correct": 0, "incorrect": 0, "not_sure": 0} | |
| label_stats[label]["total"] += 1 | |
| is_correct = entry.get("is_correct") | |
| if is_correct is True: | |
| label_stats[label]["correct"] += 1 | |
| elif is_correct is False: | |
| label_stats[label]["incorrect"] += 1 | |
| else: | |
| label_stats[label]["not_sure"] += 1 | |
| # Calculate accuracy only from definite answers (excluding not sure) | |
| definite_answers = correct + incorrect | |
| accuracy = round(correct / definite_answers * 100, 1) if definite_answers > 0 else 0 | |
| return { | |
| "total_feedback": total, | |
| "correct_count": correct, | |
| "incorrect_count": incorrect, | |
| "not_sure_count": not_sure, | |
| "accuracy": accuracy, | |
| "by_label": label_stats | |
| } | |
| def compute_image_hash(image_bytes: bytes) -> str: | |
| """Compute SHA256 hash of image bytes.""" | |
| return hashlib.sha256(image_bytes).hexdigest()[:16] | |
| # Singleton instance | |
| feedback_service = FeedbackService() | |