Spaces:
Sleeping
Sleeping
| import ultralytics | |
| from ultralytics import YOLO | |
| import torch | |
| import numpy as np | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from matplotlib import pyplot as plt | |
| import cv2 | |
| import warnings | |
| import ffmpeg | |
| warnings.filterwarnings("ignore") | |
| device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') | |
| def segment(video_path: str, model_path: str, start: int, fstep: int, crop: list) -> tuple: | |
| """ | |
| runs YOLO segmentation model and calculates the LV Area | |
| """ | |
| model = YOLO(model_path)#.to(device) | |
| cap = cv2.VideoCapture(video_path) | |
| stop = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| lv_area = [] | |
| frames = [] | |
| message = 'Video processing succeeded' | |
| for fr in tqdm(range(start, stop, fstep), desc=f'processing ECHO'): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, fr) | |
| _, frame = cap.read() | |
| new_w = int(frame.shape[1] * crop[1]) | |
| new_h = int(frame.shape[0] * crop[0]) | |
| new_left = int(frame.shape[1] / 2 + crop[3] * new_w - new_w / 2) | |
| new_top = int(frame.shape[0] / 2 + crop[2] * new_h - new_h / 2) | |
| frame = frame[new_top:new_top + new_h, new_left:new_left + new_w] | |
| frame_m = frame | |
| inputs = frame #torch.Tensor(frame).to(device) | |
| with torch.no_grad(): | |
| result = model(inputs, verbose=False) | |
| result = result#.to('cpu') | |
| classes = result[0].names | |
| if len(classes) == 0: | |
| pass | |
| overlay = frame.copy() | |
| color_list = [(255, 0, 0), | |
| (255, 255, 0), | |
| (255, 0, 255), | |
| (0, 255, 0), | |
| (0, 0, 255), | |
| (128, 128, 128)] | |
| for i, res in enumerate(result[0]): | |
| bx = res.boxes | |
| m = res.masks.xy | |
| label = int(bx.cls.squeeze().cpu()) | |
| if label == 1: | |
| lv_area.append(cv2.contourArea(m[0])) | |
| box = list(map(int, bx.xyxy.squeeze().cpu().tolist())) | |
| cv2.rectangle(overlay, (box[0], box[1]), (box[2], box[3]), (36, 255, 12), 2) | |
| cv2.putText(overlay, classes[label], (box[0], box[1] - 5), cv2.FONT_HERSHEY_TRIPLEX, 1, (0, 0, 255), 2) | |
| cv2.fillPoly(overlay, pts=np.int32([m]), color=color_list[i % 6]) | |
| alpha = 0.4 | |
| frame_m = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) | |
| frames.append(frame_m) | |
| if len(lv_area) == 0: | |
| message = 'Video processing failed' | |
| return lv_area, frames, message | |
| def plotter(lv_data: list, window: int) -> tuple: | |
| """ | |
| plots the rolling mean graph for LV area. | |
| calculates the average ejection fracture | |
| """ | |
| lv_rolling = pd.Series(lv_data).rolling(window=window).mean().dropna() | |
| ef = (max(lv_rolling) - min(lv_rolling)) / max(lv_rolling) | |
| dataframe = pd.DataFrame({ | |
| 'Frame': np.array(range(len(lv_rolling))), | |
| 'Left ventricle visible area, px*px': lv_rolling.values | |
| }).astype('int32') | |
| txt = f'Ejection fraction - {ef:.1%}' | |
| return dataframe, txt | |
| def writer(fn, images, framerate=25, vcodec='libx264'): | |
| if not isinstance(images, np.ndarray): | |
| images = np.asarray(images) | |
| n, height, width, channels = images.shape | |
| process = ( | |
| ffmpeg | |
| .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) | |
| .output(fn, pix_fmt='yuv420p', vcodec=vcodec, r=framerate) | |
| .overwrite_output() | |
| .run_async(pipe_stdin=True) | |
| ) | |
| for frame in images: | |
| process.stdin.write( | |
| frame.astype(np.uint8).tobytes() | |
| ) | |
| process.stdin.close() | |
| process.wait() |