| import json |
| import base64 |
| from pathlib import Path |
|
|
| import httpx |
| from PIL import Image |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| from typing import List, Dict, Any |
|
|
| app = FastAPI(title="Thinking with Images API") |
|
|
| |
| MODEL_NAME = "model_name" |
| CHAT_API = "http://localhost:9200/v1/chat/completions" |
| JUPYTER_API = "http://localhost:18081/v1/jupyter" |
|
|
| |
| SANDBOX_IMG_DIR = "/mnt/data" |
| HOST_IMG_DIR = "/data" |
| SANDBOX_TMP_DIR = "/mnt/data/images/temp" |
| HOST_TMP_DIR = "/data/thinking_with_images/temp" |
|
|
| SYSTEM_PROMPT = ''' |
| You are a helpful assistant. |
| |
| # Tools |
| You may call one or more functions to assist with the user query. |
| You are provided with function signatures within <tools></tools> XML tags: |
| |
| <tools> |
| {"type": "function", "function": {"name": "python", "description": "Use this tool to execute Python code in your chain of thought.\n\nWhen you send a message containing Python code to python, it will be executed in a stateful Jupyter notebook environment. python will respond with the output of the execution or time out after 60.0 seconds. The drive at '/mnt/data/images/temp' can be used to save the temporary image files. Internet access for this session is disabled. Do not make external web requests or API calls as they will fail.\n\nReasoning & Image Manipulation & Drawing Auxiliary Graphics (Optional but Encouraged):\n- You have the capability to write executable Python code to perform image manipulations (e.g., cropping to a Region of Interest (ROI), resizing, rotation, adjusting contrast) or perform calculation for better reasoning.\n- You have the capability to write Python code to add auxiliary graphics (such as segments, circles, rectangles, labels, etc.) to the image, to help illustrate your reasoning process.\n- The code will be executed in a secure sandbox, and its output will be provided back to you for further analysis.\n- At the end of the code, print the path of the processed image (processed_path) or the relevant result for further processing within the sandbox environment.", "parameters": {"type": "object", "properties": {"code": {"type": "string", "description": "The Python code to execute"}}}, "required": ["code"]}} |
| </tools> |
| |
| For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: |
| <tool_call> |
| {"name": <function-name>, "arguments": <args-json-object>} |
| </tool_call> |
| ''' |
|
|
| MAX_TURNS = 8 |
|
|
| class RequestModel(BaseModel): |
| messages: List[Dict[str, Any]] |
| image_path_list: List[str] |
|
|
|
|
| |
|
|
| def get_img_size(path: str) -> tuple[int, int]: |
| with Image.open(path) as img: |
| return img.size |
|
|
|
|
| def encode_image(path: str) -> str: |
| return base64.b64encode(Path(path).read_bytes()).decode() |
|
|
|
|
| def to_sandbox_path(host_path: str) -> str: |
| """Convert a host machine path to the corresponding sandbox path.""" |
| return host_path.replace(HOST_IMG_DIR, SANDBOX_IMG_DIR) |
|
|
|
|
| def to_host_path(sandbox_path: str) -> str: |
| """Convert a sandbox path to the corresponding host machine path.""" |
| return sandbox_path.replace(SANDBOX_TMP_DIR + "/", HOST_TMP_DIR + "/") |
|
|
|
|
| def build_user_content( |
| messages: List[Dict[str, Any]], |
| image_path_list: List[str], |
| ) -> List[Dict[str, Any]]: |
| """Inject image metadata (path, dimensions) after each image_url item in the message content.""" |
| content, k = [], 0 |
| for item in messages: |
| content.append(item) |
| if item["type"] == "image_url": |
| if k >= len(image_path_list): |
| raise ValueError( |
| f"image_path_list too short: need image #{k+1} but only {len(image_path_list)} provided" |
| ) |
| w, h = get_img_size(image_path_list[k]) |
| sandbox_path = to_sandbox_path(image_path_list[k]) |
| content.append({ |
| "type": "text", |
| "text": f"\nimage path: {sandbox_path}\nimage width: {w}\nimage height: {h}\n\n", |
| }) |
| k += 1 |
| return content |
|
|
|
|
| def build_initial_payload(user_content: List[Dict[str, Any]]) -> Dict[str, Any]: |
| return { |
| "model": MODEL_NAME, |
| "messages": [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_content}, |
| ], |
| "skip_special_tokens": False, |
| } |
|
|
|
|
| def messages_to_text(payload_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Flatten multimodal content in the final messages to plain text (for response/storage).""" |
| result = [] |
| for msg in payload_messages: |
| if msg["role"] == "user" and isinstance(msg["content"], list): |
| text = "" |
| for item in msg["content"]: |
| if item["type"] == "image_url": |
| text += "<image>" |
| elif item["type"] == "text": |
| text += item["text"] |
| result.append({**msg, "content": text}) |
| else: |
| result.append(msg) |
| return result |
|
|
|
|
| |
|
|
| async def process_request( |
| messages: List[Dict[str, Any]], |
| image_path_list: List[str], |
| ) -> Dict[str, Any]: |
|
|
| user_content = build_user_content(messages, image_path_list) |
| payload = build_initial_payload(user_content) |
|
|
| async with httpx.AsyncClient(timeout=300.0) as client: |
|
|
| |
| try: |
| r = await client.post( |
| f"{JUPYTER_API}/sessions/create", |
| json={"kernel_name": "python3.10"}, |
| ) |
| r.raise_for_status() |
| session_id = r.json()["data"]["session_id"] |
| except Exception as e: |
| raise HTTPException(500, f"Failed to create Jupyter session: {e}") |
|
|
| try: |
| for turn in range(1, MAX_TURNS + 1): |
|
|
| |
| try: |
| r = await client.post( |
| CHAT_API, |
| json=payload, |
| timeout=120.0, |
| ) |
| r.raise_for_status() |
| resp = r.json() |
| except Exception as e: |
| raise HTTPException(500, f"Model API request failed (turn={turn}): {e}") |
|
|
| if "choices" not in resp: |
| raise HTTPException(500, f"Unexpected model response: {resp}") |
|
|
| choice = resp["choices"][0]["message"] |
| thinking = (choice.get("reasoning") or "").strip() |
| answer = choice["content"].strip() |
| assistant_msg = f"<think>\n{thinking}\n</think>\n\n{answer}" |
|
|
| |
| if "<tool_call>" not in answer: |
| payload["messages"].append({"role": "assistant", "content": assistant_msg}) |
| break |
|
|
| |
| try: |
| raw = answer.split("<tool_call>")[1].split("</tool_call>")[0] |
| code = json.loads(raw)["arguments"]["code"] |
| except Exception as e: |
| raise HTTPException(500, f"Failed to parse tool_call: {e}") |
|
|
| try: |
| r = await client.post( |
| f"{JUPYTER_API}/execute", |
| json={"code": code, "timeout": 30, |
| "kernel_name": "python3.10", "session_id": session_id}, |
| timeout=60.0, |
| ) |
| r.raise_for_status() |
| exec_res = r.json() |
| except Exception as e: |
| raise HTTPException(500, f"Code execution failed: {e}") |
|
|
| |
| if not exec_res["success"]: |
| continue |
|
|
| sandbox_img_path = exec_res["data"]["outputs"][0]["text"].strip() |
| host_img_path = to_host_path(sandbox_img_path) |
| image_path_list.append(host_img_path) |
| img_b64 = f"data:image/jpeg;base64,{encode_image(host_img_path)}" |
|
|
| payload["messages"].append({"role": "assistant", "content": assistant_msg}) |
| payload["messages"].append({ |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": "<tool_response>\n"}, |
| {"type": "image_url", "image_url": {"url": img_b64}}, |
| {"type": "text", "text": f"\n{sandbox_img_path}\n</tool_response>"}, |
| ], |
| }) |
|
|
| finally: |
| |
| try: |
| await client.delete(f"{JUPYTER_API}/sessions/{session_id}") |
| except Exception as e: |
| print(f"[WARN] Failed to delete Jupyter session: {e}") |
|
|
| payload["messages"] = messages_to_text(payload["messages"]) |
| payload["image_path_list"] = image_path_list |
| return payload |
|
|
|
|
| |
|
|
| @app.post("/process") |
| async def process_images(request: RequestModel) -> Dict[str, Any]: |
| return await process_request(request.messages, request.image_path_list) |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| return {"status": "ok"} |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=10044) |