""" Feedback service for storing and managing prediction feedback. Stores feedback in a local JSON file for simplicity and privacy. """ import json import uuid import hashlib import base64 from datetime import datetime from pathlib import Path from typing import List, Dict, Optional import csv import io class FeedbackService: """Service for managing prediction feedback with local JSON storage.""" def __init__(self, storage_dir: Optional[Path] = None): """Initialize feedback service with storage directory.""" if storage_dir is None: # Default to user's home directory for persistence storage_dir = Path.home() / ".fetalclip" self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) self.feedback_file = self.storage_dir / "feedback.json" self.sessions_file = self.storage_dir / "sessions.json" self.images_dir = self.storage_dir / "images" self.images_dir.mkdir(parents=True, exist_ok=True) # Initialize files if they don't exist if not self.feedback_file.exists(): self._save_feedback([]) if not self.sessions_file.exists(): self._save_sessions({}) def _load_feedback(self) -> List[Dict]: """Load feedback from JSON file.""" try: with open(self.feedback_file, 'r') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): return [] def _save_feedback(self, feedback: List[Dict]): """Save feedback to JSON file.""" with open(self.feedback_file, 'w') as f: json.dump(feedback, f, indent=2, default=str) def _load_sessions(self) -> Dict: """Load sessions from JSON file.""" try: with open(self.sessions_file, 'r') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): return {} def _save_sessions(self, sessions: Dict): """Save sessions to JSON file.""" with open(self.sessions_file, 'w') as f: json.dump(sessions, f, indent=2, default=str) def create_session(self) -> str: """Create a new session and return its ID.""" session_id = str(uuid.uuid4())[:8] sessions = self._load_sessions() sessions[session_id] = { "created_at": datetime.now().isoformat(), "image_count": 0, "feedback_count": 0, "correct_count": 0, "incorrect_count": 0, "not_sure_count": 0 } self._save_sessions(sessions) return session_id def get_session(self, session_id: str) -> Optional[Dict]: """Get session info by ID.""" sessions = self._load_sessions() return sessions.get(session_id) def get_all_sessions(self) -> Dict: """Get all sessions.""" return self._load_sessions() def update_session_stats(self, session_id: str, is_correct: Optional[bool] = None, image_analyzed: bool = False): """Update session statistics.""" sessions = self._load_sessions() if session_id not in sessions: sessions[session_id] = { "created_at": datetime.now().isoformat(), "image_count": 0, "feedback_count": 0, "correct_count": 0, "incorrect_count": 0, "not_sure_count": 0 } # Ensure not_sure_count exists for older sessions if "not_sure_count" not in sessions[session_id]: sessions[session_id]["not_sure_count"] = 0 if image_analyzed: sessions[session_id]["image_count"] += 1 if is_correct is True: sessions[session_id]["feedback_count"] += 1 sessions[session_id]["correct_count"] += 1 elif is_correct is False: sessions[session_id]["feedback_count"] += 1 sessions[session_id]["incorrect_count"] += 1 elif is_correct is None: # Not sure case - only increment feedback_count and not_sure_count sessions[session_id]["feedback_count"] += 1 sessions[session_id]["not_sure_count"] += 1 self._save_sessions(sessions) def add_feedback( self, session_id: str, filename: str, file_type: str, predicted_label: str, predicted_confidence: float, all_predictions: List[Dict], is_correct: Optional[bool], correct_label: Optional[str] = None, reviewer_notes: Optional[str] = None, patient_id: Optional[str] = None, image_hash: Optional[str] = None, preprocessed_image_base64: Optional[str] = None ) -> Dict: """Add new feedback entry.""" feedback_id = str(uuid.uuid4())[:12] # Save preprocessed image if provided preprocessed_image_path = None if preprocessed_image_base64: try: image_data = base64.b64decode(preprocessed_image_base64) image_filename = f"{feedback_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" image_path = self.images_dir / image_filename with open(image_path, 'wb') as f: f.write(image_data) preprocessed_image_path = str(image_path) except Exception as e: print(f"Failed to save preprocessed image: {e}") entry = { "id": feedback_id, "session_id": session_id, "timestamp": datetime.now().isoformat(), "filename": filename, "file_type": file_type, "patient_id": patient_id, "image_hash": image_hash, "predicted_label": predicted_label, "predicted_confidence": round(predicted_confidence, 2), "all_predictions": all_predictions, "is_correct": is_correct, "correct_label": correct_label if is_correct is False else None, "reviewer_notes": reviewer_notes, "preprocessed_image_path": preprocessed_image_path } feedback_list = self._load_feedback() feedback_list.append(entry) self._save_feedback(feedback_list) # Update session stats self.update_session_stats(session_id, is_correct=is_correct) return entry def get_feedback(self, session_id: Optional[str] = None) -> List[Dict]: """Get all feedback, optionally filtered by session.""" feedback_list = self._load_feedback() if session_id: feedback_list = [f for f in feedback_list if f.get("session_id") == session_id] return feedback_list def get_feedback_by_id(self, feedback_id: str) -> Optional[Dict]: """Get single feedback entry by ID.""" feedback_list = self._load_feedback() for entry in feedback_list: if entry.get("id") == feedback_id: return entry return None def delete_feedback(self, feedback_id: str) -> bool: """Delete feedback entry by ID.""" feedback_list = self._load_feedback() original_length = len(feedback_list) feedback_list = [f for f in feedback_list if f.get("id") != feedback_id] if len(feedback_list) < original_length: self._save_feedback(feedback_list) return True return False def export_to_csv(self, session_id: Optional[str] = None) -> str: """Export feedback to CSV format.""" feedback_list = self.get_feedback(session_id) if not feedback_list: return "" output = io.StringIO() # Define CSV columns fieldnames = [ "timestamp", "session_id", "filename", "patient_id", "file_type", "predicted_label", "predicted_confidence", "is_correct", "correct_label", "reviewer_notes", "preprocessed_image_path" ] writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction='ignore') writer.writeheader() for entry in feedback_list: # Flatten the entry for CSV is_correct = entry.get("is_correct") if is_correct is True: is_correct_str = "Yes" elif is_correct is False: is_correct_str = "No" else: is_correct_str = "Not Sure" row = { "timestamp": entry.get("timestamp", ""), "session_id": entry.get("session_id", ""), "filename": entry.get("filename", ""), "patient_id": entry.get("patient_id", ""), "file_type": entry.get("file_type", ""), "predicted_label": entry.get("predicted_label", ""), "predicted_confidence": entry.get("predicted_confidence", ""), "is_correct": is_correct_str, "correct_label": entry.get("correct_label", ""), "reviewer_notes": entry.get("reviewer_notes", ""), "preprocessed_image_path": entry.get("preprocessed_image_path", "") } writer.writerow(row) return output.getvalue() def get_statistics(self, session_id: Optional[str] = None) -> Dict: """Get feedback statistics.""" feedback_list = self.get_feedback(session_id) total = len(feedback_list) correct = sum(1 for f in feedback_list if f.get("is_correct") is True) incorrect = sum(1 for f in feedback_list if f.get("is_correct") is False) not_sure = sum(1 for f in feedback_list if f.get("is_correct") is None) # Count by predicted label label_stats = {} for entry in feedback_list: label = entry.get("predicted_label", "Unknown") if label not in label_stats: label_stats[label] = {"total": 0, "correct": 0, "incorrect": 0, "not_sure": 0} label_stats[label]["total"] += 1 is_correct = entry.get("is_correct") if is_correct is True: label_stats[label]["correct"] += 1 elif is_correct is False: label_stats[label]["incorrect"] += 1 else: label_stats[label]["not_sure"] += 1 # Calculate accuracy only from definite answers (excluding not sure) definite_answers = correct + incorrect accuracy = round(correct / definite_answers * 100, 1) if definite_answers > 0 else 0 return { "total_feedback": total, "correct_count": correct, "incorrect_count": incorrect, "not_sure_count": not_sure, "accuracy": accuracy, "by_label": label_stats } @staticmethod def compute_image_hash(image_bytes: bytes) -> str: """Compute SHA256 hash of image bytes.""" return hashlib.sha256(image_bytes).hexdigest()[:16] # Singleton instance feedback_service = FeedbackService()