File size: 13,797 Bytes
2ac49c3
 
ae5413a
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9576ce
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9576ce
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9576ce
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9576ce
 
 
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9576ce
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9576ce
 
 
2ac49c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# SPEC-21: Middleware Architecture Refactor

**Status:** COMPLETED
**Priority:** P2 (Architectural hygiene + fixes HuggingFace retry bug)
**Effort:** 2 hours
**PR Scope:** Folder rename + new middleware implementations

---

## Problem Statement

1. **Misleading folder name:** `src/middleware/` contains a workflow (`SubIterationMiddleware`), not interceptor middleware
2. **Missing retry logic:** `HuggingFaceChatClient` has no retry on 429/transient errors (P2 bug)
3. **No token tracking:** Cannot monitor API costs
4. **Not using MS framework patterns:** We use decorators but not `ChatMiddleware` base classes

---

## Current State (WRONG)

```text
src/
β”œβ”€β”€ middleware/                      ← MISLEADING: contains workflow
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ sub_iteration.py             ← This is a WORKFLOW, not middleware
β”‚   └── .gitkeep
```

**HuggingFace client has no retry:**
```python
# src/clients/huggingface.py:263-265
except Exception as e:
    logger.error("HuggingFace API error", error=str(e))
    raise  # ← No retry, crashes on 429
```

---

## Target State (CORRECT)

```text
src/
β”œβ”€β”€ workflows/                       ← RENAMED: now accurate
β”‚   β”œβ”€β”€ __init__.py
β”‚   └── sub_iteration.py
β”‚
β”œβ”€β”€ middleware/                      ← NEW: actual MS-pattern middleware
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ retry.py                     ← RetryMiddleware(ChatMiddleware)
β”‚   └── token_tracking.py            ← TokenTrackingMiddleware(ChatMiddleware)
```

---

## Implementation Steps

### Step 1: Rename Folder (5 min)

```bash
# Rename middleware β†’ workflows
git mv src/middleware src/workflows
```

### Step 2: Update Import (5 min)

```python
# src/orchestrators/hierarchical.py - Line ~15
# BEFORE:
from src.middleware.sub_iteration import SubIterationMiddleware, SubIterationTeam

# AFTER:
from src.workflows.sub_iteration import SubIterationMiddleware, SubIterationTeam
```

### Step 3: Create New Middleware Package (10 min)

```python
# src/middleware/__init__.py
"""Microsoft Agent Framework middleware implementations.

These are interceptor-pattern middleware that wrap chat client calls.
They are NOT workflows - see src/workflows/ for orchestration patterns.
"""

from src.middleware.retry import RetryMiddleware
from src.middleware.token_tracking import TokenTrackingMiddleware

__all__ = ["RetryMiddleware", "TokenTrackingMiddleware"]
```

### Step 4: Implement RetryMiddleware (30 min)

```python
# src/middleware/retry.py
"""Retry middleware for chat clients with exponential backoff."""

import asyncio
from typing import Awaitable, Callable

import structlog
from agent_framework._middleware import ChatContext, ChatMiddleware

logger = structlog.get_logger()


class RetryMiddleware(ChatMiddleware):
    """Retries failed chat requests with exponential backoff.

    This middleware intercepts chat client calls and retries on transient
    errors (rate limits, timeouts, server errors).

    Attributes:
        max_attempts: Maximum number of attempts (default: 3)
        min_wait: Minimum wait between retries in seconds (default: 1.0)
        max_wait: Maximum wait between retries in seconds (default: 10.0)
        retryable_status_codes: HTTP status codes to retry (default: 429, 500, 502, 503, 504)
    """

    def __init__(
        self,
        max_attempts: int = 3,
        min_wait: float = 1.0,
        max_wait: float = 10.0,
        retryable_status_codes: tuple[int, ...] = (429, 500, 502, 503, 504),
    ) -> None:
        self.max_attempts = max_attempts
        self.min_wait = min_wait
        self.max_wait = max_wait
        self.retryable_status_codes = retryable_status_codes

    def _is_retryable(self, error: Exception) -> bool:
        """Check if error is retryable."""
        # Check for httpx status errors
        if hasattr(error, "response") and hasattr(error.response, "status_code"):
            return error.response.status_code in self.retryable_status_codes

        # Check for timeout errors
        error_name = type(error).__name__.lower()
        if "timeout" in error_name:
            return True

        # Check for connection errors
        if "connection" in error_name:
            return True

        return False

    def _calculate_wait(self, attempt: int) -> float:
        """Calculate wait time with exponential backoff."""
        wait = self.min_wait * (2 ** attempt)
        return min(wait, self.max_wait)

    async def process(
        self, context: ChatContext, next: Callable[[ChatContext], Awaitable[None]]
    ) -> None:
        """Process the chat request with retry logic."""
        last_error: Exception | None = None

        for attempt in range(self.max_attempts):
            try:
                await next(context)
                return  # Success - exit retry loop

            except Exception as e:
                last_error = e

                if not self._is_retryable(e):
                    logger.warning(
                        "Non-retryable error",
                        error=str(e),
                        error_type=type(e).__name__,
                    )
                    raise  # Don't retry non-retryable errors

                if attempt < self.max_attempts - 1:
                    wait_time = self._calculate_wait(attempt)
                    logger.info(
                        "Retrying after error",
                        attempt=attempt + 1,
                        max_attempts=self.max_attempts,
                        wait_seconds=wait_time,
                        error=str(e),
                    )
                    await asyncio.sleep(wait_time)

        # All retries exhausted
        logger.error(
            "All retry attempts failed",
            max_attempts=self.max_attempts,
            last_error=str(last_error),
        )
        if last_error:
            raise last_error
```

### Step 5: Implement TokenTrackingMiddleware (20 min)

```python
# src/middleware/token_tracking.py
"""Token tracking middleware for monitoring API usage."""

from contextvars import ContextVar
from typing import Awaitable, Callable

import structlog
from agent_framework._middleware import ChatContext, ChatMiddleware

logger = structlog.get_logger()

# ContextVar for per-request token tracking
_request_tokens: ContextVar[dict[str, int]] = ContextVar(
    "request_tokens",
    default={"input": 0, "output": 0},
)


class TokenTrackingMiddleware(ChatMiddleware):
    """Tracks token usage across chat requests.

    This middleware logs token usage after each chat completion
    and maintains running totals for the session.

    Usage metrics are logged via structlog for observability.
    """

    def __init__(self) -> None:
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.request_count = 0

    async def process(
        self, context: ChatContext, next: Callable[[ChatContext], Awaitable[None]]
    ) -> None:
        """Process request and track token usage."""
        await next(context)

        # Extract usage from response if available
        if context.result is None:
            return

        usage = None

        # Try to get usage from response
        if hasattr(context.result, "usage"):
            usage = context.result.usage
        elif hasattr(context.result, "messages") and context.result.messages:
            # Check first message for usage metadata
            msg = context.result.messages[0]
            if hasattr(msg, "metadata") and msg.metadata:
                usage = msg.metadata.get("usage")

        if usage:
            input_tokens = usage.get("input_tokens", 0) or usage.get("prompt_tokens", 0)
            output_tokens = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0)

            self.total_input_tokens += input_tokens
            self.total_output_tokens += output_tokens
            self.request_count += 1

            logger.info(
                "Token usage",
                request_input=input_tokens,
                request_output=output_tokens,
                total_input=self.total_input_tokens,
                total_output=self.total_output_tokens,
                total_requests=self.request_count,
            )


def get_token_stats() -> dict[str, int]:
    """Get current request's token usage."""
    return _request_tokens.get().copy()
```

### Step 6: Apply Middleware to HuggingFaceChatClient (15 min)

```python
# src/clients/huggingface.py - Update __init__

from src.middleware.retry import RetryMiddleware
from src.middleware.token_tracking import TokenTrackingMiddleware

@use_function_invocation
@use_observability
@use_chat_middleware
class HuggingFaceChatClient(BaseChatClient):
    def __init__(
        self,
        model_id: str | None = None,
        api_key: str | None = None,
        **kwargs: Any,
    ) -> None:
        # Create middleware instances
        middleware = [
            RetryMiddleware(max_attempts=3, min_wait=1.0, max_wait=10.0),
            TokenTrackingMiddleware(),
        ]

        super().__init__(middleware=middleware, **kwargs)
        # ... rest of __init__
```

### Step 7: Update Tests (20 min)

```python
# tests/unit/middleware/test_retry.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from src.middleware.retry import RetryMiddleware


@pytest.mark.asyncio
async def test_retry_middleware_succeeds_first_try():
    """RetryMiddleware should pass through on success."""
    middleware = RetryMiddleware(max_attempts=3)
    context = MagicMock()
    next_fn = AsyncMock()

    await middleware.process(context, next_fn)

    next_fn.assert_called_once_with(context)


@pytest.mark.asyncio
async def test_retry_middleware_retries_on_429():
    """RetryMiddleware should retry on 429 rate limit."""
    middleware = RetryMiddleware(max_attempts=3, min_wait=0.01)
    context = MagicMock()

    # First two calls fail with 429, third succeeds
    call_count = 0

    async def mock_next(ctx):
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            error = Exception("Rate limited")
            error.response = MagicMock(status_code=429)
            raise error

    await middleware.process(context, mock_next)
    assert call_count == 3


@pytest.mark.asyncio
async def test_retry_middleware_raises_after_max_attempts():
    """RetryMiddleware should raise after max attempts exhausted."""
    middleware = RetryMiddleware(max_attempts=2, min_wait=0.01)
    context = MagicMock()

    async def always_fails(ctx):
        error = Exception("Always fails")
        error.response = MagicMock(status_code=500)
        raise error

    with pytest.raises(Exception, match="Always fails"):
        await middleware.process(context, always_fails)
```

---

## Implementation Checklist

### Phase 1: Folder Rename
- [ ] `git mv src/middleware src/workflows`
- [ ] Update import in `src/orchestrators/hierarchical.py`
- [ ] Update `src/workflows/__init__.py` docstring
- [ ] Run `make check` - verify no import errors

### Phase 2: Create Middleware Package
- [ ] Create `src/middleware/__init__.py`
- [ ] Create `src/middleware/retry.py` with `RetryMiddleware`
- [ ] Create `src/middleware/token_tracking.py` with `TokenTrackingMiddleware`
- [ ] Run `make check` - verify no syntax errors

### Phase 3: Apply to Client
- [ ] Update `src/clients/huggingface.py` to use middleware
- [ ] Test manually: `uv run python -c "from src.clients.huggingface import HuggingFaceChatClient; print('OK')"`
- [ ] Run `make check`

### Phase 4: Tests
- [ ] Create `tests/unit/middleware/__init__.py`
- [ ] Create `tests/unit/middleware/test_retry.py`
- [ ] Create `tests/unit/middleware/test_token_tracking.py`
- [ ] Run `make test` - all tests pass

### Phase 5: Cleanup
- [ ] Remove `.gitkeep` from `src/workflows/` if present
- [ ] Run full `make check`
- [ ] Commit with message: "refactor: implement proper middleware architecture (SPEC-21)"

---

## Acceptance Criteria

1. `src/middleware/` folder contains actual `ChatMiddleware` implementations
2. `src/workflows/` folder contains `SubIterationMiddleware` (renamed from middleware)
3. `HuggingFaceChatClient` uses `RetryMiddleware` - no more crashes on 429
4. Token usage is logged via `TokenTrackingMiddleware`
5. All existing tests pass
6. `make check` passes
7. No import errors anywhere in codebase

---

## Dependencies

- **SPEC-20** should be done first (simpler, builds confidence)
- Requires `agent-framework-core` package (already installed)

---

## Gotchas & Nuances

1. **MS middleware signature:** The `process` method takes `(context, next)` where `next` is a callable
2. **Middleware order matters:** Retry should be FIRST so it wraps everything
3. **ContextVar for token tracking:** Use ContextVar for per-request isolation
4. **Don't break HierarchicalOrchestrator:** It uses `SubIterationMiddleware` - update import path
5. **BaseChatClient constructor:** Check if it accepts `middleware=` parameter - may need to register differently

---

## Testing the Fix

After implementation, verify 429 handling:

```python
# Manual test
import asyncio
from src.clients.huggingface import HuggingFaceChatClient
from agent_framework import ChatMessage, ChatOptions

async def test():
    client = HuggingFaceChatClient()
    # Make rapid requests to trigger rate limit
    for i in range(10):
        try:
            resp = await client.get_response(
                messages=[ChatMessage(role="user", text="Hello")],
                chat_options=ChatOptions(),
            )
            print(f"Request {i}: OK")
        except Exception as e:
            print(f"Request {i}: {e}")

asyncio.run(test())
```

Should see retry logs instead of immediate crashes.