esp_server / app.py
tejovk311's picture
Update app.py
ef5e09b verified
from flask import Flask, request, send_from_directory, render_template_string, jsonify
import os
from datetime import datetime
import cv2
import torch
from ultralytics import YOLO
import base64
import numpy as np
import google.generativeai as genai
import json
import time
import requests
# Initialize Flask app
app = Flask(__name__)
UPLOAD_FOLDER = '/tmp/received_images'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
latest_filename = None
last_extraction_result = []
# Gemini API setup
genai.configure(api_key="AIzaSyD7aLN4NphvsPuyB6N3FwVw5Nxzv7gxzv4")
gemma_model = genai.GenerativeModel(model_name="models/gemma-3-12b-it")
# Supabase credentials
SUPABASE_URL = "https://vynkcgoqjotnhtshbrdf.supabase.co"
SUPABASE_API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InZ5bmtjZ29xam90bmh0c2hicmRmIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTAxMzEwMzEsImV4cCI6MjA2NTcwNzAzMX0.TEC0I2WtCMcUTt6xo5RYIHUuiOcfsJLIiYdFuUVXZI4" # Truncated for brevity
def insert_to_supabase(values_dict):
url = f"{SUPABASE_URL}/rest/v1/vitals"
headers = {
"apikey": SUPABASE_API_KEY,
"Authorization": f"Bearer {SUPABASE_API_KEY}",
"Content-Type": "application/json"
}
try:
response = requests.post(url, headers=headers, json=values_dict)
if response.status_code == 201:
print("✅ Successfully inserted to Supabase.")
else:
print("❌ Failed to insert to Supabase:", response.text)
except Exception as e:
print("❌ Error sending data to Supabase:", e)
def prompt_engineered_extraction(base64_image):
try:
response = gemma_model.generate_content([{
"role": "user",
"parts": [
{"text": "Extract only the exact text from this image. Return the result as a plain JSON array like [\"value1\"]"},
{"inline_data": {"mime_type": "image/jpeg", "data": base64_image}}
]
}])
text = response.text.strip()
values = json.loads(text) if text.startswith("[") else text.strip('[]').split(',')
return ", ".join(v.strip().strip('"').strip("'") for v in values)
except Exception as e:
print("⚠️ Error during text extraction:", e)
return "Error"
def detect_and_extract_text(image_path, yolo_model_path):
img = cv2.imread(image_path)
if img is None:
return []
yolo = YOLO(yolo_model_path)
results = yolo(image_path)
output = []
vitals_dict = {}
for r in results:
for box, cls in zip(r.boxes.xyxy, r.boxes.cls):
x1, y1, x2, y2 = map(int, box)
class_name = yolo.names[int(cls)].lower()
cropped_img = img[y1:y2, x1:x2]
_, buffer = cv2.imencode('.jpg', cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB))
image_data = base64.b64encode(buffer).decode('utf-8')
extracted = prompt_engineered_extraction(image_data)
try:
if class_name == "heart rate":
vitals_dict["heart_rate"] = int(extracted)
elif class_name == "respiratory rate":
vitals_dict["respiratory_rate"] = int(extracted)
elif class_name == "oxygen saturation":
vitals_dict["oxygen_saturation"] = int(extracted)
elif class_name == "blood pressure":
vitals_dict["blood_pressure"] = extracted if "/" in extracted else None
except Exception as ve:
print(f"⚠️ Could not parse {class_name}: {extracted}")
output.append({
"class_name": class_name,
"extracted_text": extracted,
"bounding_box": [x1, y1, x2, y2]
})
if vitals_dict:
vitals_dict["mrd_number"] = request.headers.get("MRD", "unknown")
vitals_dict["timestamp"] = datetime.utcnow().isoformat()
insert_to_supabase(vitals_dict)
return output
@app.route('/upload', methods=['POST'])
def upload_file():
global latest_filename, last_extraction_result
img_data = request.data
filename = datetime.now().strftime("%Y%m%d_%H%M%S") + ".jpeg"
filepath = os.path.join(UPLOAD_FOLDER, filename)
with open(filepath, 'wb') as f:
f.write(img_data)
latest_filename = filename
last_extraction_result = detect_and_extract_text(filepath, "best.pt")
return 'OK', 200
@app.route('/images/<filename>')
def uploaded_file(filename):
return send_from_directory(UPLOAD_FOLDER, filename)
@app.route('/latest')
def latest():
return jsonify({"filename": latest_filename, "result": last_extraction_result})
@app.route('/')
def show_latest_image():
html = '''
<html><head><title>Monitor</title></head><body>
<h1>Latest Image</h1>
<img id="latest-img" style="max-width:600px;"><br>
<p id="filename">Waiting...</p><ul id="info"></ul>
<script>
let last = "";
setInterval(async () => {
const res = await fetch("/latest");
const data = await res.json();
if (data.filename !== last) {
document.getElementById("latest-img").src = "/images/" + data.filename;
document.getElementById("filename").innerText = data.filename;
let info = "";
for (const item of data.result) {
info += `<li><b>${item.class_name}</b>: ${item.extracted_text}</li>`;
}
document.getElementById("info").innerHTML = info;
last = data.filename;
}
}, 3000);
</script></body></html>
'''
return render_template_string(html)
@app.route('/vitals_data')
def vitals_data():
mrd = request.args.get("mrd")
url = f"{SUPABASE_URL}/rest/v1/vitals"
headers = {
"apikey": SUPABASE_API_KEY,
"Authorization": f"Bearer {SUPABASE_API_KEY}"
}
params = {} if not mrd else {"mrd_number": f"eq.{mrd}"}
response = requests.get(url, headers=headers, params=params)
return jsonify(response.json()) if response.status_code == 200 else jsonify({"error": response.text}), 500
@app.route('/dashboard')
def dashboard():
html = '''
<!DOCTYPE html>
<html>
<head>
<title>Vitals Dashboard</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
padding: 10px;
background: #f4f6f9;
margin: 0;
}
h2 {
text-align: center;
margin-bottom: 10px;
}
.input-container {
text-align: center;
margin-bottom: 15px;
}
input, button {
padding: 8px;
font-size: 14px;
margin: 5px;
}
.grid-container {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 10px;
}
.chart-box {
background: #fff;
border-radius: 8px;
padding: 5px;
box-shadow: 0 1px 6px rgba(0,0,0,0.1);
height: 300px;
}
</style>
</head>
<body>
<h2>Vitals Dashboard (Filter by MRD)</h2>
<div class="input-container">
<input type="text" id="mrd" placeholder="Enter MRD number" />
<button onclick="manualLoad()">Load</button>
</div>
<div class="grid-container">
<div class="chart-box" id="hr_chart"></div>
<div class="chart-box" id="rr_chart"></div>
<div class="chart-box" id="spo2_chart"></div>
<div class="chart-box" id="bp_chart"></div>
</div>
<script>
let currentMRD = "";
async function fetchAndPlot() {
if (!currentMRD) return;
const res = await fetch(`/vitals_data?mrd=${currentMRD}`);
const data = await res.json();
const timestamps = [], hr = [], rr = [], spo2 = [], sys = [], dia = [];
for (let v of data) {
timestamps.push(v.timestamp);
hr.push(v.heart_rate);
rr.push(v.respiratory_rate);
spo2.push(v.oxygen_saturation);
if (v.blood_pressure && v.blood_pressure.includes("/")) {
const [s, d] = v.blood_pressure.split("/");
sys.push(parseInt(s));
dia.push(parseInt(d));
} else {
sys.push(null);
dia.push(null);
}
}
const layoutHR = {
title: "Heart Rate", height: 280,
margin: { t: 30, l: 40, r: 20, b: 40 },
xaxis: { title: "Time", tickangle: -45 },
yaxis: { title: "bpm", range: [40, 130] }
};
const layoutRR = {
title: "Respiratory Rate", height: 280,
margin: { t: 30, l: 40, r: 20, b: 40 },
xaxis: { title: "Time", tickangle: -45 },
yaxis: { title: "breaths/min", range: [5, 30] }
};
const layoutSpO2 = {
title: "Oxygen Saturation", height: 280,
margin: { t: 30, l: 40, r: 20, b: 40 },
xaxis: { title: "Time", tickangle: -45 },
yaxis: { title: "%", range: [60, 100] }
};
const layoutBP = {
title: "Blood Pressure", height: 280,
margin: { t: 30, l: 40, r: 20, b: 40 },
xaxis: { title: "Time", tickangle: -45 },
yaxis: { title: "mmHg", range: [40, 200] }
};
Plotly.newPlot("hr_chart", [{
x: timestamps, y: hr, name: "Heart Rate",
type: "scatter", mode: "lines+markers", connectgaps: true
}], layoutHR);
Plotly.newPlot("rr_chart", [{
x: timestamps, y: rr, name: "Respiratory Rate",
type: "scatter", mode: "lines+markers", connectgaps: true
}], layoutRR);
Plotly.newPlot("spo2_chart", [{
x: timestamps, y: spo2, name: "Oxygen Saturation",
type: "scatter", mode: "lines+markers", connectgaps: true
}], layoutSpO2);
Plotly.newPlot("bp_chart", [
{ x: timestamps, y: sys, name: "Systolic", type: "scatter", mode: "lines+markers", connectgaps: true },
{ x: timestamps, y: dia, name: "Diastolic", type: "scatter", mode: "lines+markers", connectgaps: true }
], layoutBP);
}
function manualLoad() {
const mrdInput = document.getElementById("mrd").value.trim();
if (!mrdInput) return alert("Please enter an MRD number.");
currentMRD = mrdInput;
fetchAndPlot();
}
// Auto-refresh every 5 seconds
setInterval(() => {
if (currentMRD) fetchAndPlot();
}, 5000);
</script>
</body>
</html>
'''
return render_template_string(html)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)