File size: 3,832 Bytes
026ee5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9162ac
 
 
026ee5d
 
 
d9162ac
 
 
026ee5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Middleware Architecture

DeepCritical uses middleware for state management, budget tracking, and workflow coordination.

## State Management

### WorkflowState

**File**: `src/middleware/state_machine.py`

**Purpose**: Thread-safe state management for research workflows

**Implementation**: Uses `ContextVar` for thread-safe isolation

**State Components**:
- `evidence: list[Evidence]`: Collected evidence from searches
- `conversation: Conversation`: Iteration history (gaps, tool calls, findings, thoughts)
- `embedding_service: Any`: Embedding service for semantic search

**Methods**:
- `add_evidence(evidence: Evidence)`: Adds evidence with URL-based deduplication
- `async search_related(query: str, top_k: int = 5) -> list[Evidence]`: Semantic search

**Initialization**:

<!--codeinclude-->
[Initialize Workflow State](../src/middleware/state_machine.py) start_line:98 end_line:110
<!--/codeinclude-->

**Access**:

<!--codeinclude-->
[Get Workflow State](../src/middleware/state_machine.py) start_line:112 end_line:125
<!--/codeinclude-->

## Workflow Manager

**File**: `src/middleware/workflow_manager.py`

**Purpose**: Coordinates parallel research loops

**Methods**:
- `add_loop(loop: ResearchLoop)`: Add a research loop to manage
- `async run_loops_parallel() -> list[ResearchLoop]`: Run all loops in parallel
- `update_loop_status(loop_id: str, status: str)`: Update loop status
- `sync_loop_evidence_to_state()`: Synchronize evidence from loops to global state

**Features**:
- Uses `asyncio.gather()` for parallel execution
- Handles errors per loop (doesn't fail all if one fails)
- Tracks loop status: `pending`, `running`, `completed`, `failed`, `cancelled`
- Evidence deduplication across parallel loops

**Usage**:
```python
from src.middleware.workflow_manager import WorkflowManager

manager = WorkflowManager()
manager.add_loop(loop1)
manager.add_loop(loop2)
completed_loops = await manager.run_loops_parallel()
```

## Budget Tracker

**File**: `src/middleware/budget_tracker.py`

**Purpose**: Tracks and enforces resource limits

**Budget Components**:
- **Tokens**: LLM token usage
- **Time**: Elapsed time in seconds
- **Iterations**: Number of iterations

**Methods**:
- `create_budget(token_limit, time_limit_seconds, iterations_limit) -> BudgetStatus`
- `add_tokens(tokens: int)`: Add token usage
- `start_timer()`: Start time tracking
- `update_timer()`: Update elapsed time
- `increment_iteration()`: Increment iteration count
- `check_budget() -> BudgetStatus`: Check current budget status
- `can_continue() -> bool`: Check if research can continue

**Token Estimation**:
- `estimate_tokens(text: str) -> int`: ~4 chars per token
- `estimate_llm_call_tokens(prompt: str, response: str) -> int`: Estimate LLM call tokens

**Usage**:
```python
from src.middleware.budget_tracker import BudgetTracker

tracker = BudgetTracker()
budget = tracker.create_budget(
    token_limit=100000,
    time_limit_seconds=600,
    iterations_limit=10
)
tracker.start_timer()
# ... research operations ...
if not tracker.can_continue():
    # Budget exceeded, stop research
    pass
```

## Models

All middleware models are defined in `src/utils/models.py`:

- `IterationData`: Data for a single iteration
- `Conversation`: Conversation history with iterations
- `ResearchLoop`: Research loop state and configuration
- `BudgetStatus`: Current budget status

## Thread Safety

All middleware components use `ContextVar` for thread-safe isolation:

- Each request/thread has its own workflow state
- No global mutable state
- Safe for concurrent requests

## See Also

- [Orchestrators](orchestrators.md) - How middleware is used in orchestration
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation
- [Contributing - Code Style](../contributing/code-style.md) - Development guidelines