oops / gemini.py
deansmile123's picture
Upload folder using huggingface_hub
75f0bc0 verified
Raw
History Blame Contribute Delete
5.88 kB
import os
from PIL import Image
from google import genai
from google.genai import types
from pydantic import BaseModel
import json
from tqdm import tqdm
# ----------------------------------------------------------------------
# Optional: A structured schema for obstacle analysis
# If you don't want JSON output, remove the config section below.
# ----------------------------------------------------------------------
class ObstacleAnswer(BaseModel):
image_name: str
q1: str
q2: str
q3: str
q4: str
q5: str
q6: str
q7: str
q8: str
# ----------------------------------------------------------------------
# Prompt used for Gemini 3 Pro (same 8 questions you used before)
# ----------------------------------------------------------------------
QUESTIONS = """
Answer the following 8 questions about obstacles in the image:
1. Identify all obstacles or anomalies on the road or sidewalk ahead.
For each obstacle, give its category.
(On the sidewalk / walkable path)
2. If I keep walking, will I collide with this obstacle? Answer "Yes" or "No."
3. Specify where it is on the path {far left, center left, middle, center right, right}
and its relative direction {left, front, right, behind}.
4. If I walk straight, left (image-left), or right (image-right),
will I collide with the obstacle? Example: "Straight: Yes; Left: No; Right: No."
5. If a collision is likely, describe how I should walk to avoid it. If not collide, say clear passway.
6. Explain how this obstacle should be repositioned so it no longer poses danger. If not need to move, say none needed.
7. How predictable or expected is this obstacle at this location?
{expected / somewhat unexpected / clearly out of place}. Explain briefly.
8. Give an Out-of-Place Score from 0–100 and justify it.
focus on the Object that is most likely to be hitted
Consider the object's position as opposed to the object & environment.
"""
SYSTEM_MESSAGE = (
"I am fully blind. You are a mobility assistant who analyzes the scene "
"and describes obstacles for safe navigation. Be concise and accurate."
)
# ----------------------------------------------------------------------
# Gemini 3 Pro client setup
# ----------------------------------------------------------------------
GEMINI_API_KEY = "AIzaSyAfnBWMguUci9GyzW-gBxrxCfOmMExiDnA"
client = genai.Client(api_key=GEMINI_API_KEY)
MODEL_ID = "gemini-3-pro-preview"
# ----------------------------------------------------------------------
# Function to analyze one image
# ----------------------------------------------------------------------
def analyze_image_with_gemini(img_path: str, structured: bool = False):
# Load and shrink image (Gemini requirement)
image = Image.open(img_path)
image.thumbnail([512, 512])
# Prepare message contents
contents = [
SYSTEM_MESSAGE,
image,
QUESTIONS,
]
# If structured JSON output is requested
if structured:
response = client.models.generate_content(
model=MODEL_ID,
contents=contents,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=ObstacleAnswer,
),
)
else:
response = client.models.generate_content(
model=MODEL_ID,
contents=contents
)
return response
# ----------------------------------------------------------------------
# Example: process a folder of images
# ----------------------------------------------------------------------
def process_folder(image_dir, output_txt, structured=False):
with open(output_txt, "a", encoding="utf-8") as f_out:
# first_flag=True
for fname in tqdm(sorted(os.listdir(image_dir))):
# if first_flag:
# first_flag=False
# continue
if not fname.lower().endswith((".png", ".jpg", ".jpeg", ".heic")):
continue
if fname not in ["Bike_Set1_Pos3_OOPS1.png", "chair_0.5.png","reststand_0R.png","trash_0L.png","trashcan_in_0R.png"]:
continue
img_path = os.path.join(image_dir, fname)
print(f"Processing: {img_path}")
try:
response = analyze_image_with_gemini(img_path, structured=structured)
if structured:
# Parse JSON → update image_name → write JSONL
try:
data = json.loads(response.text)
data["image_name"] = fname # <-- overwrite with actual filename
f_out.write(json.dumps(data) + "\n")
except Exception as parse_err:
print(f"JSON parse error for {fname}: {parse_err}")
f_out.write(json.dumps({
"image_name": fname,
"error": "JSON parse error",
"raw_response": response.text
}) + "\n")
else:
# Raw text response
f_out.write(f"IMAGE: {img_path}\n")
f_out.write(response.text.strip() + "\n\n")
f_out.flush()
except Exception as e:
print(f"Error processing {img_path}: {e}")
f_out.write(f"IMAGE: {img_path}\nERROR: {e}\n\n")
exit()
# ----------------------------------------------------------------------
# Example usage
# ----------------------------------------------------------------------
if __name__ == "__main__":
process_folder(
image_dir="/scratch/ds5725/OOPS/images",
output_txt="gemini_results_rest.jsonl",
structured=True # Set True if you want JSON following ObstacleAnswer schema
)