Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| import base64 | |
| import cv2 | |
| import numpy as np | |
| import requests | |
| import logging | |
| import tempfile | |
| import subprocess | |
| import os | |
| from typing import List | |
| from tensorflow.keras.applications import MobileNetV2 | |
| from tensorflow.keras.applications.mobilenet_v2 import preprocess_input | |
| from tensorflow.keras.models import Model | |
| from tensorflow.keras.preprocessing.image import img_to_array | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from skimage.metrics import structural_similarity as ssim | |
| from models import RequestModel, ResponseModel | |
| from PIL import Image | |
| from io import BytesIO | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logging.getLogger("urllib3.connection").setLevel(logging.ERROR) | |
| mobilenet = MobileNetV2(weights="imagenet", include_top=False, pooling='avg') | |
| def preprocess_image_for_mobilenet(image): | |
| if len(image.shape) == 2: | |
| image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) | |
| elif image.shape[2] == 1: | |
| image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) | |
| else: | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| image = cv2.resize(image, (224, 224)) | |
| image = img_to_array(image) | |
| image = np.expand_dims(image, axis=0) | |
| image = preprocess_input(image) | |
| return image | |
| def mobilenet_sim(img1, img2, img1AssetCode, img2AssetCode): | |
| try: | |
| img1_proc = preprocess_image_for_mobilenet(img1) | |
| img2_proc = preprocess_image_for_mobilenet(img2) | |
| feat1 = mobilenet.predict(img1_proc, verbose=0) | |
| feat2 = mobilenet.predict(img2_proc, verbose=0) | |
| sim = cosine_similarity(feat1, feat2)[0][0] | |
| sim_score = (sim + 1) * 50 | |
| logging.info(f"MobileNet similarity score from {img1AssetCode} and {img2AssetCode} is {sim_score}") | |
| return float(sim_score) | |
| except Exception as e: | |
| logging.error("Erro ao calcular similaridade com MobileNet", exc_info=True) | |
| return 0 | |
| # def orb_sim(img1, img2, img1AssetCode, img2AssetCode): | |
| # score = 0 | |
| # | |
| # try: | |
| # orb = cv2.ORB_create() | |
| # kp_a, desc_a = orb.detectAndCompute(img1, None) | |
| # kp_b, desc_b = orb.detectAndCompute(img2, None) | |
| # | |
| # bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) | |
| # matches = bf.match(desc_a, desc_b) | |
| # similar_regions = [i for i in matches if i.distance < 20] | |
| # if len(matches) > 0: | |
| # score = (len(similar_regions) / len(matches)) * 100 | |
| # if (score > 0): | |
| # logging.info(f"Orb score from {img1AssetCode} and {img2AssetCode} is {score}") | |
| # except Exception as e: | |
| # logging.error("Erro ao verificar similaridade ORB", exc_info=True) | |
| # | |
| # return 1 if 0 < score < 1 else score | |
| def orb_sim(img1, img2, img1AssetCode, img2AssetCode): | |
| score = 0 | |
| try: | |
| orb = cv2.ORB_create() | |
| kp_a, desc_a = orb.detectAndCompute(img1, None) | |
| kp_b, desc_b = orb.detectAndCompute(img2, None) | |
| if desc_a is None or desc_b is None: | |
| logging.warning(f"ORB descriptors are None for {img1AssetCode} or {img2AssetCode}, skipping.") | |
| return 0 | |
| if desc_a.dtype != desc_b.dtype or desc_a.shape[1] != desc_b.shape[1]: | |
| logging.warning(f"ORB descriptors incompatible for {img1AssetCode} and {img2AssetCode}, skipping.") | |
| return 0 | |
| bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) | |
| matches = bf.match(desc_a, desc_b) | |
| similar_regions = [i for i in matches if i.distance < 20] | |
| if len(matches) > 0: | |
| score = (len(similar_regions) / len(matches)) * 100 | |
| if score > 0: | |
| logging.info(f"Orb score from {img1AssetCode} and {img2AssetCode} is {score}") | |
| except Exception as e: | |
| logging.error("Erro ao verificar similaridade ORB", exc_info=True) | |
| return 1 if 0 < score < 1 else score | |
| def ssim_sim(img1, img2): | |
| s, _ = ssim(img1, img2, full=True) | |
| return (s + 1) * 50 | |
| def load_image_url(source, assetCode, contentType=None, ffmpeg_path='ffmpeg', frame_time=1): | |
| Image.MAX_IMAGE_PIXELS = None | |
| def extract_frame_from_video(video_path_or_url, time_sec): | |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_frame: | |
| frame_path = temp_frame.name | |
| command = [ | |
| ffmpeg_path, | |
| "-ss", str(time_sec), | |
| "-i", video_path_or_url, | |
| "-frames:v", "1", | |
| "-q:v", "2", | |
| "-y", | |
| frame_path | |
| ] | |
| logging.info(f"[DEBUG] Comando ffmpeg: {' '.join(command)}") | |
| result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if result.returncode != 0: | |
| logging.error(f"[ERRO] ffmpeg falhou com código {result.returncode}") | |
| logging.error(f"[ERRO] stderr: {result.stderr.decode('utf-8')}") | |
| raise RuntimeError("Erro ao extrair frame com ffmpeg.") | |
| if not os.path.exists(frame_path): | |
| logging.error("[ERRO] Frame não criado. Verifica se o caminho do vídeo está correto e acessível.") | |
| raise ValueError("Frame não encontrado após execução do ffmpeg.") | |
| frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE) | |
| os.remove(frame_path) | |
| if frame is None: | |
| logging.error("[ERRO] Falha ao ler frame extraído com OpenCV.") | |
| raise ValueError("Erro ao carregar frame extraído.") | |
| logging.info(f"[SUCESSO] Frame extraído com sucesso de {video_path_or_url}") | |
| return frame | |
| try: | |
| logging.info(f"[INFO] Content-Type de {assetCode} é {contentType}") | |
| if source.startswith('http'): | |
| if contentType and contentType.startswith('video'): | |
| return extract_frame_from_video(source, frame_time) | |
| logging.info(f"[INFO] A carregar imagem {assetCode} a partir de URL") | |
| response = requests.get(source) | |
| img = np.asarray(bytearray(response.content), dtype=np.uint8) | |
| img = cv2.imdecode(img, cv2.IMREAD_GRAYSCALE) | |
| return img | |
| else: | |
| logging.info(f"[INFO] A tentar carregar base64 de {assetCode} como imagem ou vídeo.") | |
| try: | |
| img_bytes = base64.b64decode(source) | |
| if contentType and contentType.startswith('video'): | |
| logging.info(f"[INFO] Base64 de {assetCode} identificado como vídeo") | |
| with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_video: | |
| temp_video.write(img_bytes) | |
| temp_video_path = temp_video.name | |
| frame = extract_frame_from_video(temp_video_path, frame_time) | |
| os.remove(temp_video_path) | |
| return frame | |
| else: | |
| logging.info(f"[INFO] Base64 de {assetCode} identificado como imagem") | |
| img = Image.open(BytesIO(img_bytes)) | |
| img = np.array(img) | |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |
| return img | |
| except Exception as e: | |
| logging.info(f"[ERRO] Falha ao processar base64 de {assetCode}: {e}") | |
| raise | |
| except Exception as e: | |
| logging.info(f"[ERRO] Falha ao carregar imagem para {assetCode}: {e}") | |
| return None | |
| def split_image_quadrants_and_center(image, center_size_ratio=0.5): | |
| h, w = image.shape[:2] | |
| mid_x, mid_y = w // 2, h // 2 | |
| # Quadrantes | |
| top_left = image[0:mid_y, 0:mid_x] | |
| top_right = image[0:mid_y, mid_x:w] | |
| bottom_left = image[mid_y:h, 0:mid_x] | |
| bottom_right = image[mid_y:h, mid_x:w] | |
| # Centro | |
| center_w, center_h = int(w * center_size_ratio), int(h * center_size_ratio) | |
| start_x = (w - center_w) // 2 | |
| start_y = (h - center_h) // 2 | |
| center = image[start_y:start_y+center_h, start_x:start_x+center_w] | |
| return [top_left, top_right, bottom_left, bottom_right, center] | |
| def change_similarity_from_quadrants(main_image, main_source, source_image , source): | |
| logging.info(f"[INFO] A verificar quadrants para {main_source} sobre {source}.") | |
| image_quadrants = split_image_quadrants_and_center(source_image) | |
| similarity_score, similarity_orb_score, similarity_mobilenet_score = 0, 0, 0 | |
| for quadrant in image_quadrants: | |
| quadrant_resized = cv2.resize(quadrant, (main_image.shape[1], main_image.shape[0])) | |
| similarity_score_q = ssim_sim(main_image, quadrant_resized) | |
| similarity_orb_score_q = orb_sim(main_image, quadrant_resized, main_source, source + "quadrant") | |
| similarity_mobilenet_score_q = mobilenet_sim(main_image, quadrant_resized, main_source, source + "quadrant") | |
| similarity_score = similarity_score_q if similarity_score_q > similarity_score else similarity_score | |
| similarity_orb_score = similarity_orb_score_q if similarity_orb_score_q > similarity_orb_score else similarity_orb_score | |
| similarity_mobilenet_score = similarity_mobilenet_score_q if similarity_mobilenet_score_q > similarity_mobilenet_score else similarity_mobilenet_score | |
| return similarity_score, similarity_orb_score, similarity_mobilenet_score | |
| def check_similarity(images: List[RequestModel]): | |
| logging.info(f"Checking similarity for main source with resource id {images[0].originId}") | |
| try: | |
| original_image = load_image_url(images[0].source, images[0].assetCode, images[0].contentType) | |
| original_image_shape = original_image.shape | |
| results = [] | |
| for i in range(1, len(images)): | |
| try: | |
| image = load_image_url(images[i].source, images[i].assetCode, images[i].contentType) | |
| image = cv2.resize(image, original_image_shape[::-1]) | |
| similarity_score = ssim_sim(original_image, image) | |
| similarity_orb_score = orb_sim(original_image, image, images[0].assetCode, images[i].assetCode) | |
| similarity_mobilenet_score = mobilenet_sim(original_image, image, images[0].assetCode, images[i].assetCode) | |
| if ((similarity_score < 91 and similarity_mobilenet_score < 91) and (similarity_score >= 70 or similarity_mobilenet_score >= 70)): | |
| similarity_score_q, similarity_orb_score_q, similarity_mobilenet_score_q = change_similarity_from_quadrants(original_image, images[0].assetCode, image, images[i].assetCode) | |
| similarity_score = similarity_score_q if similarity_score_q > similarity_score else similarity_score | |
| similarity_orb_score = similarity_orb_score_q if similarity_orb_score_q > similarity_orb_score else similarity_orb_score | |
| similarity_mobilenet_score = similarity_mobilenet_score_q if similarity_mobilenet_score_q > similarity_mobilenet_score else similarity_mobilenet_score | |
| except Exception as e: | |
| logging.error(f"Error loading image for resource id {images[i].originId} : {e}") | |
| similarity_score = 0 | |
| similarity_orb_score = 0 | |
| similarity_mobilenet_score = 0 | |
| response = ResponseModel(originId=images[i].originId, sequence=images[i].sequence, | |
| assetCode=images[i].assetCode, similarity=similarity_score, similarityOrb=similarity_orb_score, similarityMobileNet=similarity_mobilenet_score) | |
| results.append(response) | |
| except Exception as e: | |
| logging.error(f"Error checking similarity for main source with resource id {images[0].originId} : {e}") | |
| for i in range(1, len(images)): | |
| similarity_score = 0 | |
| similarity_orb_score = 0 | |
| similarity_mobilenet_score = 0 | |
| response = ResponseModel(originId=images[i].originId, sequence=images[i].sequence, | |
| assetCode=images[i].assetCode, similarity=similarity_score, similarityOrb=similarity_orb_score, similarityMobileNet=similarity_mobilenet_score) | |
| results.append(response) | |
| return results |