| import asyncio, base64, json, os, tempfile |
| from pathlib import Path |
|
|
| import cv2 |
| import gradio as gr |
| import numpy as np |
| from PIL import Image |
| from dotenv import load_dotenv |
| from openai import OpenAI |
|
|
| |
| load_dotenv() |
| def _env(k, d=""): |
| return os.getenv(k, d).split("#", 1)[0].strip() |
|
|
| DEFAULT_MODEL = _env("OPENAI_MODEL", "gpt-4o") |
|
|
| AVAILABLE_MODELS = [ |
| "gpt-4o", |
| "gpt-4o-mini", |
| "o1", |
| "o1-mini", |
| "o3-mini", |
| "gpt-4-vision-preview", |
| "gpt-4-turbo" |
| ] |
|
|
| DEFAULTS = dict( |
| row = int(_env("ROW_COUNT", 7)), |
| col = int(_env("COL_COUNT", 7)), |
| zoom = int(_env("ZOOM_LEVELS", 2)), |
| overlap = 0.0, |
| pad = 0.0, |
| max_candidates = int(_env("MAX_CANDIDATES", 3)) |
| ) |
|
|
| |
| DEFAULT_PROMPT = ( |
| "You are a vision inspector. Look at the image and determine if a human is present. " |
| "Partial visibility is acceptableβconsider clues like limbs, clothing, silhouettes, shadows, or partial faces. " |
| "Respond strictly with valid JSON in the following format:\n" |
| '{"detected":"YES/NO/MAYBE", "confidence":<float between 0 and 1>, "reason":"<15 words max>"}\n' |
| "- YES: Clearly visible human feature(s) are observed.\n" |
| "- MAYBE: Ambiguous or partial evidence is present.\n" |
| "- NO: No evidence of a human is detected." |
| ) |
|
|
| |
| def encode(img): |
| """Encode image to a Base64 string.""" |
| encode_params = [int(cv2.IMWRITE_JPEG_QUALITY), 95] |
| _, buf = cv2.imencode(".jpg", img, encode_params) |
| return base64.b64encode(buf).decode() |
|
|
| def crop(img, r): |
| """Crop image to region r=(x,y,w,h) in relative coordinates.""" |
| H, W = img.shape[:2] |
| x, y, w, h = r |
| return img[int(y * H):int((y + h) * H), int(x * W):int((x + w) * W)] |
|
|
| def split(r, rows, cols, ov=0.0, pad=0.0): |
| """ |
| Split region r=(x,y,w,h) into a grid of subregions with specified rows and columns. |
| Overlap and padding are fixed at 0 as configured. |
| """ |
| x0, y0, w, h = r |
| tw, th = w / cols, h / rows |
| sx, sy = tw, th |
| tiles = [] |
| |
| nx = max(1, int((w - tw) // sx) + 1) |
| ny = max(1, int((h - th) // sy) + 1) |
| for ry in range(ny): |
| for cx in range(nx): |
| sx0 = min(x0 + cx * sx, x0 + w - tw) |
| sy0 = min(y0 + ry * sy, y0 + h - th) |
| tiles.append((sx0, sy0, tw, th)) |
| return tiles |
|
|
| def rank(det): |
| """Rank the detection result.""" |
| return {"YES": 0, "MAYBE": 1}.get(det, 2) |
|
|
| def draw_path(img, path, results=None): |
| """Draw the search path on the image with rectangles for each stage.""" |
| out = img.copy() |
| STAGE_COLOURS = [(0, 165, 255), (0, 255, 0), (255, 0, 0), (255, 255, 0), (128, 0, 128)] |
| for i, r in enumerate(path): |
| x, y, w, h = r |
| H, W = img.shape[:2] |
| x1, y1 = int(x * W), int(y * H) |
| x2, y2 = int((x + w) * W), int((y + h) * H) |
| color = STAGE_COLOURS[i % len(STAGE_COLOURS)] |
| cv2.rectangle(out, (x1, y1), (x2, y2), color, 2) |
| label = f"S{i+1}" |
| if results and i < len(results): |
| res = results[i] |
| if "detected" in res: |
| det = res["detected"] |
| conf = res.get("confidence", 0) |
| label += f": {det} ({conf:.2f})" |
| font = cv2.FONT_HERSHEY_SIMPLEX |
| font_scale = 0.5 |
| thickness = 1 |
| text_size = cv2.getTextSize(label, font, font_scale, thickness)[0] |
| cv2.rectangle(out, (x1, y1 - text_size[1] - 5), (x1 + text_size[0] + 5, y1), color, -1) |
| cv2.putText(out, label, (x1 + 2, y1 - 5), font, font_scale, (255, 255, 255), thickness) |
| return out |
|
|
| |
| async def ask_api(img, api_key, model="gpt-4o", custom_prompt=None): |
| """Send one grid cell image to the OpenAI API and return the result.""" |
| client = OpenAI(api_key=api_key) |
| prompt = custom_prompt or DEFAULT_PROMPT |
| msg = [{ |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": prompt}, |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encode(img)}"}} |
| ] |
| }] |
| delay = 1 |
| for attempt in range(5): |
| try: |
| response = await asyncio.to_thread( |
| client.chat.completions.create, |
| model=model, |
| messages=msg, |
| max_tokens=60, |
| response_format={"type": "json_object"} |
| ) |
| return json.loads(response.choices[0].message.content) |
| except Exception as e: |
| if "rate limit" in str(e).lower(): |
| await asyncio.sleep(delay) |
| delay = min(delay * 2, 32) |
| else: |
| return {"detected": "NO", "confidence": 0, "reason": f"Error: {str(e)[:50]}..."} |
| return {"detected": "NO", "confidence": 0, "reason": "Too many API retries, please try again later"} |
|
|
| |
| async def recurse_multi(img, region, depth, rows, cols, prog, api_key, model, max_candidates, all_results=None): |
| """ |
| Recursively analyze grid cells, allowing up to max_candidates per stage. |
| Returns a list of branch dictionaries with keys: |
| - "final_region": final region in the branch, |
| - "path": list of regions (from higher to lower levels), |
| - "stage_results": list of API results per stage. |
| """ |
| if all_results is None: |
| all_results = [] |
| |
| if depth == 0: |
| return [{"final_region": region, "path": [], "stage_results": []}] |
| |
| subs = split(region, rows, cols, ov=0.0, pad=0.0) |
| prog(0, desc=f"Stage {depth}: scanning {len(subs)} grid cells...") |
| tasks = [] |
| for sub in subs: |
| crop_img = crop(img, sub) |
| tasks.append(ask_api(crop_img, api_key, model)) |
| results = await asyncio.gather(*tasks) |
| |
| |
| sub_results = list(zip(subs, results)) |
| |
| sub_results.sort(key=lambda tup: (rank(tup[1]["detected"]), -tup[1].get("confidence", 0))) |
| |
| |
| candidates = [tup for tup in sub_results if tup[1]["detected"] in ("YES", "MAYBE")] |
| if not candidates: |
| candidates = [sub_results[0]] |
| candidates = candidates[:max_candidates] |
| |
| branches = [] |
| for candidate_region, candidate_result in candidates: |
| |
| current_stage = {"region": candidate_region, "result": candidate_result} |
| |
| child_branches = await recurse_multi(img, candidate_region, depth - 1, rows, cols, prog, api_key, model, max_candidates) |
| for branch in child_branches: |
| branch["path"].insert(0, candidate_region) |
| branch["stage_results"].insert(0, candidate_result) |
| branches.append(branch) |
| return branches |
|
|
| |
| def run_pipeline(pil_img, api_key, model, rows, cols, zoom, max_candidates, progress=gr.Progress()): |
| """ |
| Process a single uploaded image: |
| 1. Divide the image into grid cells. |
| 2. Recursively zoom in by exploring up to max_candidates per stage. |
| 3. Draw the search path on the original image. |
| 4. Return the final cropped region (from the best branch), its search path, and a summary. |
| """ |
| error_message = None |
| if pil_img is None: |
| error_message = "Error: Please upload an image to analyze." |
| elif not api_key or api_key.strip() == "": |
| error_message = "Error: OpenAI API key is required." |
| elif not model or model.strip() == "": |
| error_message = "Error: Please select an OpenAI model." |
| if error_message: |
| return None, None, error_message |
|
|
| try: |
| img_np = np.array(pil_img) |
| except Exception as e: |
| return None, None, f"Error converting image: {str(e)}" |
| |
| full_region = (0, 0, 1, 1) |
| progress(0, desc=f"Starting recursive grid search using {model}...") |
| branches = asyncio.run( |
| recurse_multi(img_np, full_region, zoom, rows, cols, progress, api_key, model, max_candidates) |
| ) |
| |
| if not branches: |
| return None, None, "No branch found." |
| |
| |
| best_branch = max(branches, key=lambda b: b["stage_results"][0].get("confidence", 0)) |
| final_reg = best_branch["final_region"] if "final_region" in best_branch else best_branch["path"][0] |
| final_crop = crop(img_np, final_reg) |
| final_crop_pil = Image.fromarray(final_crop) |
| |
| |
| path_order = list(reversed(best_branch["path"])) |
| stage_results_order = list(reversed(best_branch["stage_results"])) |
| path_img = draw_path(img_np, path_order, stage_results_order) |
| path_img_pil = Image.fromarray(path_img) |
| |
| |
| summary_lines = [] |
| for i, res in enumerate(stage_results_order): |
| summary_lines.append(f"Stage {i+1}: {res['detected']} ({res['confidence']:.2f}) - {res['reason']}") |
| summary_text = "\n".join(summary_lines) |
| |
| return final_crop_pil, path_img_pil, summary_text |
|
|
| |
| with gr.Blocks(title="EagleβEyes Recursive Grid Search", css="footer {visibility: hidden}") as demo: |
| gr.Markdown(""" |
| # π¦
EagleβEyes Recursive Grid Search |
| |
| Upload a single image. The tool will divide the image into grid cells and recursively zoom in on the most promising region. |
| At each stage, up to a configurable number of positive candidates are explored. The search path is drawn on the original image. |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| img_in = gr.Image(type="pil", label="Input Image") |
| api_key = gr.Textbox( |
| label="OpenAI API Key", |
| placeholder="Enter your OpenAI API key here...", |
| type="password", |
| info="Your API key will be used only for this session and not stored" |
| ) |
| model = gr.Dropdown( |
| choices=AVAILABLE_MODELS, |
| value=DEFAULT_MODEL, |
| label="Model Selection", |
| info="Select the OpenAI model to use for analysis" |
| ) |
| with gr.Group(): |
| with gr.Row(): |
| row = gr.Number(value=DEFAULTS["row"], label="Grid Rows", precision=0, minimum=1, maximum=10) |
| col = gr.Number(value=DEFAULTS["col"], label="Grid Columns", precision=0, minimum=1, maximum=10) |
| zoom = gr.Number(value=DEFAULTS["zoom"], label="Zoom Levels", precision=0, minimum=1, maximum=5) |
| with gr.Row(): |
| max_candidates = gr.Number(value=DEFAULTS["max_candidates"], label="Max Candidates per Stage", precision=0, minimum=1, maximum=10) |
| btn = gr.Button("π Run Search", variant="primary") |
| summary_out = gr.Textbox(label="Results Summary", lines=5, interactive=False) |
| with gr.Column(scale=1): |
| crop_out = gr.Image(label="Final Crop (Zoomed Region)") |
| path_out = gr.Image(label="Search Path Visualization") |
| |
| gr.Markdown(""" |
| ### Tips for Best Results |
| |
| - **OpenAI API Key**: Required for this tool. Your key remains private. |
| - **Model Selection**: Choose a model with vision support (e.g., `gpt-4o`, `gpt-4o-mini`, `o1`, etc.). |
| - **Grid Settings**: Adjust rows and columns to fine-tune segmentation. |
| - **Zoom Levels**: More zoom levels perform deeper recursive search. |
| - **Max Candidates per Stage**: Controls how many positive grid cells to explore at each stage. |
| """) |
| |
| btn.click( |
| run_pipeline, |
| inputs=[img_in, api_key, model, row, col, zoom, max_candidates], |
| outputs=[crop_out, path_out, summary_out] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|