""" ECG Analysis Pipeline: From PDF to Arrhythmia Classification ----------------------------------------------------------- This module provides functions to: 1. Digitize ECG from PDF files 2. Process the digitized ECG signal 3. Classify arrhythmias using a trained CNN model """ import cv2 import numpy as np import os import tensorflow as tf import pickle from scipy.interpolate import interp1d from pdf2image import convert_from_path ARRHYTHMIA_CLASSES = ["Conduction Abnormalities", "Atrial Arrhythmias", "Tachyarrhythmias", "Normal"] SAMPLING_RATE = 500 SEGMENT_DURATION = 10.0 TARGET_SEGMENT_LENGTH = 5000 DEFAULT_OUTPUT_FILE = 'calibrated_ecg.dat' DAT_SCALE_FACTOR = 0.001 def digitize_ecg_from_pdf(pdf_path, output_file=None): """ Process an ECG PDF file and convert it to a .dat signal file. Args: pdf_path (str): Path to the ECG PDF file output_file (str, optional): Path to save the output .dat file Returns: tuple: (path to the created .dat file, list of paths to segment files) """ if output_file is None: output_file = DEFAULT_OUTPUT_FILE images = convert_from_path(pdf_path) temp_image_path = 'temp_ecg_image.jpg' images[0].save(temp_image_path, 'JPEG') img = cv2.imread(temp_image_path, cv2.IMREAD_GRAYSCALE) height, width = img.shape calibration = { 'seconds_per_pixel': 2.0 / 197.0, 'mv_per_pixel': 1.0 / 78.8, } layer1_start = int(height * 35.35 / 100) layer1_end = int(height * 51.76 / 100) layer2_start = int(height * 51.82 / 100) layer2_end = int(height * 69.41 / 100) layer3_start = int(height * 69.47 / 100) layer3_end = int(height * 87.06 / 100) layers = [ img[layer1_start:layer1_end, :], img[layer2_start:layer2_end, :], img[layer3_start:layer3_end, :] ] signals = [] time_points = [] layer_duration = 10.0 for i, layer in enumerate(layers): _, binary = cv2.threshold(layer, 200, 255, cv2.THRESH_BINARY_INV) contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) waveform_contour = max(contours, key=cv2.contourArea) sorted_contour = sorted(waveform_contour, key=lambda p: p[0][0]) x_coords = np.array([point[0][0] for point in sorted_contour]) y_coords = np.array([point[0][1] for point in sorted_contour]) isoelectric_line_y = layer.shape[0] * 0.6 x_min, x_max = np.min(x_coords), np.max(x_coords) time = (x_coords - x_min) / (x_max - x_min) * layer_duration signal_mv = (isoelectric_line_y - y_coords) * calibration['mv_per_pixel'] signal_mv = signal_mv - np.mean(signal_mv) time_points.append(time) signals.append(signal_mv) total_duration = layer_duration * len(layers) sampling_frequency = 500 num_samples = int(total_duration * sampling_frequency) combined_time = np.linspace(0, total_duration, num_samples) combined_signal = np.zeros(num_samples) for i, (time, signal) in enumerate(zip(time_points, signals)): start_time = i * layer_duration mask = (combined_time >= start_time) & (combined_time < start_time + layer_duration) relevant_times = combined_time[mask] interpolated_signal = np.interp(relevant_times, start_time + time, signal) combined_signal[mask] = interpolated_signal combined_signal = combined_signal - np.mean(combined_signal) signal_peak = np.max(np.abs(combined_signal)) target_amplitude = 2.0 if signal_peak > 0 and (signal_peak < 0.5 or signal_peak > 4.0): scaling_factor = target_amplitude / signal_peak combined_signal = combined_signal * scaling_factor adc_gain = 1000.0 int_signal = (combined_signal * adc_gain).astype(np.int16) int_signal.tofile(output_file) if os.path.exists(temp_image_path): os.remove(temp_image_path) segment_files = [] samples_per_segment = int(layer_duration * sampling_frequency) base_name = os.path.splitext(output_file)[0] for i in range(3): start_idx = i * samples_per_segment end_idx = (i + 1) * samples_per_segment segment = combined_signal[start_idx:end_idx] segment_file = f"{base_name}_segment{i+1}.dat" (segment * adc_gain).astype(np.int16).tofile(segment_file) segment_files.append(segment_file) return output_file, segment_files def read_ecg_dat_file(dat_file_path): """ Read a DAT file directly and properly scale it Parameters: ----------- dat_file_path : str Path to the .dat file (with or without .dat extension) Returns: -------- numpy.ndarray ECG signal data with shape (total_samples,) """ if not dat_file_path.endswith('.dat'): dat_file_path += '.dat' try: data = np.fromfile(dat_file_path, dtype=np.int16) signal = data * DAT_SCALE_FACTOR return signal except Exception as e: raise def segment_signal(signal): """ Segment a signal into equal-length segments Parameters: ----------- signal : numpy.ndarray The full signal to segment Returns: -------- list List of signal segments """ segment_samples = int(SAMPLING_RATE * SEGMENT_DURATION) segments = [] num_segments = len(signal) // segment_samples for i in range(num_segments): start_idx = i * segment_samples end_idx = (i + 1) * segment_samples segment = signal[start_idx:end_idx] segments.append(segment) return segments def process_segment(segment): """ Process a segment of ECG data to ensure it's properly formatted for the model Parameters: ----------- segment : numpy.ndarray Raw ECG segment Returns: -------- numpy.ndarray Processed segment ready for model input """ if len(segment) != TARGET_SEGMENT_LENGTH: x = np.linspace(0, 1, len(segment)) x_new = np.linspace(0, 1, TARGET_SEGMENT_LENGTH) f = interp1d(x, segment, kind='linear', bounds_error=False, fill_value="extrapolate") segment = f(x_new) segment = (segment - np.mean(segment)) / (np.std(segment) + 1e-8) return segment def predict_with_cnn_model(signal_data, model): """ Process signal data and make predictions using the CNN model. Parameters: ----------- signal_data : numpy.ndarray Raw signal data model : tensorflow.keras.Model Loaded CNN model Returns: -------- dict Dictionary containing predictions for each segment and final averaged prediction """ segments = segment_signal(signal_data) all_predictions = [] for i, segment in enumerate(segments): processed_segment = process_segment(segment) X = processed_segment.reshape(1, TARGET_SEGMENT_LENGTH, 1) prediction = model.predict(X, verbose=0) all_predictions.append(prediction[0]) if all_predictions: avg_prediction = np.mean(all_predictions, axis=0) top_class_idx = np.argmax(avg_prediction) results = { "segment_predictions": all_predictions, "averaged_prediction": avg_prediction, "top_class_index": top_class_idx, "top_class": ARRHYTHMIA_CLASSES[top_class_idx], "probability": float(avg_prediction[top_class_idx]) } return results else: return {"error": "No valid segments for prediction"} def analyze_ecg_pdf(pdf_path, model_path, cleanup=True): """ Complete ECG analysis pipeline: digitizes a PDF ECG, analyzes it with the model, and returns the arrhythmia classification with highest probability. Args: pdf_path (str): Path to the ECG PDF file model_path (str): Path to the model (.h5) file cleanup (bool, optional): Whether to remove temporary files after processing Returns: dict: { "arrhythmia_class": str, # Top arrhythmia class "probability": float, # Probability of top class "all_probabilities": dict, # All classes with probabilities "digitized_file": str # Path to digitized file (if cleanup=False) } """ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' try: dat_file_path, segment_files = digitize_ecg_from_pdf(pdf_path) ecg_model = tf.keras.models.load_model(model_path) ecg_signal = read_ecg_dat_file(dat_file_path) classification_results = predict_with_cnn_model(ecg_signal, ecg_model) arrhythmia_result = { "arrhythmia_class": classification_results.get("top_class"), "probability": classification_results.get("probability", 0.0), "all_probabilities": {} } if "averaged_prediction" in classification_results: for idx, class_name in enumerate(ARRHYTHMIA_CLASSES): arrhythmia_result["all_probabilities"][class_name] = float(classification_results["averaged_prediction"][idx]) if not cleanup: arrhythmia_result["digitized_file"] = dat_file_path if cleanup: if os.path.exists(dat_file_path): os.remove(dat_file_path) for segment_file in segment_files: if os.path.exists(segment_file): os.remove(segment_file) return arrhythmia_result except Exception as e: error_msg = f"Error in ECG analysis: {str(e)}" return {"error": error_msg}