Hussein El-Hadidy
fixes
08ff78e
import os
import pickle
import shutil
import uuid
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import cloudinary
import cloudinary.uploader
from cloudinary.utils import cloudinary_url
from SkinBurns.SkinBurns_Classification import FullFeautures
from SkinBurns.SkinBurns_Segmentation import segment_burn
import requests
import joblib
import numpy as np
from ECG.ECG_Classify import classify_ecg
from ECG.ECG_MultiClass import analyze_ecg_pdf
from ultralytics import YOLO
import tensorflow as tf
from fastapi import HTTPException
from fastapi import WebSocket, WebSocketDisconnect
import base64
import cv2
import time
import tempfile
import matplotlib.pyplot as plt
import json
import asyncio
import concurrent.futures
from threading import Thread
from starlette.responses import StreamingResponse
import threading
import queue
import logging
import sys
import re
import signal
from CPR_Module.Educational_Mode.CPRAnalyzer import CPRAnalyzer as OfflineAnalyzer
from CPR_Module.Emergency_Mode.main import CPRAnalyzer as RealtimeAnalyzer
from CPR_Module.Common.analysis_socket_server import AnalysisSocketServer
from CPR_Module.Common.logging_config import cpr_logger
app = FastAPI()
SCREENSHOTS_DIR = "screenshots"
OUTPUT_DIR = "Output"
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Cloudinary config
cloudinary.config(
cloud_name = "darumyfpl",
api_key = "493972437417214",
api_secret = "jjOScVGochJYA7IxDam7L4HU2Ig",
secure=True
)
# Basic Hello route
@app.get("/")
def greet_json():
return {"Hello": "World!"}
@app.post("/predict_burn")
async def predict_burn(file: UploadFile = File(...)):
try:
temp_file_path = f"temp_{file.filename}"
with open(temp_file_path, "wb") as temp_file:
temp_file.write(await file.read())
loaded_svm = joblib.load('svm_model.pkl')
print("SVM model loaded successfully")
features = FullFeautures(temp_file_path)
os.remove(temp_file_path)
if features is None:
return JSONResponse(content={"error": "Failed to extract features from the image."}, status_code=400)
prediction = loaded_svm.predict(features.reshape(1, -1))
prediction_label = "Burn" if prediction[0] == 1 else "No Burn"
if prediction[0] == 1:
prediction_label = "First Class"
elif prediction[0] == 2:
prediction_label = "Second Class"
else:
prediction_label = "Zero Class"
return {
"prediction": prediction_label
}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/segment_burn")
async def segment_burn_endpoint(reference: UploadFile = File(...), patient: UploadFile = File(...)):
try:
reference_path = f"temp_ref_{reference.filename}"
reference_bytes = await reference.read()
with open(reference_path, "wb") as ref_file:
ref_file.write(reference_bytes)
patient_path = f"temp_patient_{patient.filename}"
patient_bytes = await patient.read()
with open(patient_path, "wb") as pat_file:
pat_file.write(patient_bytes)
burn_crop_clean, burn_crop_debug = segment_burn(patient_path, reference_path)
burn_crop_clean_path = f"temp_burn_crop_clean_{uuid.uuid4()}.png"
burn_crop_debug_path = f"temp_burn_crop_debug_{uuid.uuid4()}.png"
plt.imsave(burn_crop_clean_path, burn_crop_clean)
plt.imsave(burn_crop_debug_path, burn_crop_debug)
crop_clean_upload = cloudinary.uploader.upload(burn_crop_clean_path, public_id=f"ref_{reference.filename}")
crop_debug_upload = cloudinary.uploader.upload(burn_crop_debug_path, public_id=f"pat_{patient.filename}")
crop_clean_url = crop_clean_upload["secure_url"]
crop_debug_url = crop_debug_upload["secure_url"]
os.remove(burn_crop_clean_path)
os.remove(burn_crop_debug_path)
return {
"crop_clean_url": crop_clean_url,
"crop_debug_url": crop_debug_url
}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/classify-ecg")
async def classify_ecg_endpoint(file: UploadFile = File(...)):
model = joblib.load('voting_classifier_arrhythmia.pkl')
try:
temp_file_path = f"temp_{file.filename}"
with open(temp_file_path, "wb") as temp_file:
temp_file.write(await file.read())
result = classify_ecg(temp_file_path, model, is_pdf=True)
os.remove(temp_file_path)
return {"result": result}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/diagnose-ecg")
async def diagnose_ecg(file: UploadFile = File(...)):
try:
temp_file_path = f"temp_{file.filename}"
with open(temp_file_path, "wb") as temp_file:
temp_file.write(await file.read())
model_path = 'Arrhythmia_Model_with_SMOTE.h5'
result = analyze_ecg_pdf(
temp_file_path,
model_path,
cleanup=False
)
os.remove(temp_file_path)
if result and result["arrhythmia_class"]:
return {"result": result["arrhythmia_class"]}
else:
return {"result": "No diagnosis"}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
def clean_warning_name(filename: str) -> str:
name, _ = os.path.splitext(filename)
cleaned = re.sub(r'_\d+$', '', name)
cleaned_desc = cleaned.replace('_', ' ')
return cleaned, cleaned_desc
@app.post("/process_video")
async def process_video(file: UploadFile = File(...)):
if not file.content_type.startswith("video/"):
raise HTTPException(status_code=400, detail="File must be a video.")
print("File content type:", file.content_type)
print("File filename:", file.filename)
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
folders = ["screenshots", "uploads", "Output"]
for folder in folders:
if os.path.exists(folder):
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
if os.path.isfile(file_path):
os.remove(file_path)
video_path = os.path.join(UPLOAD_DIR, file.filename)
with open(video_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
print(f"\n[API] CPR Analysis Started on {video_path}")
video_output_path = os.path.join(OUTPUT_DIR, "Myoutput.mp4")
plot_output_path = os.path.join(OUTPUT_DIR, "Myoutput.png")
start_time = time.time()
analyzer = OfflineAnalyzer(video_path, video_output_path, plot_output_path, requested_fps=30)
chunks = analyzer.run_analysis_video()
warnings = []
if os.path.exists(SCREENSHOTS_DIR):
for filename in os.listdir(SCREENSHOTS_DIR):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
local_path = os.path.join(SCREENSHOTS_DIR, filename)
cleaned_name, description = clean_warning_name(filename)
upload_result = cloudinary.uploader.upload(
local_path,
folder="posture_warnings",
public_id=cleaned_name,
overwrite=True
)
warnings.append({
"image_url": upload_result['secure_url'],
"description": description
})
video_path = "Output/Myoutput_final.mp4"
if os.path.isfile(video_path):
upload_result = cloudinary.uploader.upload_large(
video_path,
resource_type="video",
folder="output_videos",
public_id="Myoutput_final",
overwrite=True
)
wholevideoURL = upload_result['secure_url']
else:
wholevideoURL = None
graphURL = None
if os.path.isfile(plot_output_path):
upload_graph_result = cloudinary.uploader.upload(
plot_output_path,
folder="output_graphs",
public_id=os.path.splitext(os.path.basename(plot_output_path))[0],
overwrite=True
)
graphURL = upload_graph_result['secure_url']
print(f"[API] CPR Analysis Completed on {video_path}")
analysis_time = time.time() - start_time
print(f"[TIMING] Analysis time: {analysis_time:.2f}s")
if wholevideoURL is None:
raise HTTPException(status_code=500, detail="No chunk data was generated from the video.")
return JSONResponse(content={
"videoURL": wholevideoURL,
"graphURL": graphURL,
"warnings": warnings,
"chunks": chunks,
})
logger = logging.getLogger("cpr_logger")
clients = set()
analyzer_thread = None
analysis_started = False
analyzer_lock = threading.Lock()
socket_server: AnalysisSocketServer = None
async def forward_results_from_queue(websocket: WebSocket, warning_queue):
try:
while True:
warnings = await asyncio.to_thread(warning_queue.get)
serialized = json.dumps(warnings)
await websocket.send_text(serialized)
except asyncio.CancelledError:
logger.info("[WebSocket] Forwarding task cancelled")
except Exception as e:
logger.error(f"[WebSocket] Error forwarding data: {e}")
def run_cpr_analysis(source, requested_fps, output_path):
global socket_server
logger.info(f"[MAIN] CPR Analysis Started")
requested_fps = 30
input_video = source
output_dir = r"CPRRealTime\outputs"
os.makedirs(output_dir, exist_ok=True)
video_output_path = os.path.join(output_dir, "output.mp4")
plot_output_path = os.path.join(output_dir, "output.png")
logger.info(f"[CONFIG] Input video: {input_video}")
logger.info(f"[CONFIG] Video output: {video_output_path}")
logger.info(f"[CONFIG] Plot output: {plot_output_path}")
initialization_start_time = time.time()
analyzer = RealtimeAnalyzer(input_video, video_output_path, plot_output_path, requested_fps)
socket_server = analyzer.socket_server
analyzer.plot_output_path = plot_output_path
elapsed_time = time.time() - initialization_start_time
logger.info(f"[TIMING] Initialization time: {elapsed_time:.2f}s")
try:
analyzer.run_analysis()
finally:
if analyzer.socket_server:
analyzer.socket_server.stop_server()
logger.info("[MAIN] Analyzer stopped")
@app.websocket("/ws/real")
async def websocket_analysis(websocket: WebSocket):
global analyzer_thread, analysis_started, socket_server
await websocket.accept()
clients.add(websocket)
logger.info("[WebSocket] Flutter connected")
try:
source = await websocket.receive_text()
logger.info(f"[WebSocket] Received stream URL: {source}")
with analyzer_lock:
if not analysis_started:
requested_fps = 30
output_path = r"Output"
analyzer_thread = threading.Thread(
target=run_cpr_analysis,
args=(source, requested_fps, output_path),
daemon=True
)
analyzer_thread.start()
analysis_started = True
logger.info("[WebSocket] Analysis thread started")
while socket_server is None or socket_server.warning_queue is None:
await asyncio.sleep(0.1)
forward_task = asyncio.create_task(
forward_results_from_queue(websocket, socket_server.warning_queue)
)
while True:
await asyncio.sleep(1)
except WebSocketDisconnect:
logger.warning("[WebSocket] Client disconnected")
if 'forward_task' in locals():
forward_task.cancel()
except Exception as e:
logger.error(f"[WebSocket] Error receiving stream URL: {str(e)}")
await websocket.close(code=1011)
finally:
clients.discard(websocket)
logger.info(f"[WebSocket] Active clients: {len(clients)}")
if not clients and socket_server:
logger.info("[WebSocket] No clients left. Stopping analyzer.")
socket_server.stop_server()
analysis_started = False
socket_server = None
def shutdown_handler(signum, frame):
logger.info("Received shutdown signal")
if socket_server:
try:
socket_server.stop_server()
except Exception as e:
logger.warning(f"Error during socket server shutdown: {e}")
os._exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)