sml-agents-publish-subscribe-dbvec / SUBSCRIBE_DATASOURCES_GUIDE.md
santanche's picture
refactor (datadource): new subscribe feature
81f916b

Subscribable Data Sources Guide

Overview

Data sources can now subscribe to topics on the message bus and automatically update their content when messages are published. This creates dynamic, reactive data sources that change during execution.

How It Works

Traditional Data Sources (Static)

Before: Data sources were static containers:

Data Source: PatientInfo
Content: "Initial patient data"
→ Content never changes during execution

Subscribable Data Sources (Dynamic)

Now: Data sources can subscribe to topics:

Data Source: PatientInfo
Content: "Initial patient data"
Subscribe Topic: UPDATED_INFO

When message published to UPDATED_INFO:
→ Content automatically updates to the new message

Configuration

Adding Subscribe Topic

In the UI:

  1. Create or edit a data source
  2. Set the label (e.g., "Results")
  3. Set initial content (optional)
  4. Set Subscribe Topic field (e.g., "PROCESSED_DATA")
  5. Data source will update when messages arrive on that topic

Subscribe Topic Field

Location: In each data source card, below the content textarea

Label: "Subscribe Topic (optional, case insensitive)"

Placeholder: "e.g., PROCESSED_DATA - updates content from bus"

Behavior:

  • Empty: Data source is static (traditional behavior)
  • With topic: Data source subscribes to that topic and updates on messages

Use Cases

Use Case 1: Iterative Refinement

Scenario: Agent refines data over multiple iterations

Setup:

Data Source: WorkingData
Initial Content: "Raw data to process"
Subscribe Topic: REFINED

Agent 1:
- Prompt: Process this: {WorkingData}
- Subscribe: START
- Publish: REFINED

Agent 2:
- Prompt: Further refine: {WorkingData}
- Subscribe: REFINED
- Publish: REFINED

Flow:

  1. Agent 1 reads initial content from WorkingData
  2. Agent 1 processes and publishes to REFINED
  3. WorkingData updates with refined content
  4. Agent 2 reads updated content from WorkingData
  5. Agent 2 refines further and publishes to REFINED
  6. WorkingData updates again

Result: WorkingData evolves through the pipeline

Use Case 2: Accumulating Results

Scenario: Collect results from multiple agents

Setup:

Data Source: CollectedResults
Initial Content: ""
Subscribe Topic: RESULT

Agent 1:
- Prompt: Analyze aspect A: {Input}
- Publish: RESULT

Agent 2:
- Prompt: Analyze aspect B: {Input}
- Publish: RESULT

Agent 3:
- Prompt: Analyze aspect C: {Input}
- Publish: RESULT

Flow:

  1. Agent 1 publishes result to RESULT
  2. CollectedResults updates to Agent 1's result
  3. Agent 2 publishes result to RESULT
  4. CollectedResults updates to Agent 2's result (overwrites)
  5. Agent 3 publishes result to RESULT
  6. CollectedResults updates to Agent 3's result

Note: Last message overwrites previous. For accumulation, agents need to read and append.

Use Case 3: State Management

Scenario: Maintain evolving state through pipeline

Setup:

Data Source: SystemState
Initial Content: '{"status": "initial", "step": 0}'
Subscribe Topic: STATE_UPDATE

Agent 1:
- Prompt: Update state: {SystemState}
         Process: {Input}
         Return updated JSON state
- Subscribe: START
- Publish: STATE_UPDATE

Agent 2:
- Prompt: Current state: {SystemState}
         Next action based on state
- Subscribe: STATE_UPDATE
- Publish: STATE_UPDATE

Flow:

  1. Agent 1 reads initial state
  2. Agent 1 publishes updated state
  3. SystemState updates
  4. Agent 2 reads updated state
  5. Agent 2 publishes further updates
  6. SystemState continues evolving

Use Case 4: Feedback Loop

Scenario: Agent uses its own previous output

Setup:

Data Source: Context
Initial Content: "Start here"
Subscribe Topic: OUTPUT

Agent: Processor
- Prompt: Previous context: {Context}
         New input: {input}
         Generate next response
- Subscribe: START (and OUTPUT for iterations)
- Publish: OUTPUT

Flow:

  1. Agent reads initial context
  2. Agent processes and publishes
  3. Context updates with agent's output
  4. On next trigger, agent reads its own previous output
  5. Creates feedback loop

Use Case 5: Multi-Source Aggregation

Scenario: Different agents update different data sources

Setup:

Data Source: SourceA
Subscribe Topic: RESULTS_A

Data Source: SourceB
Subscribe Topic: RESULTS_B

Agent 1:
- Publish: RESULTS_A

Agent 2:
- Publish: RESULTS_B

Agent 3:
- Prompt: Combine {SourceA} and {SourceB}
- Subscribe: RESULTS_B (waits for both to be ready)

Flow:

  1. Agent 1 updates SourceA
  2. Agent 2 updates SourceB
  3. Agent 3 combines both updated sources

Features

Case Insensitive Topics

Subscribe topics are case insensitive:

Data Source subscribes to: "RESULTS"
Agent publishes to: "results"
→ Match! Data source updates

Update Notification

When a data source updates, the system logs:

[10:30:15] 📡 Data source "Results" updated with message from "PROCESSED"

Immediate Update

Data sources update immediately when messages are published:

  • Message published → Data source content updates
  • Next agent reading the data source sees new content

Initial Content

Data sources can have initial content:

Data Source: Counter
Initial Content: "0"
Subscribe Topic: INCREMENT

Agent:
- Prompt: Current value: {Counter}
         Increment by 1
         Return new value
- Publish: INCREMENT

The initial content is used until the first message arrives.

Multiple Subscribers

Both agents and data sources can subscribe to the same topic:

Topic: ANALYSIS_DONE

Subscribers:
- Agent "Reporter" (processes the analysis)
- Data Source "LastAnalysis" (stores for reference)

When message published:
→ Agent processes it
→ Data source stores it

Configuration Example

Complete Setup

Data Sources:

1. PatientData
   Content: "Patient: John Doe, Age: 45"
   Subscribe: (empty - static)

2. AnalysisResults
   Content: ""
   Subscribe: ANALYSIS

3. FinalReport
   Content: ""
   Subscribe: REPORT

Agents:

1. Analyzer
   Prompt: Analyze patient: {PatientData}
   Subscribe: START
   Publish: ANALYSIS

2. Reporter
   Prompt: Create report from: {AnalysisResults}
   Subscribe: ANALYSIS
   Publish: REPORT

Flow:

  1. START → Analyzer reads PatientData (static)
  2. Analyzer publishes to ANALYSIS
  3. AnalysisResults updates with analysis
  4. Reporter reads updated AnalysisResults
  5. Reporter publishes to REPORT
  6. FinalReport updates with report

Best Practices

1. Clear Naming

Use descriptive names that indicate the data source updates:

✅ Good:
- "CurrentState" subscribes to "STATE_UPDATE"
- "LatestResults" subscribes to "RESULTS"
- "WorkingBuffer" subscribes to "PROCESSED"

❌ Avoid:
- "Data1" subscribes to "TOPIC1"
- "X" subscribes to "Y"

2. Initial Content

Provide meaningful initial content when appropriate:

✅ Good:
- Counter: "0"
- State: '{"initialized": true}'
- Context: "Beginning of conversation"

🤷 Optional:
- Results: "" (will be populated by first message)

3. Update vs Accumulate

Update (default): Latest message overwrites

Data Source: CurrentValue
Each message replaces previous

Accumulate (requires agent logic):

Agent:
- Prompt: Previous: {Accumulated}
         New: {input}
         Combine and return both
- Publish: UPDATE_ACCUMULATED

4. Document Dependencies

In complex pipelines, document which agents update which data sources:

Pipeline Flow:
Agent A → TOPIC_X → DataSource1
Agent B → TOPIC_Y → DataSource2
Agent C reads {DataSource1} and {DataSource2}

5. Avoid Circular Dependencies

Don't create circular update loops:

❌ Dangerous:
Data Source: State
Subscribe: STATE

Agent:
- Prompt: {State}
- Subscribe: STATE
- Publish: STATE
→ Infinite loop risk!

Troubleshooting

Issue: Data source not updating

Causes:

  1. Subscribe topic doesn't match publish topic
  2. Topic name has typo
  3. Case sensitivity confusion (shouldn't happen, but check)

Solution:

  • Check topic names match exactly
  • Check execution log for "datasource_subscribed" message
  • Check for "datasource_updated" message when message published

Issue: Data source updates too late

Cause: Agent reads data source before it updates

Solution: Ensure proper topic chaining:

Agent A → TOPIC → DataSource → Agent B reads it
Agent B must subscribe to TOPIC (or later topic)

Issue: Lost data from previous messages

Cause: Messages overwrite previous content

Solution: If you need to accumulate, implement append logic:

Agent:
- Prompt: Existing: {DataSource}
         New: {input}
         Append new to existing

Advanced Patterns

Pattern 1: State Machine

Data Source: State
Subscribe: STATE_CHANGE

Agents represent state transitions:
- StateA → processes → publishes STATE_CHANGE with "B"
- StateB → processes → publishes STATE_CHANGE with "C"
- Each agent checks {State} to see if it should act

Pattern 2: Shared Memory

Multiple agents read and write to shared data source:

Data Source: SharedMemory
Subscribe: MEMORY_UPDATE

Agents:
- All read {SharedMemory}
- All can publish to MEMORY_UPDATE
- Creates shared workspace

Pattern 3: Pipeline Checkpoints

Data Sources as checkpoints:

Data Source: Checkpoint1
Subscribe: STAGE1_DONE

Data Source: Checkpoint2
Subscribe: STAGE2_DONE

Later agents can reference any checkpoint:
- Prompt: From stage 1: {Checkpoint1}
         From stage 2: {Checkpoint2}

Limitations

  1. No History: Only latest message is stored
  2. No Merge Logic: Latest message completely replaces content
  3. No Conditional Updates: Data source updates on every message to its topic
  4. No Filtering: Can't filter which messages to accept

Future Enhancements

Potential improvements:

  • Append mode (accumulate messages)
  • History (keep last N messages)
  • Conditional updates (filter by content)
  • Transform on update (apply function to incoming message)
  • Multiple topic subscriptions per data source
  • Update callbacks/hooks