santanche commited on
Commit
81f916b
·
1 Parent(s): f369a7d

refactor (datadource): new subscribe feature

Browse files
Files changed (4) hide show
  1. README.md +2 -1
  2. SUBSCRIBE_DATASOURCES_GUIDE.md +466 -0
  3. server.py +33 -2
  4. static/index.html +28 -5
README.md CHANGED
@@ -1,5 +1,5 @@
1
  ---
2
- title: Pub/Sub Multi-Agent System
3
  emoji: 📡
4
  colorFrom: purple
5
  colorTo: pink
@@ -31,6 +31,7 @@ This system allows you to create and orchestrate multiple AI agents that communi
31
  - ⚙️ **Customizable Prompts**: Each agent has its own prompt template
32
  - 🏷️ **Named Entity Recognition**: Dedicated NER agents with formatted output display
33
  - 💾 **Save/Load with Results**: Save configurations including execution results and logs
 
34
 
35
  ## How It Works
36
 
 
1
  ---
2
+ title: (New) Pub/Sub Multi-Agent System
3
  emoji: 📡
4
  colorFrom: purple
5
  colorTo: pink
 
31
  - ⚙️ **Customizable Prompts**: Each agent has its own prompt template
32
  - 🏷️ **Named Entity Recognition**: Dedicated NER agents with formatted output display
33
  - 💾 **Save/Load with Results**: Save configurations including execution results and logs
34
+ - 📊 **Subscribable Data Sources**: Data sources can subscribe to topics and update dynamically during execution
35
 
36
  ## How It Works
37
 
SUBSCRIBE_DATASOURCES_GUIDE.md ADDED
@@ -0,0 +1,466 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Subscribable Data Sources Guide
2
+
3
+ ## Overview
4
+
5
+ Data sources can now subscribe to topics on the message bus and automatically update their content when messages are published. This creates dynamic, reactive data sources that change during execution.
6
+
7
+ ## How It Works
8
+
9
+ ### Traditional Data Sources (Static)
10
+
11
+ **Before**: Data sources were static containers:
12
+ ```
13
+ Data Source: PatientInfo
14
+ Content: "Initial patient data"
15
+ → Content never changes during execution
16
+ ```
17
+
18
+ ### Subscribable Data Sources (Dynamic)
19
+
20
+ **Now**: Data sources can subscribe to topics:
21
+ ```
22
+ Data Source: PatientInfo
23
+ Content: "Initial patient data"
24
+ Subscribe Topic: UPDATED_INFO
25
+
26
+ When message published to UPDATED_INFO:
27
+ → Content automatically updates to the new message
28
+ ```
29
+
30
+ ## Configuration
31
+
32
+ ### Adding Subscribe Topic
33
+
34
+ **In the UI**:
35
+ 1. Create or edit a data source
36
+ 2. Set the label (e.g., "Results")
37
+ 3. Set initial content (optional)
38
+ 4. Set **Subscribe Topic** field (e.g., "PROCESSED_DATA")
39
+ 5. Data source will update when messages arrive on that topic
40
+
41
+ ### Subscribe Topic Field
42
+
43
+ **Location**: In each data source card, below the content textarea
44
+
45
+ **Label**: "Subscribe Topic (optional, case insensitive)"
46
+
47
+ **Placeholder**: "e.g., PROCESSED_DATA - updates content from bus"
48
+
49
+ **Behavior**:
50
+ - **Empty**: Data source is static (traditional behavior)
51
+ - **With topic**: Data source subscribes to that topic and updates on messages
52
+
53
+ ## Use Cases
54
+
55
+ ### Use Case 1: Iterative Refinement
56
+
57
+ **Scenario**: Agent refines data over multiple iterations
58
+
59
+ **Setup**:
60
+ ```
61
+ Data Source: WorkingData
62
+ Initial Content: "Raw data to process"
63
+ Subscribe Topic: REFINED
64
+
65
+ Agent 1:
66
+ - Prompt: Process this: {WorkingData}
67
+ - Subscribe: START
68
+ - Publish: REFINED
69
+
70
+ Agent 2:
71
+ - Prompt: Further refine: {WorkingData}
72
+ - Subscribe: REFINED
73
+ - Publish: REFINED
74
+ ```
75
+
76
+ **Flow**:
77
+ 1. Agent 1 reads initial content from WorkingData
78
+ 2. Agent 1 processes and publishes to REFINED
79
+ 3. WorkingData updates with refined content
80
+ 4. Agent 2 reads updated content from WorkingData
81
+ 5. Agent 2 refines further and publishes to REFINED
82
+ 6. WorkingData updates again
83
+
84
+ **Result**: WorkingData evolves through the pipeline
85
+
86
+ ### Use Case 2: Accumulating Results
87
+
88
+ **Scenario**: Collect results from multiple agents
89
+
90
+ **Setup**:
91
+ ```
92
+ Data Source: CollectedResults
93
+ Initial Content: ""
94
+ Subscribe Topic: RESULT
95
+
96
+ Agent 1:
97
+ - Prompt: Analyze aspect A: {Input}
98
+ - Publish: RESULT
99
+
100
+ Agent 2:
101
+ - Prompt: Analyze aspect B: {Input}
102
+ - Publish: RESULT
103
+
104
+ Agent 3:
105
+ - Prompt: Analyze aspect C: {Input}
106
+ - Publish: RESULT
107
+ ```
108
+
109
+ **Flow**:
110
+ 1. Agent 1 publishes result to RESULT
111
+ 2. CollectedResults updates to Agent 1's result
112
+ 3. Agent 2 publishes result to RESULT
113
+ 4. CollectedResults updates to Agent 2's result (overwrites)
114
+ 5. Agent 3 publishes result to RESULT
115
+ 6. CollectedResults updates to Agent 3's result
116
+
117
+ **Note**: Last message overwrites previous. For accumulation, agents need to read and append.
118
+
119
+ ### Use Case 3: State Management
120
+
121
+ **Scenario**: Maintain evolving state through pipeline
122
+
123
+ **Setup**:
124
+ ```
125
+ Data Source: SystemState
126
+ Initial Content: '{"status": "initial", "step": 0}'
127
+ Subscribe Topic: STATE_UPDATE
128
+
129
+ Agent 1:
130
+ - Prompt: Update state: {SystemState}
131
+ Process: {Input}
132
+ Return updated JSON state
133
+ - Subscribe: START
134
+ - Publish: STATE_UPDATE
135
+
136
+ Agent 2:
137
+ - Prompt: Current state: {SystemState}
138
+ Next action based on state
139
+ - Subscribe: STATE_UPDATE
140
+ - Publish: STATE_UPDATE
141
+ ```
142
+
143
+ **Flow**:
144
+ 1. Agent 1 reads initial state
145
+ 2. Agent 1 publishes updated state
146
+ 3. SystemState updates
147
+ 4. Agent 2 reads updated state
148
+ 5. Agent 2 publishes further updates
149
+ 6. SystemState continues evolving
150
+
151
+ ### Use Case 4: Feedback Loop
152
+
153
+ **Scenario**: Agent uses its own previous output
154
+
155
+ **Setup**:
156
+ ```
157
+ Data Source: Context
158
+ Initial Content: "Start here"
159
+ Subscribe Topic: OUTPUT
160
+
161
+ Agent: Processor
162
+ - Prompt: Previous context: {Context}
163
+ New input: {input}
164
+ Generate next response
165
+ - Subscribe: START (and OUTPUT for iterations)
166
+ - Publish: OUTPUT
167
+ ```
168
+
169
+ **Flow**:
170
+ 1. Agent reads initial context
171
+ 2. Agent processes and publishes
172
+ 3. Context updates with agent's output
173
+ 4. On next trigger, agent reads its own previous output
174
+ 5. Creates feedback loop
175
+
176
+ ### Use Case 5: Multi-Source Aggregation
177
+
178
+ **Scenario**: Different agents update different data sources
179
+
180
+ **Setup**:
181
+ ```
182
+ Data Source: SourceA
183
+ Subscribe Topic: RESULTS_A
184
+
185
+ Data Source: SourceB
186
+ Subscribe Topic: RESULTS_B
187
+
188
+ Agent 1:
189
+ - Publish: RESULTS_A
190
+
191
+ Agent 2:
192
+ - Publish: RESULTS_B
193
+
194
+ Agent 3:
195
+ - Prompt: Combine {SourceA} and {SourceB}
196
+ - Subscribe: RESULTS_B (waits for both to be ready)
197
+ ```
198
+
199
+ **Flow**:
200
+ 1. Agent 1 updates SourceA
201
+ 2. Agent 2 updates SourceB
202
+ 3. Agent 3 combines both updated sources
203
+
204
+ ## Features
205
+
206
+ ### Case Insensitive Topics
207
+
208
+ Subscribe topics are case insensitive:
209
+ ```
210
+ Data Source subscribes to: "RESULTS"
211
+ Agent publishes to: "results"
212
+ → Match! Data source updates
213
+ ```
214
+
215
+ ### Update Notification
216
+
217
+ When a data source updates, the system logs:
218
+ ```
219
+ [10:30:15] 📡 Data source "Results" updated with message from "PROCESSED"
220
+ ```
221
+
222
+ ### Immediate Update
223
+
224
+ Data sources update immediately when messages are published:
225
+ - Message published → Data source content updates
226
+ - Next agent reading the data source sees new content
227
+
228
+ ### Initial Content
229
+
230
+ Data sources can have initial content:
231
+ ```
232
+ Data Source: Counter
233
+ Initial Content: "0"
234
+ Subscribe Topic: INCREMENT
235
+
236
+ Agent:
237
+ - Prompt: Current value: {Counter}
238
+ Increment by 1
239
+ Return new value
240
+ - Publish: INCREMENT
241
+ ```
242
+
243
+ The initial content is used until the first message arrives.
244
+
245
+ ### Multiple Subscribers
246
+
247
+ Both agents and data sources can subscribe to the same topic:
248
+ ```
249
+ Topic: ANALYSIS_DONE
250
+
251
+ Subscribers:
252
+ - Agent "Reporter" (processes the analysis)
253
+ - Data Source "LastAnalysis" (stores for reference)
254
+
255
+ When message published:
256
+ → Agent processes it
257
+ → Data source stores it
258
+ ```
259
+
260
+ ## Configuration Example
261
+
262
+ ### Complete Setup
263
+
264
+ **Data Sources**:
265
+ ```
266
+ 1. PatientData
267
+ Content: "Patient: John Doe, Age: 45"
268
+ Subscribe: (empty - static)
269
+
270
+ 2. AnalysisResults
271
+ Content: ""
272
+ Subscribe: ANALYSIS
273
+
274
+ 3. FinalReport
275
+ Content: ""
276
+ Subscribe: REPORT
277
+ ```
278
+
279
+ **Agents**:
280
+ ```
281
+ 1. Analyzer
282
+ Prompt: Analyze patient: {PatientData}
283
+ Subscribe: START
284
+ Publish: ANALYSIS
285
+
286
+ 2. Reporter
287
+ Prompt: Create report from: {AnalysisResults}
288
+ Subscribe: ANALYSIS
289
+ Publish: REPORT
290
+ ```
291
+
292
+ **Flow**:
293
+ 1. START → Analyzer reads PatientData (static)
294
+ 2. Analyzer publishes to ANALYSIS
295
+ 3. AnalysisResults updates with analysis
296
+ 4. Reporter reads updated AnalysisResults
297
+ 5. Reporter publishes to REPORT
298
+ 6. FinalReport updates with report
299
+
300
+ ## Best Practices
301
+
302
+ ### 1. Clear Naming
303
+
304
+ Use descriptive names that indicate the data source updates:
305
+ ```
306
+ ✅ Good:
307
+ - "CurrentState" subscribes to "STATE_UPDATE"
308
+ - "LatestResults" subscribes to "RESULTS"
309
+ - "WorkingBuffer" subscribes to "PROCESSED"
310
+
311
+ ❌ Avoid:
312
+ - "Data1" subscribes to "TOPIC1"
313
+ - "X" subscribes to "Y"
314
+ ```
315
+
316
+ ### 2. Initial Content
317
+
318
+ Provide meaningful initial content when appropriate:
319
+ ```
320
+ ✅ Good:
321
+ - Counter: "0"
322
+ - State: '{"initialized": true}'
323
+ - Context: "Beginning of conversation"
324
+
325
+ 🤷 Optional:
326
+ - Results: "" (will be populated by first message)
327
+ ```
328
+
329
+ ### 3. Update vs Accumulate
330
+
331
+ **Update (default)**: Latest message overwrites
332
+ ```
333
+ Data Source: CurrentValue
334
+ Each message replaces previous
335
+ ```
336
+
337
+ **Accumulate (requires agent logic)**:
338
+ ```
339
+ Agent:
340
+ - Prompt: Previous: {Accumulated}
341
+ New: {input}
342
+ Combine and return both
343
+ - Publish: UPDATE_ACCUMULATED
344
+ ```
345
+
346
+ ### 4. Document Dependencies
347
+
348
+ In complex pipelines, document which agents update which data sources:
349
+ ```
350
+ Pipeline Flow:
351
+ Agent A → TOPIC_X → DataSource1
352
+ Agent B → TOPIC_Y → DataSource2
353
+ Agent C reads {DataSource1} and {DataSource2}
354
+ ```
355
+
356
+ ### 5. Avoid Circular Dependencies
357
+
358
+ Don't create circular update loops:
359
+ ```
360
+ ❌ Dangerous:
361
+ Data Source: State
362
+ Subscribe: STATE
363
+
364
+ Agent:
365
+ - Prompt: {State}
366
+ - Subscribe: STATE
367
+ - Publish: STATE
368
+ → Infinite loop risk!
369
+ ```
370
+
371
+ ## Troubleshooting
372
+
373
+ ### Issue: Data source not updating
374
+
375
+ **Causes**:
376
+ 1. Subscribe topic doesn't match publish topic
377
+ 2. Topic name has typo
378
+ 3. Case sensitivity confusion (shouldn't happen, but check)
379
+
380
+ **Solution**:
381
+ - Check topic names match exactly
382
+ - Check execution log for "datasource_subscribed" message
383
+ - Check for "datasource_updated" message when message published
384
+
385
+ ### Issue: Data source updates too late
386
+
387
+ **Cause**: Agent reads data source before it updates
388
+
389
+ **Solution**: Ensure proper topic chaining:
390
+ ```
391
+ Agent A → TOPIC → DataSource → Agent B reads it
392
+ Agent B must subscribe to TOPIC (or later topic)
393
+ ```
394
+
395
+ ### Issue: Lost data from previous messages
396
+
397
+ **Cause**: Messages overwrite previous content
398
+
399
+ **Solution**: If you need to accumulate, implement append logic:
400
+ ```
401
+ Agent:
402
+ - Prompt: Existing: {DataSource}
403
+ New: {input}
404
+ Append new to existing
405
+ ```
406
+
407
+ ## Advanced Patterns
408
+
409
+ ### Pattern 1: State Machine
410
+
411
+ ```
412
+ Data Source: State
413
+ Subscribe: STATE_CHANGE
414
+
415
+ Agents represent state transitions:
416
+ - StateA → processes → publishes STATE_CHANGE with "B"
417
+ - StateB → processes → publishes STATE_CHANGE with "C"
418
+ - Each agent checks {State} to see if it should act
419
+ ```
420
+
421
+ ### Pattern 2: Shared Memory
422
+
423
+ ```
424
+ Multiple agents read and write to shared data source:
425
+
426
+ Data Source: SharedMemory
427
+ Subscribe: MEMORY_UPDATE
428
+
429
+ Agents:
430
+ - All read {SharedMemory}
431
+ - All can publish to MEMORY_UPDATE
432
+ - Creates shared workspace
433
+ ```
434
+
435
+ ### Pattern 3: Pipeline Checkpoints
436
+
437
+ ```
438
+ Data Sources as checkpoints:
439
+
440
+ Data Source: Checkpoint1
441
+ Subscribe: STAGE1_DONE
442
+
443
+ Data Source: Checkpoint2
444
+ Subscribe: STAGE2_DONE
445
+
446
+ Later agents can reference any checkpoint:
447
+ - Prompt: From stage 1: {Checkpoint1}
448
+ From stage 2: {Checkpoint2}
449
+ ```
450
+
451
+ ## Limitations
452
+
453
+ 1. **No History**: Only latest message is stored
454
+ 2. **No Merge Logic**: Latest message completely replaces content
455
+ 3. **No Conditional Updates**: Data source updates on every message to its topic
456
+ 4. **No Filtering**: Can't filter which messages to accept
457
+
458
+ ## Future Enhancements
459
+
460
+ Potential improvements:
461
+ - [ ] Append mode (accumulate messages)
462
+ - [ ] History (keep last N messages)
463
+ - [ ] Conditional updates (filter by content)
464
+ - [ ] Transform on update (apply function to incoming message)
465
+ - [ ] Multiple topic subscriptions per data source
466
+ - [ ] Update callbacks/hooks
server.py CHANGED
@@ -40,6 +40,7 @@ app.mount("/static", StaticFiles(directory=static_dir), name="static")
40
  class DataSource(BaseModel):
41
  label: str
42
  content: str
 
43
 
44
  class Agent(BaseModel):
45
  title: str
@@ -58,11 +59,13 @@ class ExecutionRequest(BaseModel):
58
  class MessageBus:
59
  def __init__(self):
60
  self.subscribers: Dict[str, List[Agent]] = {}
 
61
  self.messages: Dict[str, str] = {}
62
 
63
  def reset(self):
64
  """Reset the bus for a new execution"""
65
  self.subscribers = {}
 
66
  self.messages = {}
67
 
68
  def _normalize_topic(self, topic: str) -> str:
@@ -76,6 +79,13 @@ class MessageBus:
76
  self.subscribers[normalized] = []
77
  self.subscribers[normalized].append(agent)
78
 
 
 
 
 
 
 
 
79
  def publish(self, topic: str, content: str):
80
  """Publish a message to a topic (case insensitive)"""
81
  normalized = self._normalize_topic(topic)
@@ -90,6 +100,11 @@ class MessageBus:
90
  """Get all subscribers for a topic (case insensitive)"""
91
  normalized = self._normalize_topic(topic)
92
  return self.subscribers.get(normalized, [])
 
 
 
 
 
93
 
94
  # Stream event helper
95
  def create_event(event_type: str, **kwargs):
@@ -139,7 +154,7 @@ def format_ner_result(text: str, entities: List[Dict]) -> str:
139
  for entity in sorted_entities:
140
  start = entity['start']
141
  end = entity['end']
142
- entity_type = entity['entity_type']
143
  original_text = text[start:end]
144
 
145
  # Replace entity with labeled version
@@ -238,6 +253,12 @@ async def execute_pipeline(request: ExecutionRequest) -> AsyncGenerator[str, Non
238
  bus.subscribe(agent.subscribe_topic, agent)
239
  yield create_event("agent_subscribed", agent=agent.title, topic=agent.subscribe_topic)
240
 
 
 
 
 
 
 
241
  # Publish START message
242
  start_message = request.user_question if request.user_question else "System initialized"
243
  bus.publish("START", start_message)
@@ -259,14 +280,24 @@ async def execute_pipeline(request: ExecutionRequest) -> AsyncGenerator[str, Non
259
 
260
  for topic in topics_to_process:
261
  subscribers = bus.get_subscribers(topic)
 
262
 
263
- if not subscribers:
264
  yield create_event("no_subscribers", topic=topic)
265
  processed_topics.add(topic)
266
  continue
267
 
268
  message_content = bus.get_message(topic)
269
 
 
 
 
 
 
 
 
 
 
270
  for agent in subscribers:
271
  yield create_event("agent_triggered", agent=agent.title, topic=topic)
272
  yield create_event("agent_processing", agent=agent.title)
 
40
  class DataSource(BaseModel):
41
  label: str
42
  content: str
43
+ subscribe_topic: Optional[str] = None
44
 
45
  class Agent(BaseModel):
46
  title: str
 
59
  class MessageBus:
60
  def __init__(self):
61
  self.subscribers: Dict[str, List[Agent]] = {}
62
+ self.datasource_subscribers: Dict[str, List[DataSource]] = {}
63
  self.messages: Dict[str, str] = {}
64
 
65
  def reset(self):
66
  """Reset the bus for a new execution"""
67
  self.subscribers = {}
68
+ self.datasource_subscribers = {}
69
  self.messages = {}
70
 
71
  def _normalize_topic(self, topic: str) -> str:
 
79
  self.subscribers[normalized] = []
80
  self.subscribers[normalized].append(agent)
81
 
82
+ def subscribe_datasource(self, topic: str, datasource: DataSource):
83
+ """Subscribe a data source to a topic (case insensitive)"""
84
+ normalized = self._normalize_topic(topic)
85
+ if normalized not in self.datasource_subscribers:
86
+ self.datasource_subscribers[normalized] = []
87
+ self.datasource_subscribers[normalized].append(datasource)
88
+
89
  def publish(self, topic: str, content: str):
90
  """Publish a message to a topic (case insensitive)"""
91
  normalized = self._normalize_topic(topic)
 
100
  """Get all subscribers for a topic (case insensitive)"""
101
  normalized = self._normalize_topic(topic)
102
  return self.subscribers.get(normalized, [])
103
+
104
+ def get_datasource_subscribers(self, topic: str) -> List[DataSource]:
105
+ """Get all data source subscribers for a topic (case insensitive)"""
106
+ normalized = self._normalize_topic(topic)
107
+ return self.datasource_subscribers.get(normalized, [])
108
 
109
  # Stream event helper
110
  def create_event(event_type: str, **kwargs):
 
154
  for entity in sorted_entities:
155
  start = entity['start']
156
  end = entity['end']
157
+ entity_type = entity['entity_group']
158
  original_text = text[start:end]
159
 
160
  # Replace entity with labeled version
 
253
  bus.subscribe(agent.subscribe_topic, agent)
254
  yield create_event("agent_subscribed", agent=agent.title, topic=agent.subscribe_topic)
255
 
256
+ # Subscribe data sources to their topics
257
+ for datasource in request.data_sources:
258
+ if datasource.subscribe_topic:
259
+ bus.subscribe_datasource(datasource.subscribe_topic, datasource)
260
+ yield create_event("datasource_subscribed", datasource=datasource.label, topic=datasource.subscribe_topic)
261
+
262
  # Publish START message
263
  start_message = request.user_question if request.user_question else "System initialized"
264
  bus.publish("START", start_message)
 
280
 
281
  for topic in topics_to_process:
282
  subscribers = bus.get_subscribers(topic)
283
+ datasource_subscribers = bus.get_datasource_subscribers(topic)
284
 
285
+ if not subscribers and not datasource_subscribers:
286
  yield create_event("no_subscribers", topic=topic)
287
  processed_topics.add(topic)
288
  continue
289
 
290
  message_content = bus.get_message(topic)
291
 
292
+ # Update data sources that subscribe to this topic
293
+ for datasource in datasource_subscribers:
294
+ datasource.content = message_content
295
+ yield create_event("datasource_updated",
296
+ datasource=datasource.label,
297
+ topic=topic,
298
+ content=message_content)
299
+
300
+ # Process agents that subscribe to this topic
301
  for agent in subscribers:
302
  yield create_event("agent_triggered", agent=agent.title, topic=topic)
303
  yield create_event("agent_processing", agent=agent.title)
static/index.html CHANGED
@@ -67,7 +67,8 @@
67
  const newDataSource = {
68
  id: Date.now(),
69
  label: `Data${dataSources.length + 1}`,
70
- content: ''
 
71
  };
72
  setDataSources([...dataSources, newDataSource]);
73
  };
@@ -108,7 +109,8 @@
108
  userQuestion,
109
  dataSources: dataSources.map(ds => ({
110
  label: ds.label,
111
- content: ds.content
 
112
  })),
113
  agents: agents.map(a => ({
114
  title: a.title,
@@ -163,7 +165,8 @@
163
  const loadedDataSources = config.dataSources.map(ds => ({
164
  id: Date.now() + Math.random(),
165
  label: ds.label,
166
- content: ds.content
 
167
  }));
168
  setDataSources(loadedDataSources);
169
 
@@ -226,7 +229,8 @@
226
  body: JSON.stringify({
227
  data_sources: dataSources.map(ds => ({
228
  label: ds.label,
229
- content: ds.content
 
230
  })),
231
  user_question: userQuestion,
232
  agents: agents.map(a => ({
@@ -263,6 +267,14 @@
263
  addLog('Bus initialized', 'bus');
264
  } else if (data.type === 'agent_subscribed') {
265
  addLog(`Agent "${data.agent}" subscribed to topic "${data.topic}"`, 'bus');
 
 
 
 
 
 
 
 
266
  } else if (data.type === 'message_published') {
267
  addLog(`Published to "${data.topic}": ${data.content.substring(0, 100)}${data.content.length > 100 ? '...' : ''}`, 'bus');
268
  } else if (data.type === 'agent_triggered') {
@@ -422,7 +434,18 @@
422
  value={ds.content}
423
  onChange={(e) => updateDataSource(ds.id, 'content', e.target.value)}
424
  className="w-full h-24 p-2 border border-gray-300 rounded text-sm font-mono focus:ring-2 focus:ring-blue-500"
425
- placeholder="Enter content or upload file below..."
 
 
 
 
 
 
 
 
 
 
 
426
  />
427
  </div>
428
 
 
67
  const newDataSource = {
68
  id: Date.now(),
69
  label: `Data${dataSources.length + 1}`,
70
+ content: '',
71
+ subscribeTopic: ''
72
  };
73
  setDataSources([...dataSources, newDataSource]);
74
  };
 
109
  userQuestion,
110
  dataSources: dataSources.map(ds => ({
111
  label: ds.label,
112
+ content: ds.content,
113
+ subscribeTopic: ds.subscribeTopic || ''
114
  })),
115
  agents: agents.map(a => ({
116
  title: a.title,
 
165
  const loadedDataSources = config.dataSources.map(ds => ({
166
  id: Date.now() + Math.random(),
167
  label: ds.label,
168
+ content: ds.content,
169
+ subscribeTopic: ds.subscribeTopic || ''
170
  }));
171
  setDataSources(loadedDataSources);
172
 
 
229
  body: JSON.stringify({
230
  data_sources: dataSources.map(ds => ({
231
  label: ds.label,
232
+ content: ds.content,
233
+ subscribe_topic: ds.subscribeTopic || null
234
  })),
235
  user_question: userQuestion,
236
  agents: agents.map(a => ({
 
267
  addLog('Bus initialized', 'bus');
268
  } else if (data.type === 'agent_subscribed') {
269
  addLog(`Agent "${data.agent}" subscribed to topic "${data.topic}"`, 'bus');
270
+ } else if (data.type === 'datasource_subscribed') {
271
+ addLog(`Data source "${data.datasource}" subscribed to topic "${data.topic}"`, 'bus');
272
+ } else if (data.type === 'datasource_updated') {
273
+ addLog(`Data source "${data.datasource}" updated with message from "${data.topic}"`, 'bus');
274
+ // Update the data source in state
275
+ setDataSources(prev => prev.map(ds =>
276
+ ds.label === data.datasource ? { ...ds, content: data.content } : ds
277
+ ));
278
  } else if (data.type === 'message_published') {
279
  addLog(`Published to "${data.topic}": ${data.content.substring(0, 100)}${data.content.length > 100 ? '...' : ''}`, 'bus');
280
  } else if (data.type === 'agent_triggered') {
 
434
  value={ds.content}
435
  onChange={(e) => updateDataSource(ds.id, 'content', e.target.value)}
436
  className="w-full h-24 p-2 border border-gray-300 rounded text-sm font-mono focus:ring-2 focus:ring-blue-500"
437
+ placeholder="Enter content, upload file, or receive from bus..."
438
+ />
439
+ </div>
440
+
441
+ <div>
442
+ <label className="block text-xs font-medium text-gray-600 mb-1">Subscribe Topic (optional, case insensitive)</label>
443
+ <input
444
+ type="text"
445
+ value={ds.subscribeTopic}
446
+ onChange={(e) => updateDataSource(ds.id, 'subscribeTopic', e.target.value)}
447
+ className="w-full p-2 border border-gray-300 rounded text-sm focus:ring-2 focus:ring-blue-500"
448
+ placeholder="e.g., PROCESSED_DATA - updates content from bus"
449
  />
450
  </div>
451