| import os |
| from PIL import Image |
| from google import genai |
| from google.genai import types |
| from pydantic import BaseModel |
| import json |
| from tqdm import tqdm |
| |
| |
| |
| |
| class ObstacleAnswer(BaseModel): |
| image_name: str |
| q1: str |
| q2: str |
| q3: str |
| q4: str |
| q5: str |
| q6: str |
| q7: str |
| q8: str |
|
|
|
|
| |
| |
| |
| QUESTIONS = """ |
| Answer the following 8 questions about obstacles in the image: |
| |
| 1. Identify all obstacles or anomalies on the road or sidewalk ahead. |
| For each obstacle, give its category. |
| (On the sidewalk / walkable path) |
| |
| 2. If I keep walking, will I collide with this obstacle? Answer "Yes" or "No." |
| |
| 3. Specify where it is on the path {far left, center left, middle, center right, right} |
| and its relative direction {left, front, right, behind}. |
| |
| 4. If I walk straight, left (image-left), or right (image-right), |
| will I collide with the obstacle? Example: "Straight: Yes; Left: No; Right: No." |
| |
| 5. If a collision is likely, describe how I should walk to avoid it. If not collide, say clear passway. |
| |
| 6. Explain how this obstacle should be repositioned so it no longer poses danger. If not need to move, say none needed. |
| |
| 7. How predictable or expected is this obstacle at this location? |
| {expected / somewhat unexpected / clearly out of place}. Explain briefly. |
| |
| 8. Give an Out-of-Place Score from 0–100 and justify it. |
| focus on the Object that is most likely to be hitted |
| Consider the object's position as opposed to the object & environment. |
| """ |
|
|
|
|
| SYSTEM_MESSAGE = ( |
| "I am fully blind. You are a mobility assistant who analyzes the scene " |
| "and describes obstacles for safe navigation. Be concise and accurate." |
| ) |
|
|
|
|
| |
| |
| |
| GEMINI_API_KEY = "AIzaSyAfnBWMguUci9GyzW-gBxrxCfOmMExiDnA" |
| client = genai.Client(api_key=GEMINI_API_KEY) |
| MODEL_ID = "gemini-3-pro-preview" |
|
|
|
|
| |
| |
| |
| def analyze_image_with_gemini(img_path: str, structured: bool = False): |
| |
| image = Image.open(img_path) |
| image.thumbnail([512, 512]) |
|
|
| |
| contents = [ |
| SYSTEM_MESSAGE, |
| image, |
| QUESTIONS, |
| ] |
|
|
| |
| if structured: |
| response = client.models.generate_content( |
| model=MODEL_ID, |
| contents=contents, |
| config=types.GenerateContentConfig( |
| response_mime_type="application/json", |
| response_schema=ObstacleAnswer, |
| ), |
| ) |
| else: |
| response = client.models.generate_content( |
| model=MODEL_ID, |
| contents=contents |
| ) |
|
|
| return response |
|
|
|
|
| |
| |
| |
| def process_folder(image_dir, output_txt, structured=False): |
| with open(output_txt, "a", encoding="utf-8") as f_out: |
| |
| for fname in tqdm(sorted(os.listdir(image_dir))): |
| |
| |
| |
| if not fname.lower().endswith((".png", ".jpg", ".jpeg", ".heic")): |
| continue |
| if fname not in ["Bike_Set1_Pos3_OOPS1.png", "chair_0.5.png","reststand_0R.png","trash_0L.png","trashcan_in_0R.png"]: |
| continue |
|
|
| img_path = os.path.join(image_dir, fname) |
| print(f"Processing: {img_path}") |
|
|
| try: |
| response = analyze_image_with_gemini(img_path, structured=structured) |
|
|
| if structured: |
| |
| try: |
| data = json.loads(response.text) |
| data["image_name"] = fname |
| f_out.write(json.dumps(data) + "\n") |
| except Exception as parse_err: |
| print(f"JSON parse error for {fname}: {parse_err}") |
| f_out.write(json.dumps({ |
| "image_name": fname, |
| "error": "JSON parse error", |
| "raw_response": response.text |
| }) + "\n") |
| else: |
| |
| f_out.write(f"IMAGE: {img_path}\n") |
| f_out.write(response.text.strip() + "\n\n") |
|
|
| f_out.flush() |
|
|
| except Exception as e: |
| print(f"Error processing {img_path}: {e}") |
| f_out.write(f"IMAGE: {img_path}\nERROR: {e}\n\n") |
| exit() |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| process_folder( |
| image_dir="/scratch/ds5725/OOPS/images", |
| output_txt="gemini_results_rest.jsonl", |
| structured=True |
| ) |
|
|