import json
import base64
from pathlib import Path
import httpx
from PIL import Image
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any
app = FastAPI(title="Thinking with Images API")
# ── Configuration ──
MODEL_NAME = "model_name"
CHAT_API = "http://localhost:9200/v1/chat/completions"
JUPYTER_API = "http://localhost:18081/v1/jupyter"
# Sandbox internal paths <-> host machine real paths (docker volume mapping)
SANDBOX_IMG_DIR = "/mnt/data"
HOST_IMG_DIR = "/data" # ← update to match actual mount path
SANDBOX_TMP_DIR = "/mnt/data/images/temp" # fixed path — do not change
HOST_TMP_DIR = "/data/thinking_with_images/temp"
SYSTEM_PROMPT = '''
You are a helpful assistant.
# Tools
You may call one or more functions to assist with the user query.
You are provided with function signatures within XML tags:
{"type": "function", "function": {"name": "python", "description": "Use this tool to execute Python code in your chain of thought.\n\nWhen you send a message containing Python code to python, it will be executed in a stateful Jupyter notebook environment. python will respond with the output of the execution or time out after 60.0 seconds. The drive at '/mnt/data/images/temp' can be used to save the temporary image files. Internet access for this session is disabled. Do not make external web requests or API calls as they will fail.\n\nReasoning & Image Manipulation & Drawing Auxiliary Graphics (Optional but Encouraged):\n- You have the capability to write executable Python code to perform image manipulations (e.g., cropping to a Region of Interest (ROI), resizing, rotation, adjusting contrast) or perform calculation for better reasoning.\n- You have the capability to write Python code to add auxiliary graphics (such as segments, circles, rectangles, labels, etc.) to the image, to help illustrate your reasoning process.\n- The code will be executed in a secure sandbox, and its output will be provided back to you for further analysis.\n- At the end of the code, print the path of the processed image (processed_path) or the relevant result for further processing within the sandbox environment.", "parameters": {"type": "object", "properties": {"code": {"type": "string", "description": "The Python code to execute"}}}, "required": ["code"]}}
For each function call, return a json object with function name and arguments within XML tags:
{"name": , "arguments": }
'''
MAX_TURNS = 8
class RequestModel(BaseModel):
messages: List[Dict[str, Any]]
image_path_list: List[str]
# ── Utility Functions ──
def get_img_size(path: str) -> tuple[int, int]:
with Image.open(path) as img:
return img.size # (width, height)
def encode_image(path: str) -> str:
return base64.b64encode(Path(path).read_bytes()).decode()
def to_sandbox_path(host_path: str) -> str:
"""Convert a host machine path to the corresponding sandbox path."""
return host_path.replace(HOST_IMG_DIR, SANDBOX_IMG_DIR)
def to_host_path(sandbox_path: str) -> str:
"""Convert a sandbox path to the corresponding host machine path."""
return sandbox_path.replace(SANDBOX_TMP_DIR + "/", HOST_TMP_DIR + "/")
def build_user_content(
messages: List[Dict[str, Any]],
image_path_list: List[str],
) -> List[Dict[str, Any]]:
"""Inject image metadata (path, dimensions) after each image_url item in the message content."""
content, k = [], 0
for item in messages:
content.append(item)
if item["type"] == "image_url":
if k >= len(image_path_list):
raise ValueError(
f"image_path_list too short: need image #{k+1} but only {len(image_path_list)} provided"
)
w, h = get_img_size(image_path_list[k])
sandbox_path = to_sandbox_path(image_path_list[k])
content.append({
"type": "text",
"text": f"\nimage path: {sandbox_path}\nimage width: {w}\nimage height: {h}\n\n",
})
k += 1
return content
def build_initial_payload(user_content: List[Dict[str, Any]]) -> Dict[str, Any]:
return {
"model": MODEL_NAME,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
"skip_special_tokens": False,
}
def messages_to_text(payload_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Flatten multimodal content in the final messages to plain text (for response/storage)."""
result = []
for msg in payload_messages:
if msg["role"] == "user" and isinstance(msg["content"], list):
text = ""
for item in msg["content"]:
if item["type"] == "image_url":
text += ""
elif item["type"] == "text":
text += item["text"]
result.append({**msg, "content": text})
else:
result.append(msg)
return result
# ── Core Logic ──
async def process_request(
messages: List[Dict[str, Any]],
image_path_list: List[str],
) -> Dict[str, Any]:
user_content = build_user_content(messages, image_path_list)
payload = build_initial_payload(user_content)
async with httpx.AsyncClient(timeout=300.0) as client:
# 1. Create Jupyter session
try:
r = await client.post(
f"{JUPYTER_API}/sessions/create",
json={"kernel_name": "python3.10"},
)
r.raise_for_status()
session_id = r.json()["data"]["session_id"]
except Exception as e:
raise HTTPException(500, f"Failed to create Jupyter session: {e}")
try:
for turn in range(1, MAX_TURNS + 1):
# 2. Call the model
try:
r = await client.post(
CHAT_API,
json=payload,
timeout=120.0,
)
r.raise_for_status()
resp = r.json()
except Exception as e:
raise HTTPException(500, f"Model API request failed (turn={turn}): {e}")
if "choices" not in resp:
raise HTTPException(500, f"Unexpected model response: {resp}")
choice = resp["choices"][0]["message"]
thinking = (choice.get("reasoning") or "").strip()
answer = choice["content"].strip()
assistant_msg = f"\n{thinking}\n\n\n{answer}"
# 3. No tool call — conversation complete
if "" not in answer:
payload["messages"].append({"role": "assistant", "content": assistant_msg})
break
# 4. Parse and execute the tool call
try:
raw = answer.split("")[1].split("")[0]
code = json.loads(raw)["arguments"]["code"]
except Exception as e:
raise HTTPException(500, f"Failed to parse tool_call: {e}")
try:
r = await client.post(
f"{JUPYTER_API}/execute",
json={"code": code, "timeout": 30,
"kernel_name": "python3.10", "session_id": session_id},
timeout=60.0,
)
r.raise_for_status()
exec_res = r.json()
except Exception as e:
raise HTTPException(500, f"Code execution failed: {e}")
# Skip this turn if execution failed
if not exec_res["success"]:
continue
sandbox_img_path = exec_res["data"]["outputs"][0]["text"].strip()
host_img_path = to_host_path(sandbox_img_path)
image_path_list.append(host_img_path)
img_b64 = f"data:image/jpeg;base64,{encode_image(host_img_path)}"
payload["messages"].append({"role": "assistant", "content": assistant_msg})
payload["messages"].append({
"role": "user",
"content": [
{"type": "text", "text": "\n"},
{"type": "image_url", "image_url": {"url": img_b64}},
{"type": "text", "text": f"\n{sandbox_img_path}\n"},
],
})
finally:
# 5. Clean up the Jupyter session
try:
await client.delete(f"{JUPYTER_API}/sessions/{session_id}")
except Exception as e:
print(f"[WARN] Failed to delete Jupyter session: {e}")
payload["messages"] = messages_to_text(payload["messages"])
payload["image_path_list"] = image_path_list
return payload
# ── Routes ──
@app.post("/process")
async def process_images(request: RequestModel) -> Dict[str, Any]:
return await process_request(request.messages, request.image_path_list)
@app.get("/health")
async def health_check():
return {"status": "ok"}
# ── Entrypoint ──
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=10044)