ccm commited on
Commit
52a7b5c
·
1 Parent(s): 0b902e8

Moving more things over into agent_server

Browse files
Files changed (2) hide show
  1. agent_server/std_tee.py +102 -0
  2. proxy.py +1 -98
agent_server/std_tee.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import io
3
+ import json
4
+ import re
5
+ import threading
6
+
7
+ from agent_server.sanitizing_think_tags import scrub_think_tags
8
+
9
+
10
+ class QueueWriter(io.TextIOBase):
11
+ """
12
+ File-like object that pushes each write to an asyncio.Queue immediately.
13
+ """
14
+
15
+ def __init__(self, q: "asyncio.Queue"):
16
+ self.q = q
17
+ self._lock = threading.Lock()
18
+ self._buf = [] # accumulate until newline to reduce spam
19
+
20
+ def write(self, s: str):
21
+ if not s:
22
+ return 0
23
+ with self._lock:
24
+ self._buf.append(s)
25
+ # flush on newline to keep granularity reasonable
26
+ if "\n" in s:
27
+ chunk = "".join(self._buf)
28
+ self._buf.clear()
29
+ try:
30
+ self.q.put_nowait({"__stdout__": chunk})
31
+ except Exception:
32
+ pass
33
+ return len(s)
34
+
35
+ def flush(self):
36
+ with self._lock:
37
+ if self._buf:
38
+ chunk = "".join(self._buf)
39
+ self._buf.clear()
40
+ try:
41
+ self.q.put_nowait({"__stdout__": chunk})
42
+ except Exception:
43
+ pass
44
+
45
+
46
+ def _serialize_step(step) -> str:
47
+ """
48
+ Best-effort pretty string for a smolagents MemoryStep / ActionStep.
49
+ Works even if attributes are missing on some versions.
50
+ """
51
+ parts = []
52
+ sn = getattr(step, "step_number", None)
53
+ if sn is not None:
54
+ parts.append(f"Step {sn}")
55
+ thought_val = getattr(step, "thought", None)
56
+ if thought_val:
57
+ parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
58
+ tool_val = getattr(step, "tool", None)
59
+ if tool_val:
60
+ parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
61
+ code_val = getattr(step, "code", None)
62
+ if code_val:
63
+ code_str = scrub_think_tags(str(code_val)).strip()
64
+ parts.append("```python\n" + code_str + "\n```")
65
+ args = getattr(step, "args", None)
66
+ if args:
67
+ try:
68
+ parts.append(
69
+ "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
70
+ )
71
+ except Exception:
72
+ parts.append("Args: " + scrub_think_tags(str(args)))
73
+ error = getattr(step, "error", None)
74
+ if error:
75
+ parts.append(f"Error: {scrub_think_tags(str(error))}")
76
+ obs = getattr(step, "observations", None)
77
+ if obs is not None:
78
+ if isinstance(obs, (list, tuple)):
79
+ obs_str = "\n".join(map(str, obs))
80
+ else:
81
+ obs_str = str(obs)
82
+ parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
83
+ # If this looks like a FinalAnswer step object, surface a clean final answer
84
+ try:
85
+ tname = type(step).__name__
86
+ except Exception:
87
+ tname = ""
88
+ if tname.lower().startswith("finalanswer"):
89
+ out = getattr(step, "output", None)
90
+ if out is not None:
91
+ return f"Final answer: {scrub_think_tags(str(out)).strip()}"
92
+ # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
93
+ s = scrub_think_tags(str(step))
94
+ m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
95
+ if m:
96
+ return f"Final answer: {m.group(1).strip()}"
97
+ # If the only content would be an object repr like FinalAnswerStep(...), drop it;
98
+ # a cleaner "Final answer: ..." will come from the rule above or stdout.
99
+ joined = "\n".join(parts).strip()
100
+ if re.match(r"^FinalAnswer[^\n]+\)$", joined):
101
+ return ""
102
+ return joined or scrub_think_tags(str(step))
proxy.py CHANGED
@@ -3,7 +3,6 @@ OpenAI-compatible FastAPI proxy that wraps a smolagents CodeAgent
3
  """
4
 
5
  import os # For dealing with env vars
6
- import re # For tag stripping
7
  import json # For JSON handling
8
  import time # For timestamps and sleeps
9
  import asyncio # For async operations
@@ -13,7 +12,6 @@ import threading # For threading operations
13
 
14
  import fastapi
15
  import fastapi.responses
16
- import io
17
  import contextlib
18
 
19
  # Upstream pass-through
@@ -24,6 +22,7 @@ from agent_server.formatting_reasoning import _format_reasoning_chunk, _extract_
24
  from agent_server.helpers import normalize_content_to_text, _messages_to_task, _openai_response, _sse_headers
25
  from agent_server.openai_schemas import ChatMessage, ChatCompletionRequest
26
  from agent_server.sanitizing_think_tags import scrub_think_tags
 
27
  from agents.code_writing_agents import (
28
  generate_code_writing_agent_without_tools,
29
  generate_code_writing_agent_with_search,
@@ -59,102 +58,6 @@ app = fastapi.FastAPI()
59
  async def healthz():
60
  return {"ok": True}
61
 
62
- # ---------- Live stdout/stderr tee ----------
63
- class QueueWriter(io.TextIOBase):
64
- """
65
- File-like object that pushes each write to an asyncio.Queue immediately.
66
- """
67
-
68
- def __init__(self, q: "asyncio.Queue"):
69
- self.q = q
70
- self._lock = threading.Lock()
71
- self._buf = [] # accumulate until newline to reduce spam
72
-
73
- def write(self, s: str):
74
- if not s:
75
- return 0
76
- with self._lock:
77
- self._buf.append(s)
78
- # flush on newline to keep granularity reasonable
79
- if "\n" in s:
80
- chunk = "".join(self._buf)
81
- self._buf.clear()
82
- try:
83
- self.q.put_nowait({"__stdout__": chunk})
84
- except Exception:
85
- pass
86
- return len(s)
87
-
88
- def flush(self):
89
- with self._lock:
90
- if self._buf:
91
- chunk = "".join(self._buf)
92
- self._buf.clear()
93
- try:
94
- self.q.put_nowait({"__stdout__": chunk})
95
- except Exception:
96
- pass
97
-
98
-
99
- def _serialize_step(step) -> str:
100
- """
101
- Best-effort pretty string for a smolagents MemoryStep / ActionStep.
102
- Works even if attributes are missing on some versions.
103
- """
104
- parts = []
105
- sn = getattr(step, "step_number", None)
106
- if sn is not None:
107
- parts.append(f"Step {sn}")
108
- thought_val = getattr(step, "thought", None)
109
- if thought_val:
110
- parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
111
- tool_val = getattr(step, "tool", None)
112
- if tool_val:
113
- parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
114
- code_val = getattr(step, "code", None)
115
- if code_val:
116
- code_str = scrub_think_tags(str(code_val)).strip()
117
- parts.append("```python\n" + code_str + "\n```")
118
- args = getattr(step, "args", None)
119
- if args:
120
- try:
121
- parts.append(
122
- "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
123
- )
124
- except Exception:
125
- parts.append("Args: " + scrub_think_tags(str(args)))
126
- error = getattr(step, "error", None)
127
- if error:
128
- parts.append(f"Error: {scrub_think_tags(str(error))}")
129
- obs = getattr(step, "observations", None)
130
- if obs is not None:
131
- if isinstance(obs, (list, tuple)):
132
- obs_str = "\n".join(map(str, obs))
133
- else:
134
- obs_str = str(obs)
135
- parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
136
- # If this looks like a FinalAnswer step object, surface a clean final answer
137
- try:
138
- tname = type(step).__name__
139
- except Exception:
140
- tname = ""
141
- if tname.lower().startswith("finalanswer"):
142
- out = getattr(step, "output", None)
143
- if out is not None:
144
- return f"Final answer: {scrub_think_tags(str(out)).strip()}"
145
- # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
146
- s = scrub_think_tags(str(step))
147
- m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
148
- if m:
149
- return f"Final answer: {m.group(1).strip()}"
150
- # If the only content would be an object repr like FinalAnswerStep(...), drop it;
151
- # a cleaner "Final answer: ..." will come from the rule above or stdout.
152
- joined = "\n".join(parts).strip()
153
- if re.match(r"^FinalAnswer[^\n]+\)$", joined):
154
- return ""
155
- return joined or scrub_think_tags(str(step))
156
-
157
-
158
  # ---------- Agent streaming bridge (truly live) ----------
159
  async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
160
  """
 
3
  """
4
 
5
  import os # For dealing with env vars
 
6
  import json # For JSON handling
7
  import time # For timestamps and sleeps
8
  import asyncio # For async operations
 
12
 
13
  import fastapi
14
  import fastapi.responses
 
15
  import contextlib
16
 
17
  # Upstream pass-through
 
22
  from agent_server.helpers import normalize_content_to_text, _messages_to_task, _openai_response, _sse_headers
23
  from agent_server.openai_schemas import ChatMessage, ChatCompletionRequest
24
  from agent_server.sanitizing_think_tags import scrub_think_tags
25
+ from agent_server.std_tee import QueueWriter, _serialize_step
26
  from agents.code_writing_agents import (
27
  generate_code_writing_agent_without_tools,
28
  generate_code_writing_agent_with_search,
 
58
  async def healthz():
59
  return {"ok": True}
60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  # ---------- Agent streaming bridge (truly live) ----------
62
  async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
63
  """