Subscribable Data Sources Guide
Overview
Data sources can now subscribe to topics on the message bus and automatically update their content when messages are published. This creates dynamic, reactive data sources that change during execution.
How It Works
Traditional Data Sources (Static)
Before: Data sources were static containers:
Data Source: PatientInfo
Content: "Initial patient data"
→ Content never changes during execution
Subscribable Data Sources (Dynamic)
Now: Data sources can subscribe to topics:
Data Source: PatientInfo
Content: "Initial patient data"
Subscribe Topic: UPDATED_INFO
When message published to UPDATED_INFO:
→ Content automatically updates to the new message
Configuration
Adding Subscribe Topic
In the UI:
- Create or edit a data source
- Set the label (e.g., "Results")
- Set initial content (optional)
- Set Subscribe Topic field (e.g., "PROCESSED_DATA")
- Data source will update when messages arrive on that topic
Subscribe Topic Field
Location: In each data source card, below the content textarea
Label: "Subscribe Topic (optional, case insensitive)"
Placeholder: "e.g., PROCESSED_DATA - updates content from bus"
Behavior:
- Empty: Data source is static (traditional behavior)
- With topic: Data source subscribes to that topic and updates on messages
Use Cases
Use Case 1: Iterative Refinement
Scenario: Agent refines data over multiple iterations
Setup:
Data Source: WorkingData
Initial Content: "Raw data to process"
Subscribe Topic: REFINED
Agent 1:
- Prompt: Process this: {WorkingData}
- Subscribe: START
- Publish: REFINED
Agent 2:
- Prompt: Further refine: {WorkingData}
- Subscribe: REFINED
- Publish: REFINED
Flow:
- Agent 1 reads initial content from WorkingData
- Agent 1 processes and publishes to REFINED
- WorkingData updates with refined content
- Agent 2 reads updated content from WorkingData
- Agent 2 refines further and publishes to REFINED
- WorkingData updates again
Result: WorkingData evolves through the pipeline
Use Case 2: Accumulating Results
Scenario: Collect results from multiple agents
Setup:
Data Source: CollectedResults
Initial Content: ""
Subscribe Topic: RESULT
Agent 1:
- Prompt: Analyze aspect A: {Input}
- Publish: RESULT
Agent 2:
- Prompt: Analyze aspect B: {Input}
- Publish: RESULT
Agent 3:
- Prompt: Analyze aspect C: {Input}
- Publish: RESULT
Flow:
- Agent 1 publishes result to RESULT
- CollectedResults updates to Agent 1's result
- Agent 2 publishes result to RESULT
- CollectedResults updates to Agent 2's result (overwrites)
- Agent 3 publishes result to RESULT
- CollectedResults updates to Agent 3's result
Note: Last message overwrites previous. For accumulation, agents need to read and append.
Use Case 3: State Management
Scenario: Maintain evolving state through pipeline
Setup:
Data Source: SystemState
Initial Content: '{"status": "initial", "step": 0}'
Subscribe Topic: STATE_UPDATE
Agent 1:
- Prompt: Update state: {SystemState}
Process: {Input}
Return updated JSON state
- Subscribe: START
- Publish: STATE_UPDATE
Agent 2:
- Prompt: Current state: {SystemState}
Next action based on state
- Subscribe: STATE_UPDATE
- Publish: STATE_UPDATE
Flow:
- Agent 1 reads initial state
- Agent 1 publishes updated state
- SystemState updates
- Agent 2 reads updated state
- Agent 2 publishes further updates
- SystemState continues evolving
Use Case 4: Feedback Loop
Scenario: Agent uses its own previous output
Setup:
Data Source: Context
Initial Content: "Start here"
Subscribe Topic: OUTPUT
Agent: Processor
- Prompt: Previous context: {Context}
New input: {input}
Generate next response
- Subscribe: START (and OUTPUT for iterations)
- Publish: OUTPUT
Flow:
- Agent reads initial context
- Agent processes and publishes
- Context updates with agent's output
- On next trigger, agent reads its own previous output
- Creates feedback loop
Use Case 5: Multi-Source Aggregation
Scenario: Different agents update different data sources
Setup:
Data Source: SourceA
Subscribe Topic: RESULTS_A
Data Source: SourceB
Subscribe Topic: RESULTS_B
Agent 1:
- Publish: RESULTS_A
Agent 2:
- Publish: RESULTS_B
Agent 3:
- Prompt: Combine {SourceA} and {SourceB}
- Subscribe: RESULTS_B (waits for both to be ready)
Flow:
- Agent 1 updates SourceA
- Agent 2 updates SourceB
- Agent 3 combines both updated sources
Features
Case Insensitive Topics
Subscribe topics are case insensitive:
Data Source subscribes to: "RESULTS"
Agent publishes to: "results"
→ Match! Data source updates
Update Notification
When a data source updates, the system logs:
[10:30:15] 📡 Data source "Results" updated with message from "PROCESSED"
Immediate Update
Data sources update immediately when messages are published:
- Message published → Data source content updates
- Next agent reading the data source sees new content
Initial Content
Data sources can have initial content:
Data Source: Counter
Initial Content: "0"
Subscribe Topic: INCREMENT
Agent:
- Prompt: Current value: {Counter}
Increment by 1
Return new value
- Publish: INCREMENT
The initial content is used until the first message arrives.
Multiple Subscribers
Both agents and data sources can subscribe to the same topic:
Topic: ANALYSIS_DONE
Subscribers:
- Agent "Reporter" (processes the analysis)
- Data Source "LastAnalysis" (stores for reference)
When message published:
→ Agent processes it
→ Data source stores it
Configuration Example
Complete Setup
Data Sources:
1. PatientData
Content: "Patient: John Doe, Age: 45"
Subscribe: (empty - static)
2. AnalysisResults
Content: ""
Subscribe: ANALYSIS
3. FinalReport
Content: ""
Subscribe: REPORT
Agents:
1. Analyzer
Prompt: Analyze patient: {PatientData}
Subscribe: START
Publish: ANALYSIS
2. Reporter
Prompt: Create report from: {AnalysisResults}
Subscribe: ANALYSIS
Publish: REPORT
Flow:
- START → Analyzer reads PatientData (static)
- Analyzer publishes to ANALYSIS
- AnalysisResults updates with analysis
- Reporter reads updated AnalysisResults
- Reporter publishes to REPORT
- FinalReport updates with report
Best Practices
1. Clear Naming
Use descriptive names that indicate the data source updates:
✅ Good:
- "CurrentState" subscribes to "STATE_UPDATE"
- "LatestResults" subscribes to "RESULTS"
- "WorkingBuffer" subscribes to "PROCESSED"
❌ Avoid:
- "Data1" subscribes to "TOPIC1"
- "X" subscribes to "Y"
2. Initial Content
Provide meaningful initial content when appropriate:
✅ Good:
- Counter: "0"
- State: '{"initialized": true}'
- Context: "Beginning of conversation"
🤷 Optional:
- Results: "" (will be populated by first message)
3. Update vs Accumulate
Update (default): Latest message overwrites
Data Source: CurrentValue
Each message replaces previous
Accumulate (requires agent logic):
Agent:
- Prompt: Previous: {Accumulated}
New: {input}
Combine and return both
- Publish: UPDATE_ACCUMULATED
4. Document Dependencies
In complex pipelines, document which agents update which data sources:
Pipeline Flow:
Agent A → TOPIC_X → DataSource1
Agent B → TOPIC_Y → DataSource2
Agent C reads {DataSource1} and {DataSource2}
5. Avoid Circular Dependencies
Don't create circular update loops:
❌ Dangerous:
Data Source: State
Subscribe: STATE
Agent:
- Prompt: {State}
- Subscribe: STATE
- Publish: STATE
→ Infinite loop risk!
Troubleshooting
Issue: Data source not updating
Causes:
- Subscribe topic doesn't match publish topic
- Topic name has typo
- Case sensitivity confusion (shouldn't happen, but check)
Solution:
- Check topic names match exactly
- Check execution log for "datasource_subscribed" message
- Check for "datasource_updated" message when message published
Issue: Data source updates too late
Cause: Agent reads data source before it updates
Solution: Ensure proper topic chaining:
Agent A → TOPIC → DataSource → Agent B reads it
Agent B must subscribe to TOPIC (or later topic)
Issue: Lost data from previous messages
Cause: Messages overwrite previous content
Solution: If you need to accumulate, implement append logic:
Agent:
- Prompt: Existing: {DataSource}
New: {input}
Append new to existing
Advanced Patterns
Pattern 1: State Machine
Data Source: State
Subscribe: STATE_CHANGE
Agents represent state transitions:
- StateA → processes → publishes STATE_CHANGE with "B"
- StateB → processes → publishes STATE_CHANGE with "C"
- Each agent checks {State} to see if it should act
Pattern 2: Shared Memory
Multiple agents read and write to shared data source:
Data Source: SharedMemory
Subscribe: MEMORY_UPDATE
Agents:
- All read {SharedMemory}
- All can publish to MEMORY_UPDATE
- Creates shared workspace
Pattern 3: Pipeline Checkpoints
Data Sources as checkpoints:
Data Source: Checkpoint1
Subscribe: STAGE1_DONE
Data Source: Checkpoint2
Subscribe: STAGE2_DONE
Later agents can reference any checkpoint:
- Prompt: From stage 1: {Checkpoint1}
From stage 2: {Checkpoint2}
Limitations
- No History: Only latest message is stored
- No Merge Logic: Latest message completely replaces content
- No Conditional Updates: Data source updates on every message to its topic
- No Filtering: Can't filter which messages to accept
Future Enhancements
Potential improvements:
- Append mode (accumulate messages)
- History (keep last N messages)
- Conditional updates (filter by content)
- Transform on update (apply function to incoming message)
- Multiple topic subscriptions per data source
- Update callbacks/hooks