TheBug95's picture
cambios en el readme para que HF funcione y en la base de datos
749d250
"""OphthalmoCapture — Database Layer (Metadata Only)
Option B: The database persists annotation metadata (labels, transcriptions,
doctor info, timestamps) for audit and history. It NEVER stores images or audio.
"""
import os
import datetime
import sqlite3
# Try importing firebase_admin
try:
import firebase_admin
from firebase_admin import credentials, firestore
FIREBASE_AVAILABLE = True
except ImportError:
FIREBASE_AVAILABLE = False
DB_TYPE = "SQLITE"
# Use /tmp for writable storage in Docker (ephemeral but always writable).
# Locally, falls back to the script's own directory.
_DB_DIR = "/tmp" if os.path.isdir("/tmp") else os.path.dirname(os.path.abspath(__file__))
DB_FILE = os.path.join(_DB_DIR, "annotations.db")
db_ref = None
def init_db():
"""Initialize the database connection (Firebase or SQLite fallback)."""
global DB_TYPE, db_ref
# Try Firebase first
if FIREBASE_AVAILABLE and os.path.exists("serviceAccountKey.json"):
try:
if not firebase_admin._apps:
cred = credentials.Certificate("serviceAccountKey.json")
firebase_admin.initialize_app(cred)
db_ref = firestore.client()
DB_TYPE = "FIREBASE"
return "FIREBASE"
except Exception as e:
print(f"Firebase init failed: {e}")
# Fallback to SQLite
try:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
image_filename TEXT NOT NULL,
label TEXT,
transcription TEXT,
doctor_name TEXT DEFAULT '',
created_at DATETIME
)''')
c.execute('''CREATE INDEX IF NOT EXISTS idx_ann_filename
ON annotations (image_filename)''')
# Migration: add session_id column if it doesn't exist yet
try:
c.execute("ALTER TABLE annotations ADD COLUMN session_id TEXT DEFAULT ''")
except sqlite3.OperationalError:
pass # column already exists
# Migration: add locs_data column (JSON string)
try:
c.execute("ALTER TABLE annotations ADD COLUMN locs_data TEXT DEFAULT ''")
except sqlite3.OperationalError:
pass # column already exists
c.execute('''CREATE INDEX IF NOT EXISTS idx_ann_session
ON annotations (image_filename, session_id)''')
conn.commit()
conn.close()
DB_TYPE = "SQLITE"
return "SQLITE"
except Exception as e:
raise Exception(f"Database initialization failed: {e}")
def save_annotation(image_filename, label, transcription, doctor_name=""):
"""Save an annotation record (always INSERT). Stores metadata only."""
timestamp = datetime.datetime.now()
if DB_TYPE == "FIREBASE":
db_ref.collection("annotations").add({
"imageFilename": image_filename,
"label": label,
"transcription": transcription,
"doctorName": doctor_name,
"createdAt": timestamp,
})
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
c.execute(
"INSERT INTO annotations "
"(image_filename, label, transcription, doctor_name, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(image_filename, label, transcription, doctor_name, timestamp),
)
conn.commit()
conn.close()
def save_or_update_annotation(
image_filename, label, transcription, doctor_name="", session_id="",
locs_data=None,
):
"""Upsert: within the same session, keep only ONE record per image.
If a record for (image_filename, session_id) already exists → UPDATE it.
Otherwise → INSERT a new one.
"""
import json as _json
timestamp = datetime.datetime.now()
locs_json = _json.dumps(locs_data or {}, ensure_ascii=False)
if DB_TYPE == "FIREBASE":
# Query for existing doc with matching filename + session
docs = list(
db_ref.collection("annotations")
.where("imageFilename", "==", image_filename)
.where("sessionId", "==", session_id)
.limit(1)
.stream()
)
if docs:
docs[0].reference.update({
"label": label,
"transcription": transcription,
"doctorName": doctor_name,
"locsData": locs_data or {},
"createdAt": timestamp,
})
else:
db_ref.collection("annotations").add({
"imageFilename": image_filename,
"label": label,
"transcription": transcription,
"doctorName": doctor_name,
"sessionId": session_id,
"locsData": locs_data or {},
"createdAt": timestamp,
})
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
# Check if a row for this image+session already exists
c.execute(
"SELECT id FROM annotations "
"WHERE image_filename = ? AND session_id = ? LIMIT 1",
(image_filename, session_id),
)
row = c.fetchone()
if row:
c.execute(
"UPDATE annotations "
"SET label = ?, transcription = ?, doctor_name = ?, "
"created_at = ?, locs_data = ? "
"WHERE id = ?",
(label, transcription, doctor_name, timestamp, locs_json, row[0]),
)
else:
c.execute(
"INSERT INTO annotations "
"(image_filename, label, transcription, doctor_name, "
"created_at, session_id, locs_data) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(image_filename, label, transcription, doctor_name,
timestamp, session_id, locs_json),
)
conn.commit()
conn.close()
def get_latest_annotation(image_filename):
"""Retrieve the most recent annotation for a given image filename."""
if DB_TYPE == "FIREBASE":
docs = (
db_ref.collection("annotations")
.where("imageFilename", "==", image_filename)
.order_by("createdAt", direction=firestore.Query.DESCENDING)
.limit(1)
.stream()
)
for doc in docs:
return doc.to_dict()
return None
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
c.execute(
"SELECT image_filename, label, transcription, doctor_name, created_at "
"FROM annotations WHERE image_filename = ? ORDER BY id DESC LIMIT 1",
(image_filename,),
)
row = c.fetchone()
conn.close()
if row:
return {
"imageFilename": row[0],
"label": row[1],
"transcription": row[2],
"doctorName": row[3],
"createdAt": row[4],
}
return None
def get_history_paginated(search_query="", page=1, per_page=10):
"""Retrieve annotation history with search and pagination.
Returns: (list_of_items, total_count)
"""
offset = (page - 1) * per_page
history = []
total_count = 0
if DB_TYPE == "FIREBASE":
ref = db_ref.collection("annotations")
if search_query:
query = (
ref.where("imageFilename", ">=", search_query)
.where("imageFilename", "<=", search_query + "\uf8ff")
)
else:
query = ref.order_by("createdAt", direction=firestore.Query.DESCENDING)
all_docs = list(query.stream())
total_count = len(all_docs)
for doc in all_docs[offset : offset + per_page]:
history.append(doc.to_dict())
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
# Count
if search_query:
c.execute(
"SELECT COUNT(*) FROM annotations WHERE image_filename LIKE ?",
(f"%{search_query}%",),
)
else:
c.execute("SELECT COUNT(*) FROM annotations")
total_count = c.fetchone()[0]
# Fetch page
sql = (
"SELECT image_filename, label, transcription, doctor_name, created_at "
"FROM annotations"
)
params = []
if search_query:
sql += " WHERE image_filename LIKE ?"
params.append(f"%{search_query}%")
sql += " ORDER BY id DESC LIMIT ? OFFSET ?"
params.extend([per_page, offset])
c.execute(sql, params)
for row in c.fetchall():
history.append({
"imageFilename": row[0],
"label": row[1],
"transcription": row[2],
"doctorName": row[3],
"createdAt": row[4],
})
conn.close()
return history, total_count
def get_annotation_stats():
"""Get summary statistics of all stored annotations."""
if DB_TYPE == "FIREBASE":
docs = list(db_ref.collection("annotations").stream())
total = len(docs)
labels = {}
for doc in docs:
lbl = doc.to_dict().get("label", "sin_etiqueta")
labels[lbl] = labels.get(lbl, 0) + 1
return {"total": total, "by_label": labels}
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM annotations")
total = c.fetchone()[0]
c.execute("SELECT label, COUNT(*) FROM annotations GROUP BY label")
labels = {row[0]: row[1] for row in c.fetchall()}
conn.close()
return {"total": total, "by_label": labels}
def get_previously_labeled_filenames(filenames: list[str]) -> dict[str, list[dict]]:
"""Check which filenames have been previously annotated in the DB.
Returns a dict mapping filename → list of annotation records.
Only filenames with at least one record are included.
"""
if not filenames:
return {}
result = {}
if DB_TYPE == "FIREBASE":
# Firestore doesn't support 'IN' with >30 items, so batch
for fname in filenames:
docs = (
db_ref.collection("annotations")
.where("imageFilename", "==", fname)
.order_by("createdAt", direction=firestore.Query.DESCENDING)
.stream()
)
records = [doc.to_dict() for doc in docs]
if records:
result[fname] = records
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
placeholders = ",".join("?" for _ in filenames)
c.execute(
f"SELECT image_filename, label, transcription, doctor_name, created_at "
f"FROM annotations WHERE image_filename IN ({placeholders}) "
f"ORDER BY created_at DESC",
filenames,
)
for row in c.fetchall():
fname = row[0]
record = {
"imageFilename": row[0],
"label": row[1],
"transcription": row[2],
"doctorName": row[3],
"createdAt": row[4],
}
result.setdefault(fname, []).append(record)
conn.close()
return result
def get_all_annotations_for_file(image_filename: str) -> list[dict]:
"""Retrieve ALL annotations for a given image filename, ordered by date desc."""
if DB_TYPE == "FIREBASE":
docs = (
db_ref.collection("annotations")
.where("imageFilename", "==", image_filename)
.order_by("createdAt", direction=firestore.Query.DESCENDING)
.stream()
)
return [doc.to_dict() for doc in docs]
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
c.execute(
"SELECT image_filename, label, transcription, doctor_name, created_at "
"FROM annotations WHERE image_filename = ? ORDER BY created_at DESC",
(image_filename,),
)
results = []
for row in c.fetchall():
results.append({
"imageFilename": row[0],
"label": row[1],
"transcription": row[2],
"doctorName": row[3],
"createdAt": row[4],
})
conn.close()
return results
def get_history_grouped(search_query="", page=1, per_page=10):
"""Retrieve annotation history GROUPED by image filename.
Returns: (list_of_groups, total_unique_images)
Each group = {"imageFilename": str, "annotations": [list of records]}
sorted by most recent annotation date per image.
"""
offset = (page - 1) * per_page
if DB_TYPE == "FIREBASE":
ref = db_ref.collection("annotations")
if search_query:
query = (
ref.where("imageFilename", ">=", search_query)
.where("imageFilename", "<=", search_query + "\uf8ff")
)
else:
query = ref.order_by("createdAt", direction=firestore.Query.DESCENDING)
all_docs = [doc.to_dict() for doc in query.stream()]
# Group by filename
grouped = {}
for doc in all_docs:
fname = doc.get("imageFilename", "")
grouped.setdefault(fname, []).append(doc)
# Sort groups by most recent annotation
sorted_groups = sorted(
grouped.items(),
key=lambda x: max(str(a.get("createdAt", "")) for a in x[1]),
reverse=True,
)
total_unique = len(sorted_groups)
page_groups = sorted_groups[offset:offset + per_page]
result = []
for fname, annotations in page_groups:
result.append({
"imageFilename": fname,
"annotations": sorted(
annotations,
key=lambda a: str(a.get("createdAt", "")),
reverse=True,
),
})
return result, total_unique
else:
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
c = conn.cursor()
# Count unique filenames
where = ""
params = []
if search_query:
where = " WHERE image_filename LIKE ?"
params.append(f"%{search_query}%")
c.execute(
f"SELECT COUNT(DISTINCT image_filename) FROM annotations{where}",
params,
)
total_unique = c.fetchone()[0]
# Get unique filenames for this page, sorted by most recent
c.execute(
f"SELECT image_filename, MAX(created_at) as latest "
f"FROM annotations{where} "
f"GROUP BY image_filename ORDER BY latest DESC "
f"LIMIT ? OFFSET ?",
params + [per_page, offset],
)
page_filenames = [row[0] for row in c.fetchall()]
# Fetch all annotations for those filenames
result = []
for fname in page_filenames:
c.execute(
"SELECT image_filename, label, transcription, doctor_name, created_at "
"FROM annotations WHERE image_filename = ? ORDER BY created_at DESC",
(fname,),
)
annotations = []
for row in c.fetchall():
annotations.append({
"imageFilename": row[0],
"label": row[1],
"transcription": row[2],
"doctorName": row[3],
"createdAt": row[4],
})
result.append({
"imageFilename": fname,
"annotations": annotations,
})
conn.close()
return result, total_unique