import json import base64 from pathlib import Path import httpx from PIL import Image from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Dict, Any app = FastAPI(title="Thinking with Images API") # ── Configuration ── MODEL_NAME = "model_name" CHAT_API = "http://localhost:9200/v1/chat/completions" JUPYTER_API = "http://localhost:18081/v1/jupyter" # Sandbox internal paths <-> host machine real paths (docker volume mapping) SANDBOX_IMG_DIR = "/mnt/data" HOST_IMG_DIR = "/data" # ← update to match actual mount path SANDBOX_TMP_DIR = "/mnt/data/images/temp" # fixed path — do not change HOST_TMP_DIR = "/data/thinking_with_images/temp" SYSTEM_PROMPT = ''' You are a helpful assistant. # Tools You may call one or more functions to assist with the user query. You are provided with function signatures within XML tags: {"type": "function", "function": {"name": "python", "description": "Use this tool to execute Python code in your chain of thought.\n\nWhen you send a message containing Python code to python, it will be executed in a stateful Jupyter notebook environment. python will respond with the output of the execution or time out after 60.0 seconds. The drive at '/mnt/data/images/temp' can be used to save the temporary image files. Internet access for this session is disabled. Do not make external web requests or API calls as they will fail.\n\nReasoning & Image Manipulation & Drawing Auxiliary Graphics (Optional but Encouraged):\n- You have the capability to write executable Python code to perform image manipulations (e.g., cropping to a Region of Interest (ROI), resizing, rotation, adjusting contrast) or perform calculation for better reasoning.\n- You have the capability to write Python code to add auxiliary graphics (such as segments, circles, rectangles, labels, etc.) to the image, to help illustrate your reasoning process.\n- The code will be executed in a secure sandbox, and its output will be provided back to you for further analysis.\n- At the end of the code, print the path of the processed image (processed_path) or the relevant result for further processing within the sandbox environment.", "parameters": {"type": "object", "properties": {"code": {"type": "string", "description": "The Python code to execute"}}}, "required": ["code"]}} For each function call, return a json object with function name and arguments within XML tags: {"name": , "arguments": } ''' MAX_TURNS = 8 class RequestModel(BaseModel): messages: List[Dict[str, Any]] image_path_list: List[str] # ── Utility Functions ── def get_img_size(path: str) -> tuple[int, int]: with Image.open(path) as img: return img.size # (width, height) def encode_image(path: str) -> str: return base64.b64encode(Path(path).read_bytes()).decode() def to_sandbox_path(host_path: str) -> str: """Convert a host machine path to the corresponding sandbox path.""" return host_path.replace(HOST_IMG_DIR, SANDBOX_IMG_DIR) def to_host_path(sandbox_path: str) -> str: """Convert a sandbox path to the corresponding host machine path.""" return sandbox_path.replace(SANDBOX_TMP_DIR + "/", HOST_TMP_DIR + "/") def build_user_content( messages: List[Dict[str, Any]], image_path_list: List[str], ) -> List[Dict[str, Any]]: """Inject image metadata (path, dimensions) after each image_url item in the message content.""" content, k = [], 0 for item in messages: content.append(item) if item["type"] == "image_url": if k >= len(image_path_list): raise ValueError( f"image_path_list too short: need image #{k+1} but only {len(image_path_list)} provided" ) w, h = get_img_size(image_path_list[k]) sandbox_path = to_sandbox_path(image_path_list[k]) content.append({ "type": "text", "text": f"\nimage path: {sandbox_path}\nimage width: {w}\nimage height: {h}\n\n", }) k += 1 return content def build_initial_payload(user_content: List[Dict[str, Any]]) -> Dict[str, Any]: return { "model": MODEL_NAME, "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], "skip_special_tokens": False, } def messages_to_text(payload_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Flatten multimodal content in the final messages to plain text (for response/storage).""" result = [] for msg in payload_messages: if msg["role"] == "user" and isinstance(msg["content"], list): text = "" for item in msg["content"]: if item["type"] == "image_url": text += "" elif item["type"] == "text": text += item["text"] result.append({**msg, "content": text}) else: result.append(msg) return result # ── Core Logic ── async def process_request( messages: List[Dict[str, Any]], image_path_list: List[str], ) -> Dict[str, Any]: user_content = build_user_content(messages, image_path_list) payload = build_initial_payload(user_content) async with httpx.AsyncClient(timeout=300.0) as client: # 1. Create Jupyter session try: r = await client.post( f"{JUPYTER_API}/sessions/create", json={"kernel_name": "python3.10"}, ) r.raise_for_status() session_id = r.json()["data"]["session_id"] except Exception as e: raise HTTPException(500, f"Failed to create Jupyter session: {e}") try: for turn in range(1, MAX_TURNS + 1): # 2. Call the model try: r = await client.post( CHAT_API, json=payload, timeout=120.0, ) r.raise_for_status() resp = r.json() except Exception as e: raise HTTPException(500, f"Model API request failed (turn={turn}): {e}") if "choices" not in resp: raise HTTPException(500, f"Unexpected model response: {resp}") choice = resp["choices"][0]["message"] thinking = (choice.get("reasoning") or "").strip() answer = choice["content"].strip() assistant_msg = f"\n{thinking}\n\n\n{answer}" # 3. No tool call — conversation complete if "" not in answer: payload["messages"].append({"role": "assistant", "content": assistant_msg}) break # 4. Parse and execute the tool call try: raw = answer.split("")[1].split("")[0] code = json.loads(raw)["arguments"]["code"] except Exception as e: raise HTTPException(500, f"Failed to parse tool_call: {e}") try: r = await client.post( f"{JUPYTER_API}/execute", json={"code": code, "timeout": 30, "kernel_name": "python3.10", "session_id": session_id}, timeout=60.0, ) r.raise_for_status() exec_res = r.json() except Exception as e: raise HTTPException(500, f"Code execution failed: {e}") # Skip this turn if execution failed if not exec_res["success"]: continue sandbox_img_path = exec_res["data"]["outputs"][0]["text"].strip() host_img_path = to_host_path(sandbox_img_path) image_path_list.append(host_img_path) img_b64 = f"data:image/jpeg;base64,{encode_image(host_img_path)}" payload["messages"].append({"role": "assistant", "content": assistant_msg}) payload["messages"].append({ "role": "user", "content": [ {"type": "text", "text": "\n"}, {"type": "image_url", "image_url": {"url": img_b64}}, {"type": "text", "text": f"\n{sandbox_img_path}\n"}, ], }) finally: # 5. Clean up the Jupyter session try: await client.delete(f"{JUPYTER_API}/sessions/{session_id}") except Exception as e: print(f"[WARN] Failed to delete Jupyter session: {e}") payload["messages"] = messages_to_text(payload["messages"]) payload["image_path_list"] = image_path_list return payload # ── Routes ── @app.post("/process") async def process_images(request: RequestModel) -> Dict[str, Any]: return await process_request(request.messages, request.image_path_list) @app.get("/health") async def health_check(): return {"status": "ok"} # ── Entrypoint ── if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=10044)