|
|
import os |
|
|
import pickle |
|
|
import shutil |
|
|
import uuid |
|
|
from fastapi import FastAPI, File, UploadFile |
|
|
from fastapi.responses import JSONResponse |
|
|
from pymongo.mongo_client import MongoClient |
|
|
from pymongo.server_api import ServerApi |
|
|
import cloudinary |
|
|
import cloudinary.uploader |
|
|
from cloudinary.utils import cloudinary_url |
|
|
from SkinBurns.SkinBurns_Classification import FullFeautures |
|
|
from SkinBurns.SkinBurns_Segmentation import segment_burn |
|
|
import requests |
|
|
import joblib |
|
|
import numpy as np |
|
|
from ECG.ECG_Classify import classify_ecg |
|
|
from ECG.ECG_MultiClass import analyze_ecg_pdf |
|
|
from ultralytics import YOLO |
|
|
import tensorflow as tf |
|
|
from fastapi import HTTPException |
|
|
from fastapi import WebSocket, WebSocketDisconnect |
|
|
import base64 |
|
|
import cv2 |
|
|
import time |
|
|
import tempfile |
|
|
import matplotlib.pyplot as plt |
|
|
import json |
|
|
import asyncio |
|
|
import concurrent.futures |
|
|
from threading import Thread |
|
|
from starlette.responses import StreamingResponse |
|
|
import threading |
|
|
import queue |
|
|
import logging |
|
|
import sys |
|
|
import re |
|
|
import signal |
|
|
|
|
|
from CPR_Module.Educational_Mode.CPRAnalyzer import CPRAnalyzer as OfflineAnalyzer |
|
|
from CPR_Module.Emergency_Mode.main import CPRAnalyzer as RealtimeAnalyzer |
|
|
from CPR_Module.Common.analysis_socket_server import AnalysisSocketServer |
|
|
from CPR_Module.Common.logging_config import cpr_logger |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
SCREENSHOTS_DIR = "screenshots" |
|
|
OUTPUT_DIR = "Output" |
|
|
UPLOAD_DIR = "uploads" |
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cloudinary.config( |
|
|
cloud_name = "darumyfpl", |
|
|
api_key = "493972437417214", |
|
|
api_secret = "jjOScVGochJYA7IxDam7L4HU2Ig", |
|
|
secure=True |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
def greet_json(): |
|
|
return {"Hello": "World!"} |
|
|
|
|
|
@app.post("/predict_burn") |
|
|
async def predict_burn(file: UploadFile = File(...)): |
|
|
try: |
|
|
temp_file_path = f"temp_{file.filename}" |
|
|
with open(temp_file_path, "wb") as temp_file: |
|
|
temp_file.write(await file.read()) |
|
|
|
|
|
loaded_svm = joblib.load('svm_model.pkl') |
|
|
|
|
|
print("SVM model loaded successfully") |
|
|
features = FullFeautures(temp_file_path) |
|
|
|
|
|
os.remove(temp_file_path) |
|
|
|
|
|
if features is None: |
|
|
return JSONResponse(content={"error": "Failed to extract features from the image."}, status_code=400) |
|
|
|
|
|
prediction = loaded_svm.predict(features.reshape(1, -1)) |
|
|
prediction_label = "Burn" if prediction[0] == 1 else "No Burn" |
|
|
|
|
|
if prediction[0] == 1: |
|
|
prediction_label = "First Class" |
|
|
elif prediction[0] == 2: |
|
|
prediction_label = "Second Class" |
|
|
else: |
|
|
prediction_label = "Zero Class" |
|
|
|
|
|
return { |
|
|
"prediction": prediction_label |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return JSONResponse(content={"error": str(e)}, status_code=500) |
|
|
|
|
|
@app.post("/segment_burn") |
|
|
async def segment_burn_endpoint(reference: UploadFile = File(...), patient: UploadFile = File(...)): |
|
|
try: |
|
|
reference_path = f"temp_ref_{reference.filename}" |
|
|
reference_bytes = await reference.read() |
|
|
with open(reference_path, "wb") as ref_file: |
|
|
ref_file.write(reference_bytes) |
|
|
|
|
|
patient_path = f"temp_patient_{patient.filename}" |
|
|
patient_bytes = await patient.read() |
|
|
with open(patient_path, "wb") as pat_file: |
|
|
pat_file.write(patient_bytes) |
|
|
|
|
|
burn_crop_clean, burn_crop_debug = segment_burn(patient_path, reference_path) |
|
|
|
|
|
burn_crop_clean_path = f"temp_burn_crop_clean_{uuid.uuid4()}.png" |
|
|
burn_crop_debug_path = f"temp_burn_crop_debug_{uuid.uuid4()}.png" |
|
|
|
|
|
|
|
|
plt.imsave(burn_crop_clean_path, burn_crop_clean) |
|
|
plt.imsave(burn_crop_debug_path, burn_crop_debug) |
|
|
|
|
|
crop_clean_upload = cloudinary.uploader.upload(burn_crop_clean_path, public_id=f"ref_{reference.filename}") |
|
|
crop_debug_upload = cloudinary.uploader.upload(burn_crop_debug_path, public_id=f"pat_{patient.filename}") |
|
|
crop_clean_url = crop_clean_upload["secure_url"] |
|
|
crop_debug_url = crop_debug_upload["secure_url"] |
|
|
|
|
|
os.remove(burn_crop_clean_path) |
|
|
os.remove(burn_crop_debug_path) |
|
|
|
|
|
|
|
|
return { |
|
|
"crop_clean_url": crop_clean_url, |
|
|
"crop_debug_url": crop_debug_url |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return JSONResponse(content={"error": str(e)}, status_code=500) |
|
|
|
|
|
|
|
|
@app.post("/classify-ecg") |
|
|
async def classify_ecg_endpoint(file: UploadFile = File(...)): |
|
|
model = joblib.load('voting_classifier_arrhythmia.pkl') |
|
|
|
|
|
try: |
|
|
temp_file_path = f"temp_{file.filename}" |
|
|
with open(temp_file_path, "wb") as temp_file: |
|
|
temp_file.write(await file.read()) |
|
|
|
|
|
result = classify_ecg(temp_file_path, model, is_pdf=True) |
|
|
|
|
|
os.remove(temp_file_path) |
|
|
|
|
|
return {"result": result} |
|
|
|
|
|
except Exception as e: |
|
|
return JSONResponse(content={"error": str(e)}, status_code=500) |
|
|
|
|
|
@app.post("/diagnose-ecg") |
|
|
async def diagnose_ecg(file: UploadFile = File(...)): |
|
|
try: |
|
|
temp_file_path = f"temp_{file.filename}" |
|
|
with open(temp_file_path, "wb") as temp_file: |
|
|
temp_file.write(await file.read()) |
|
|
|
|
|
model_path = 'Arrhythmia_Model_with_SMOTE.h5' |
|
|
|
|
|
result = analyze_ecg_pdf( |
|
|
temp_file_path, |
|
|
model_path, |
|
|
cleanup=False |
|
|
) |
|
|
|
|
|
os.remove(temp_file_path) |
|
|
|
|
|
if result and result["arrhythmia_class"]: |
|
|
return {"result": result["arrhythmia_class"]} |
|
|
else: |
|
|
return {"result": "No diagnosis"} |
|
|
|
|
|
except Exception as e: |
|
|
return JSONResponse(content={"error": str(e)}, status_code=500) |
|
|
|
|
|
|
|
|
def clean_warning_name(filename: str) -> str: |
|
|
|
|
|
name, _ = os.path.splitext(filename) |
|
|
|
|
|
cleaned = re.sub(r'_\d+$', '', name) |
|
|
|
|
|
cleaned_desc = cleaned.replace('_', ' ') |
|
|
return cleaned, cleaned_desc |
|
|
|
|
|
@app.post("/process_video") |
|
|
async def process_video(file: UploadFile = File(...)): |
|
|
if not file.content_type.startswith("video/"): |
|
|
raise HTTPException(status_code=400, detail="File must be a video.") |
|
|
|
|
|
print("File content type:", file.content_type) |
|
|
print("File filename:", file.filename) |
|
|
|
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
os.makedirs(SCREENSHOTS_DIR, exist_ok=True) |
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
folders = ["screenshots", "uploads", "Output"] |
|
|
|
|
|
for folder in folders: |
|
|
if os.path.exists(folder): |
|
|
for filename in os.listdir(folder): |
|
|
file_path = os.path.join(folder, filename) |
|
|
if os.path.isfile(file_path): |
|
|
os.remove(file_path) |
|
|
|
|
|
video_path = os.path.join(UPLOAD_DIR, file.filename) |
|
|
with open(video_path, "wb") as buffer: |
|
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
print(f"\n[API] CPR Analysis Started on {video_path}") |
|
|
|
|
|
video_output_path = os.path.join(OUTPUT_DIR, "Myoutput.mp4") |
|
|
plot_output_path = os.path.join(OUTPUT_DIR, "Myoutput.png") |
|
|
|
|
|
start_time = time.time() |
|
|
analyzer = OfflineAnalyzer(video_path, video_output_path, plot_output_path, requested_fps=30) |
|
|
|
|
|
chunks = analyzer.run_analysis_video() |
|
|
|
|
|
warnings = [] |
|
|
|
|
|
if os.path.exists(SCREENSHOTS_DIR): |
|
|
for filename in os.listdir(SCREENSHOTS_DIR): |
|
|
if filename.lower().endswith(('.png', '.jpg', '.jpeg')): |
|
|
local_path = os.path.join(SCREENSHOTS_DIR, filename) |
|
|
cleaned_name, description = clean_warning_name(filename) |
|
|
|
|
|
upload_result = cloudinary.uploader.upload( |
|
|
local_path, |
|
|
folder="posture_warnings", |
|
|
public_id=cleaned_name, |
|
|
overwrite=True |
|
|
) |
|
|
|
|
|
warnings.append({ |
|
|
"image_url": upload_result['secure_url'], |
|
|
"description": description |
|
|
}) |
|
|
|
|
|
video_path = "Output/Myoutput_final.mp4" |
|
|
|
|
|
if os.path.isfile(video_path): |
|
|
upload_result = cloudinary.uploader.upload_large( |
|
|
video_path, |
|
|
resource_type="video", |
|
|
folder="output_videos", |
|
|
public_id="Myoutput_final", |
|
|
overwrite=True |
|
|
) |
|
|
wholevideoURL = upload_result['secure_url'] |
|
|
else: |
|
|
wholevideoURL = None |
|
|
|
|
|
graphURL = None |
|
|
if os.path.isfile(plot_output_path): |
|
|
upload_graph_result = cloudinary.uploader.upload( |
|
|
plot_output_path, |
|
|
folder="output_graphs", |
|
|
public_id=os.path.splitext(os.path.basename(plot_output_path))[0], |
|
|
overwrite=True |
|
|
) |
|
|
graphURL = upload_graph_result['secure_url'] |
|
|
|
|
|
print(f"[API] CPR Analysis Completed on {video_path}") |
|
|
analysis_time = time.time() - start_time |
|
|
print(f"[TIMING] Analysis time: {analysis_time:.2f}s") |
|
|
|
|
|
if wholevideoURL is None: |
|
|
raise HTTPException(status_code=500, detail="No chunk data was generated from the video.") |
|
|
|
|
|
return JSONResponse(content={ |
|
|
"videoURL": wholevideoURL, |
|
|
"graphURL": graphURL, |
|
|
"warnings": warnings, |
|
|
"chunks": chunks, |
|
|
}) |
|
|
|
|
|
|
|
|
logger = logging.getLogger("cpr_logger") |
|
|
clients = set() |
|
|
analyzer_thread = None |
|
|
analysis_started = False |
|
|
analyzer_lock = threading.Lock() |
|
|
socket_server: AnalysisSocketServer = None |
|
|
|
|
|
|
|
|
async def forward_results_from_queue(websocket: WebSocket, warning_queue): |
|
|
try: |
|
|
while True: |
|
|
warnings = await asyncio.to_thread(warning_queue.get) |
|
|
serialized = json.dumps(warnings) |
|
|
await websocket.send_text(serialized) |
|
|
except asyncio.CancelledError: |
|
|
logger.info("[WebSocket] Forwarding task cancelled") |
|
|
except Exception as e: |
|
|
logger.error(f"[WebSocket] Error forwarding data: {e}") |
|
|
|
|
|
|
|
|
def run_cpr_analysis(source, requested_fps, output_path): |
|
|
global socket_server |
|
|
logger.info(f"[MAIN] CPR Analysis Started") |
|
|
|
|
|
requested_fps = 30 |
|
|
input_video = source |
|
|
|
|
|
output_dir = r"CPRRealTime\outputs" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
video_output_path = os.path.join(output_dir, "output.mp4") |
|
|
plot_output_path = os.path.join(output_dir, "output.png") |
|
|
|
|
|
logger.info(f"[CONFIG] Input video: {input_video}") |
|
|
logger.info(f"[CONFIG] Video output: {video_output_path}") |
|
|
logger.info(f"[CONFIG] Plot output: {plot_output_path}") |
|
|
|
|
|
initialization_start_time = time.time() |
|
|
analyzer = RealtimeAnalyzer(input_video, video_output_path, plot_output_path, requested_fps) |
|
|
socket_server = analyzer.socket_server |
|
|
analyzer.plot_output_path = plot_output_path |
|
|
|
|
|
elapsed_time = time.time() - initialization_start_time |
|
|
logger.info(f"[TIMING] Initialization time: {elapsed_time:.2f}s") |
|
|
|
|
|
try: |
|
|
analyzer.run_analysis() |
|
|
finally: |
|
|
if analyzer.socket_server: |
|
|
analyzer.socket_server.stop_server() |
|
|
logger.info("[MAIN] Analyzer stopped") |
|
|
|
|
|
|
|
|
@app.websocket("/ws/real") |
|
|
async def websocket_analysis(websocket: WebSocket): |
|
|
global analyzer_thread, analysis_started, socket_server |
|
|
|
|
|
await websocket.accept() |
|
|
clients.add(websocket) |
|
|
logger.info("[WebSocket] Flutter connected") |
|
|
|
|
|
try: |
|
|
source = await websocket.receive_text() |
|
|
logger.info(f"[WebSocket] Received stream URL: {source}") |
|
|
|
|
|
with analyzer_lock: |
|
|
if not analysis_started: |
|
|
requested_fps = 30 |
|
|
output_path = r"Output" |
|
|
|
|
|
analyzer_thread = threading.Thread( |
|
|
target=run_cpr_analysis, |
|
|
args=(source, requested_fps, output_path), |
|
|
daemon=True |
|
|
) |
|
|
analyzer_thread.start() |
|
|
analysis_started = True |
|
|
logger.info("[WebSocket] Analysis thread started") |
|
|
|
|
|
while socket_server is None or socket_server.warning_queue is None: |
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
forward_task = asyncio.create_task( |
|
|
forward_results_from_queue(websocket, socket_server.warning_queue) |
|
|
) |
|
|
|
|
|
while True: |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
logger.warning("[WebSocket] Client disconnected") |
|
|
if 'forward_task' in locals(): |
|
|
forward_task.cancel() |
|
|
except Exception as e: |
|
|
logger.error(f"[WebSocket] Error receiving stream URL: {str(e)}") |
|
|
await websocket.close(code=1011) |
|
|
finally: |
|
|
clients.discard(websocket) |
|
|
logger.info(f"[WebSocket] Active clients: {len(clients)}") |
|
|
|
|
|
if not clients and socket_server: |
|
|
logger.info("[WebSocket] No clients left. Stopping analyzer.") |
|
|
socket_server.stop_server() |
|
|
analysis_started = False |
|
|
socket_server = None |
|
|
|
|
|
|
|
|
def shutdown_handler(signum, frame): |
|
|
logger.info("Received shutdown signal") |
|
|
if socket_server: |
|
|
try: |
|
|
socket_server.stop_server() |
|
|
except Exception as e: |
|
|
logger.warning(f"Error during socket server shutdown: {e}") |
|
|
os._exit(0) |
|
|
|
|
|
signal.signal(signal.SIGINT, shutdown_handler) |
|
|
signal.signal(signal.SIGTERM, shutdown_handler) |
|
|
|