TensorRT-LLM-Windows-RTX40 / scripts /openclaw_proxy.py
xThr45hx's picture
Add source, patches, scripts, build notes, README, LICENSE
00db36f verified
Raw
History Blame Contribute Delete
6.84 kB
"""
OpenClaw SSE Proxy
- Strips reasoning_content from TRT-LLM SSE chunks so OpenClaw renders text instead of "..."
- Strips <think>...</think> blocks from content (stateful per-request)
- Injects repetition_penalty: 1.15 to prevent loops
- Injects "detailed thinking off" into system prompt (Nemotron suppresses thinking via system prompt)
Run on :5002 — point OpenClaw at http://localhost:5002 instead of :5001
"""
import json
import asyncio
import httpx
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
import uvicorn
UPSTREAM = "http://localhost:5001"
app = FastAPI()
def strip_chunk(data_str: str, think_state: list) -> str | None:
"""
Clean a single SSE data chunk:
- Remove reasoning_content field
- Strip <think>...</think> block from content (stateful — think_state[0] tracks position)
think_state[0]: "before" | "inside" | "after"
Returns cleaned JSON string, or None to suppress the chunk entirely.
"""
try:
obj = json.loads(data_str)
for choice in obj.get("choices", []):
delta = choice.get("delta", {})
delta.pop("reasoning_content", None)
content = delta.get("content", "")
if not content:
continue
state = think_state[0]
if state == "before":
if "<think>" in content:
think_state[0] = "inside"
after_open = content.split("<think>", 1)[1]
if "</think>" in after_open:
think_state[0] = "after"
delta["content"] = after_open.split("</think>", 1)[1]
else:
delta.pop("content", None) # null not "" — avoids OpenClaw treating as finished
# else: no think tag yet, pass through as-is
elif state == "inside":
if "</think>" in content:
think_state[0] = "after"
delta["content"] = content.split("</think>", 1)[1]
else:
delta.pop("content", None) # null not "" — avoids OpenClaw treating as finished
# state == "after": pass through as-is
return json.dumps(obj)
except (json.JSONDecodeError, AttributeError):
return data_str
async def stream_response(upstream_url: str, method: str, headers: dict, body: bytes):
think_state = ["before"] # mutable so nested function can update it
chunk_n = 0
log_f = open(r"D:\AI\models\proxy_debug.log", "a", encoding="utf-8")
log_f.write("=== REQUEST ===\n")
log_f.flush()
try:
async with httpx.AsyncClient(timeout=300.0) as client:
async with client.stream(method, upstream_url, headers=headers, content=body) as resp:
log_f.write(f"HTTP {resp.status_code}\n")
log_f.flush()
raw_n = 0
async for line in resp.aiter_lines():
raw_n += 1
if raw_n <= 20:
log_f.write(f"RAW[{raw_n}]: {repr(line[:300])}\n")
log_f.flush()
if line.startswith("data: "):
payload = line[6:]
if payload.strip() == "[DONE]":
log_f.write("[DONE]\n")
log_f.flush()
yield "data: [DONE]\n\n"
else:
chunk_n += 1
cleaned = strip_chunk(payload, think_state)
if cleaned is not None:
yield f"data: {cleaned}\n\n"
elif line == "":
pass
else:
yield f"{line}\n\n"
finally:
log_f.write(f"total_chunks={chunk_n}\n\n")
log_f.close()
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"])
async def proxy(path: str, request: Request):
upstream_url = f"{UPSTREAM}/{path}"
body = await request.body()
# Forward headers, drop hop-by-hop
skip = {"host", "content-length", "transfer-encoding", "connection"}
headers = {k: v for k, v in request.headers.items() if k.lower() not in skip}
# Check if client wants streaming; inject repetition_penalty to prevent loops
is_stream = False
if body:
try:
payload = json.loads(body)
is_stream = payload.get("stream", False)
payload.setdefault("repetition_penalty", 1.15)
# Suppress Nemotron thinking via system prompt.
messages = payload.get("messages", [])
if messages and messages[0].get("role") == "system":
sys_content = messages[0].get("content", "")
if "detailed thinking off" not in sys_content:
messages[0]["content"] = "detailed thinking off\n\n" + sys_content
elif messages:
messages.insert(0, {"role": "system", "content": "detailed thinking off\n\nYou are a helpful assistant."})
body = json.dumps(payload).encode()
except json.JSONDecodeError:
pass
if is_stream:
return StreamingResponse(
stream_response(upstream_url, request.method, headers, body),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
else:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.request(
request.method, upstream_url, headers=headers, content=body
)
# Strip reasoning_content and <think>...</think> from non-streaming response
try:
obj = resp.json()
for choice in obj.get("choices", []):
msg = choice.get("message", {})
msg.pop("reasoning_content", None)
content = msg.get("content", "")
if content and "<think>" in content and "</think>" in content:
msg["content"] = content.split("</think>", 1)[1].lstrip("\n")
clean = json.dumps(obj).encode()
return Response(content=clean, status_code=resp.status_code,
headers={"Content-Type": "application/json"})
except Exception:
return Response(content=resp.content, status_code=resp.status_code,
headers=dict(resp.headers))
if __name__ == "__main__":
print("OpenClaw SSE Proxy :5002 -> :5001")
print("Point OpenClaw at http://localhost:5002")
uvicorn.run(app, host="localhost", port=5002, log_level="warning")