File size: 5,878 Bytes
75f0bc0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os
from PIL import Image
from google import genai
from google.genai import types
from pydantic import BaseModel
import json
from tqdm import tqdm
# ----------------------------------------------------------------------
# Optional: A structured schema for obstacle analysis
# If you don't want JSON output, remove the config section below.
# ----------------------------------------------------------------------
class ObstacleAnswer(BaseModel):
    image_name: str
    q1: str
    q2: str
    q3: str
    q4: str
    q5: str
    q6: str
    q7: str
    q8: str


# ----------------------------------------------------------------------
# Prompt used for Gemini 3 Pro (same 8 questions you used before)
# ----------------------------------------------------------------------
QUESTIONS = """
Answer the following 8 questions about obstacles in the image:

1. Identify all obstacles or anomalies on the road or sidewalk ahead. 
   For each obstacle, give its category.
   (On the sidewalk / walkable path) 

2. If I keep walking, will I collide with this obstacle? Answer "Yes" or "No."

3. Specify where it is on the path {far left, center left, middle, center right, right}
   and its relative direction {left, front, right, behind}.

4. If I walk straight, left (image-left), or right (image-right), 
   will I collide with the obstacle? Example: "Straight: Yes; Left: No; Right: No."

5. If a collision is likely, describe how I should walk to avoid it. If not collide, say clear passway.

6. Explain how this obstacle should be repositioned so it no longer poses danger. If not need to move, say none needed.

7. How predictable or expected is this obstacle at this location?
   {expected / somewhat unexpected / clearly out of place}. Explain briefly.

8. Give an Out-of-Place Score from 0–100 and justify it.
 focus on the Object that is most likely to be hitted
 Consider the object's position as opposed to the object & environment.
"""


SYSTEM_MESSAGE = (
    "I am fully blind. You are a mobility assistant who analyzes the scene "
    "and describes obstacles for safe navigation. Be concise and accurate."
)


# ----------------------------------------------------------------------
# Gemini 3 Pro client setup
# ----------------------------------------------------------------------
GEMINI_API_KEY = "AIzaSyAfnBWMguUci9GyzW-gBxrxCfOmMExiDnA"
client = genai.Client(api_key=GEMINI_API_KEY)
MODEL_ID = "gemini-3-pro-preview"


# ----------------------------------------------------------------------
# Function to analyze one image
# ----------------------------------------------------------------------
def analyze_image_with_gemini(img_path: str, structured: bool = False):
    # Load and shrink image (Gemini requirement)
    image = Image.open(img_path)
    image.thumbnail([512, 512])

    # Prepare message contents
    contents = [
        SYSTEM_MESSAGE,
        image,
        QUESTIONS,
    ]

    # If structured JSON output is requested
    if structured:
        response = client.models.generate_content(
            model=MODEL_ID,
            contents=contents,
            config=types.GenerateContentConfig(
                response_mime_type="application/json",
                response_schema=ObstacleAnswer,
            ),
        )
    else:
        response = client.models.generate_content(
                model=MODEL_ID,
                contents=contents
        )

    return response


# ----------------------------------------------------------------------
# Example: process a folder of images
# ----------------------------------------------------------------------
def process_folder(image_dir, output_txt, structured=False):
    with open(output_txt, "a", encoding="utf-8") as f_out:
        # first_flag=True
        for fname in tqdm(sorted(os.listdir(image_dir))):
            # if first_flag:
            #     first_flag=False
            #     continue
            if not fname.lower().endswith((".png", ".jpg", ".jpeg", ".heic")):
                continue
            if fname not in ["Bike_Set1_Pos3_OOPS1.png", "chair_0.5.png","reststand_0R.png","trash_0L.png","trashcan_in_0R.png"]:
                continue

            img_path = os.path.join(image_dir, fname)
            print(f"Processing: {img_path}")

            try:
                response = analyze_image_with_gemini(img_path, structured=structured)

                if structured:
                    # Parse JSON → update image_name → write JSONL
                    try:
                        data = json.loads(response.text)
                        data["image_name"] = fname  # <-- overwrite with actual filename
                        f_out.write(json.dumps(data) + "\n")
                    except Exception as parse_err:
                        print(f"JSON parse error for {fname}: {parse_err}")
                        f_out.write(json.dumps({
                            "image_name": fname,
                            "error": "JSON parse error",
                            "raw_response": response.text
                        }) + "\n")
                else:
                    # Raw text response
                    f_out.write(f"IMAGE: {img_path}\n")
                    f_out.write(response.text.strip() + "\n\n")

                f_out.flush()

            except Exception as e:
                print(f"Error processing {img_path}: {e}")
                f_out.write(f"IMAGE: {img_path}\nERROR: {e}\n\n")
            exit()


# ----------------------------------------------------------------------
# Example usage
# ----------------------------------------------------------------------
if __name__ == "__main__":
    process_folder(
        image_dir="/scratch/ds5725/OOPS/images",
        output_txt="gemini_results_rest.jsonl",
        structured=True  # Set True if you want JSON following ObstacleAnswer schema
    )