File size: 4,341 Bytes
426ace1
d7d469b
cd9b218
5a69773
a7331e7
24ea268
36443ae
d7d469b
 
0864ad4
 
 
 
 
426ace1
ad4e573
 
 
426ace1
 
40d2ebc
7d4b686
b187293
 
 
 
 
 
 
 
 
 
a7331e7
0864ad4
 
a7331e7
0864ad4
 
 
 
 
 
 
 
 
7b8c02f
b187293
 
 
 
 
ad7e92c
b187293
 
 
 
ad7e92c
b187293
 
 
ad7e92c
b187293
 
ad7e92c
 
af0c9d9
 
ad7e92c
af0c9d9
ad7e92c
 
 
b187293
 
 
 
 
 
 
 
 
ad7e92c
b187293
 
af0c9d9
 
b187293
af0c9d9
 
 
 
 
 
 
ac73cb7
 
 
ad4e573
af0c9d9
 
 
26aedd3
 
b187293
0864ad4
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
from PIL import Image
import io
import base64
from sentence_transformers import SentenceTransformer, util

class Item(BaseModel):
    images: List[bytes]
    #name: str
    #image1: bytes
    #image2: bytes
    

class ScoreResponse(BaseModel):
    score: float

app = FastAPI()

@app.put("/", response_model=ScoreResponse)
def predict(item: Item):
    # Load the OpenAI CLIP Model
    print('Loading CLIP Model...')
    model = SentenceTransformer('clip-ViT-B-32')
    
    # Next we compute the embeddings
    # To encode an image, you can use the following code:
    # from PIL import Image
    # encoded_image = model.encode(Image.open(filepath))
    #image_names = list(glob.glob('./*.jpg'))
    #print("Images:", len(image_names))

    #image_stream1 = io.BytesIO(base64.b64decode(item.image1));
    #image_stream2 = io.BytesIO(base64.b64decode(item.image2));
    
    #encoded_image = model.encode([Image.open(image_stream1), Image.open(image_stream2)], batch_size=128, convert_to_tensor=True, show_progress_bar=True)

    # Assuming 'item.images' is a list of base64 encoded strings, each representing an image
    image_streams = [io.BytesIO(base64.b64decode(image_data)) for image_data in item.images]
    
    # Now open each image stream using PIL's Image.open
    opened_images = [Image.open(image_stream) for image_stream in image_streams]
    
    # Assuming 'model' is your pre-trained model that encodes the images
    encoded_image = model.encode(opened_images, batch_size=128, convert_to_tensor=True, show_progress_bar=True)
    
    # Now we run the clustering algorithm. This function compares images aganist 
    # all other images and returns a list with the pairs that have the highest 
    # cosine similarity score
    processed_images = util.paraphrase_mining_embeddings(encoded_image)
    #NUM_SIMILAR_IMAGES = 10 
    
    # =================
    # DUPLICATES
    # =================
    #print('Finding duplicate images...')
    # Filter list for duplicates. Results are triplets (score, image_id1, image_id2) and is scorted in decreasing order
    # A duplicate image will have a score of 1.00
    # It may be 0.9999 due to lossy image compression (.jpg)
    #duplicates = [image for image in processed_images if image[0] >= 0.999]
    
    # Output the top X duplicate images
    #for score, image_id1, image_id2 in duplicates[0:NUM_SIMILAR_IMAGES]:
        #print("\nScore: {:.3f}%".format(score * 100))

    # Check if there are any duplicates
    #if duplicates:
        # Find the top score among duplicates
        #top_score = max(duplicates, key=lambda x: x[0])[0]
        #formatted_score = round(top_score * 100, 3)  # Multiplies by 100 and rounds to three decimal places
        #return ScoreResponse(score=formatted_score)        
    
    # =================
    # NEAR DUPLICATES
    # =================
    print('Finding near duplicate images...')
    # Use a threshold parameter to identify two images as similar. By setting the threshold lower, 
    # you will get larger clusters which have less similar images in it. Threshold 0 - 1.00
    # A threshold of 1.00 means the two images are exactly the same. Since we are finding near 
    # duplicate images, we can set it at 0.99 or any number 0 < X < 1.00.
    threshold = 1.0
    near_duplicates = [image for image in processed_images if image[0] < threshold]
    
    #for score, image_id1, image_id2 in near_duplicates[0:NUM_SIMILAR_IMAGES]:
        #print("\nScore: {:.3f}%".format(score * 100))

    # Find the top score from near duplicates
    if near_duplicates:
        top_score = max(near_duplicates, key=lambda x: x[0])[0]
    else:
        top_score = 0  # Default score if there are no near duplicates

    formatted_score = round(top_score * 100, 3)  # Multiplies by 100 and rounds to three decimal places

    print("Score: " + str(formatted_score))
    
    return ScoreResponse(score=formatted_score)

    #formatted_score = round(score * 100, 3)  # Multiplies by 100 and rounds to three decimal places
    #return ScoreResponse(score=formatted_score)
    
    #return "Score: {:.3f}%".format(score * 100)

#@app.get("/")
#def read_root():
#    return {"Hello": "World!"}

#@app.put("/return/")
#def return_item(item: Item):
#   return item