indoflaven's picture
Update main.py
ac73cb7
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
from PIL import Image
import io
import base64
from sentence_transformers import SentenceTransformer, util
class Item(BaseModel):
images: List[bytes]
#name: str
#image1: bytes
#image2: bytes
class ScoreResponse(BaseModel):
score: float
app = FastAPI()
@app.put("/", response_model=ScoreResponse)
def predict(item: Item):
# Load the OpenAI CLIP Model
print('Loading CLIP Model...')
model = SentenceTransformer('clip-ViT-B-32')
# Next we compute the embeddings
# To encode an image, you can use the following code:
# from PIL import Image
# encoded_image = model.encode(Image.open(filepath))
#image_names = list(glob.glob('./*.jpg'))
#print("Images:", len(image_names))
#image_stream1 = io.BytesIO(base64.b64decode(item.image1));
#image_stream2 = io.BytesIO(base64.b64decode(item.image2));
#encoded_image = model.encode([Image.open(image_stream1), Image.open(image_stream2)], batch_size=128, convert_to_tensor=True, show_progress_bar=True)
# Assuming 'item.images' is a list of base64 encoded strings, each representing an image
image_streams = [io.BytesIO(base64.b64decode(image_data)) for image_data in item.images]
# Now open each image stream using PIL's Image.open
opened_images = [Image.open(image_stream) for image_stream in image_streams]
# Assuming 'model' is your pre-trained model that encodes the images
encoded_image = model.encode(opened_images, batch_size=128, convert_to_tensor=True, show_progress_bar=True)
# Now we run the clustering algorithm. This function compares images aganist
# all other images and returns a list with the pairs that have the highest
# cosine similarity score
processed_images = util.paraphrase_mining_embeddings(encoded_image)
#NUM_SIMILAR_IMAGES = 10
# =================
# DUPLICATES
# =================
#print('Finding duplicate images...')
# Filter list for duplicates. Results are triplets (score, image_id1, image_id2) and is scorted in decreasing order
# A duplicate image will have a score of 1.00
# It may be 0.9999 due to lossy image compression (.jpg)
#duplicates = [image for image in processed_images if image[0] >= 0.999]
# Output the top X duplicate images
#for score, image_id1, image_id2 in duplicates[0:NUM_SIMILAR_IMAGES]:
#print("\nScore: {:.3f}%".format(score * 100))
# Check if there are any duplicates
#if duplicates:
# Find the top score among duplicates
#top_score = max(duplicates, key=lambda x: x[0])[0]
#formatted_score = round(top_score * 100, 3) # Multiplies by 100 and rounds to three decimal places
#return ScoreResponse(score=formatted_score)
# =================
# NEAR DUPLICATES
# =================
print('Finding near duplicate images...')
# Use a threshold parameter to identify two images as similar. By setting the threshold lower,
# you will get larger clusters which have less similar images in it. Threshold 0 - 1.00
# A threshold of 1.00 means the two images are exactly the same. Since we are finding near
# duplicate images, we can set it at 0.99 or any number 0 < X < 1.00.
threshold = 1.0
near_duplicates = [image for image in processed_images if image[0] < threshold]
#for score, image_id1, image_id2 in near_duplicates[0:NUM_SIMILAR_IMAGES]:
#print("\nScore: {:.3f}%".format(score * 100))
# Find the top score from near duplicates
if near_duplicates:
top_score = max(near_duplicates, key=lambda x: x[0])[0]
else:
top_score = 0 # Default score if there are no near duplicates
formatted_score = round(top_score * 100, 3) # Multiplies by 100 and rounds to three decimal places
print("Score: " + str(formatted_score))
return ScoreResponse(score=formatted_score)
#formatted_score = round(score * 100, 3) # Multiplies by 100 and rounds to three decimal places
#return ScoreResponse(score=formatted_score)
#return "Score: {:.3f}%".format(score * 100)
#@app.get("/")
#def read_root():
# return {"Hello": "World!"}
#@app.put("/return/")
#def return_item(item: Item):
# return item