ccm commited on
Commit
a2b9043
·
1 Parent(s): cdca445

Reverting logging changes, updating agent.

Browse files
agent_server/agent_streaming.py CHANGED
@@ -1,152 +1,58 @@
 
 
 
1
  import asyncio
2
  import contextlib
3
  import os
4
  import threading
5
- import time
6
- import typing
7
 
8
  import fastapi
9
  import httpx
10
 
11
  from agent_server.helpers import sse_headers
12
  from agent_server.sanitizing_think_tags import scrub_think_tags
13
- from agent_server.std_tee import QueueWriter, _serialize_step
14
-
 
 
15
 
16
- async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
 
 
 
 
 
 
17
  """
18
- Start the agent in a worker thread.
19
- Stream THREE sources of incremental data into the async generator:
20
- (1) live stdout/stderr lines,
21
- (2) newly appended memory steps (polled),
22
- (3) any iterable the agent may yield (if supported).
23
- Finally emit a __final__ item with the last answer.
 
24
  """
25
- loop = asyncio.get_running_loop()
26
  q: asyncio.Queue = asyncio.Queue()
27
  agent_to_use = agent_obj
28
 
29
- stop_evt = threading.Event()
30
-
31
- # 1) stdout/stderr live tee
32
  qwriter = QueueWriter(q)
33
 
34
- # 2) memory poller
35
- def poll_memory():
36
- last_len = 0
37
- while not stop_evt.is_set():
38
- try:
39
- steps = []
40
- try:
41
- # Common API: agent.memory.get_full_steps()
42
- steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
43
- except Exception:
44
- # Fallbacks: different names across versions
45
- steps = (
46
- getattr(agent_to_use, "steps", [])
47
- or getattr(agent_to_use, "memory", [])
48
- or []
49
- )
50
- if steps is None:
51
- steps = []
52
- curr_len = len(steps)
53
- if curr_len > last_len:
54
- new = steps[last_len:curr_len]
55
- last_len = curr_len
56
- for s in new:
57
- s_text = _serialize_step(s)
58
- if s_text:
59
- try:
60
- q.put_nowait({"__step__": s_text})
61
- except Exception:
62
- pass
63
- except Exception:
64
- pass
65
- time.sleep(0.10) # 100 ms cadence
66
-
67
- # 3) agent runner (may or may not yield)
68
  def run_agent():
69
  final_result = None
70
  try:
71
- with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
72
- qwriter
73
- ):
74
- used_iterable = False
75
- if hasattr(agent_to_use, "run") and callable(
76
- getattr(agent_to_use, "run")
77
- ):
78
- try:
79
- res = agent_to_use.run(task, stream=True)
80
- if hasattr(res, "__iter__") and not isinstance(
81
- res, (str, bytes)
82
- ):
83
- used_iterable = True
84
- for it in res:
85
- try:
86
- q.put_nowait(it)
87
- except Exception:
88
- pass
89
- final_result = (
90
- None # iterable may already contain the answer
91
- )
92
- else:
93
- final_result = res
94
- except TypeError:
95
- # run(stream=True) not supported -> fall back
96
- pass
97
-
98
- if final_result is None and not used_iterable:
99
- # Try other common streaming signatures
100
- for name in (
101
- "run_stream",
102
- "stream",
103
- "stream_run",
104
- "run_with_callback",
105
- ):
106
- fn = getattr(agent_to_use, name, None)
107
- if callable(fn):
108
- try:
109
- res = fn(task)
110
- if hasattr(res, "__iter__") and not isinstance(
111
- res, (str, bytes)
112
- ):
113
- for it in res:
114
- q.put_nowait(it)
115
- final_result = None
116
- else:
117
- final_result = res
118
- break
119
- except TypeError:
120
- # maybe callback signature
121
- def cb(item):
122
- try:
123
- q.put_nowait(item)
124
- except Exception:
125
- pass
126
-
127
- try:
128
- fn(task, cb)
129
- final_result = None
130
- break
131
- except Exception:
132
- continue
133
-
134
- if final_result is None and not used_iterable:
135
- pass # (typo guard removed below)
136
-
137
- if final_result is None and not used_iterable:
138
- # Last resort: synchronous run()/generate()/callable
139
- if hasattr(agent_to_use, "run") and callable(
140
- getattr(agent_to_use, "run")
141
- ):
142
- final_result = agent_to_use.run(task)
143
- elif hasattr(agent_to_use, "generate") and callable(
144
- getattr(agent_to_use, "generate")
145
- ):
146
- final_result = agent_to_use.generate(task)
147
- elif callable(agent_to_use):
148
- final_result = agent_to_use(task)
149
-
150
  except Exception as e:
151
  try:
152
  qwriter.flush()
@@ -165,22 +71,31 @@ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = N
165
  q.put_nowait({"__final__": final_result})
166
  except Exception:
167
  pass
168
- stop_evt.set()
169
 
170
- # Kick off threads
171
- mem_thread = threading.Thread(target=poll_memory, daemon=True)
172
- run_thread = threading.Thread(target=run_agent, daemon=True)
173
- mem_thread.start()
174
  run_thread.start()
175
 
176
- # Async consumer
177
  while True:
178
  item = await q.get()
 
 
 
 
 
 
 
 
 
179
  yield item
180
  if isinstance(item, dict) and "__final__" in item:
181
  break
182
 
183
 
 
 
 
184
  def _recursively_scrub(obj):
185
  if isinstance(obj, str):
186
  return scrub_think_tags(obj)
@@ -191,9 +106,10 @@ def _recursively_scrub(obj):
191
  return obj
192
 
193
 
194
- async def proxy_upstream_chat_completions(
195
- body: dict, stream: bool, scrub_think: bool = False
196
- ):
 
197
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
198
  headers = {
199
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
@@ -206,12 +122,9 @@ async def proxy_upstream_chat_completions(
206
 
207
  async def proxy_stream():
208
  async with httpx.AsyncClient(timeout=None) as client:
209
- async with client.stream(
210
- "POST", url, headers=headers, json=body
211
- ) as resp:
212
  resp.raise_for_status()
213
  if scrub_think:
214
- # Pull text segments, scrub tags, and yield bytes
215
  async for txt in resp.aiter_text():
216
  try:
217
  cleaned = scrub_think_tags(txt)
@@ -239,6 +152,249 @@ async def proxy_upstream_chat_completions(
239
  except Exception:
240
  pass
241
 
242
- return fastapi.responses.JSONResponse(
243
- status_code=r.status_code, content=payload
244
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # agent_server/agent_streaming.py
2
+ from __future__ import annotations
3
+
4
  import asyncio
5
  import contextlib
6
  import os
7
  import threading
8
+ import typing as t
 
9
 
10
  import fastapi
11
  import httpx
12
 
13
  from agent_server.helpers import sse_headers
14
  from agent_server.sanitizing_think_tags import scrub_think_tags
15
+ from agent_server.std_tee import (
16
+ QueueWriter,
17
+ _format_reasoning_chunk,
18
+ )
19
 
20
+ # -----------------------------------------------------------------------------
21
+ # Minimal agent streaming:
22
+ # • capture ONLY stdout/stderr during agent execution
23
+ # • stream normalized reasoning chunks derived from those lines
24
+ # • emit a single {"__final__": ...} with the agent's returned result
25
+ # -----------------------------------------------------------------------------
26
+ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
27
  """
28
+ Streams compact reasoning derived from stdout/stderr lines while the agent runs.
29
+ When the agent finishes, emits {"__final__": <agent_return_value>}.
30
+
31
+ Yields dict events:
32
+ - {"__reasoning__": "<chunk>"} # normalized, compact text
33
+ - {"__error__": "<message>"} # if the agent runner throws
34
+ - {"__final__": <any>} # terminal event with the returned result
35
  """
 
36
  q: asyncio.Queue = asyncio.Queue()
37
  agent_to_use = agent_obj
38
 
39
+ # redirect stdout/stderr into this queue (line-buffered in QueueWriter)
 
 
40
  qwriter = QueueWriter(q)
41
 
42
+ # Background runner executes the agent synchronously so stdout flushes as it prints
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  def run_agent():
44
  final_result = None
45
  try:
46
+ with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(qwriter):
47
+ # Keep this simple: don't use stream=True or iterator-style APIs.
48
+ if hasattr(agent_to_use, "run") and callable(getattr(agent_to_use, "run")):
49
+ final_result = agent_to_use.run(task)
50
+ elif hasattr(agent_to_use, "generate") and callable(getattr(agent_to_use, "generate")):
51
+ final_result = agent_to_use.generate(task)
52
+ elif callable(agent_to_use):
53
+ final_result = agent_to_use(task)
54
+ else:
55
+ raise RuntimeError("Agent object is not callable and exposes no run()/generate()")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  except Exception as e:
57
  try:
58
  qwriter.flush()
 
71
  q.put_nowait({"__final__": final_result})
72
  except Exception:
73
  pass
 
74
 
75
+ # Kick off the worker thread
76
+ run_thread = threading.Thread(target=run_agent, name="agent-runner", daemon=True)
 
 
77
  run_thread.start()
78
 
79
+ # Async consumer: convert raw stdout lines -> compact reasoning chunks
80
  while True:
81
  item = await q.get()
82
+
83
+ if isinstance(item, dict) and "__stdout__" in item:
84
+ # Normalize/clean the line (strips ANSI, think tags, box drawing, system prompts)
85
+ chunk = _format_reasoning_chunk(item["__stdout__"], tag="stdout", idx=0)
86
+ if chunk:
87
+ yield {"__reasoning__": chunk}
88
+ continue
89
+
90
+ # Pass through non-stdout items (errors, final)
91
  yield item
92
  if isinstance(item, dict) and "__final__" in item:
93
  break
94
 
95
 
96
+ # -----------------------------------------------------------------------------
97
+ # Utilities: scrub nested structures of <think> tags when proxying upstream
98
+ # -----------------------------------------------------------------------------
99
  def _recursively_scrub(obj):
100
  if isinstance(obj, str):
101
  return scrub_think_tags(obj)
 
106
  return obj
107
 
108
 
109
+ # -----------------------------------------------------------------------------
110
+ # Upstream proxy (OpenAI-compatible) with optional think-tag scrubbing
111
+ # -----------------------------------------------------------------------------
112
+ async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think: bool = False):
113
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
114
  headers = {
115
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
 
122
 
123
  async def proxy_stream():
124
  async with httpx.AsyncClient(timeout=None) as client:
125
+ async with client.stream("POST", url, headers=headers, json=body) as resp:
 
 
126
  resp.raise_for_status()
127
  if scrub_think:
 
128
  async for txt in resp.aiter_text():
129
  try:
130
  cleaned = scrub_think_tags(txt)
 
152
  except Exception:
153
  pass
154
 
155
+ return fastapi.responses.JSONResponse(status_code=r.status_code, content=payload)
156
+
157
+ # import asyncio
158
+ # import contextlib
159
+ # import os
160
+ # import threading
161
+ # import time
162
+ # import typing
163
+ #
164
+ # import fastapi
165
+ # import httpx
166
+ #
167
+ # from agent_server.helpers import sse_headers
168
+ # from agent_server.sanitizing_think_tags import scrub_think_tags
169
+ # from agent_server.std_tee import QueueWriter, _serialize_step
170
+ #
171
+ #
172
+ # async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
173
+ # """
174
+ # Start the agent in a worker thread.
175
+ # Stream THREE sources of incremental data into the async generator:
176
+ # (1) live stdout/stderr lines,
177
+ # (2) newly appended memory steps (polled),
178
+ # (3) any iterable the agent may yield (if supported).
179
+ # Finally emit a __final__ item with the last answer.
180
+ # """
181
+ # loop = asyncio.get_running_loop()
182
+ # q: asyncio.Queue = asyncio.Queue()
183
+ # agent_to_use = agent_obj
184
+ #
185
+ # stop_evt = threading.Event()
186
+ #
187
+ # # 1) stdout/stderr live tee
188
+ # qwriter = QueueWriter(q)
189
+ #
190
+ # # 2) memory poller
191
+ # def poll_memory():
192
+ # last_len = 0
193
+ # while not stop_evt.is_set():
194
+ # try:
195
+ # steps = []
196
+ # try:
197
+ # # Common API: agent.memory.get_full_steps()
198
+ # steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
199
+ # except Exception:
200
+ # # Fallbacks: different names across versions
201
+ # steps = (
202
+ # getattr(agent_to_use, "steps", [])
203
+ # or getattr(agent_to_use, "memory", [])
204
+ # or []
205
+ # )
206
+ # if steps is None:
207
+ # steps = []
208
+ # curr_len = len(steps)
209
+ # if curr_len > last_len:
210
+ # new = steps[last_len:curr_len]
211
+ # last_len = curr_len
212
+ # for s in new:
213
+ # s_text = _serialize_step(s)
214
+ # if s_text:
215
+ # try:
216
+ # q.put_nowait({"__step__": s_text})
217
+ # except Exception:
218
+ # pass
219
+ # except Exception:
220
+ # pass
221
+ # time.sleep(0.10) # 100 ms cadence
222
+ #
223
+ # # 3) agent runner (may or may not yield)
224
+ # def run_agent():
225
+ # final_result = None
226
+ # try:
227
+ # with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
228
+ # qwriter
229
+ # ):
230
+ # used_iterable = False
231
+ # if hasattr(agent_to_use, "run") and callable(
232
+ # getattr(agent_to_use, "run")
233
+ # ):
234
+ # try:
235
+ # res = agent_to_use.run(task, stream=True)
236
+ # if hasattr(res, "__iter__") and not isinstance(
237
+ # res, (str, bytes)
238
+ # ):
239
+ # used_iterable = True
240
+ # for it in res:
241
+ # try:
242
+ # q.put_nowait(it)
243
+ # except Exception:
244
+ # pass
245
+ # final_result = (
246
+ # None # iterable may already contain the answer
247
+ # )
248
+ # else:
249
+ # final_result = res
250
+ # except TypeError:
251
+ # # run(stream=True) not supported -> fall back
252
+ # pass
253
+ #
254
+ # if final_result is None and not used_iterable:
255
+ # # Try other common streaming signatures
256
+ # for name in (
257
+ # "run_stream",
258
+ # "stream",
259
+ # "stream_run",
260
+ # "run_with_callback",
261
+ # ):
262
+ # fn = getattr(agent_to_use, name, None)
263
+ # if callable(fn):
264
+ # try:
265
+ # res = fn(task)
266
+ # if hasattr(res, "__iter__") and not isinstance(
267
+ # res, (str, bytes)
268
+ # ):
269
+ # for it in res:
270
+ # q.put_nowait(it)
271
+ # final_result = None
272
+ # else:
273
+ # final_result = res
274
+ # break
275
+ # except TypeError:
276
+ # # maybe callback signature
277
+ # def cb(item):
278
+ # try:
279
+ # q.put_nowait(item)
280
+ # except Exception:
281
+ # pass
282
+ #
283
+ # try:
284
+ # fn(task, cb)
285
+ # final_result = None
286
+ # break
287
+ # except Exception:
288
+ # continue
289
+ #
290
+ # if final_result is None and not used_iterable:
291
+ # pass # (typo guard removed below)
292
+ #
293
+ # if final_result is None and not used_iterable:
294
+ # # Last resort: synchronous run()/generate()/callable
295
+ # if hasattr(agent_to_use, "run") and callable(
296
+ # getattr(agent_to_use, "run")
297
+ # ):
298
+ # final_result = agent_to_use.run(task)
299
+ # elif hasattr(agent_to_use, "generate") and callable(
300
+ # getattr(agent_to_use, "generate")
301
+ # ):
302
+ # final_result = agent_to_use.generate(task)
303
+ # elif callable(agent_to_use):
304
+ # final_result = agent_to_use(task)
305
+ #
306
+ # except Exception as e:
307
+ # try:
308
+ # qwriter.flush()
309
+ # except Exception:
310
+ # pass
311
+ # try:
312
+ # q.put_nowait({"__error__": str(e)})
313
+ # except Exception:
314
+ # pass
315
+ # finally:
316
+ # try:
317
+ # qwriter.flush()
318
+ # except Exception:
319
+ # pass
320
+ # try:
321
+ # q.put_nowait({"__final__": final_result})
322
+ # except Exception:
323
+ # pass
324
+ # stop_evt.set()
325
+ #
326
+ # # Kick off threads
327
+ # mem_thread = threading.Thread(target=poll_memory, daemon=True)
328
+ # run_thread = threading.Thread(target=run_agent, daemon=True)
329
+ # mem_thread.start()
330
+ # run_thread.start()
331
+ #
332
+ # # Async consumer
333
+ # while True:
334
+ # item = await q.get()
335
+ # yield item
336
+ # if isinstance(item, dict) and "__final__" in item:
337
+ # break
338
+ #
339
+ #
340
+ # def _recursively_scrub(obj):
341
+ # if isinstance(obj, str):
342
+ # return scrub_think_tags(obj)
343
+ # if isinstance(obj, dict):
344
+ # return {k: _recursively_scrub(v) for k, v in obj.items()}
345
+ # if isinstance(obj, list):
346
+ # return [_recursively_scrub(v) for v in obj]
347
+ # return obj
348
+ #
349
+ #
350
+ # async def proxy_upstream_chat_completions(
351
+ # body: dict, stream: bool, scrub_think: bool = False
352
+ # ):
353
+ # HF_TOKEN = os.getenv("OPENAI_API_KEY")
354
+ # headers = {
355
+ # "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
356
+ # "Content-Type": "application/json",
357
+ # }
358
+ # UPSTREAM_BASE = os.getenv("UPSTREAM_OPENAI_BASE", "").rstrip("/")
359
+ # url = f"{UPSTREAM_BASE}/chat/completions"
360
+ #
361
+ # if stream:
362
+ #
363
+ # async def proxy_stream():
364
+ # async with httpx.AsyncClient(timeout=None) as client:
365
+ # async with client.stream(
366
+ # "POST", url, headers=headers, json=body
367
+ # ) as resp:
368
+ # resp.raise_for_status()
369
+ # if scrub_think:
370
+ # # Pull text segments, scrub tags, and yield bytes
371
+ # async for txt in resp.aiter_text():
372
+ # try:
373
+ # cleaned = scrub_think_tags(txt)
374
+ # yield cleaned.encode("utf-8")
375
+ # except Exception:
376
+ # yield txt.encode("utf-8")
377
+ # else:
378
+ # async for chunk in resp.aiter_bytes():
379
+ # yield chunk
380
+ #
381
+ # return fastapi.responses.StreamingResponse(
382
+ # proxy_stream(), media_type="text/event-stream", headers=sse_headers()
383
+ # )
384
+ # else:
385
+ # async with httpx.AsyncClient(timeout=None) as client:
386
+ # r = await client.post(url, headers=headers, json=body)
387
+ # try:
388
+ # payload = r.json()
389
+ # except Exception:
390
+ # payload = {"status_code": r.status_code, "text": r.text}
391
+ #
392
+ # if scrub_think:
393
+ # try:
394
+ # payload = _recursively_scrub(payload)
395
+ # except Exception:
396
+ # pass
397
+ #
398
+ # return fastapi.responses.JSONResponse(
399
+ # status_code=r.status_code, content=payload
400
+ # )
agent_server/std_tee.py CHANGED
@@ -1,102 +1,317 @@
 
 
 
1
  import asyncio
2
  import io
3
  import json
4
  import re
5
  import threading
 
6
 
7
  from agent_server.sanitizing_think_tags import scrub_think_tags
8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
  class QueueWriter(io.TextIOBase):
11
  """
12
- File-like object that pushes each write to an asyncio.Queue immediately.
 
 
13
  """
14
 
15
- def __init__(self, q: "asyncio.Queue"):
16
  self.q = q
17
  self._lock = threading.Lock()
18
- self._buf = [] # accumulate until newline to reduce spam
19
 
20
- def write(self, s: str):
21
  if not s:
22
  return 0
 
 
 
23
  with self._lock:
24
  self._buf.append(s)
25
- # flush on newline to keep granularity reasonable
26
- if "\n" in s:
27
- chunk = "".join(self._buf)
28
- self._buf.clear()
29
- try:
30
- self.q.put_nowait({"__stdout__": chunk})
31
- except Exception:
32
- pass
 
 
 
 
33
  return len(s)
34
 
35
- def flush(self):
36
  with self._lock:
37
  if self._buf:
38
- chunk = "".join(self._buf)
39
  self._buf.clear()
40
- try:
41
- self.q.put_nowait({"__stdout__": chunk})
42
- except Exception:
43
- pass
 
 
 
 
44
 
45
 
 
 
 
 
46
  def _serialize_step(step) -> str:
47
  """
48
- Best-effort pretty string for a smolagents MemoryStep / ActionStep.
49
- Works even if attributes are missing on some versions.
 
 
 
 
 
 
 
 
50
  """
51
- parts = []
 
 
52
  sn = getattr(step, "step_number", None)
53
  if sn is not None:
54
  parts.append(f"Step {sn}")
 
 
55
  thought_val = getattr(step, "thought", None)
56
  if thought_val:
57
- parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
 
 
58
  tool_val = getattr(step, "tool", None)
59
  if tool_val:
60
- parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
 
 
61
  code_val = getattr(step, "code", None)
62
  if code_val:
63
- code_str = scrub_think_tags(str(code_val)).strip()
64
- parts.append("```python\n" + code_str + "\n```")
 
 
 
65
  args = getattr(step, "args", None)
66
  if args:
67
  try:
68
- parts.append(
69
- "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
70
- )
71
  except Exception:
72
- parts.append("Args: " + scrub_think_tags(str(args)))
 
 
 
73
  error = getattr(step, "error", None)
74
  if error:
75
- parts.append(f"Error: {scrub_think_tags(str(error))}")
 
 
76
  obs = getattr(step, "observations", None)
77
  if obs is not None:
78
  if isinstance(obs, (list, tuple)):
79
  obs_str = "\n".join(map(str, obs))
80
  else:
81
  obs_str = str(obs)
82
- parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
 
83
  # If this looks like a FinalAnswer step object, surface a clean final answer
84
  try:
85
  tname = type(step).__name__
86
  except Exception:
87
  tname = ""
88
- if tname.lower().startswith("finalanswer"):
89
  out = getattr(step, "output", None)
90
  if out is not None:
91
- return f"Final answer: {scrub_think_tags(str(out)).strip()}"
92
- # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
93
- s = scrub_think_tags(str(step))
94
  m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
95
  if m:
96
  return f"Final answer: {m.group(1).strip()}"
 
97
  # If the only content would be an object repr like FinalAnswerStep(...), drop it;
98
  # a cleaner "Final answer: ..." will come from the rule above or stdout.
99
  joined = "\n".join(parts).strip()
100
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
101
  return ""
102
- return joined or scrub_think_tags(str(step))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # agent_server/std_tee.py
2
+ from __future__ import annotations
3
+
4
  import asyncio
5
  import io
6
  import json
7
  import re
8
  import threading
9
+ import typing as t
10
 
11
  from agent_server.sanitizing_think_tags import scrub_think_tags
12
 
13
+ # ---------------------------------------------------------------------------
14
+ # Cleaning / formatting helpers (used by the streaming layer)
15
+ # ---------------------------------------------------------------------------
16
+
17
+ # Strip ANSI escape sequences (common from rich/logging)
18
+ _ANSI_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
19
+
20
+ # Lines that should never be surfaced (system prompt safety + obvious boilerplate)
21
+ _NOISY_PREFIXES = (
22
+ "OpenAIServerModel",
23
+ "Output message of the LLM",
24
+ "New run",
25
+ "─ Executing parsed code",
26
+ "╭", "╰", "│", "━", "─",
27
+ "System prompt", "SYSTEM PROMPT", "System Prompt",
28
+ )
29
+
30
+ # Very long single lines without enough alphanumerics are dropped
31
+ _MIN_SIG_CHARS = re.compile(r"[A-Za-z0-9]{3,}")
32
+
33
+ def _strip_ansi_and_think(s: str) -> str:
34
+ s = scrub_think_tags(s)
35
+ s = _ANSI_RE.sub("", s)
36
+ return s
37
+
38
+ def _truncate(s: str, n: int) -> str:
39
+ s = s.strip()
40
+ if len(s) <= n:
41
+ return s
42
+ return s[:n] + "\n… [truncated]"
43
+
44
+ def _format_reasoning_chunk(text: str, tag: str, idx: int) -> str:
45
+ """
46
+ Lightweight formatter for reasoning stream from stdout.
47
+ - scrubs <think>…</think>
48
+ - strips ANSI
49
+ - drops banners/box drawing and 'System prompt …'
50
+ - drops very-long low-signal lines
51
+ Returns a small readable chunk with a trailing blank line.
52
+ """
53
+ stripped = _strip_ansi_and_think(text).rstrip("\n").strip()
54
+ if not stripped:
55
+ return ""
56
+ if any(stripped.startswith(p) for p in _NOISY_PREFIXES):
57
+ return ""
58
+ # Lines made mostly of box drawing/separators
59
+ if all(ch in " ─━╭╮╰╯│═·—-_=+•" for ch in stripped):
60
+ return ""
61
+ # Excessively long lines with little signal (no alphanumerics)
62
+ if len(stripped) > 240 and not _MIN_SIG_CHARS.search(stripped):
63
+ return ""
64
+ return f"{stripped}\n\n"
65
+
66
+ # Optional helper if you ever want to sniff final answer from stdout
67
+ _FINAL_RE = re.compile(r"(?:^|\b)Final\s+answer:\s*(.+)$", flags=re.IGNORECASE)
68
+ def _maybe_parse_final_from_stdout(line: str) -> t.Optional[str]:
69
+ if not isinstance(line, str):
70
+ return None
71
+ m = _FINAL_RE.search(_strip_ansi_and_think(line))
72
+ if not m:
73
+ return None
74
+ return _strip_ansi_and_think(m.group(1)).strip() or None
75
+
76
+
77
+ # ---------------------------------------------------------------------------
78
+ # QueueWriter: tee stdout/stderr into an asyncio.Queue line-by-line
79
+ # ---------------------------------------------------------------------------
80
 
81
  class QueueWriter(io.TextIOBase):
82
  """
83
+ File-like object that pushes lines to an asyncio.Queue.
84
+ Each complete line is enqueued as {"__stdout__": "<line>"}.
85
+ The last partial line (if any) is flushed on flush().
86
  """
87
 
88
+ def __init__(self, q: "asyncio.Queue[dict]"):
89
  self.q = q
90
  self._lock = threading.Lock()
91
+ self._buf: list[str] = []
92
 
93
+ def write(self, s: str) -> int:
94
  if not s:
95
  return 0
96
+ if not isinstance(s, str):
97
+ s = str(s)
98
+
99
  with self._lock:
100
  self._buf.append(s)
101
+ text = "".join(self._buf)
102
+ if "\n" in text:
103
+ lines = text.splitlines(keepends=True)
104
+ # keep last partial (no newline) in buffer
105
+ tail = "" if text.endswith("\n") else lines.pop()
106
+ for ln in lines:
107
+ if ln: # include newlines; consumer will trim/format
108
+ try:
109
+ self.q.put_nowait({"__stdout__": ln})
110
+ except Exception:
111
+ pass
112
+ self._buf = [tail]
113
  return len(s)
114
 
115
+ def flush(self) -> None:
116
  with self._lock:
117
  if self._buf:
118
+ text = "".join(self._buf)
119
  self._buf.clear()
120
+ if text:
121
+ try:
122
+ self.q.put_nowait({"__stdout__": text})
123
+ except Exception:
124
+ pass
125
+
126
+ def isatty(self) -> bool: # some libs check this
127
+ return False
128
 
129
 
130
+ # ---------------------------------------------------------------------------
131
+ # (Optional / future) Compact serializer for step objects from various agents
132
+ # ---------------------------------------------------------------------------
133
+
134
  def _serialize_step(step) -> str:
135
  """
136
+ Compact, uniform serializer for 'step' objects from different agent libs.
137
+ Produces:
138
+ Step N
139
+ 🧠 Thought: …
140
+ 🛠️ Tool: …
141
+ 📥 Args: …
142
+ 📤 Observation: …
143
+ 💥 Error: …
144
+ (plus code fences when code is present)
145
+ With truncation to keep the reveal parsimonious.
146
  """
147
+ parts: list[str] = []
148
+
149
+ # Step number (best-effort)
150
  sn = getattr(step, "step_number", None)
151
  if sn is not None:
152
  parts.append(f"Step {sn}")
153
+
154
+ # Thought
155
  thought_val = getattr(step, "thought", None)
156
  if thought_val:
157
+ parts.append(f"🧠 Thought: {_truncate(_strip_ansi_and_think(str(thought_val)), 600)}")
158
+
159
+ # Tool
160
  tool_val = getattr(step, "tool", None)
161
  if tool_val:
162
+ parts.append(f"🛠️ Tool: {_truncate(_strip_ansi_and_think(str(tool_val)), 240)}")
163
+
164
+ # Code (if any)
165
  code_val = getattr(step, "code", None)
166
  if code_val:
167
+ code_str = _truncate(_strip_ansi_and_think(str(code_val)), 1600)
168
+ if code_str:
169
+ parts.append("```python\n" + code_str + "\n```")
170
+
171
+ # Args
172
  args = getattr(step, "args", None)
173
  if args:
174
  try:
175
+ arg_s = _truncate(_strip_ansi_and_think(json.dumps(args, ensure_ascii=False)), 800)
 
 
176
  except Exception:
177
+ arg_s = _truncate(_strip_ansi_and_think(str(args)), 800)
178
+ parts.append("📥 Args: " + arg_s)
179
+
180
+ # Error
181
  error = getattr(step, "error", None)
182
  if error:
183
+ parts.append(f"💥 Error: {_truncate(_strip_ansi_and_think(str(error)), 600)}")
184
+
185
+ # Observations
186
  obs = getattr(step, "observations", None)
187
  if obs is not None:
188
  if isinstance(obs, (list, tuple)):
189
  obs_str = "\n".join(map(str, obs))
190
  else:
191
  obs_str = str(obs)
192
+ parts.append("📤 Observation:\n" + _truncate(_strip_ansi_and_think(obs_str), 1600))
193
+
194
  # If this looks like a FinalAnswer step object, surface a clean final answer
195
  try:
196
  tname = type(step).__name__
197
  except Exception:
198
  tname = ""
199
+ if isinstance(tname, str) and tname.lower().startswith("finalanswer"):
200
  out = getattr(step, "output", None)
201
  if out is not None:
202
+ return f"Final answer: {_strip_ansi_and_think(str(out)).strip()}"
203
+ # Fallback: parse from string repr "FinalAnswerStep(output=...)"
204
+ s = _strip_ansi_and_think(str(step))
205
  m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
206
  if m:
207
  return f"Final answer: {m.group(1).strip()}"
208
+
209
  # If the only content would be an object repr like FinalAnswerStep(...), drop it;
210
  # a cleaner "Final answer: ..." will come from the rule above or stdout.
211
  joined = "\n".join(parts).strip()
212
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
213
  return ""
214
+ return joined or _strip_ansi_and_think(str(step))
215
+
216
+ # import asyncio
217
+ # import io
218
+ # import json
219
+ # import re
220
+ # import threading
221
+ #
222
+ # from agent_server.sanitizing_think_tags import scrub_think_tags
223
+ #
224
+ #
225
+ # class QueueWriter(io.TextIOBase):
226
+ # """
227
+ # File-like object that pushes each write to an asyncio.Queue immediately.
228
+ # """
229
+ #
230
+ # def __init__(self, q: "asyncio.Queue"):
231
+ # self.q = q
232
+ # self._lock = threading.Lock()
233
+ # self._buf = [] # accumulate until newline to reduce spam
234
+ #
235
+ # def write(self, s: str):
236
+ # if not s:
237
+ # return 0
238
+ # with self._lock:
239
+ # self._buf.append(s)
240
+ # # flush on newline to keep granularity reasonable
241
+ # if "\n" in s:
242
+ # chunk = "".join(self._buf)
243
+ # self._buf.clear()
244
+ # try:
245
+ # self.q.put_nowait({"__stdout__": chunk})
246
+ # except Exception:
247
+ # pass
248
+ # return len(s)
249
+ #
250
+ # def flush(self):
251
+ # with self._lock:
252
+ # if self._buf:
253
+ # chunk = "".join(self._buf)
254
+ # self._buf.clear()
255
+ # try:
256
+ # self.q.put_nowait({"__stdout__": chunk})
257
+ # except Exception:
258
+ # pass
259
+ #
260
+ #
261
+ # def _serialize_step(step) -> str:
262
+ # """
263
+ # Best-effort pretty string for a smolagents MemoryStep / ActionStep.
264
+ # Works even if attributes are missing on some versions.
265
+ # """
266
+ # parts = []
267
+ # sn = getattr(step, "step_number", None)
268
+ # if sn is not None:
269
+ # parts.append(f"Step {sn}")
270
+ # thought_val = getattr(step, "thought", None)
271
+ # if thought_val:
272
+ # parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
273
+ # tool_val = getattr(step, "tool", None)
274
+ # if tool_val:
275
+ # parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
276
+ # code_val = getattr(step, "code", None)
277
+ # if code_val:
278
+ # code_str = scrub_think_tags(str(code_val)).strip()
279
+ # parts.append("```python\n" + code_str + "\n```")
280
+ # args = getattr(step, "args", None)
281
+ # if args:
282
+ # try:
283
+ # parts.append(
284
+ # "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
285
+ # )
286
+ # except Exception:
287
+ # parts.append("Args: " + scrub_think_tags(str(args)))
288
+ # error = getattr(step, "error", None)
289
+ # if error:
290
+ # parts.append(f"Error: {scrub_think_tags(str(error))}")
291
+ # obs = getattr(step, "observations", None)
292
+ # if obs is not None:
293
+ # if isinstance(obs, (list, tuple)):
294
+ # obs_str = "\n".join(map(str, obs))
295
+ # else:
296
+ # obs_str = str(obs)
297
+ # parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
298
+ # # If this looks like a FinalAnswer step object, surface a clean final answer
299
+ # try:
300
+ # tname = type(step).__name__
301
+ # except Exception:
302
+ # tname = ""
303
+ # if tname.lower().startswith("finalanswer"):
304
+ # out = getattr(step, "output", None)
305
+ # if out is not None:
306
+ # return f"Final answer: {scrub_think_tags(str(out)).strip()}"
307
+ # # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
308
+ # s = scrub_think_tags(str(step))
309
+ # m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
310
+ # if m:
311
+ # return f"Final answer: {m.group(1).strip()}"
312
+ # # If the only content would be an object repr like FinalAnswerStep(...), drop it;
313
+ # # a cleaner "Final answer: ..." will come from the rule above or stdout.
314
+ # joined = "\n".join(parts).strip()
315
+ # if re.match(r"^FinalAnswer[^\n]+\)$", joined):
316
+ # return ""
317
+ # return joined or scrub_think_tags(str(step))
agents/generator_and_critic.py CHANGED
@@ -20,7 +20,7 @@ Instead, simply call the final answer tool with your evaluation and feedback.
20
  # ---------------- Factory ----------------
21
  def generate_generator_with_managed_critic(
22
  *,
23
- gen_max_steps: int = 12,
24
  crt_max_steps: int = 1,
25
  ) -> ToolCallingAgent:
26
  """
 
20
  # ---------------- Factory ----------------
21
  def generate_generator_with_managed_critic(
22
  *,
23
+ gen_max_steps: int = 4,
24
  crt_max_steps: int = 1,
25
  ) -> ToolCallingAgent:
26
  """