Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Copyright (c) 2020, Carleton University Biomedical Informatics Collaboratory | |
| This source code is licensed under the MIT license found in the | |
| LICENSE file in the root directory of this source tree. | |
| """ | |
| import pathlib | |
| import json | |
| import os | |
| import subprocess as sp | |
| import tempfile | |
| from typing import List, Callable | |
| from tqdm import tqdm | |
| import numpy as np | |
| from interfaces import AudiogramDict, AudiogramAnnotationDict, ThresholdDict | |
| from digitizer.report_components.grid import Grid | |
| from digitizer.report_components.label import Label | |
| from digitizer.report_components.symbol import Symbol | |
| from digitizer.report_components.report import Report | |
| import utils.audiology as Audiology | |
| from utils.geometry import compute_rotation_angle, apply_rotation | |
| DIR = os.path.join(pathlib.Path(__file__).parent.absolute(), "..") # current directory | |
| def detect_audiograms(filepath: str, weights: str, device: str = "cpu") -> List[AudiogramDict]: | |
| """Runs the audiogram detector. | |
| The detector is run as a subprocess. | |
| Parameters | |
| ---------- | |
| filepath : str | |
| Path to the image on which the detector is to be run. | |
| weights : str | |
| Path to the file holding the weights of the neural network (detector). | |
| device : str | |
| "cpu" or "gpu" | |
| Returns | |
| ------- | |
| List[AudiogramDict] | |
| The AudiogramDict corresponding to the audiograms detected in the report. | |
| """ | |
| subprocess = sp.Popen([ | |
| "python3", | |
| f"{os.path.join(DIR, 'digitizer/yolov5/detect_audiograms.py')}", | |
| "--source", f"{filepath}", | |
| "--weights", weights, | |
| "--device", device | |
| ], stdout=sp.PIPE) # TODO timeout should be an environment variable | |
| output = subprocess.stdout.read().decode("utf-8") | |
| audiograms = json.loads(output.split("$$$")[1]) | |
| return audiograms | |
| def detect_labels(filepath: str, weights: str, audiogram_coordinates: dict, correction_angle: float, device: str = "cpu") -> List[Label]: | |
| """Runs the label detector. | |
| The detector is run as a subprocess. | |
| Parameters | |
| ---------- | |
| filepath : str | |
| Path to the image on which the detector is to be run. | |
| audiogram_coordinates: dict | |
| The coordinates of the audiogram { "x": int, "y": int } needed to convert the label locations | |
| with respect to the top-left corner of the bounding audiogram to relative to the top-left corner | |
| of the report. | |
| correction_angle: float | |
| The correction angle in degrees that was applied to the audiogram, so that it can be reversed to | |
| get the coordinates of the label with respect to the top-left corner of the original unrotated report. | |
| weights : str | |
| Path to the file holding the weights of the neural network (detector). | |
| device : str | |
| "cpu" or "gpu" | |
| Returns | |
| ------- | |
| List[Label] | |
| A list of Label objects (NOT LabelDict). | |
| """ | |
| subprocess = sp.Popen([ | |
| "python3", | |
| os.path.join(DIR, "digitizer/yolov5/detect_labels.py"), | |
| "--source", f"{filepath}", | |
| "--weights", weights, | |
| "--device", device | |
| ], stdout=sp.PIPE) | |
| output = subprocess.stdout.read().decode("utf-8") | |
| parsed = json.loads(output.split("$$$")[1]) | |
| label_dicts = parsed | |
| labels = [Label(label, audiogram_coordinates, correction_angle) for label in parsed] | |
| return labels | |
| def detect_symbols(filepath: str, weights: str, audiogram_coordinates: dict, correction_angle: float, device: str = "cpu") -> List[Symbol]: | |
| """Runs the symbol detector. | |
| The detector is run as a subprocess. | |
| Parameters | |
| ---------- | |
| filepath : str | |
| Path to the image on which the detector is to be run. | |
| audiogram_coordinates: dict | |
| The coordinates of the audiogram { "x": int, "y": int } needed to convert the label locations | |
| with respect to the top-left corner of the bounding audiogram to relative to the top-left corner | |
| of the report. | |
| correction_angle: float | |
| The correction angle in degrees that was applied to the audiogram, so that it can be reversed to | |
| get the coordinates of the label with respect to the top-left corner of the original unrotated report. | |
| weights : str | |
| Path to the file holding the weights of the neural network (detector). | |
| device : str | |
| "cpu" or "gpu" | |
| Returns | |
| ------- | |
| List[Label] | |
| A list of Symbol objects (NOT SymbolDict). | |
| """ | |
| subprocess = sp.Popen([ | |
| "python3", | |
| os.path.join(DIR, "digitizer/yolov5/detect_symbols.py"), | |
| "--source", filepath, | |
| "--weights", weights, | |
| "--device", device | |
| ], stdout=sp.PIPE) | |
| output = json.loads(subprocess.stdout.read().decode("utf-8").split("$$$")[1]) | |
| symbols = [Symbol(detection, audiogram_coordinates, correction_angle) for detection in output] | |
| return symbols | |
| def detect_components(filepath: str, gpu: bool = False) -> List: | |
| """Invokes the object detectors. | |
| Parameters | |
| ---------- | |
| filepath : str | |
| Path to the image. | |
| gpu : bool | |
| Whether the GPU should be used (default: False). | |
| Returns | |
| ------- | |
| List | |
| A list (of length 0, 1 or 2) of the form | |
| [ | |
| { "audiogram": AudiogramDict, "labels": List[Label], "symbols": List[Symbol] }, # plot 1 | |
| { "audiogram": AudiogramDict, "labels": List[Label], "symbols": List[Symbol] } # plot 2 | |
| ] | |
| """ | |
| components = [] | |
| # Detect audiograms within the report | |
| audiogram_model_weights_path = os.path.join(DIR, "..", "models/audiograms/latest/weights/best.pt") | |
| audiograms = detect_audiograms(f"{filepath}", audiogram_model_weights_path) | |
| # If no audiogram is detected, return... | |
| if len(audiograms) == 0: | |
| return components | |
| # Iterate through every audiogram in the report | |
| for i, audiogram in enumerate(audiograms): | |
| components.append({}) | |
| # Load the report | |
| report = Report(filename=filepath) | |
| # Generate a cropped version of the report around the detected audiogram | |
| report = report.crop( | |
| audiogram["boundingBox"]["x"], | |
| audiogram["boundingBox"]["y"], | |
| audiogram["boundingBox"]["x"] + audiogram["boundingBox"]["width"], | |
| audiogram["boundingBox"]["y"] + audiogram["boundingBox"]["height"] | |
| ) | |
| # Create a temporary file | |
| cropped_file = tempfile.NamedTemporaryFile(suffix=".jpg") | |
| # Correct for rotation | |
| lines = report.detect_lines(threshold=200) | |
| perpendicular_lines = [ | |
| line for line in lines | |
| if line.has_a_perpendicular_line(lines) | |
| and (abs(line.get_angle() - 90) < 10 | |
| or abs(line.get_angle()) < 10) | |
| ] | |
| correction_angle = compute_rotation_angle(perpendicular_lines) | |
| audiogram["correctionAngle"] = correction_angle | |
| report = report.rotate(correction_angle) | |
| report.save(cropped_file.name) | |
| audiogram_coordinates = { | |
| "x": audiogram["boundingBox"]["x"], | |
| "y": audiogram["boundingBox"]["y"] | |
| } | |
| components[i]["audiogram"] = audiogram | |
| labels_model_weights_path = os.path.join(DIR, "..", "models/labels/latest/weights/best.pt") | |
| components[i]["labels"] = detect_labels(cropped_file.name, labels_model_weights_path, audiogram_coordinates, correction_angle) | |
| symbols_model_weights_path = os.path.join(DIR, "..", "models/symbols/latest/weights/best.pt") | |
| components[i]["symbols"] = detect_symbols(cropped_file.name, symbols_model_weights_path, audiogram_coordinates, correction_angle) | |
| return components | |
| def generate_partial_annotation(filepath: str, gpu: bool = False) -> List[AudiogramAnnotationDict]: | |
| """Generates a seed annotation to be completed in the nihl portal. | |
| It is ``partial`` because it does not locate the corners of the audiogram. | |
| Parameters | |
| ---------- | |
| filepath : str | |
| Path to the file for which an initial annotation is to b | |
| gpu : bool | |
| Whether the gpu should be used. | |
| Returns | |
| ------- | |
| List[AudiogramAnnotationDict] | |
| An Annotation dict. | |
| """ | |
| components = detect_components(filepath, gpu=gpu) | |
| audiograms = [] | |
| for i in range(len(components)): | |
| audiogram = components[i]["audiogram"] | |
| audiogram["labels"] = [label.to_dict() for label in components[i]["labels"]] | |
| audiogram["symbols"] = [symbol.to_dict() for symbol in components[i]["symbols"]] | |
| audiogram["corners"] = [] # these are not located by the algorithm | |
| audiograms.append(audiogram) | |
| return audiograms | |
| def extract_thresholds(filepath: str, gpu: bool = False) -> List[ThresholdDict]: | |
| """Extracts the thresholds from the report. | |
| parameters | |
| ---------- | |
| filepath : str | |
| Path to the file for which an initial annotation is to b | |
| gpu : bool | |
| Whether the gpu should be used. | |
| Returns | |
| ------- | |
| list[ThresholdDict] | |
| A list of thresholds. | |
| """ | |
| components = detect_components(filepath, gpu=gpu) | |
| thresholds = [] | |
| # For each audiogram, extract the thresholds and append them to the | |
| # thresholds list | |
| for i in range(len(components)): | |
| audiogram = components[i]["audiogram"] | |
| labels = components[i]["labels"] | |
| symbols = components[i]["symbols"] | |
| report = Report(filename=filepath) | |
| report = report.crop( | |
| audiogram["boundingBox"]["x"], | |
| audiogram["boundingBox"]["y"], | |
| audiogram["boundingBox"]["x"] + audiogram["boundingBox"]["width"], | |
| audiogram["boundingBox"]["y"] + audiogram["boundingBox"]["height"] | |
| ) | |
| report = report.rotate(audiogram["correctionAngle"]) | |
| try: | |
| grid = Grid(report, labels) | |
| except Exception as e: | |
| continue | |
| thresholds += [{ | |
| "ear": symbol.ear, | |
| "conduction": symbol.conduction, | |
| "masking": symbol.masking, | |
| "measurementType": Audiology.stringify_measurement(symbol.to_dict()), | |
| "frequency": grid.get_snapped_frequency(symbol), | |
| "threshold": grid.get_snapped_threshold(symbol), | |
| "response": True # IMPORTANT: assume that a response was obtain for measurements | |
| } | |
| for symbol in symbols | |
| ] | |
| return thresholds | |
| def get_correction_angle(corners: List[dict]) -> float: | |
| """Computes the rotation angle that must be applied based on | |
| corner coordinates to get an unrotated audiogram. | |
| Parameters | |
| ---------- | |
| corners : List[dict] | |
| A list of corners. | |
| Returns | |
| ------- | |
| float | |
| The rotation angle that must be applied to correct for the rotation | |
| of the audiogram. | |
| """ | |
| # sort the corners | |
| corners = sorted(corners, key=lambda c: c["y"]) | |
| top_corners = sorted(corners[2:], key=lambda c: c["x"]) | |
| bottom_corners = sorted(corners[0:2], key=lambda c: c["x"]) | |
| # Find the rotation angle based on the top_corners 2 corners | |
| dx1 = top_corners[1]["x"] - top_corners[0]["x"] | |
| dy1 = top_corners[1]["y"] - top_corners[0]["y"] | |
| angle1 = np.arcsin(abs(dy1)/abs(dx1)) | |
| # Repeat for the bottom_corners angles | |
| dx2 = bottom_corners[1]["x"] - bottom_corners[0]["x"] | |
| dy2 = bottom_corners[1]["y"] - bottom_corners[0]["y"] | |
| angle2 = np.arcsin(abs(dy2)/abs(dx2)) | |
| return np.sign(dy1) * np.mean([angle1, angle2]) | |
| def get_conversion_maps(corners: List[dict]) -> List[Callable]: | |
| """Computes the functions that map pixel coordinates to frequency-threshold coordinates | |
| and vice versa. | |
| Parameters | |
| ---------- | |
| corners : List[dict] | |
| The audiogram corners. | |
| Returns | |
| ------- | |
| List[Callable] | |
| A list of lambda functions. These functions all accept a single float argument. | |
| They are in the following order. | |
| 1. pixel->frequency | |
| 2. pixel->threshold | |
| 3. frequency->pixel | |
| 4. threshold->pixel | |
| """ | |
| # For x axis | |
| y_sorted_corners = sorted(corners, key=lambda c: c["y"]) | |
| top_corners = sorted(y_sorted_corners[0:2], key=lambda c: c["x"]) | |
| o_max = Audiology.frequency_to_octave(top_corners[1]["frequency"]) # max octave | |
| x_max = top_corners[1]["x"] # max pixel value | |
| o_min = Audiology.frequency_to_octave(top_corners[0]["frequency"]) # min octave | |
| x_min = top_corners[0]["x"] | |
| frequency_map = lambda p: Audiology.octave_to_frequency(o_min + (o_max - o_min)*(p - x_min)/(x_max - x_min)) | |
| inverse_frequency_map = lambda f: x_min + (Audiology.frequency_to_octave(f) - o_min)*(x_max - x_min)/(o_max - o_min) | |
| # For y axis | |
| x_sorted_corners = sorted(corners, key=lambda c: c["x"]) | |
| left_corners = sorted(x_sorted_corners[0:2], key=lambda c: c["y"]) | |
| t_max = left_corners[1]["threshold"] # max threshold | |
| y_max = left_corners[1]["y"] # max pixel value | |
| t_min = left_corners[0]["threshold"] | |
| y_min = left_corners[0]["y"] | |
| threshold_map = lambda p: t_min + (t_max - t_min)*(p - y_min)/(y_max - y_min) | |
| inverse_threshold_map = lambda t: y_min + (t - t_min)*(y_max - y_min)/(t_max - t_min) | |
| return [frequency_map, threshold_map, inverse_frequency_map, inverse_threshold_map] | |
| def annotation_to_thresholds(audiograms: dict) -> List[ThresholdDict]: | |
| """Extracts the thresholds from an annotation. | |
| Parameters | |
| ---------- | |
| audiograms : dict | |
| An annotation. | |
| Returns | |
| ------- | |
| List[ThresholdDict] | |
| A list of thresholds | |
| """ | |
| combined_thresholds = [] | |
| for audiogram in audiograms: | |
| correction_angle = get_correction_angle(audiogram["corners"]) | |
| corners = [apply_rotation(corner, correction_angle) for corner in audiogram["corners"]] | |
| frequency_map, threshold_map, inverse_frequency_map, inverse_threshold_map = get_conversion_maps(corners) | |
| thresholds: List[ThresholdDict] = [] | |
| for symbol in audiogram["symbols"]: | |
| symbol_center = { | |
| "x": symbol["boundingBox"]["x"] + symbol["boundingBox"]["width"] / 2, | |
| "y": symbol["boundingBox"]["y"] + symbol["boundingBox"]["height"] / 2, | |
| } | |
| symbol = { **symbol, "boundingBox": symbol_center } | |
| new_symbol = {**symbol, "boundingBox": apply_rotation(symbol["boundingBox"], correction_angle) } | |
| bounding_box = new_symbol["boundingBox"] | |
| ear = "left" if "left" in new_symbol["measurementType"].lower() else "right" | |
| conduction = "air" if "air" in new_symbol["measurementType"].lower() else "bone" | |
| masking = False if "unmasked" in new_symbol["measurementType"].lower() else True | |
| if conduction == "air": | |
| frequency = Audiology.round_frequency(frequency_map(bounding_box["x"])) | |
| else: | |
| frequency = Audiology.round_frequency_bone(frequency_map(bounding_box["x"]), ear) | |
| threshold = Audiology.round_threshold(threshold_map(bounding_box["y"])) | |
| thresholds.append({ | |
| "ear": ear, | |
| "conduction": conduction, | |
| "masking": masking, | |
| "frequency": frequency, | |
| "threshold": threshold, | |
| "response": True, # IMPORTANT: assume that a response was measured for threshold | |
| "measurementType": f"{conduction}_{'MASKED' if masking else 'UNMASKED'}_{ear}".upper() | |
| }) | |
| combined_thresholds += thresholds | |
| return combined_thresholds | |