File size: 4,355 Bytes
40061d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import gc
import uuid

import chromadb
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from transformers import AutoModel, AutoImageProcessor

from src.utils.utils import extract_images_from_file


# models
# nomic-ai/nomic-embed-vision-v1.5
# google/vit-base-patch16-384

class is_conf_image:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.feature_extractor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5",
                                                                    cache_dir="../weights", use_fast=True,
                                                                    trust_remote_code=True)
        self.model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5",
                                               cache_dir="../weights", trust_remote_code=True).eval().to(self.device)

        self.client = chromadb.PersistentClient(path="../db/image")
        self.collection = self.client.get_or_create_collection(name="image_embedding", metadata={"hnsw": "cosine"}, )
        self.max_size: int = 800
        self.cnt: int = 0
        self.cnt_infer: int = 0

    async def making_embedding_vector(self, image_path: str, category: int, ):
        image = Image.open(image_path).convert("RGB")
        image = np.array(image)

        embedding_vector = self.inference(image)
        # print(category)

        self.add_vectors(embedding_vector, {"image": image_path, "category": category})

        if (self.cnt + 1) % 200 == 0:
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            self.cnt += 1
        else:
            self.cnt += 1
        return embedding_vector

    async def infer_image(self, image_path: str, threshold: float = 0.45, top_k: int = 2):
        image = Image.open(image_path).convert("RGB")
        image = np.array(image)

        if image.shape[0] > self.max_size or image.shape[1] > self.max_size or image_path.endswith('.pdf'):
            results = []
            for image in extract_images_from_file(image_path, max_size=self.max_size):
                image = Image.open(image).convert("RGB")
                image = np.array(image)
                embedding_vector = self.inference(image)
                result = self.finding_from_db(embedding_vector, threshold, top_k)
                results.append(result)
            return results

        embedding_vector = self.inference(image)
        results = self.finding_from_db(embedding_vector, threshold, top_k)

        # print(results)
        # 메모리 청소
        if (self.cnt_infer + 1) % 200 == 0:
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            self.cnt_infer += 1
        else:
            self.cnt_infer += 1
        return results

    def finding_from_db(self, embedding_vector, threshold: float, top_k: int, ) -> dict:
        result_out, idx = {}, 0
        # print(embedding_vector)
        results = self.collection.query(query_embeddings=embedding_vector, n_results=top_k,
                                        include=["embeddings", "metadatas", "distances"])
        # print(results)

        for j in range(len(results["distances"][0])):
            if results["distances"][0][j] <= threshold:
                result_out["similar_image" + str(idx)] = results["metadatas"][0][j]["image"]
                result_out["category" + str(idx)] = results["metadatas"][0][j]["category"]
                result_out["cosine distance" + str(idx)] = results["distances"][0][j]
        return result_out

    @torch.inference_mode()
    def inference(self, image: np.array):
        inputs = self.feature_extractor(images=image, return_tensors="pt").to(self.device)
        outputs = self.model(**inputs).last_hidden_state
        outputs = F.normalize(outputs[:, 0], p=2, dim=1).detach().cpu().numpy()

        return outputs.tolist()

    def add_vectors(self, vectors, metadatas):
        # 벡터는 리스트로, 이미 임베딩된 경우 직접 추가
        self.collection.add(
            embeddings=vectors[0],  # 벡터 임베딩
            metadatas=metadatas,  # 메타데이터 (카테고리 포함)
            ids=str(uuid.uuid4())  # 고유 ID
        )