Spaces:
Sleeping
Sleeping
| """OphthalmoCapture — Database Layer (Metadata Only) | |
| Option B: The database persists annotation metadata (labels, transcriptions, | |
| doctor info, timestamps) for audit and history. It NEVER stores images or audio. | |
| """ | |
| import os | |
| import datetime | |
| import sqlite3 | |
| # Try importing firebase_admin | |
| try: | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| FIREBASE_AVAILABLE = True | |
| except ImportError: | |
| FIREBASE_AVAILABLE = False | |
| DB_TYPE = "SQLITE" | |
| # Use /tmp for writable storage in Docker (ephemeral but always writable). | |
| # Locally, falls back to the script's own directory. | |
| _DB_DIR = "/tmp" if os.path.isdir("/tmp") else os.path.dirname(os.path.abspath(__file__)) | |
| DB_FILE = os.path.join(_DB_DIR, "annotations.db") | |
| db_ref = None | |
| def init_db(): | |
| """Initialize the database connection (Firebase or SQLite fallback).""" | |
| global DB_TYPE, db_ref | |
| # Try Firebase first | |
| if FIREBASE_AVAILABLE and os.path.exists("serviceAccountKey.json"): | |
| try: | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate("serviceAccountKey.json") | |
| firebase_admin.initialize_app(cred) | |
| db_ref = firestore.client() | |
| DB_TYPE = "FIREBASE" | |
| return "FIREBASE" | |
| except Exception as e: | |
| print(f"Firebase init failed: {e}") | |
| # Fallback to SQLite | |
| try: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| c.execute('''CREATE TABLE IF NOT EXISTS annotations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| image_filename TEXT NOT NULL, | |
| label TEXT, | |
| transcription TEXT, | |
| doctor_name TEXT DEFAULT '', | |
| created_at DATETIME | |
| )''') | |
| c.execute('''CREATE INDEX IF NOT EXISTS idx_ann_filename | |
| ON annotations (image_filename)''') | |
| # Migration: add session_id column if it doesn't exist yet | |
| try: | |
| c.execute("ALTER TABLE annotations ADD COLUMN session_id TEXT DEFAULT ''") | |
| except sqlite3.OperationalError: | |
| pass # column already exists | |
| # Migration: add locs_data column (JSON string) | |
| try: | |
| c.execute("ALTER TABLE annotations ADD COLUMN locs_data TEXT DEFAULT ''") | |
| except sqlite3.OperationalError: | |
| pass # column already exists | |
| c.execute('''CREATE INDEX IF NOT EXISTS idx_ann_session | |
| ON annotations (image_filename, session_id)''') | |
| conn.commit() | |
| conn.close() | |
| DB_TYPE = "SQLITE" | |
| return "SQLITE" | |
| except Exception as e: | |
| raise Exception(f"Database initialization failed: {e}") | |
| def save_annotation(image_filename, label, transcription, doctor_name=""): | |
| """Save an annotation record (always INSERT). Stores metadata only.""" | |
| timestamp = datetime.datetime.now() | |
| if DB_TYPE == "FIREBASE": | |
| db_ref.collection("annotations").add({ | |
| "imageFilename": image_filename, | |
| "label": label, | |
| "transcription": transcription, | |
| "doctorName": doctor_name, | |
| "createdAt": timestamp, | |
| }) | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| c.execute( | |
| "INSERT INTO annotations " | |
| "(image_filename, label, transcription, doctor_name, created_at) " | |
| "VALUES (?, ?, ?, ?, ?)", | |
| (image_filename, label, transcription, doctor_name, timestamp), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def save_or_update_annotation( | |
| image_filename, label, transcription, doctor_name="", session_id="", | |
| locs_data=None, | |
| ): | |
| """Upsert: within the same session, keep only ONE record per image. | |
| If a record for (image_filename, session_id) already exists → UPDATE it. | |
| Otherwise → INSERT a new one. | |
| """ | |
| import json as _json | |
| timestamp = datetime.datetime.now() | |
| locs_json = _json.dumps(locs_data or {}, ensure_ascii=False) | |
| if DB_TYPE == "FIREBASE": | |
| # Query for existing doc with matching filename + session | |
| docs = list( | |
| db_ref.collection("annotations") | |
| .where("imageFilename", "==", image_filename) | |
| .where("sessionId", "==", session_id) | |
| .limit(1) | |
| .stream() | |
| ) | |
| if docs: | |
| docs[0].reference.update({ | |
| "label": label, | |
| "transcription": transcription, | |
| "doctorName": doctor_name, | |
| "locsData": locs_data or {}, | |
| "createdAt": timestamp, | |
| }) | |
| else: | |
| db_ref.collection("annotations").add({ | |
| "imageFilename": image_filename, | |
| "label": label, | |
| "transcription": transcription, | |
| "doctorName": doctor_name, | |
| "sessionId": session_id, | |
| "locsData": locs_data or {}, | |
| "createdAt": timestamp, | |
| }) | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| # Check if a row for this image+session already exists | |
| c.execute( | |
| "SELECT id FROM annotations " | |
| "WHERE image_filename = ? AND session_id = ? LIMIT 1", | |
| (image_filename, session_id), | |
| ) | |
| row = c.fetchone() | |
| if row: | |
| c.execute( | |
| "UPDATE annotations " | |
| "SET label = ?, transcription = ?, doctor_name = ?, " | |
| "created_at = ?, locs_data = ? " | |
| "WHERE id = ?", | |
| (label, transcription, doctor_name, timestamp, locs_json, row[0]), | |
| ) | |
| else: | |
| c.execute( | |
| "INSERT INTO annotations " | |
| "(image_filename, label, transcription, doctor_name, " | |
| "created_at, session_id, locs_data) " | |
| "VALUES (?, ?, ?, ?, ?, ?, ?)", | |
| (image_filename, label, transcription, doctor_name, | |
| timestamp, session_id, locs_json), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def get_latest_annotation(image_filename): | |
| """Retrieve the most recent annotation for a given image filename.""" | |
| if DB_TYPE == "FIREBASE": | |
| docs = ( | |
| db_ref.collection("annotations") | |
| .where("imageFilename", "==", image_filename) | |
| .order_by("createdAt", direction=firestore.Query.DESCENDING) | |
| .limit(1) | |
| .stream() | |
| ) | |
| for doc in docs: | |
| return doc.to_dict() | |
| return None | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| c.execute( | |
| "SELECT image_filename, label, transcription, doctor_name, created_at " | |
| "FROM annotations WHERE image_filename = ? ORDER BY id DESC LIMIT 1", | |
| (image_filename,), | |
| ) | |
| row = c.fetchone() | |
| conn.close() | |
| if row: | |
| return { | |
| "imageFilename": row[0], | |
| "label": row[1], | |
| "transcription": row[2], | |
| "doctorName": row[3], | |
| "createdAt": row[4], | |
| } | |
| return None | |
| def get_history_paginated(search_query="", page=1, per_page=10): | |
| """Retrieve annotation history with search and pagination. | |
| Returns: (list_of_items, total_count) | |
| """ | |
| offset = (page - 1) * per_page | |
| history = [] | |
| total_count = 0 | |
| if DB_TYPE == "FIREBASE": | |
| ref = db_ref.collection("annotations") | |
| if search_query: | |
| query = ( | |
| ref.where("imageFilename", ">=", search_query) | |
| .where("imageFilename", "<=", search_query + "\uf8ff") | |
| ) | |
| else: | |
| query = ref.order_by("createdAt", direction=firestore.Query.DESCENDING) | |
| all_docs = list(query.stream()) | |
| total_count = len(all_docs) | |
| for doc in all_docs[offset : offset + per_page]: | |
| history.append(doc.to_dict()) | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| # Count | |
| if search_query: | |
| c.execute( | |
| "SELECT COUNT(*) FROM annotations WHERE image_filename LIKE ?", | |
| (f"%{search_query}%",), | |
| ) | |
| else: | |
| c.execute("SELECT COUNT(*) FROM annotations") | |
| total_count = c.fetchone()[0] | |
| # Fetch page | |
| sql = ( | |
| "SELECT image_filename, label, transcription, doctor_name, created_at " | |
| "FROM annotations" | |
| ) | |
| params = [] | |
| if search_query: | |
| sql += " WHERE image_filename LIKE ?" | |
| params.append(f"%{search_query}%") | |
| sql += " ORDER BY id DESC LIMIT ? OFFSET ?" | |
| params.extend([per_page, offset]) | |
| c.execute(sql, params) | |
| for row in c.fetchall(): | |
| history.append({ | |
| "imageFilename": row[0], | |
| "label": row[1], | |
| "transcription": row[2], | |
| "doctorName": row[3], | |
| "createdAt": row[4], | |
| }) | |
| conn.close() | |
| return history, total_count | |
| def get_annotation_stats(): | |
| """Get summary statistics of all stored annotations.""" | |
| if DB_TYPE == "FIREBASE": | |
| docs = list(db_ref.collection("annotations").stream()) | |
| total = len(docs) | |
| labels = {} | |
| for doc in docs: | |
| lbl = doc.to_dict().get("label", "sin_etiqueta") | |
| labels[lbl] = labels.get(lbl, 0) + 1 | |
| return {"total": total, "by_label": labels} | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| c.execute("SELECT COUNT(*) FROM annotations") | |
| total = c.fetchone()[0] | |
| c.execute("SELECT label, COUNT(*) FROM annotations GROUP BY label") | |
| labels = {row[0]: row[1] for row in c.fetchall()} | |
| conn.close() | |
| return {"total": total, "by_label": labels} | |
| def get_previously_labeled_filenames(filenames: list[str]) -> dict[str, list[dict]]: | |
| """Check which filenames have been previously annotated in the DB. | |
| Returns a dict mapping filename → list of annotation records. | |
| Only filenames with at least one record are included. | |
| """ | |
| if not filenames: | |
| return {} | |
| result = {} | |
| if DB_TYPE == "FIREBASE": | |
| # Firestore doesn't support 'IN' with >30 items, so batch | |
| for fname in filenames: | |
| docs = ( | |
| db_ref.collection("annotations") | |
| .where("imageFilename", "==", fname) | |
| .order_by("createdAt", direction=firestore.Query.DESCENDING) | |
| .stream() | |
| ) | |
| records = [doc.to_dict() for doc in docs] | |
| if records: | |
| result[fname] = records | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| placeholders = ",".join("?" for _ in filenames) | |
| c.execute( | |
| f"SELECT image_filename, label, transcription, doctor_name, created_at " | |
| f"FROM annotations WHERE image_filename IN ({placeholders}) " | |
| f"ORDER BY created_at DESC", | |
| filenames, | |
| ) | |
| for row in c.fetchall(): | |
| fname = row[0] | |
| record = { | |
| "imageFilename": row[0], | |
| "label": row[1], | |
| "transcription": row[2], | |
| "doctorName": row[3], | |
| "createdAt": row[4], | |
| } | |
| result.setdefault(fname, []).append(record) | |
| conn.close() | |
| return result | |
| def get_all_annotations_for_file(image_filename: str) -> list[dict]: | |
| """Retrieve ALL annotations for a given image filename, ordered by date desc.""" | |
| if DB_TYPE == "FIREBASE": | |
| docs = ( | |
| db_ref.collection("annotations") | |
| .where("imageFilename", "==", image_filename) | |
| .order_by("createdAt", direction=firestore.Query.DESCENDING) | |
| .stream() | |
| ) | |
| return [doc.to_dict() for doc in docs] | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| c.execute( | |
| "SELECT image_filename, label, transcription, doctor_name, created_at " | |
| "FROM annotations WHERE image_filename = ? ORDER BY created_at DESC", | |
| (image_filename,), | |
| ) | |
| results = [] | |
| for row in c.fetchall(): | |
| results.append({ | |
| "imageFilename": row[0], | |
| "label": row[1], | |
| "transcription": row[2], | |
| "doctorName": row[3], | |
| "createdAt": row[4], | |
| }) | |
| conn.close() | |
| return results | |
| def get_history_grouped(search_query="", page=1, per_page=10): | |
| """Retrieve annotation history GROUPED by image filename. | |
| Returns: (list_of_groups, total_unique_images) | |
| Each group = {"imageFilename": str, "annotations": [list of records]} | |
| sorted by most recent annotation date per image. | |
| """ | |
| offset = (page - 1) * per_page | |
| if DB_TYPE == "FIREBASE": | |
| ref = db_ref.collection("annotations") | |
| if search_query: | |
| query = ( | |
| ref.where("imageFilename", ">=", search_query) | |
| .where("imageFilename", "<=", search_query + "\uf8ff") | |
| ) | |
| else: | |
| query = ref.order_by("createdAt", direction=firestore.Query.DESCENDING) | |
| all_docs = [doc.to_dict() for doc in query.stream()] | |
| # Group by filename | |
| grouped = {} | |
| for doc in all_docs: | |
| fname = doc.get("imageFilename", "") | |
| grouped.setdefault(fname, []).append(doc) | |
| # Sort groups by most recent annotation | |
| sorted_groups = sorted( | |
| grouped.items(), | |
| key=lambda x: max(str(a.get("createdAt", "")) for a in x[1]), | |
| reverse=True, | |
| ) | |
| total_unique = len(sorted_groups) | |
| page_groups = sorted_groups[offset:offset + per_page] | |
| result = [] | |
| for fname, annotations in page_groups: | |
| result.append({ | |
| "imageFilename": fname, | |
| "annotations": sorted( | |
| annotations, | |
| key=lambda a: str(a.get("createdAt", "")), | |
| reverse=True, | |
| ), | |
| }) | |
| return result, total_unique | |
| else: | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| c = conn.cursor() | |
| # Count unique filenames | |
| where = "" | |
| params = [] | |
| if search_query: | |
| where = " WHERE image_filename LIKE ?" | |
| params.append(f"%{search_query}%") | |
| c.execute( | |
| f"SELECT COUNT(DISTINCT image_filename) FROM annotations{where}", | |
| params, | |
| ) | |
| total_unique = c.fetchone()[0] | |
| # Get unique filenames for this page, sorted by most recent | |
| c.execute( | |
| f"SELECT image_filename, MAX(created_at) as latest " | |
| f"FROM annotations{where} " | |
| f"GROUP BY image_filename ORDER BY latest DESC " | |
| f"LIMIT ? OFFSET ?", | |
| params + [per_page, offset], | |
| ) | |
| page_filenames = [row[0] for row in c.fetchall()] | |
| # Fetch all annotations for those filenames | |
| result = [] | |
| for fname in page_filenames: | |
| c.execute( | |
| "SELECT image_filename, label, transcription, doctor_name, created_at " | |
| "FROM annotations WHERE image_filename = ? ORDER BY created_at DESC", | |
| (fname,), | |
| ) | |
| annotations = [] | |
| for row in c.fetchall(): | |
| annotations.append({ | |
| "imageFilename": row[0], | |
| "label": row[1], | |
| "transcription": row[2], | |
| "doctorName": row[3], | |
| "createdAt": row[4], | |
| }) | |
| result.append({ | |
| "imageFilename": fname, | |
| "annotations": annotations, | |
| }) | |
| conn.close() | |
| return result, total_unique | |