Guillaume Salou commited on
Commit
7867a7a
·
unverified ·
1 Parent(s): c4ac4e6

feat(telemetry): track 5 untracked Bedrock call sites for full cost attribution (#179)

Browse files

* feat(telemetry): track 5 untracked Bedrock call sites for full cost attribution

Cost Explorer ($78,738 over 6 days) vs the session dataset's
total_cost_usd (~$354/day attributed) showed the dataset captures only
~33% of real Bedrock spend. Root cause: out of 9 acompletion() call
sites, only 2 (in agent_loop.py) emit the llm_call event that
total_cost_usd sums.

This wires telemetry into the 5 Bedrock-billing call sites that were
flying blind, with a `kind` tag on each call so analytics can split
spend by category:

- research_tool.py × 3 → kind="research" (sub-agent loop)
- context_manager.py → kind="compaction" (history summary)
- effort_probe.py → kind="effort_probe" (cascade walk)

Plus a fourth tag for the session-restore summary path
(session_manager.py → kind="restore").

Plumbing changes:

- telemetry.record_llm_call now accepts kind="..." (default "main"
preserves existing behavior).
- summarize_messages() and ContextManager.compact() take optional
session=None so the caller can opt into telemetry.
- probe_effort() takes optional session=None for the same reason.
- Both probe_effort callers (agent_loop._heal_effort_error and
model_switcher) now pass session.

Skipped:

- routes/agent.py /title — uses HF Router (Cerebras), not Bedrock
- routes/agent.py /health/llm — no session context (manual diagnostic
endpoint, ~$0.02/call, not billable to a user)

After deploy, expect dataset total_cost_usd to converge with Cost
Explorer to within 5-10%. The kind breakdown will quantify each
category, validating the cost-plan estimates in
ml_intern_bedrock_cost_plan.md.

* fix(telemetry): address PR bot feedback (2 P1 + 1 P2)

1. P1 — Wrap each research_tool record_llm_call in its own try/except.
record_llm_call's inner send_event is wrapped, but extract_usage
(telemetry.py:101) is not — an unexpected usage shape from LiteLLM
could propagate. At all 3 research sites the surrounding except-block
would convert that into "Research summary call failed", masking a
valid LLM response. Match the effort_probe pattern: dedicated
try/except logging at DEBUG.

2. P1 — Hoist `import time` from inside summarize_messages() to module
level in manager.py. stdlib, always available, matches the rest of
the module.

3. P2 — Update telemetry.py docstring kind list. Drop title_gen and
model_probe (skipped per PR description), add restore (emitted from
session_manager.py). Note the intentional skips at the bottom.

agent/context_manager/manager.py CHANGED
@@ -4,6 +4,7 @@ Context management for conversation history
4
 
5
  import logging
6
  import os
 
7
  import zoneinfo
8
  from datetime import datetime
9
  from pathlib import Path
@@ -102,6 +103,8 @@ async def summarize_messages(
102
  max_tokens: int = 2000,
103
  tool_specs: list[dict] | None = None,
104
  prompt: str = _COMPACT_PROMPT,
 
 
105
  ) -> tuple[str, int]:
106
  """Run a summarization prompt against a list of messages.
107
 
@@ -110,6 +113,13 @@ async def summarize_messages(
110
  instead — it preserves the tool-call trail so the agent can answer
111
  follow-up questions about what it did.
112
 
 
 
 
 
 
 
 
113
  Returns ``(summary_text, completion_tokens)``.
114
  """
115
  from agent.core.llm_params import _resolve_llm_params
@@ -119,12 +129,23 @@ async def summarize_messages(
119
  prompt_messages, tool_specs = with_prompt_caching(
120
  prompt_messages, tool_specs, llm_params.get("model")
121
  )
 
122
  response = await acompletion(
123
  messages=prompt_messages,
124
  max_completion_tokens=max_tokens,
125
  tools=tool_specs,
126
  **llm_params,
127
  )
 
 
 
 
 
 
 
 
 
 
128
  summary = response.choices[0].message.content or ""
129
  completion_tokens = response.usage.completion_tokens if response.usage else 0
130
  return summary, completion_tokens
@@ -355,8 +376,14 @@ class ContextManager:
355
  model_name: str,
356
  tool_specs: list[dict] | None = None,
357
  hf_token: str | None = None,
 
358
  ) -> None:
359
- """Remove old messages to keep history under target size"""
 
 
 
 
 
360
  if not self.needs_compaction:
361
  return
362
 
@@ -394,6 +421,8 @@ class ContextManager:
394
  max_tokens=self.compact_size,
395
  tool_specs=tool_specs,
396
  prompt=_COMPACT_PROMPT,
 
 
397
  )
398
  summarized_message = Message(role="assistant", content=summary)
399
 
 
4
 
5
  import logging
6
  import os
7
+ import time
8
  import zoneinfo
9
  from datetime import datetime
10
  from pathlib import Path
 
103
  max_tokens: int = 2000,
104
  tool_specs: list[dict] | None = None,
105
  prompt: str = _COMPACT_PROMPT,
106
+ session: Any = None,
107
+ kind: str = "compaction",
108
  ) -> tuple[str, int]:
109
  """Run a summarization prompt against a list of messages.
110
 
 
113
  instead — it preserves the tool-call trail so the agent can answer
114
  follow-up questions about what it did.
115
 
116
+ ``session`` is optional; when provided, the call is recorded via
117
+ ``telemetry.record_llm_call`` so its cost lands in the session's
118
+ ``total_cost_usd``. Without it, the call still happens but is
119
+ invisible in telemetry — which used to be the case for every
120
+ compaction call until 2026-04-29 (~30-50% of Bedrock spend was
121
+ attributed to this single source of dark cost).
122
+
123
  Returns ``(summary_text, completion_tokens)``.
124
  """
125
  from agent.core.llm_params import _resolve_llm_params
 
129
  prompt_messages, tool_specs = with_prompt_caching(
130
  prompt_messages, tool_specs, llm_params.get("model")
131
  )
132
+ _t0 = time.monotonic()
133
  response = await acompletion(
134
  messages=prompt_messages,
135
  max_completion_tokens=max_tokens,
136
  tools=tool_specs,
137
  **llm_params,
138
  )
139
+ if session is not None:
140
+ from agent.core import telemetry
141
+ await telemetry.record_llm_call(
142
+ session,
143
+ model=model_name,
144
+ response=response,
145
+ latency_ms=int((time.monotonic() - _t0) * 1000),
146
+ finish_reason=response.choices[0].finish_reason if response.choices else None,
147
+ kind=kind,
148
+ )
149
  summary = response.choices[0].message.content or ""
150
  completion_tokens = response.usage.completion_tokens if response.usage else 0
151
  return summary, completion_tokens
 
376
  model_name: str,
377
  tool_specs: list[dict] | None = None,
378
  hf_token: str | None = None,
379
+ session: Any = None,
380
  ) -> None:
381
+ """Remove old messages to keep history under target size.
382
+
383
+ ``session`` is optional — if passed, the underlying summarization
384
+ LLM call is recorded via ``telemetry.record_llm_call(kind=
385
+ "compaction")`` so its cost shows up in ``total_cost_usd``.
386
+ """
387
  if not self.needs_compaction:
388
  return
389
 
 
421
  max_tokens=self.compact_size,
422
  tool_specs=tool_specs,
423
  prompt=_COMPACT_PROMPT,
424
+ session=session,
425
+ kind="compaction",
426
  )
427
  summarized_message = Message(role="assistant", content=summary)
428
 
agent/core/agent_loop.py CHANGED
@@ -282,6 +282,7 @@ async def _heal_effort_and_rebuild_params(
282
  try:
283
  outcome = await probe_effort(
284
  model, session.config.reasoning_effort, session.hf_token,
 
285
  )
286
  session.model_effective_effort[model] = outcome.effective_effort
287
  logger.info(
@@ -354,6 +355,7 @@ async def _compact_and_notify(session: Session) -> None:
354
  model_name=session.config.model_name,
355
  tool_specs=session.tool_router.get_tool_specs_for_llm(),
356
  hf_token=session.hf_token,
 
357
  )
358
  new_usage = cm.running_context_usage
359
  if new_usage != old_usage:
 
282
  try:
283
  outcome = await probe_effort(
284
  model, session.config.reasoning_effort, session.hf_token,
285
+ session=session,
286
  )
287
  session.model_effective_effort[model] = outcome.effective_effort
288
  logger.info(
 
355
  model_name=session.config.model_name,
356
  tool_specs=session.tool_router.get_tool_specs_for_llm(),
357
  hf_token=session.hf_token,
358
+ session=session,
359
  )
360
  new_usage = cm.running_context_usage
361
  if new_usage != old_usage:
agent/core/effort_probe.py CHANGED
@@ -22,7 +22,9 @@ from __future__ import annotations
22
 
23
  import asyncio
24
  import logging
 
25
  from dataclasses import dataclass
 
26
 
27
  from litellm import acompletion
28
 
@@ -139,6 +141,7 @@ async def probe_effort(
139
  model_name: str,
140
  preference: str | None,
141
  hf_token: str | None,
 
142
  ) -> ProbeOutcome:
143
  """Walk the cascade for ``preference`` on ``model_name``.
144
 
@@ -147,6 +150,12 @@ async def probe_effort(
147
  transient errors (5xx, timeout) — persistent 4xx that aren't thinking/
148
  effort related bubble as the original exception so callers can surface
149
  them (auth, model-not-found, quota, etc.).
 
 
 
 
 
 
150
  """
151
  loop = asyncio.get_event_loop()
152
  start = loop.time()
@@ -174,7 +183,8 @@ async def probe_effort(
174
 
175
  attempts += 1
176
  try:
177
- await asyncio.wait_for(
 
178
  acompletion(
179
  messages=[{"role": "user", "content": "ping"}],
180
  max_tokens=_PROBE_MAX_TOKENS,
@@ -183,6 +193,21 @@ async def probe_effort(
183
  ),
184
  timeout=_PROBE_TIMEOUT,
185
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  except Exception as e:
187
  last_error = e
188
  if _is_thinking_unsupported(e):
 
22
 
23
  import asyncio
24
  import logging
25
+ import time
26
  from dataclasses import dataclass
27
+ from typing import Any
28
 
29
  from litellm import acompletion
30
 
 
141
  model_name: str,
142
  preference: str | None,
143
  hf_token: str | None,
144
+ session: Any = None,
145
  ) -> ProbeOutcome:
146
  """Walk the cascade for ``preference`` on ``model_name``.
147
 
 
150
  transient errors (5xx, timeout) — persistent 4xx that aren't thinking/
151
  effort related bubble as the original exception so callers can surface
152
  them (auth, model-not-found, quota, etc.).
153
+
154
+ ``session`` is optional; when provided, each successful probe attempt
155
+ is recorded via ``telemetry.record_llm_call(kind="effort_probe")`` so
156
+ the cost shows up in the session's ``total_cost_usd``. Failed probes
157
+ (rejected by the provider) typically aren't billed, so we only record
158
+ on success.
159
  """
160
  loop = asyncio.get_event_loop()
161
  start = loop.time()
 
183
 
184
  attempts += 1
185
  try:
186
+ _t0 = time.monotonic()
187
+ response = await asyncio.wait_for(
188
  acompletion(
189
  messages=[{"role": "user", "content": "ping"}],
190
  max_tokens=_PROBE_MAX_TOKENS,
 
193
  ),
194
  timeout=_PROBE_TIMEOUT,
195
  )
196
+ if session is not None:
197
+ # Best-effort telemetry — never let a logging blip propagate
198
+ # out of the probe and break model switching.
199
+ try:
200
+ from agent.core import telemetry
201
+ await telemetry.record_llm_call(
202
+ session,
203
+ model=model_name,
204
+ response=response,
205
+ latency_ms=int((time.monotonic() - _t0) * 1000),
206
+ finish_reason=response.choices[0].finish_reason if response.choices else None,
207
+ kind="effort_probe",
208
+ )
209
+ except Exception as _telem_err:
210
+ logger.debug("effort_probe telemetry failed: %s", _telem_err)
211
  except Exception as e:
212
  last_error = e
213
  if _is_thinking_unsupported(e):
agent/core/model_switcher.py CHANGED
@@ -187,7 +187,7 @@ async def probe_and_switch_model(
187
 
188
  console.print(f"[dim]checking {model_id} (effort: {preference})...[/dim]")
189
  try:
190
- outcome = await probe_effort(model_id, preference, hf_token)
191
  except ProbeInconclusive as e:
192
  _commit_switch(model_id, config, session, effective=None, cache=False)
193
  console.print(
 
187
 
188
  console.print(f"[dim]checking {model_id} (effort: {preference})...[/dim]")
189
  try:
190
+ outcome = await probe_effort(model_id, preference, hf_token, session=session)
191
  except ProbeInconclusive as e:
192
  _commit_switch(model_id, config, session, effective=None, cache=False)
193
  console.print(
agent/core/telemetry.py CHANGED
@@ -78,9 +78,29 @@ async def record_llm_call(
78
  response: Any = None,
79
  latency_ms: int,
80
  finish_reason: str | None,
 
81
  ) -> dict:
82
  """Emit an ``llm_call`` event and return the extracted usage dict so
83
- callers can stash it on their result object if they want."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  usage = extract_usage(response) if response is not None else {}
85
  cost_usd = 0.0
86
  if response is not None:
@@ -98,6 +118,7 @@ async def record_llm_call(
98
  "latency_ms": latency_ms,
99
  "finish_reason": finish_reason,
100
  "cost_usd": cost_usd,
 
101
  **usage,
102
  },
103
  ))
 
78
  response: Any = None,
79
  latency_ms: int,
80
  finish_reason: str | None,
81
+ kind: str = "main",
82
  ) -> dict:
83
  """Emit an ``llm_call`` event and return the extracted usage dict so
84
+ callers can stash it on their result object if they want.
85
+
86
+ ``kind`` tags the call site so downstream analytics can break spend
87
+ down by category. Values currently emitted by the codebase:
88
+
89
+ * ``main`` — agent loop turn (user-facing reply or tool follow-up)
90
+ * ``research`` — research sub-agent inner loop (3 call sites)
91
+ * ``compaction`` — context-window summary on overflow
92
+ * ``effort_probe``— effort cascade walk on rejection / model switch
93
+ * ``restore`` — session re-seed summary after a Space restart
94
+
95
+ Pre-2026-04-29 only ``main`` calls were instrumented; observed gap on
96
+ Cost Explorer was ~67%, with the other 5 call sites accounting for
97
+ the rest. Tagging lets us split the dataset's ``total_cost_usd`` by
98
+ category and validate against AWS billing.
99
+
100
+ The ``/title`` (HF Router, not Bedrock) and ``/health/llm`` (diagnostic
101
+ endpoint, no session context) call sites are intentionally not
102
+ instrumented — together they're <1% of spend.
103
+ """
104
  usage = extract_usage(response) if response is not None else {}
105
  cost_usd = 0.0
106
  if response is not None:
 
118
  "latency_ms": latency_ms,
119
  "finish_reason": finish_reason,
120
  "cost_usd": cost_usd,
121
+ "kind": kind,
122
  **usage,
123
  },
124
  ))
agent/tools/research_tool.py CHANGED
@@ -9,10 +9,12 @@ Inspired by claude-code's code-explorer agent pattern.
9
 
10
  import json
11
  import logging
 
12
  from typing import Any
13
 
14
  from litellm import Message, acompletion
15
 
 
16
  from agent.core.doom_loop import check_for_doom_loop
17
  from agent.core.llm_params import _resolve_llm_params
18
  from agent.core.prompt_caching import with_prompt_caching
@@ -332,6 +334,7 @@ async def research_handler(
332
  ))
333
  try:
334
  _msgs, _ = with_prompt_caching(messages, None, llm_params.get("model"))
 
335
  response = await acompletion(
336
  messages=_msgs,
337
  tools=None, # no tools — force text response
@@ -339,6 +342,20 @@ async def research_handler(
339
  timeout=120,
340
  **llm_params,
341
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
342
  content = response.choices[0].message.content or ""
343
  return content or "Research context exhausted — no summary produced.", bool(content)
344
  except Exception:
@@ -360,6 +377,7 @@ async def research_handler(
360
  _msgs, _tools = with_prompt_caching(
361
  messages, tool_specs if tool_specs else None, llm_params.get("model")
362
  )
 
363
  response = await acompletion(
364
  messages=_msgs,
365
  tools=_tools,
@@ -368,6 +386,17 @@ async def research_handler(
368
  timeout=120,
369
  **llm_params,
370
  )
 
 
 
 
 
 
 
 
 
 
 
371
  except Exception as e:
372
  logger.error("Research sub-agent LLM error: %s", e)
373
  return f"Research agent LLM error: {e}", False
@@ -459,6 +488,7 @@ async def research_handler(
459
  ))
460
  try:
461
  _msgs, _ = with_prompt_caching(messages, None, llm_params.get("model"))
 
462
  response = await acompletion(
463
  messages=_msgs,
464
  tools=None,
@@ -466,6 +496,17 @@ async def research_handler(
466
  timeout=120,
467
  **llm_params,
468
  )
 
 
 
 
 
 
 
 
 
 
 
469
  content = response.choices[0].message.content or ""
470
  if content:
471
  return content, True
 
9
 
10
  import json
11
  import logging
12
+ import time
13
  from typing import Any
14
 
15
  from litellm import Message, acompletion
16
 
17
+ from agent.core import telemetry
18
  from agent.core.doom_loop import check_for_doom_loop
19
  from agent.core.llm_params import _resolve_llm_params
20
  from agent.core.prompt_caching import with_prompt_caching
 
334
  ))
335
  try:
336
  _msgs, _ = with_prompt_caching(messages, None, llm_params.get("model"))
337
+ _t0 = time.monotonic()
338
  response = await acompletion(
339
  messages=_msgs,
340
  tools=None, # no tools — force text response
 
342
  timeout=120,
343
  **llm_params,
344
  )
345
+ # Telemetry is best-effort; a logging blip must never mask a
346
+ # valid LLM response (the surrounding except would convert it
347
+ # to "summary call failed").
348
+ try:
349
+ await telemetry.record_llm_call(
350
+ session,
351
+ model=research_model,
352
+ response=response,
353
+ latency_ms=int((time.monotonic() - _t0) * 1000),
354
+ finish_reason=response.choices[0].finish_reason if response.choices else None,
355
+ kind="research",
356
+ )
357
+ except Exception as _telem_err:
358
+ logger.debug("research telemetry failed: %s", _telem_err)
359
  content = response.choices[0].message.content or ""
360
  return content or "Research context exhausted — no summary produced.", bool(content)
361
  except Exception:
 
377
  _msgs, _tools = with_prompt_caching(
378
  messages, tool_specs if tool_specs else None, llm_params.get("model")
379
  )
380
+ _t0 = time.monotonic()
381
  response = await acompletion(
382
  messages=_msgs,
383
  tools=_tools,
 
386
  timeout=120,
387
  **llm_params,
388
  )
389
+ try:
390
+ await telemetry.record_llm_call(
391
+ session,
392
+ model=research_model,
393
+ response=response,
394
+ latency_ms=int((time.monotonic() - _t0) * 1000),
395
+ finish_reason=response.choices[0].finish_reason if response.choices else None,
396
+ kind="research",
397
+ )
398
+ except Exception as _telem_err:
399
+ logger.debug("research telemetry failed: %s", _telem_err)
400
  except Exception as e:
401
  logger.error("Research sub-agent LLM error: %s", e)
402
  return f"Research agent LLM error: {e}", False
 
488
  ))
489
  try:
490
  _msgs, _ = with_prompt_caching(messages, None, llm_params.get("model"))
491
+ _t0 = time.monotonic()
492
  response = await acompletion(
493
  messages=_msgs,
494
  tools=None,
 
496
  timeout=120,
497
  **llm_params,
498
  )
499
+ try:
500
+ await telemetry.record_llm_call(
501
+ session,
502
+ model=research_model,
503
+ response=response,
504
+ latency_ms=int((time.monotonic() - _t0) * 1000),
505
+ finish_reason=response.choices[0].finish_reason if response.choices else None,
506
+ kind="research",
507
+ )
508
+ except Exception as _telem_err:
509
+ logger.debug("research telemetry failed: %s", _telem_err)
510
  content = response.choices[0].message.content or ""
511
  if content:
512
  return content, True
backend/session_manager.py CHANGED
@@ -612,6 +612,8 @@ class SessionManager:
612
  max_tokens=4000,
613
  prompt=_RESTORE_PROMPT,
614
  tool_specs=tool_specs,
 
 
615
  )
616
  except Exception as e:
617
  logger.error("Summary call failed during seed: %s", e)
 
612
  max_tokens=4000,
613
  prompt=_RESTORE_PROMPT,
614
  tool_specs=tool_specs,
615
+ session=session,
616
+ kind="restore",
617
  )
618
  except Exception as e:
619
  logger.error("Summary call failed during seed: %s", e)