Iris314's picture
Upload 4 files
a813fbf verified
# -*- coding: utf-8 -*-
"""
Detect ingredients using a Roboflow model with preprocessing:
- Resize images to 640x640 if needed.
- Perform detection.
- Classify object sizes via K-Means.
- Generate JSON and annotated image outputs.
"""
import json
import os
import tempfile
from dataclasses import dataclass
import cv2
import numpy as np
from roboflow import Roboflow
from sklearn.cluster import KMeans
import supervision as sv
@dataclass
class RoboflowCredentials:
api_key: str
project_name: str
version: int = 1
def load_roboflow_credentials(path: str) -> RoboflowCredentials:
"""Load Roboflow API credentials from a simple key=value text file."""
if not os.path.exists(path):
raise FileNotFoundError(
f"Roboflow credential file not found: {path}."
)
api_key = None
project_name = None
version = 1
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip().lower()
value = value.strip()
if key == "api_key":
api_key = value
elif key == "project_name":
project_name = value
elif key == "version":
try:
version = int(value)
except ValueError:
raise ValueError("Version in credential file must be an integer") from None
if not api_key or not project_name:
raise ValueError(
"Credential file must contain api_key and project_name entries."
)
return RoboflowCredentials(api_key=api_key, project_name=project_name, version=version)
def compute_area_ratios(predictions, img_shape):
"""Compute area ratio (bbox area / image area) for each detection."""
img_area = float(img_shape[0] * img_shape[1])
ratios = []
for pred in predictions:
area = pred["width"] * pred["height"]
ratios.append(area / img_area)
return np.array(ratios).reshape(-1, 1)
def cluster_sizes(area_ratios):
"""Cluster area ratios into two groups using K-Means and return size labels."""
kmeans = KMeans(n_clusters=2, init="k-means++", random_state=0)
labels = kmeans.fit_predict(area_ratios)
centroids = kmeans.cluster_centers_.flatten()
large_cluster = np.argmax(centroids)
return ["large" if lbl == large_cluster else "small" for lbl in labels]
def detect_and_generate(
image_path: str,
credentials: RoboflowCredentials,
conf_threshold: float = 0.4,
overlap_threshold: float = 0.3,
conf_split: float = 0.7,
output_json: str = "recipe_input.json",
output_image: str = "annotated_image.jpg"
):
"""
Resize image if necessary, run detection, classify sizes via K-Means, and
create both JSON output and annotated image.
Args:
image_path (str): Path to the original image.
api_key (str): Roboflow API key.
project_name (str): Roboflow project name.
version (int): Model version.
conf_threshold (float): Minimum confidence threshold (0–1).
overlap_threshold (float): NMS overlap threshold (0–1).
conf_split (float): Threshold for high/low confidence lists.
output_json (str): Output JSON filename.
output_image (str): Output annotated image filename.
Returns:
dict: Recipe input JSON structure.
"""
# Load original image
original_img = cv2.imread(image_path)
if original_img is None:
raise FileNotFoundError(f"Image not found: {image_path}")
height, width = original_img.shape[:2]
# Preprocess: resize to 640x640 if needed, and save to a temp file
if height != 640 or width != 640:
resized_img = cv2.resize(original_img, (640, 640))
# create temporary file via mkstemp; close fd to avoid locking
fd, tmp_path = tempfile.mkstemp(suffix=".jpg")
os.close(fd)
cv2.imwrite(tmp_path, resized_img)
detection_path = tmp_path
img_for_annotation = resized_img
else:
detection_path = image_path
img_for_annotation = original_img
# Initialize Roboflow model
rf = Roboflow(api_key=credentials.api_key)
model = rf.workspace().project(credentials.project_name).version(credentials.version).model
# Run prediction
response = model.predict(
detection_path,
confidence=int(conf_threshold * 100),
overlap=int(overlap_threshold * 100)
).json()
predictions = response["predictions"]
# Classify sizes using K-Means
area_ratios = compute_area_ratios(predictions, img_for_annotation.shape)
size_labels = cluster_sizes(area_ratios)
# Build JSON structure
ingredients = []
high_conf = []
low_conf = []
for pred, size_label in zip(predictions, size_labels):
name = pred["class"]
conf = pred["confidence"]
ingredients.append({
"name": name,
"quantity": size_label,
"confidence": round(conf, 2)
})
if conf >= conf_split:
high_conf.append(name)
else:
low_conf.append(name)
recipe_json = {
"ingredients": ingredients,
"high_confidence_ingredients": high_conf,
"low_confidence_ingredients": low_conf
}
# Write JSON to file
with open(output_json, "w", encoding="utf-8") as jf:
json.dump(recipe_json, jf, indent=4)
# Annotate image with bounding boxes and confidence labels
detections = sv.Detections.from_inference(response)
label_annotator = sv.LabelAnnotator()
box_annotator = sv.BoxAnnotator()
labels_for_annotation = [
f"{pred['class']} ({pred['confidence']:.2f})" for pred in predictions
]
annotated_img = box_annotator.annotate(
scene=img_for_annotation.copy(),
detections=detections
)
annotated_img = label_annotator.annotate(
scene=annotated_img,
detections=detections,
labels=labels_for_annotation
)
cv2.imwrite(output_image, annotated_img)
# Display annotated image (optional, for notebooks)
# Clean up temporary file
if height != 640 or width != 640:
try:
os.remove(tmp_path)
except PermissionError:
# If still locked on Windows, delay deletion or log a warning
pass
return {
"recipe_json": recipe_json,
"output_json_path": output_json,
"annotated_image_path": output_image,
}