Spaces:
Runtime error
Runtime error
| import os | |
| import sys | |
| import torch | |
| import numpy as np | |
| import tensorrt as trt | |
| from typing import Union, Tuple, Optional | |
| from PIL import Image | |
| import matplotlib.pyplot as plt | |
| from torchvision.transforms import ToTensor, Normalize | |
| from torchvision.transforms.functional import normalize, to_pil_image | |
| import json | |
| import datetime | |
| from scipy.ndimage import gaussian_filter | |
| from sklearn.cluster import KMeans | |
| import assets | |
| # ํ๋ก์ ํธ ๋ฃจํธ ๋๋ ํ ๋ฆฌ ์ค์ | |
| project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| sys.path.append(project_root) | |
| class ClipEBCTensorRT: | |
| """ | |
| CLIP-EBC (Efficient Boundary Counting) TensorRT ๋ฒ์ ์ด๋ฏธ์ง ์ฒ๋ฆฌ ํด๋์ค์ ๋๋ค. | |
| TensorRT๋ก ๋ณํ๋ CLIP ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ์ด๋ฏธ์ง๋ฅผ ์ฒ๋ฆฌํ๋ฉฐ, ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์์ธก ๊ธฐ๋ฅ์ ํฌํจํ | |
| ๋ค์ํ ์ค์ ์ต์ ์ ์ ๊ณตํฉ๋๋ค. | |
| """ | |
| def __init__(self, | |
| engine_path="assets/CLIP_EBC_nwpu_rmse_tensorrt.trt", | |
| truncation=4, | |
| reduction=8, | |
| granularity="fine", | |
| anchor_points="average", | |
| input_size=224, | |
| window_size=224, | |
| stride=224, | |
| dataset_name="qnrf", | |
| mean=(0.485, 0.456, 0.406), | |
| std=(0.229, 0.224, 0.225), | |
| config_dir ="configs"): | |
| """CLIPEBC TensorRT ํด๋์ค๋ฅผ ์ค์ ๋งค๊ฐ๋ณ์์ ํจ๊ป ์ด๊ธฐํํฉ๋๋ค.""" | |
| self.engine_path = engine_path | |
| self.truncation = truncation | |
| self.reduction = reduction | |
| self.granularity = granularity | |
| self.anchor_points_type = anchor_points | |
| self.input_size = input_size | |
| self.window_size = window_size | |
| self.stride = stride | |
| self.dataset_name = dataset_name | |
| self.mean = mean | |
| self.std = std | |
| self.config_dir = config_dir | |
| # ๊ฒฐ๊ณผ ์ ์ฅ์ฉ ๋ณ์ ์ด๊ธฐํ | |
| self.density_map = None | |
| self.processed_image = None | |
| self.count = None | |
| self.original_image = None | |
| # TensorRT ์์ง ๋ก๋ | |
| print(f"TensorRT ์์ง ๋ก๋ ์ค: {self.engine_path}") | |
| self._load_engine() | |
| # ์ ๋ ฅ ๋ฐ ์ถ๋ ฅ ์ด๋ฆ ์ค์ | |
| self.input_name = "input" | |
| self.output_name = "output" | |
| print(f"TensorRT ์์ง ์ด๊ธฐํ ์๋ฃ") | |
| def _load_engine(self): | |
| """TensorRT ์์ง์ ๋ก๋ํฉ๋๋ค.""" | |
| # TensorRT ๋ก๊ฑฐ ์์ฑ | |
| TRT_LOGGER = trt.Logger(trt.Logger.WARNING) | |
| # ๋ฐํ์ ์์ฑ | |
| self.runtime = trt.Runtime(TRT_LOGGER) | |
| # ์์ง ํ์ผ ๋ก๋ | |
| with open(self.engine_path, 'rb') as f: | |
| engine_data = f.read() | |
| # ์ง๋ ฌํ๋ ์์ง์์ ์์ง ์์ฑ | |
| self.engine = self.runtime.deserialize_cuda_engine(engine_data) | |
| # ์คํ ์ปจํ ์คํธ ์์ฑ | |
| self.context = self.engine.create_execution_context() | |
| # TensorRT 10.x์์๋ input_binding/output_binding ๋์ ๋คํธ์ํฌ ๊ตฌ์กฐ๋ฅผ ํ์ธ | |
| # ์ ๋ ฅ๊ณผ ์ถ๋ ฅ์ ๊ฐ์ ธ์ค๋ ๋ฐฉ๋ฒ์ด ๋ณ๊ฒฝ๋จ | |
| self.num_io_tensors = self.engine.num_io_tensors | |
| # ์ ๋ ฅ๊ณผ ์ถ๋ ฅ ํ ์ ์ด๋ฆ ์ฐพ๊ธฐ | |
| self.input_tensor_names = [] | |
| self.output_tensor_names = [] | |
| print(f"TensorRT ์์ง์์ {self.num_io_tensors}๊ฐ์ IO ํ ์๋ฅผ ์ฐพ์์ต๋๋ค") | |
| for i in range(self.num_io_tensors): | |
| name = self.engine.get_tensor_name(i) | |
| is_input = self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT | |
| if is_input: | |
| self.input_tensor_names.append(name) | |
| else: | |
| self.output_tensor_names.append(name) | |
| # ์ ๋ ฅ๊ณผ ์ถ๋ ฅ ์ด๋ฆ ์ค์ | |
| if not self.input_tensor_names: | |
| raise ValueError("์์ง์์ ์ ๋ ฅ ํ ์๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
| if not self.output_tensor_names: | |
| raise ValueError("์์ง์์ ์ถ๋ ฅ ํ ์๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
| # ๊ธฐ๋ณธ ์ ๋ ฅ ๋ฐ ์ถ๋ ฅ ์ด๋ฆ ์ค์ | |
| self.input_name = self.input_tensor_names[0] | |
| self.output_name = self.output_tensor_names[0] | |
| # ์ ์ถ๋ ฅ ํํ ์ถ์ถ | |
| self.input_shape = self.engine.get_tensor_shape(self.input_name) | |
| self.output_shape = self.engine.get_tensor_shape(self.output_name) | |
| print(f"์ ๋ ฅ ์ด๋ฆ: {self.input_name}, ํํ: {self.input_shape}") | |
| print(f"์ถ๋ ฅ ์ด๋ฆ: {self.output_name}, ํํ: {self.output_shape}") | |
| def _process_image(self, image: Union[str, np.ndarray]) -> np.ndarray: | |
| """ | |
| ์ด๋ฏธ์ง๋ฅผ ์ ์ฒ๋ฆฌํฉ๋๋ค. ์ด๋ฏธ์ง ๊ฒฝ๋ก, ๋ํ์ด ๋ฐฐ์ด, Streamlit UploadedFile ๋ชจ๋ ์ฒ๋ฆฌ ๊ฐ๋ฅํฉ๋๋ค. | |
| Args: | |
| image: ์ ๋ ฅ ์ด๋ฏธ์ง. ๋ค์ ํ์ ์ค ํ๋์ฌ์ผ ํฉ๋๋ค: | |
| - str: ์ด๋ฏธ์ง ํ์ผ ๊ฒฝ๋ก | |
| - np.ndarray: (H, W, 3) ํํ์ RGB ์ด๋ฏธ์ง | |
| - UploadedFile: Streamlit์ ์ ๋ก๋๋ ํ์ผ | |
| Returns: | |
| np.ndarray: ์ ์ฒ๋ฆฌ๋ ์ด๋ฏธ์ง ๋ฐฐ์ด, shape (1, 3, H, W) | |
| """ | |
| to_tensor = ToTensor() | |
| normalize = Normalize(mean=self.mean, std=self.std) | |
| # ์๋ณธ ์ด๋ฏธ์ง ์ ์ฅ | |
| self.original_image = image | |
| # ์ ๋ ฅ ํ์ ์ ๋ฐ๋ฅธ ์ฒ๋ฆฌ | |
| if isinstance(image, str): | |
| # ํ์ผ ๊ฒฝ๋ก์ธ ๊ฒฝ์ฐ | |
| with open(image, "rb") as f: | |
| pil_image = Image.open(f).convert("RGB") | |
| elif isinstance(image, np.ndarray): | |
| # ๋ํ์ด ๋ฐฐ์ด์ธ ๊ฒฝ์ฐ | |
| if image.dtype == np.uint8: | |
| pil_image = Image.fromarray(image) | |
| else: | |
| # float ํ์ ์ธ ๊ฒฝ์ฐ [0, 1] ๋ฒ์๋ก ๊ฐ์ ํ๊ณ ๋ณํ | |
| pil_image = Image.fromarray((image * 255).astype(np.uint8)) | |
| else: | |
| # Streamlit UploadedFile ๋๋ ๊ธฐํ ํ์ผ ๊ฐ์ฒด์ธ ๊ฒฝ์ฐ | |
| try: | |
| pil_image = Image.open(image).convert("RGB") | |
| except Exception as e: | |
| raise ValueError(f"์ง์ํ์ง ์๋ ์ด๋ฏธ์ง ํ์์ ๋๋ค: {type(image)}") from e | |
| # ํ ์ ๋ณํ ๋ฐ ์ ๊ทํ | |
| tensor_image = to_tensor(pil_image) | |
| normalized_image = normalize(tensor_image) | |
| batched_image = normalized_image.unsqueeze(0) # (1, 3, H, W) | |
| # numpy๋ก ๋ณํ | |
| numpy_image = batched_image.numpy() | |
| return numpy_image | |
| def _post_process_image(self, image_tensor): | |
| """์ด๋ฏธ์ง ํ ์๋ฅผ PIL ์ด๋ฏธ์ง๋ก ๋ณํํฉ๋๋ค.""" | |
| # NumPy ๋ฐฐ์ด์ PyTorch ํ ์๋ก ๋ณํ | |
| if isinstance(image_tensor, np.ndarray): | |
| image_tensor = torch.from_numpy(image_tensor) | |
| # ์ ๊ทํ ์ญ๋ณํ | |
| image = normalize( | |
| image_tensor, | |
| mean=[0., 0., 0.], | |
| std=[1./self.std[0], 1./self.std[1], 1./self.std[2]] | |
| ) | |
| image = normalize( | |
| image, | |
| mean=[-self.mean[0], -self.mean[1], -self.mean[2]], | |
| std=[1., 1., 1.] | |
| ) | |
| # ๋ฐฐ์น ์ฐจ์ ์ ๊ฑฐ ๋ฐ PIL ์ด๋ฏธ์ง๋ก ๋ณํ | |
| processed_image = to_pil_image(image.squeeze(0)) | |
| return processed_image | |
| def _infer_batch(self, batch_input): | |
| """ | |
| TensorRT ์์ง์ ์ฌ์ฉํ์ฌ ๋ฐฐ์น ์ถ๋ก ์ ์ํํฉ๋๋ค. (์์ ๋ฒ์ ) | |
| """ | |
| import pycuda.driver as cuda | |
| import pycuda.autoinit | |
| import numpy as np | |
| batch_size = batch_input.shape[0] | |
| # ์ ๋ ฅ์ ํํ์ ๋ฐ์ดํฐ ํ์ ํ์ธ | |
| input_shape = (batch_size, 3, self.input_size, self.input_size) | |
| print(f"์ ๋ ฅ ๋ฐฐ์น ํํ: {batch_input.shape}, ๋ฐ์ดํฐ ํ์ : {batch_input.dtype}") | |
| # ์ ๋ ฅ ํํ ๊ฒ์ฆ | |
| if batch_input.shape != input_shape: | |
| print(f"๊ฒฝ๊ณ : ์ ๋ ฅ ํํ ๋ถ์ผ์น. ์์: {input_shape}, ์ค์ : {batch_input.shape}") | |
| # ํ์์ ํํ ์์ | |
| batch_input = np.resize(batch_input, input_shape) | |
| # ๋ฐ์ดํฐ ํ์ ๊ฒ์ฆ | |
| if batch_input.dtype != np.float32: | |
| print(f"๊ฒฝ๊ณ : ์ ๋ ฅ ๋ฐ์ดํฐ ํ์ ๋ถ์ผ์น. float32๋ก ๋ณํํฉ๋๋ค.") | |
| batch_input = batch_input.astype(np.float32) | |
| # ๋์ ๋ฐฐ์น ํฌ๊ธฐ ์ค์ | |
| self.context.set_input_shape(self.input_name, input_shape) | |
| # ์ถ๋ ฅ ํํ ๊ฐ์ ธ์ค๊ธฐ | |
| output_shape = self.context.get_tensor_shape(self.output_name) | |
| output_shape = tuple(output_shape) # ํํ๋ก ๋ณํํ์ฌ ์์ ์ฑ ๋ณด์ฅ | |
| print(f"์ถ๋ ฅ ํํ: {output_shape}") | |
| # -1 ๊ฐ์ ์ค์ ๋ฐฐ์น ํฌ๊ธฐ๋ก ๋์ฒด | |
| if output_shape[0] == -1: | |
| output_shape = (batch_size,) + output_shape[1:] | |
| # ์ถ๋ ฅ ๋ฒํผ ์ค๋น | |
| output = np.empty(output_shape, dtype=np.float32) | |
| # ํธ์คํธ ๋ฉ๋ชจ๋ฆฌ ์ค๋น (ํ์ด์ง ์ ๊ธ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ) | |
| h_input = cuda.pagelocked_empty(batch_input.shape, dtype=np.float32) | |
| h_output = cuda.pagelocked_empty(output_shape, dtype=np.float32) | |
| # ์ ๋ ฅ ๋ฐ์ดํฐ ๋ณต์ฌ | |
| np.copyto(h_input, batch_input) | |
| # ๋๋ฐ์ด์ค ๋ฉ๋ชจ๋ฆฌ ํ ๋น | |
| d_input = cuda.mem_alloc(h_input.nbytes) | |
| d_output = cuda.mem_alloc(h_output.nbytes) | |
| # CUDA ์คํธ๋ฆผ ์์ฑ | |
| stream = cuda.Stream() | |
| try: | |
| # ๋ฉ๋ชจ๋ฆฌ ๋ณต์ฌ (ํธ์คํธ -> ๋๋ฐ์ด์ค) | |
| cuda.memcpy_htod_async(d_input, h_input, stream) | |
| # ํ ์ ์ฃผ์ ์ค์ | |
| self.context.set_tensor_address(self.input_name, int(d_input)) | |
| self.context.set_tensor_address(self.output_name, int(d_output)) | |
| # ๋๋ฒ๊น ์ ๋ณด (๋ฉ๋ชจ๋ฆฌ ์ฃผ์) | |
| print(f"์ ๋ ฅ ๋ฉ๋ชจ๋ฆฌ ์ฃผ์: {int(d_input)}, ์ถ๋ ฅ ๋ฉ๋ชจ๋ฆฌ ์ฃผ์: {int(d_output)}") | |
| # ์คํ | |
| success = self.context.execute_async_v3(stream_handle=stream.handle) | |
| if not success: | |
| print("TensorRT ์คํ ์คํจ") | |
| return None | |
| # ๋ฉ๋ชจ๋ฆฌ ๋ณต์ฌ (๋๋ฐ์ด์ค -> ํธ์คํธ) | |
| cuda.memcpy_dtoh_async(h_output, d_output, stream) | |
| # ์คํธ๋ฆผ ๋๊ธฐํ | |
| stream.synchronize() | |
| # ์ถ๋ ฅ ๋ฐ์ดํฐ ๋ณต์ฌ | |
| np.copyto(output, h_output) | |
| return output | |
| except Exception as e: | |
| print(f"TensorRT ์ถ๋ก ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| finally: | |
| # ๋ฉ๋ชจ๋ฆฌ ํด์ | |
| del stream | |
| if 'd_input' in locals(): | |
| d_input.free() | |
| if 'd_output' in locals(): | |
| d_output.free() | |
| def sliding_window_predict(self, image: np.ndarray, window_size: Union[int, Tuple[int, int]], | |
| stride: Union[int, Tuple[int, int]]) -> np.ndarray: | |
| """ | |
| ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ฐฉ์์ผ๋ก ์ด๋ฏธ์ง ์์ธก์ ์ํํฉ๋๋ค. ๊ฒน์น๋ ์์ญ์ ํ๊ท ๊ฐ์ ์ฌ์ฉํฉ๋๋ค. | |
| Args: | |
| image (np.ndarray): ํํ๊ฐ (1, 3, H, W)์ธ ์ด๋ฏธ์ง ๋ฐฐ์ด | |
| window_size (int or tuple): ์๋์ฐ ํฌ๊ธฐ | |
| stride (int or tuple): ์๋์ฐ ์ด๋ ๊ฐ๊ฒฉ | |
| Returns: | |
| np.ndarray: ์์ธก๋ ๋ฐ๋ ๋งต | |
| """ | |
| # CUDA ์ด๊ธฐํ (์ฒ์ ์ฌ์ฉํ ๋๋ง) | |
| global cuda | |
| if 'cuda' not in globals(): | |
| import pycuda.driver as cuda | |
| cuda.init() | |
| # ์ ๋ ฅ ๊ฒ์ฆ | |
| assert len(image.shape) == 4, f"์ด๋ฏธ์ง๋ 4์ฐจ์ ๋ฐฐ์ด์ด์ด์ผ ํฉ๋๋ค. (1, C, H, W), ํ์ฌ: {image.shape}" | |
| # ์๋์ฐ ํฌ๊ธฐ์ ์คํธ๋ผ์ด๋ ์ค์ | |
| window_size = (int(window_size), int(window_size)) if isinstance(window_size, (int, float)) else window_size | |
| stride = (int(stride), int(stride)) if isinstance(stride, (int, float)) else stride | |
| window_size = tuple(window_size) | |
| stride = tuple(stride) | |
| # ๊ฒ์ฆ | |
| assert isinstance(window_size, tuple) and len(window_size) == 2 and window_size[0] > 0 and window_size[1] > 0, \ | |
| f"์๋์ฐ ํฌ๊ธฐ๋ ์์ ์ ์ ํํ (h, w)์ด์ด์ผ ํฉ๋๋ค. ํ์ฌ: {window_size}" | |
| assert isinstance(stride, tuple) and len(stride) == 2 and stride[0] > 0 and stride[1] > 0, \ | |
| f"์คํธ๋ผ์ด๋๋ ์์ ์ ์ ํํ (h, w)์ด์ด์ผ ํฉ๋๋ค. ํ์ฌ: {stride}" | |
| assert stride[0] <= window_size[0] and stride[1] <= window_size[1], \ | |
| f"์คํธ๋ผ์ด๋๋ ์๋์ฐ ํฌ๊ธฐ๋ณด๋ค ์์์ผ ํฉ๋๋ค. ํ์ฌ: {stride}์ {window_size}" | |
| image_height, image_width = image.shape[-2:] | |
| window_height, window_width = window_size | |
| stride_height, stride_width = stride | |
| # ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์ ๊ณ์ฐ | |
| num_rows = int(np.ceil((image_height - window_height) / stride_height) + 1) | |
| num_cols = int(np.ceil((image_width - window_width) / stride_width) + 1) | |
| # ์๋์ฐ ์ถ์ถ | |
| windows = [] | |
| window_positions = [] | |
| for i in range(num_rows): | |
| for j in range(num_cols): | |
| x_start, y_start = i * stride_height, j * stride_width | |
| x_end, y_end = x_start + window_height, y_start + window_width | |
| # ์ด๋ฏธ์ง ๊ฒฝ๊ณ ์ฒ๋ฆฌ | |
| if x_end > image_height: | |
| x_start, x_end = image_height - window_height, image_height | |
| if y_end > image_width: | |
| y_start, y_end = image_width - window_width, image_width | |
| window = image[:, :, x_start:x_end, y_start:y_end] | |
| windows.append(window) | |
| window_positions.append((x_start, y_start, x_end, y_end)) | |
| # ๋ฐฐ์น ๋จ์๋ก ์ถ๋ก | |
| all_preds = [] | |
| max_batch_size = 8 | |
| for start_idx in range(0, len(windows), max_batch_size): | |
| end_idx = min(start_idx + max_batch_size, len(windows)) | |
| batch_windows = np.vstack(windows[start_idx:end_idx]) # (batch_size, 3, h, w) | |
| # TensorRT ์ถ๋ก | |
| batch_preds = self._infer_batch(batch_windows) | |
| # Debug ์ ๋ณด | |
| # print(f"๋ฐฐ์น ์ ๋ ฅ ํํ: {batch_windows.shape}, ๋ฐฐ์น ์ถ๋ ฅ ํํ: {batch_preds.shape}") | |
| all_preds.extend([batch_preds[i:i+1] for i in range(batch_preds.shape[0])]) | |
| # ์์ธก ๊ฒฐ๊ณผ๋ฅผ numpy ๋ฐฐ์ด๋ก ๋ณํ | |
| preds = np.concatenate(all_preds, axis=0) | |
| # ์ถ๋ ฅ ๋ฐ๋ ๋งต ์กฐ๋ฆฝ | |
| pred_map = np.zeros((preds.shape[1], image_height // self.reduction, image_width // self.reduction), dtype=np.float32) | |
| count_map = np.zeros((preds.shape[1], image_height // self.reduction, image_width // self.reduction), dtype=np.float32) | |
| idx = 0 | |
| for i in range(num_rows): | |
| for j in range(num_cols): | |
| x_start, y_start, x_end, y_end = window_positions[idx] | |
| # ์ถ๋ ฅ ์์ญ ๊ณ์ฐ (reduction ๊ณ ๋ ค) | |
| x_start_out = x_start // self.reduction | |
| y_start_out = y_start // self.reduction | |
| x_end_out = x_end // self.reduction | |
| y_end_out = y_end // self.reduction | |
| pred_map[:, x_start_out:x_end_out, y_start_out:y_end_out] += preds[idx] | |
| count_map[:, x_start_out:x_end_out, y_start_out:y_end_out] += 1. | |
| idx += 1 | |
| # ๊ฒน์น๋ ์์ญ ํ๊ท ๊ณ์ฐ | |
| pred_map /= count_map | |
| return pred_map | |
| def resize_density_map(self, density_map: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray: | |
| """ | |
| ๋ฐ๋ ๋งต์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํฉ๋๋ค. ์ดํฉ์ ๋ณด์กด๋ฉ๋๋ค. | |
| Args: | |
| density_map: ํํ๊ฐ (C, H, W)์ธ ๋ฐ๋ ๋งต | |
| target_size: ๋ชฉํ ํฌ๊ธฐ (H', W') | |
| Returns: | |
| np.ndarray: ํฌ๊ธฐ๊ฐ ์กฐ์ ๋ ๋ฐ๋ ๋งต | |
| """ | |
| from PIL import Image | |
| import torch.nn.functional as F | |
| import torch | |
| # numpy๋ฅผ torch๋ก ๋ณํ | |
| if isinstance(density_map, np.ndarray): | |
| density_map = torch.from_numpy(density_map) | |
| # ๋ฐฐ์น ์ฐจ์ ์ถ๊ฐ | |
| if density_map.dim() == 3: | |
| density_map = density_map.unsqueeze(0) # (1, C, H, W) | |
| current_size = density_map.shape[2:] | |
| if current_size[0] == target_size[0] and current_size[1] == target_size[1]: | |
| return density_map.squeeze(0).numpy() | |
| # ์๋ณธ ๋ฐ๋ ๋งต์ ์ดํฉ ๊ณ์ฐ | |
| original_sum = density_map.sum() | |
| # ํฌ๊ธฐ ์กฐ์ (์์ ํ ๋ณด๊ฐ) | |
| resized_map = F.interpolate( | |
| density_map, | |
| size=target_size, | |
| mode='bilinear', | |
| align_corners=False | |
| ) | |
| # ์ดํฉ ๋ณด์กด์ ์ํ ์ค์ผ์ผ๋ง | |
| if resized_map.sum() > 0: # 0์ผ๋ก ๋๋๊ธฐ ๋ฐฉ์ง | |
| resized_map = resized_map * (original_sum / resized_map.sum()) | |
| return resized_map.squeeze(0).numpy() | |
| def predict(self, image: Union[str, np.ndarray]) -> float: | |
| """ | |
| ์ด๋ฏธ์ง์์ ๊ตฐ์ค ๊ณ์ ์์ธก์ ์ํํฉ๋๋ค. | |
| Args: | |
| image: ์ ๋ ฅ ์ด๋ฏธ์ง (๊ฒฝ๋ก, ๋ํ์ด ๋ฐฐ์ด, ๋๋ ์ ๋ก๋๋ ํ์ผ) | |
| Returns: | |
| float: ์์ธก๋ ์ฌ๋ ์ | |
| """ | |
| # ์ด๋ฏธ์ง ์ ์ฒ๋ฆฌ | |
| processed_image = self._process_image(image) | |
| image_height, image_width = processed_image.shape[-2:] | |
| # ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์์ธก | |
| pred_density = self.sliding_window_predict( | |
| processed_image, | |
| self.window_size, | |
| self.stride | |
| ) | |
| # ์์ธก ๊ฒฐ๊ณผ ์ ์ฅ | |
| pred_count = pred_density.sum() | |
| # ์๋ณธ ์ด๋ฏธ์ง ํฌ๊ธฐ๋ก ๋ฐ๋ ๋งต ์กฐ์ | |
| resized_pred_density = self.resize_density_map( | |
| pred_density, | |
| (image_height, image_width) | |
| ) | |
| # ๊ฒฐ๊ณผ ์ ์ฅ | |
| self.processed_image = self._post_process_image(processed_image) | |
| self.density_map = resized_pred_density.squeeze() | |
| self.count = pred_count | |
| return pred_count | |
| def visualize_density_map(self, alpha: float = 0.5, save: bool = False, | |
| save_path: Optional[str] = None): | |
| """ | |
| ํ์ฌ ์ ์ฅ๋ ์์ธก ๊ฒฐ๊ณผ๋ฅผ ์๊ฐํํฉ๋๋ค. | |
| Args: | |
| alpha (float): density map์ ํฌ๋ช ๋ (0~1). ๊ธฐ๋ณธ๊ฐ 0.5 | |
| save (bool): ์๊ฐํ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ฏธ์ง๋ก ์ ์ฅํ ์ง ์ฌ๋ถ. ๊ธฐ๋ณธ๊ฐ False | |
| save_path (str, optional): ์ ์ฅํ ๊ฒฝ๋ก. None์ผ ๊ฒฝ์ฐ ํ์ฌ ๋๋ ํ ๋ฆฌ์ ์๋ ์์ฑ๋ ์ด๋ฆ์ผ๋ก ์ ์ฅ. | |
| ๊ธฐ๋ณธ๊ฐ None | |
| Returns: | |
| Tuple[matplotlib.figure.Figure, np.ndarray]: | |
| - density map์ด ์ค๋ฒ๋ ์ด๋ matplotlib Figure ๊ฐ์ฒด | |
| - RGB ํ์์ ์๊ฐํ๋ ์ด๋ฏธ์ง ๋ฐฐ์ด (H, W, 3) | |
| """ | |
| if self.density_map is None or self.processed_image is None: | |
| raise ValueError("๋จผ์ predict ๋ฉ์๋๋ฅผ ์คํํ์ฌ ์์ธก์ ์ํํด์ผ ํฉ๋๋ค.") | |
| fig, ax = plt.subplots(dpi=200, frameon=False) | |
| ax.imshow(self.processed_image) | |
| ax.imshow(self.density_map, cmap="jet", alpha=alpha) | |
| ax.axis("off") | |
| plt.title(f"Count: {self.count:.1f}") | |
| if save: | |
| if save_path is None: | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| save_path = f"crowd_density_{timestamp}.png" | |
| # ์ฌ๋ฐฑ ์ ๊ฑฐํ๊ณ ์ ์ฅ | |
| plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=200) | |
| print(f"์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {save_path}") | |
| fig.canvas.draw() | |
| image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) | |
| image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (4,)) | |
| image_from_plot = image_from_plot[:,:,:3] # RGB๋ก ๋ณํ | |
| return fig, image_from_plot | |
| def visualize_dots(self, dot_size: int = 20, sigma: float = 1, percentile: float = 97, | |
| save: bool = False, save_path: Optional[str] = None): | |
| """ | |
| ์์ธก๋ ๊ตฐ์ค ์์น๋ฅผ ์ ์ผ๋ก ํ์ํ์ฌ ์๊ฐํํฉ๋๋ค. | |
| Args: | |
| dot_size (int): ์ ์ ํฌ๊ธฐ. ๊ธฐ๋ณธ๊ฐ 20 | |
| sigma (float): Gaussian ํํฐ์ sigma ๊ฐ. ๊ธฐ๋ณธ๊ฐ 1 | |
| percentile (float): ์๊ณ๊ฐ์ผ๋ก ์ฌ์ฉํ ๋ฐฑ๋ถ์์ (0-100). ๊ธฐ๋ณธ๊ฐ 97 | |
| save (bool): ์๊ฐํ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ฏธ์ง๋ก ์ ์ฅํ ์ง ์ฌ๋ถ. ๊ธฐ๋ณธ๊ฐ False | |
| save_path (str, optional): ์ ์ฅํ ๊ฒฝ๋ก. None์ผ ๊ฒฝ์ฐ ํ์ฌ ๋๋ ํ ๋ฆฌ์ ์๋ ์์ฑ๋ ์ด๋ฆ์ผ๋ก ์ ์ฅ. | |
| ๊ธฐ๋ณธ๊ฐ None | |
| Returns: | |
| Tuple[matplotlib.backends.backend_agg.FigureCanvasBase, np.ndarray]: | |
| - matplotlib figure์ canvas ๊ฐ์ฒด | |
| - RGB ํ์์ ์๊ฐํ๋ ์ด๋ฏธ์ง ๋ฐฐ์ด (H, W, 3) | |
| """ | |
| if self.density_map is None or self.processed_image is None: | |
| raise ValueError("๋จผ์ predict ๋ฉ์๋๋ฅผ ์คํํ์ฌ ์์ธก์ ์ํํด์ผ ํฉ๋๋ค.") | |
| adjusted_pred_count = int(round(self.count)) | |
| fig, ax = plt.subplots(dpi=200, frameon=False) | |
| ax.imshow(self.processed_image) | |
| filtered_density = gaussian_filter(self.density_map, sigma=sigma) | |
| threshold = np.percentile(filtered_density, percentile) | |
| candidate_pixels = np.column_stack(np.where(filtered_density >= threshold)) | |
| if len(candidate_pixels) > adjusted_pred_count: | |
| kmeans = KMeans(n_clusters=adjusted_pred_count, random_state=42, n_init=10) | |
| kmeans.fit(candidate_pixels) | |
| head_positions = kmeans.cluster_centers_.astype(int) | |
| else: | |
| head_positions = candidate_pixels | |
| y_coords, x_coords = head_positions[:, 0], head_positions[:, 1] | |
| ax.scatter(x_coords, y_coords, | |
| c='red', | |
| s=dot_size, | |
| alpha=1.0, | |
| edgecolors='white', | |
| linewidth=1) | |
| ax.axis("off") | |
| plt.title(f"Count: {self.count:.1f}") | |
| if save: | |
| if save_path is None: | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| save_path = f"crowd_dots_{timestamp}.png" | |
| plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=200) | |
| print(f"์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {save_path}") | |
| # Figure๋ฅผ numpy ๋ฐฐ์ด๋ก ๋ณํ | |
| fig.canvas.draw() | |
| image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) | |
| image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (4,)) | |
| image_from_plot = image_from_plot[:,:,:3] # RGB๋ก ๋ณํ | |
| return fig.canvas, image_from_plot | |
| def crowd_count(self): | |
| """ | |
| ๊ฐ์ฅ ์ต๊ทผ ์์ธก์ ๊ตฐ์ค ์๋ฅผ ๋ฐํํฉ๋๋ค. | |
| Returns: | |
| float: ์์ธก๋ ๊ตฐ์ค ์ | |
| None: ์์ง ์์ธก์ด ์ํ๋์ง ์์ ๊ฒฝ์ฐ | |
| """ | |
| return self.count | |
| def get_density_map(self): | |
| """ | |
| ๊ฐ์ฅ ์ต๊ทผ ์์ธก์ ๋ฐ๋ ๋งต์ ๋ฐํํฉ๋๋ค. | |
| Returns: | |
| numpy.ndarray: ๋ฐ๋ ๋งต | |
| None: ์์ง ์์ธก์ด ์ํ๋์ง ์์ ๊ฒฝ์ฐ | |
| """ | |
| return self.density_map |