import os import pickle import shutil import uuid from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse from pymongo.mongo_client import MongoClient from pymongo.server_api import ServerApi import cloudinary import cloudinary.uploader from cloudinary.utils import cloudinary_url from SkinBurns.SkinBurns_Classification import FullFeautures from SkinBurns.SkinBurns_Segmentation import segment_burn import requests import joblib import numpy as np from ECG.ECG_Classify import classify_ecg from ECG.ECG_MultiClass import analyze_ecg_pdf from ultralytics import YOLO import tensorflow as tf from fastapi import HTTPException from fastapi import WebSocket, WebSocketDisconnect import base64 import cv2 import time import tempfile import matplotlib.pyplot as plt import json import asyncio import concurrent.futures from threading import Thread from starlette.responses import StreamingResponse import threading import queue import logging import sys import re import signal from CPR_Module.Educational_Mode.CPRAnalyzer import CPRAnalyzer as OfflineAnalyzer from CPR_Module.Emergency_Mode.main import CPRAnalyzer as RealtimeAnalyzer from CPR_Module.Common.analysis_socket_server import AnalysisSocketServer from CPR_Module.Common.logging_config import cpr_logger app = FastAPI() SCREENSHOTS_DIR = "screenshots" OUTPUT_DIR = "Output" UPLOAD_DIR = "uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) # Cloudinary config cloudinary.config( cloud_name = "darumyfpl", api_key = "493972437417214", api_secret = "jjOScVGochJYA7IxDam7L4HU2Ig", secure=True ) # Basic Hello route @app.get("/") def greet_json(): return {"Hello": "World!"} @app.post("/predict_burn") async def predict_burn(file: UploadFile = File(...)): try: temp_file_path = f"temp_{file.filename}" with open(temp_file_path, "wb") as temp_file: temp_file.write(await file.read()) loaded_svm = joblib.load('svm_model.pkl') print("SVM model loaded successfully") features = FullFeautures(temp_file_path) os.remove(temp_file_path) if features is None: return JSONResponse(content={"error": "Failed to extract features from the image."}, status_code=400) prediction = loaded_svm.predict(features.reshape(1, -1)) prediction_label = "Burn" if prediction[0] == 1 else "No Burn" if prediction[0] == 1: prediction_label = "First Class" elif prediction[0] == 2: prediction_label = "Second Class" else: prediction_label = "Zero Class" return { "prediction": prediction_label } except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) @app.post("/segment_burn") async def segment_burn_endpoint(reference: UploadFile = File(...), patient: UploadFile = File(...)): try: reference_path = f"temp_ref_{reference.filename}" reference_bytes = await reference.read() with open(reference_path, "wb") as ref_file: ref_file.write(reference_bytes) patient_path = f"temp_patient_{patient.filename}" patient_bytes = await patient.read() with open(patient_path, "wb") as pat_file: pat_file.write(patient_bytes) burn_crop_clean, burn_crop_debug = segment_burn(patient_path, reference_path) burn_crop_clean_path = f"temp_burn_crop_clean_{uuid.uuid4()}.png" burn_crop_debug_path = f"temp_burn_crop_debug_{uuid.uuid4()}.png" plt.imsave(burn_crop_clean_path, burn_crop_clean) plt.imsave(burn_crop_debug_path, burn_crop_debug) crop_clean_upload = cloudinary.uploader.upload(burn_crop_clean_path, public_id=f"ref_{reference.filename}") crop_debug_upload = cloudinary.uploader.upload(burn_crop_debug_path, public_id=f"pat_{patient.filename}") crop_clean_url = crop_clean_upload["secure_url"] crop_debug_url = crop_debug_upload["secure_url"] os.remove(burn_crop_clean_path) os.remove(burn_crop_debug_path) return { "crop_clean_url": crop_clean_url, "crop_debug_url": crop_debug_url } except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) @app.post("/classify-ecg") async def classify_ecg_endpoint(file: UploadFile = File(...)): model = joblib.load('voting_classifier_arrhythmia.pkl') try: temp_file_path = f"temp_{file.filename}" with open(temp_file_path, "wb") as temp_file: temp_file.write(await file.read()) result = classify_ecg(temp_file_path, model, is_pdf=True) os.remove(temp_file_path) return {"result": result} except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) @app.post("/diagnose-ecg") async def diagnose_ecg(file: UploadFile = File(...)): try: temp_file_path = f"temp_{file.filename}" with open(temp_file_path, "wb") as temp_file: temp_file.write(await file.read()) model_path = 'Arrhythmia_Model_with_SMOTE.h5' result = analyze_ecg_pdf( temp_file_path, model_path, cleanup=False ) os.remove(temp_file_path) if result and result["arrhythmia_class"]: return {"result": result["arrhythmia_class"]} else: return {"result": "No diagnosis"} except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) def clean_warning_name(filename: str) -> str: name, _ = os.path.splitext(filename) cleaned = re.sub(r'_\d+$', '', name) cleaned_desc = cleaned.replace('_', ' ') return cleaned, cleaned_desc @app.post("/process_video") async def process_video(file: UploadFile = File(...)): if not file.content_type.startswith("video/"): raise HTTPException(status_code=400, detail="File must be a video.") print("File content type:", file.content_type) print("File filename:", file.filename) os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(SCREENSHOTS_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) folders = ["screenshots", "uploads", "Output"] for folder in folders: if os.path.exists(folder): for filename in os.listdir(folder): file_path = os.path.join(folder, filename) if os.path.isfile(file_path): os.remove(file_path) video_path = os.path.join(UPLOAD_DIR, file.filename) with open(video_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) print(f"\n[API] CPR Analysis Started on {video_path}") video_output_path = os.path.join(OUTPUT_DIR, "Myoutput.mp4") plot_output_path = os.path.join(OUTPUT_DIR, "Myoutput.png") start_time = time.time() analyzer = OfflineAnalyzer(video_path, video_output_path, plot_output_path, requested_fps=30) chunks = analyzer.run_analysis_video() warnings = [] if os.path.exists(SCREENSHOTS_DIR): for filename in os.listdir(SCREENSHOTS_DIR): if filename.lower().endswith(('.png', '.jpg', '.jpeg')): local_path = os.path.join(SCREENSHOTS_DIR, filename) cleaned_name, description = clean_warning_name(filename) upload_result = cloudinary.uploader.upload( local_path, folder="posture_warnings", public_id=cleaned_name, overwrite=True ) warnings.append({ "image_url": upload_result['secure_url'], "description": description }) video_path = "Output/Myoutput_final.mp4" if os.path.isfile(video_path): upload_result = cloudinary.uploader.upload_large( video_path, resource_type="video", folder="output_videos", public_id="Myoutput_final", overwrite=True ) wholevideoURL = upload_result['secure_url'] else: wholevideoURL = None graphURL = None if os.path.isfile(plot_output_path): upload_graph_result = cloudinary.uploader.upload( plot_output_path, folder="output_graphs", public_id=os.path.splitext(os.path.basename(plot_output_path))[0], overwrite=True ) graphURL = upload_graph_result['secure_url'] print(f"[API] CPR Analysis Completed on {video_path}") analysis_time = time.time() - start_time print(f"[TIMING] Analysis time: {analysis_time:.2f}s") if wholevideoURL is None: raise HTTPException(status_code=500, detail="No chunk data was generated from the video.") return JSONResponse(content={ "videoURL": wholevideoURL, "graphURL": graphURL, "warnings": warnings, "chunks": chunks, }) logger = logging.getLogger("cpr_logger") clients = set() analyzer_thread = None analysis_started = False analyzer_lock = threading.Lock() socket_server: AnalysisSocketServer = None async def forward_results_from_queue(websocket: WebSocket, warning_queue): try: while True: warnings = await asyncio.to_thread(warning_queue.get) serialized = json.dumps(warnings) await websocket.send_text(serialized) except asyncio.CancelledError: logger.info("[WebSocket] Forwarding task cancelled") except Exception as e: logger.error(f"[WebSocket] Error forwarding data: {e}") def run_cpr_analysis(source, requested_fps, output_path): global socket_server logger.info(f"[MAIN] CPR Analysis Started") requested_fps = 30 input_video = source output_dir = r"CPRRealTime\outputs" os.makedirs(output_dir, exist_ok=True) video_output_path = os.path.join(output_dir, "output.mp4") plot_output_path = os.path.join(output_dir, "output.png") logger.info(f"[CONFIG] Input video: {input_video}") logger.info(f"[CONFIG] Video output: {video_output_path}") logger.info(f"[CONFIG] Plot output: {plot_output_path}") initialization_start_time = time.time() analyzer = RealtimeAnalyzer(input_video, video_output_path, plot_output_path, requested_fps) socket_server = analyzer.socket_server analyzer.plot_output_path = plot_output_path elapsed_time = time.time() - initialization_start_time logger.info(f"[TIMING] Initialization time: {elapsed_time:.2f}s") try: analyzer.run_analysis() finally: if analyzer.socket_server: analyzer.socket_server.stop_server() logger.info("[MAIN] Analyzer stopped") @app.websocket("/ws/real") async def websocket_analysis(websocket: WebSocket): global analyzer_thread, analysis_started, socket_server await websocket.accept() clients.add(websocket) logger.info("[WebSocket] Flutter connected") try: source = await websocket.receive_text() logger.info(f"[WebSocket] Received stream URL: {source}") with analyzer_lock: if not analysis_started: requested_fps = 30 output_path = r"Output" analyzer_thread = threading.Thread( target=run_cpr_analysis, args=(source, requested_fps, output_path), daemon=True ) analyzer_thread.start() analysis_started = True logger.info("[WebSocket] Analysis thread started") while socket_server is None or socket_server.warning_queue is None: await asyncio.sleep(0.1) forward_task = asyncio.create_task( forward_results_from_queue(websocket, socket_server.warning_queue) ) while True: await asyncio.sleep(1) except WebSocketDisconnect: logger.warning("[WebSocket] Client disconnected") if 'forward_task' in locals(): forward_task.cancel() except Exception as e: logger.error(f"[WebSocket] Error receiving stream URL: {str(e)}") await websocket.close(code=1011) finally: clients.discard(websocket) logger.info(f"[WebSocket] Active clients: {len(clients)}") if not clients and socket_server: logger.info("[WebSocket] No clients left. Stopping analyzer.") socket_server.stop_server() analysis_started = False socket_server = None def shutdown_handler(signum, frame): logger.info("Received shutdown signal") if socket_server: try: socket_server.stop_server() except Exception as e: logger.warning(f"Error during socket server shutdown: {e}") os._exit(0) signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler)