Numan Saeed
Add Not Sure feedback, save preprocessed scans, add Help tab
f84688d
"""
Feedback service for storing and managing prediction feedback.
Stores feedback in a local JSON file for simplicity and privacy.
"""
import json
import uuid
import hashlib
import base64
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
import csv
import io
class FeedbackService:
"""Service for managing prediction feedback with local JSON storage."""
def __init__(self, storage_dir: Optional[Path] = None):
"""Initialize feedback service with storage directory."""
if storage_dir is None:
# Default to user's home directory for persistence
storage_dir = Path.home() / ".fetalclip"
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.feedback_file = self.storage_dir / "feedback.json"
self.sessions_file = self.storage_dir / "sessions.json"
self.images_dir = self.storage_dir / "images"
self.images_dir.mkdir(parents=True, exist_ok=True)
# Initialize files if they don't exist
if not self.feedback_file.exists():
self._save_feedback([])
if not self.sessions_file.exists():
self._save_sessions({})
def _load_feedback(self) -> List[Dict]:
"""Load feedback from JSON file."""
try:
with open(self.feedback_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return []
def _save_feedback(self, feedback: List[Dict]):
"""Save feedback to JSON file."""
with open(self.feedback_file, 'w') as f:
json.dump(feedback, f, indent=2, default=str)
def _load_sessions(self) -> Dict:
"""Load sessions from JSON file."""
try:
with open(self.sessions_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {}
def _save_sessions(self, sessions: Dict):
"""Save sessions to JSON file."""
with open(self.sessions_file, 'w') as f:
json.dump(sessions, f, indent=2, default=str)
def create_session(self) -> str:
"""Create a new session and return its ID."""
session_id = str(uuid.uuid4())[:8]
sessions = self._load_sessions()
sessions[session_id] = {
"created_at": datetime.now().isoformat(),
"image_count": 0,
"feedback_count": 0,
"correct_count": 0,
"incorrect_count": 0,
"not_sure_count": 0
}
self._save_sessions(sessions)
return session_id
def get_session(self, session_id: str) -> Optional[Dict]:
"""Get session info by ID."""
sessions = self._load_sessions()
return sessions.get(session_id)
def get_all_sessions(self) -> Dict:
"""Get all sessions."""
return self._load_sessions()
def update_session_stats(self, session_id: str, is_correct: Optional[bool] = None, image_analyzed: bool = False):
"""Update session statistics."""
sessions = self._load_sessions()
if session_id not in sessions:
sessions[session_id] = {
"created_at": datetime.now().isoformat(),
"image_count": 0,
"feedback_count": 0,
"correct_count": 0,
"incorrect_count": 0,
"not_sure_count": 0
}
# Ensure not_sure_count exists for older sessions
if "not_sure_count" not in sessions[session_id]:
sessions[session_id]["not_sure_count"] = 0
if image_analyzed:
sessions[session_id]["image_count"] += 1
if is_correct is True:
sessions[session_id]["feedback_count"] += 1
sessions[session_id]["correct_count"] += 1
elif is_correct is False:
sessions[session_id]["feedback_count"] += 1
sessions[session_id]["incorrect_count"] += 1
elif is_correct is None:
# Not sure case - only increment feedback_count and not_sure_count
sessions[session_id]["feedback_count"] += 1
sessions[session_id]["not_sure_count"] += 1
self._save_sessions(sessions)
def add_feedback(
self,
session_id: str,
filename: str,
file_type: str,
predicted_label: str,
predicted_confidence: float,
all_predictions: List[Dict],
is_correct: Optional[bool],
correct_label: Optional[str] = None,
reviewer_notes: Optional[str] = None,
patient_id: Optional[str] = None,
image_hash: Optional[str] = None,
preprocessed_image_base64: Optional[str] = None
) -> Dict:
"""Add new feedback entry."""
feedback_id = str(uuid.uuid4())[:12]
# Save preprocessed image if provided
preprocessed_image_path = None
if preprocessed_image_base64:
try:
image_data = base64.b64decode(preprocessed_image_base64)
image_filename = f"{feedback_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
image_path = self.images_dir / image_filename
with open(image_path, 'wb') as f:
f.write(image_data)
preprocessed_image_path = str(image_path)
except Exception as e:
print(f"Failed to save preprocessed image: {e}")
entry = {
"id": feedback_id,
"session_id": session_id,
"timestamp": datetime.now().isoformat(),
"filename": filename,
"file_type": file_type,
"patient_id": patient_id,
"image_hash": image_hash,
"predicted_label": predicted_label,
"predicted_confidence": round(predicted_confidence, 2),
"all_predictions": all_predictions,
"is_correct": is_correct,
"correct_label": correct_label if is_correct is False else None,
"reviewer_notes": reviewer_notes,
"preprocessed_image_path": preprocessed_image_path
}
feedback_list = self._load_feedback()
feedback_list.append(entry)
self._save_feedback(feedback_list)
# Update session stats
self.update_session_stats(session_id, is_correct=is_correct)
return entry
def get_feedback(self, session_id: Optional[str] = None) -> List[Dict]:
"""Get all feedback, optionally filtered by session."""
feedback_list = self._load_feedback()
if session_id:
feedback_list = [f for f in feedback_list if f.get("session_id") == session_id]
return feedback_list
def get_feedback_by_id(self, feedback_id: str) -> Optional[Dict]:
"""Get single feedback entry by ID."""
feedback_list = self._load_feedback()
for entry in feedback_list:
if entry.get("id") == feedback_id:
return entry
return None
def delete_feedback(self, feedback_id: str) -> bool:
"""Delete feedback entry by ID."""
feedback_list = self._load_feedback()
original_length = len(feedback_list)
feedback_list = [f for f in feedback_list if f.get("id") != feedback_id]
if len(feedback_list) < original_length:
self._save_feedback(feedback_list)
return True
return False
def export_to_csv(self, session_id: Optional[str] = None) -> str:
"""Export feedback to CSV format."""
feedback_list = self.get_feedback(session_id)
if not feedback_list:
return ""
output = io.StringIO()
# Define CSV columns
fieldnames = [
"timestamp",
"session_id",
"filename",
"patient_id",
"file_type",
"predicted_label",
"predicted_confidence",
"is_correct",
"correct_label",
"reviewer_notes",
"preprocessed_image_path"
]
writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for entry in feedback_list:
# Flatten the entry for CSV
is_correct = entry.get("is_correct")
if is_correct is True:
is_correct_str = "Yes"
elif is_correct is False:
is_correct_str = "No"
else:
is_correct_str = "Not Sure"
row = {
"timestamp": entry.get("timestamp", ""),
"session_id": entry.get("session_id", ""),
"filename": entry.get("filename", ""),
"patient_id": entry.get("patient_id", ""),
"file_type": entry.get("file_type", ""),
"predicted_label": entry.get("predicted_label", ""),
"predicted_confidence": entry.get("predicted_confidence", ""),
"is_correct": is_correct_str,
"correct_label": entry.get("correct_label", ""),
"reviewer_notes": entry.get("reviewer_notes", ""),
"preprocessed_image_path": entry.get("preprocessed_image_path", "")
}
writer.writerow(row)
return output.getvalue()
def get_statistics(self, session_id: Optional[str] = None) -> Dict:
"""Get feedback statistics."""
feedback_list = self.get_feedback(session_id)
total = len(feedback_list)
correct = sum(1 for f in feedback_list if f.get("is_correct") is True)
incorrect = sum(1 for f in feedback_list if f.get("is_correct") is False)
not_sure = sum(1 for f in feedback_list if f.get("is_correct") is None)
# Count by predicted label
label_stats = {}
for entry in feedback_list:
label = entry.get("predicted_label", "Unknown")
if label not in label_stats:
label_stats[label] = {"total": 0, "correct": 0, "incorrect": 0, "not_sure": 0}
label_stats[label]["total"] += 1
is_correct = entry.get("is_correct")
if is_correct is True:
label_stats[label]["correct"] += 1
elif is_correct is False:
label_stats[label]["incorrect"] += 1
else:
label_stats[label]["not_sure"] += 1
# Calculate accuracy only from definite answers (excluding not sure)
definite_answers = correct + incorrect
accuracy = round(correct / definite_answers * 100, 1) if definite_answers > 0 else 0
return {
"total_feedback": total,
"correct_count": correct,
"incorrect_count": incorrect,
"not_sure_count": not_sure,
"accuracy": accuracy,
"by_label": label_stats
}
@staticmethod
def compute_image_hash(image_bytes: bytes) -> str:
"""Compute SHA256 hash of image bytes."""
return hashlib.sha256(image_bytes).hexdigest()[:16]
# Singleton instance
feedback_service = FeedbackService()