| |
| |
| """ |
| @Author : Ailove |
| ------------------------------------ |
| @File : utils.py |
| @CreateTime : 2025/5/22 20:37 |
| ------------------------------------ |
| """ |
| import os, hashlib, glob, re, base64, cv2 |
| import numpy as np |
| from PIL import Image |
| from io import BytesIO |
| import requests |
| import onnxruntime |
| from config import MEDIA_ROOT |
|
|
| |
| _CACHED_CUDA_AVAILABLE = None |
|
|
| def is_cuda_available(): |
| global _CACHED_CUDA_AVAILABLE |
| if _CACHED_CUDA_AVAILABLE is None: |
| try: |
| import GPUtil |
| _CACHED_CUDA_AVAILABLE = len(GPUtil.getGPUs()) > 0 |
| except: |
| _CACHED_CUDA_AVAILABLE = False |
| return _CACHED_CUDA_AVAILABLE |
|
|
|
|
| def read_file(file_path): |
| with open(file_path, "rb") as f: |
| return f.read() |
|
|
|
|
| def is_md5(string): |
| return re.match(r"^[0-9a-f]{32}$", string) |
|
|
|
|
| def id2address(image_id, type="s"): |
| if not is_md5(image_id): |
| image_id = hashlib.md5(str(image_id).encode("utf-8")).hexdigest() |
| addrs = [str(int(image_id[i:i + 7], 16) % 1024) for i in range(0, 17, 8)] |
| base = "image_storage" if type == "s" else "store" |
| return os.path.join(MEDIA_ROOT, base, *addrs) |
|
|
|
|
| def readimg(body, keys): |
| inputs = {} |
| for key in keys: |
| try: |
| if key.startswith("url"): |
| image_string = requests.get(body[key]).content |
| elif key.startswith("img"): |
| image_string = base64.b64decode(body[key]) |
| elif key.startswith("id"): |
| path = id2address(body[key]) |
| files = glob.glob(os.path.join(path, '*')) |
| file = next((f for f in files if os.path.isfile(f)), None) |
| if not file: |
| continue |
| image_string = read_file(file) |
| else: |
| continue |
| inputs[key] = np.array(Image.open(BytesIO(image_string)).convert("RGB")) |
| except: |
| inputs[key] = None |
| return inputs |
|
|
| |
| def is_bottom_third_blank(image_data: np.ndarray) -> bool: |
| original = cv2.cvtColor(image_data, cv2.COLOR_RGB2BGR) |
| height, width, _ = original.shape |
| if height > 510: |
| original = cv2.resize(original, (350, 510)) |
| height, width, _ = original.shape |
| bottom_third_start = height * 2 // 3 |
| bottom_third = original[bottom_third_start:, :] |
| hist = cv2.calcHist([bottom_third], [0], None, [256], [0, 256])[5:150] |
| if np.sum(hist) > 0: |
| middle_section = original[200:400, :] |
| middle_hist = cv2.calcHist([middle_section], [0], None, [256], [0, 256])[5:150] |
| return np.sum(middle_hist) == 0 |
| else: |
| return True |
|
|
|
|
| def detect_features(original_image: np.ndarray) -> int: |
| model_path = MEDIA_ROOT |
| feature_image_path = os.path.join(model_path, "xx.png") |
| feature = cv2.imread(feature_image_path) |
|
|
| if original_image.shape[0] > 600: |
| original = original_image |
| target_size = (15, 15) |
| else: |
| original = cv2.resize(original_image, (350, 510)) |
| target_size = (9, 9) |
|
|
| resized_feature = cv2.resize(feature, target_size) |
| result = cv2.matchTemplate(original, resized_feature, cv2.TM_CCOEFF_NORMED) |
| _, binary = cv2.threshold(result, 0.8, 1, cv2.THRESH_BINARY) |
| locations = np.where(binary == 1) |
| return len(locations[0]) |
|
|
|
|
| def get_embedding(img: np.ndarray) -> np.ndarray: |
| cuda = is_cuda_available() |
| model_path = os.path.join(MEDIA_ROOT, "image-similarity.onnx") |
|
|
| session = onnxruntime.InferenceSession( |
| model_path, |
| providers=["CUDAExecutionProvider"] if cuda else ["CPUExecutionProvider"] |
| ) |
|
|
| img = img / 255.0 |
| img = cv2.resize(img, (448, 448)) |
| img = img.transpose(2, 0, 1).astype(np.float32)[np.newaxis, :] |
| embedding = session.run(['output'], {'input': img})[0][0] |
| return embedding |
|
|
|
|
| def get_cos_similar(v1: np.ndarray, v2: np.ndarray) -> float: |
| num = float(np.dot(v1, v2)) |
| denom = np.linalg.norm(v1) * np.linalg.norm(v2) |
| return 0.5 + 0.5 * (num / denom) |