# !/usr/bin/python3 # -*- coding: utf-8 -*- """ @Author : Ailove ------------------------------------ @File : utils.py @CreateTime : 2025/5/22 20:37 ------------------------------------ """ import os, hashlib, glob, re, base64, cv2 import numpy as np from PIL import Image from io import BytesIO import requests import onnxruntime from config import MEDIA_ROOT # 缓存 GPU 可用性 _CACHED_CUDA_AVAILABLE = None def is_cuda_available(): global _CACHED_CUDA_AVAILABLE if _CACHED_CUDA_AVAILABLE is None: try: import GPUtil _CACHED_CUDA_AVAILABLE = len(GPUtil.getGPUs()) > 0 except: _CACHED_CUDA_AVAILABLE = False return _CACHED_CUDA_AVAILABLE def read_file(file_path): with open(file_path, "rb") as f: return f.read() def is_md5(string): return re.match(r"^[0-9a-f]{32}$", string) def id2address(image_id, type="s"): if not is_md5(image_id): image_id = hashlib.md5(str(image_id).encode("utf-8")).hexdigest() addrs = [str(int(image_id[i:i + 7], 16) % 1024) for i in range(0, 17, 8)] base = "image_storage" if type == "s" else "store" return os.path.join(MEDIA_ROOT, base, *addrs) def readimg(body, keys): inputs = {} for key in keys: try: if key.startswith("url"): image_string = requests.get(body[key]).content elif key.startswith("img"): image_string = base64.b64decode(body[key]) elif key.startswith("id"): path = id2address(body[key]) files = glob.glob(os.path.join(path, '*')) file = next((f for f in files if os.path.isfile(f)), None) if not file: continue image_string = read_file(file) else: continue inputs[key] = np.array(Image.open(BytesIO(image_string)).convert("RGB")) except: inputs[key] = None return inputs # 其他函数(如 get_embedding、detect_features、is_bottom_third_blank、get_cos_similar)可保持不变 def is_bottom_third_blank(image_data: np.ndarray) -> bool: original = cv2.cvtColor(image_data, cv2.COLOR_RGB2BGR) height, width, _ = original.shape if height > 510: original = cv2.resize(original, (350, 510)) height, width, _ = original.shape bottom_third_start = height * 2 // 3 bottom_third = original[bottom_third_start:, :] hist = cv2.calcHist([bottom_third], [0], None, [256], [0, 256])[5:150] if np.sum(hist) > 0: middle_section = original[200:400, :] middle_hist = cv2.calcHist([middle_section], [0], None, [256], [0, 256])[5:150] return np.sum(middle_hist) == 0 else: return True def detect_features(original_image: np.ndarray) -> int: model_path = MEDIA_ROOT#os.path.join(MEDIA_ROOT, "model") feature_image_path = os.path.join(model_path, "xx.png") feature = cv2.imread(feature_image_path) if original_image.shape[0] > 600: original = original_image target_size = (15, 15) else: original = cv2.resize(original_image, (350, 510)) target_size = (9, 9) resized_feature = cv2.resize(feature, target_size) result = cv2.matchTemplate(original, resized_feature, cv2.TM_CCOEFF_NORMED) _, binary = cv2.threshold(result, 0.8, 1, cv2.THRESH_BINARY) locations = np.where(binary == 1) return len(locations[0]) def get_embedding(img: np.ndarray) -> np.ndarray: cuda = is_cuda_available() model_path = os.path.join(MEDIA_ROOT, "image-similarity.onnx") session = onnxruntime.InferenceSession( model_path, providers=["CUDAExecutionProvider"] if cuda else ["CPUExecutionProvider"] ) img = img / 255.0 img = cv2.resize(img, (448, 448)) img = img.transpose(2, 0, 1).astype(np.float32)[np.newaxis, :] embedding = session.run(['output'], {'input': img})[0][0] return embedding def get_cos_similar(v1: np.ndarray, v2: np.ndarray) -> float: num = float(np.dot(v1, v2)) denom = np.linalg.norm(v1) * np.linalg.norm(v2) return 0.5 + 0.5 * (num / denom)