| import os |
| import base64 |
| import json |
| import requests |
| from time import sleep |
| from tqdm import tqdm |
|
|
| |
| |
| |
| def encode_image(image_path: str) -> str: |
| with open(image_path, "rb") as f: |
| return base64.b64encode(f.read()).decode("utf-8") |
|
|
|
|
| |
| |
| |
| def analyze_obstacles_in_folder_internvl( |
| image_dir: str, |
| output_path: str, |
| api_key: str = None, |
| model: str = "internvl3.5-241b-a28b", |
| temperature: float = 1.0, |
| sleep_time: float = 1.0, |
| ): |
| """ |
| For each .png in image_dir, send the image to InternVL with the obstacle prompt |
| and write results to output_path. |
| """ |
|
|
| if api_key is None: |
| |
| api_key = os.getenv("INTERNVL_API_KEY") |
|
|
| if not api_key: |
| raise ValueError( |
| "No InternVL API key provided. " |
| "Pass --api_key on the command line or set the INTERNVL_API_KEY env var." |
| ) |
|
|
| url = "https://chat.intern-ai.org.cn/api/v1/chat/completions" |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}", |
| } |
|
|
| |
| image_paths = [ |
| os.path.join(image_dir, f) |
| for f in os.listdir(image_dir) |
| if f.lower().endswith(".png") |
| ] |
| image_paths.sort() |
|
|
| |
| |
| |
|
|
| if not image_paths: |
| print(f"No .png images found in {image_dir}") |
| return |
|
|
| |
| questions_prompt = ( |
| "1. Identify all obstacles or anomalies on the road or sidewalk ahead. " |
| "For each obstacle, provide its category (e.g., trash bin, traffic cone, car, " |
| "person, construction sign).\n\n" |
| "2. If I keep walking, will I collide with this obstacle? " |
| "Response: \"Yes\" or \"No.\"\n\n" |
| "3. If relevant, specify where it is on the path: " |
| "{far left, center left, middle, center right, right}. " |
| "Also include approximate direction relative to the user: {left / front / right / behind}.\n\n" |
| "4. I am fully blind. If I walk straight, walk to the left in the image, " |
| "or walk to the right in the image, will I collide with this obstacle? " |
| "For each option, respond \"Yes\" or \"No.\" " |
| "Example: \"Straight: Yes; Left: No; Right: No.\"\n\n" |
| "5. If a collision is likely, describe how I should walk to avoid it using simple " |
| "directional instructions (e.g., \"keep to the left of the path to avoid the object " |
| "on the right\", \"turn 15 degrees to the right, then continue forward\"). If not collide, say clear passway. \n\n" |
| "6. How should this obstacle be moved or repositioned so that it no longer poses " |
| "danger to pedestrians and returns to its default or ‘home’ location? " |
| "Example: \"Move the sign closer to the building wall\" or " |
| "\"Place the bin at the curb edge.\". If not need to move, say none needed. \n\n" |
| "7. How predictable or anticipated is this obstacle’s presence at its exact " |
| "location? Response: One of {expected / somewhat unexpected / clearly out of place}. " |
| "Then briefly explain why.\n\n" |
| "8. Give the object a formal Out-of-Place Score on a 0–100 rating scale, where:\n" |
| " 0 = perfectly expected, correct place\n" |
| " 50 = somewhat out of place\n" |
| " 100 = completely out of place.\n" |
| " Justify your score briefly." |
| ) |
|
|
| |
| system_message_text = ( |
| "I am fully blind. You are a mobility assistant that is tasked with accurately " |
| "describing image contents relevant for a blind user. The input is an image " |
| "captured from a forward-facing phone camera at street level from my point-of-view " |
| "perspective, pointing in the direction I am travelling. Your goal is to analyze " |
| "the scene and describe potential obstacles or anomalies in terms of their location, " |
| "safety, and predictability. Answer the following 8 questions concisely." |
| ) |
|
|
| with open(output_path, "a", encoding="utf-8") as out_f: |
| for img_path in tqdm(image_paths, desc="Processing images with InternVL"): |
| cont_flag= True |
| if "GarbageBag_Set1_Pos1_OOPS0" in img_path or "Safety_Cone_Pos2_OOPS0.5" in img_path or "chair_0" in img_path: |
| cont_flag= False |
| if cont_flag: |
| continue |
| try: |
| img_b64 = encode_image(img_path) |
|
|
| |
| |
| data = { |
| "model": model, |
| "messages": [ |
| {"role": "user", "content": "Start session."}, |
| {"role": "assistant", "content": "Session started."}, |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": system_message_text |
| + "\n\n" |
| + questions_prompt, |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/png;base64,{img_b64}" |
| }, |
| }, |
| ], |
| }, |
| ], |
| "temperature": temperature, |
| "top_p": 0.9, |
| "max_tokens": 2048, |
| } |
|
|
| response = requests.post( |
| url, headers=headers, data=json.dumps(data) |
| ) |
| response.raise_for_status() |
| content = response.json()["choices"][0]["message"]["content"] |
|
|
| out_f.write(f"IMAGE: {img_path}\n") |
| out_f.write(content.strip() + "\n") |
| out_f.write("\n" + "-" * 80 + "\n\n") |
| out_f.flush() |
|
|
| sleep(sleep_time) |
|
|
| except Exception as e: |
| print(f"Error processing {img_path}: {e}") |
| out_f.write(f"IMAGE: {img_path}\n") |
| out_f.write(f"ERROR: {e}\n") |
| out_f.write("\n" + "-" * 80 + "\n\n") |
| out_f.flush() |
|
|
| print(f"Done. Results saved to {output_path}") |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser( |
| description="Process PNG images with InternVL for obstacle analysis." |
| ) |
| parser.add_argument("--image_dir", required=True, help="Folder of .png images") |
| parser.add_argument("--output", required=True, help="Output text file") |
| parser.add_argument( |
| "--api_key", |
| default="sk-6yfk0jIHCoZk4mppCqI5O9wOBASwB0ZlbzeZ3F0FNFs5oN4S", |
| help="InternVL API key (or set INTERNVL_API_KEY env var).", |
| ) |
| parser.add_argument( |
| "--model", |
| default="internvl3.5-241b-a28b", |
| help="Model name for InternVL (default: internvl-latest)", |
| ) |
| parser.add_argument( |
| "--temperature", type=float, default=0.2, help="Sampling temperature" |
| ) |
| parser.add_argument( |
| "--sleep", |
| type=float, |
| default=1.0, |
| help="Sleep time between requests (seconds)", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| analyze_obstacles_in_folder_internvl( |
| image_dir=args.image_dir, |
| output_path=args.output, |
| api_key=args.api_key, |
| model=args.model, |
| temperature=args.temperature, |
| sleep_time=args.sleep, |
| ) |
|
|