grok-api / utils.py
hf1024's picture
Update utils.py
066f443 verified
Raw
History Blame Contribute Delete
4.18 kB
# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@Author : Ailove
------------------------------------
@File : utils.py
@CreateTime : 2025/5/22 20:37
------------------------------------
"""
import os, hashlib, glob, re, base64, cv2
import numpy as np
from PIL import Image
from io import BytesIO
import requests
import onnxruntime
from config import MEDIA_ROOT
# 缓存 GPU 可用性
_CACHED_CUDA_AVAILABLE = None
def is_cuda_available():
global _CACHED_CUDA_AVAILABLE
if _CACHED_CUDA_AVAILABLE is None:
try:
import GPUtil
_CACHED_CUDA_AVAILABLE = len(GPUtil.getGPUs()) > 0
except:
_CACHED_CUDA_AVAILABLE = False
return _CACHED_CUDA_AVAILABLE
def read_file(file_path):
with open(file_path, "rb") as f:
return f.read()
def is_md5(string):
return re.match(r"^[0-9a-f]{32}$", string)
def id2address(image_id, type="s"):
if not is_md5(image_id):
image_id = hashlib.md5(str(image_id).encode("utf-8")).hexdigest()
addrs = [str(int(image_id[i:i + 7], 16) % 1024) for i in range(0, 17, 8)]
base = "image_storage" if type == "s" else "store"
return os.path.join(MEDIA_ROOT, base, *addrs)
def readimg(body, keys):
inputs = {}
for key in keys:
try:
if key.startswith("url"):
image_string = requests.get(body[key]).content
elif key.startswith("img"):
image_string = base64.b64decode(body[key])
elif key.startswith("id"):
path = id2address(body[key])
files = glob.glob(os.path.join(path, '*'))
file = next((f for f in files if os.path.isfile(f)), None)
if not file:
continue
image_string = read_file(file)
else:
continue
inputs[key] = np.array(Image.open(BytesIO(image_string)).convert("RGB"))
except:
inputs[key] = None
return inputs
# 其他函数(如 get_embedding、detect_features、is_bottom_third_blank、get_cos_similar)可保持不变
def is_bottom_third_blank(image_data: np.ndarray) -> bool:
original = cv2.cvtColor(image_data, cv2.COLOR_RGB2BGR)
height, width, _ = original.shape
if height > 510:
original = cv2.resize(original, (350, 510))
height, width, _ = original.shape
bottom_third_start = height * 2 // 3
bottom_third = original[bottom_third_start:, :]
hist = cv2.calcHist([bottom_third], [0], None, [256], [0, 256])[5:150]
if np.sum(hist) > 0:
middle_section = original[200:400, :]
middle_hist = cv2.calcHist([middle_section], [0], None, [256], [0, 256])[5:150]
return np.sum(middle_hist) == 0
else:
return True
def detect_features(original_image: np.ndarray) -> int:
model_path = MEDIA_ROOT#os.path.join(MEDIA_ROOT, "model")
feature_image_path = os.path.join(model_path, "xx.png")
feature = cv2.imread(feature_image_path)
if original_image.shape[0] > 600:
original = original_image
target_size = (15, 15)
else:
original = cv2.resize(original_image, (350, 510))
target_size = (9, 9)
resized_feature = cv2.resize(feature, target_size)
result = cv2.matchTemplate(original, resized_feature, cv2.TM_CCOEFF_NORMED)
_, binary = cv2.threshold(result, 0.8, 1, cv2.THRESH_BINARY)
locations = np.where(binary == 1)
return len(locations[0])
def get_embedding(img: np.ndarray) -> np.ndarray:
cuda = is_cuda_available()
model_path = os.path.join(MEDIA_ROOT, "image-similarity.onnx")
session = onnxruntime.InferenceSession(
model_path,
providers=["CUDAExecutionProvider"] if cuda else ["CPUExecutionProvider"]
)
img = img / 255.0
img = cv2.resize(img, (448, 448))
img = img.transpose(2, 0, 1).astype(np.float32)[np.newaxis, :]
embedding = session.run(['output'], {'input': img})[0][0]
return embedding
def get_cos_similar(v1: np.ndarray, v2: np.ndarray) -> float:
num = float(np.dot(v1, v2))
denom = np.linalg.norm(v1) * np.linalg.norm(v2)
return 0.5 + 0.5 * (num / denom)