Buckets:
| Name | Size | Uploaded | Xet hash |
|---|---|---|---|
| README.md | 3.65 kB xet | e8b04c42 | |
| __init__.py | 165 Bytes xet | 152de002 | |
| assets.py | 8.34 kB xet | 52d11d98 | |
| definitions.py | 751 Bytes xet | 68538f24 |
Event-Driven Benchmark Refresh
Automatically re-materialize evaluation datasets when an upstream Hub benchmark changes, using a Dagster sensor with cursor-based state.
What this example shows
@sensorwithasset_selectiontargeting multiple downstream assets- Hub revision polling via
load_dataset_builder(lightweight, no dataset download) context.cursorfor durable sensor state across restarts and replicasRunRequestwithrun_keyfor idempotent trigger deduplicationDefaultSensorStatus.STOPPEDas a safe default — flip toRUNNINGin production- Downstream assets (
humaneval_formatted,benchmark_refresh_report) re-running automatically when the ingestion asset is refreshed
Dataset
openai/openai_humaneval — 164 Python programming
tasks used to benchmark LLM code generation capability. Compact and
stable, making revision changes meaningful when they do occur.
| Split | Rows | Description |
|---|---|---|
| test | 164 | Python function completion tasks |
Key API
@sensor(
asset_selection=[AssetKey("humaneval_benchmark"), ...],
minimum_interval_seconds=300,
default_status=DefaultSensorStatus.STOPPED,
)
def humaneval_revision_sensor(context: SensorEvaluationContext):
last_seen = context.cursor or ""
current_revision = _get_current_hub_revision("openai/openai_humaneval")
if current_revision == last_seen:
return # no change — skip this tick
context.update_cursor(current_revision)
yield RunRequest(run_key=f"humaneval_refresh_{current_revision}")
Asset graph
[sensor tick]
│ (revision changed)
▼
humaneval_benchmark ← re-materialized on trigger
│
▼
humaneval_formatted ← auto-runs downstream
│
▼
benchmark_refresh_report ← audit record with run_id + fingerprint
Sensor lifecycle
| Status | Behaviour |
|---|---|
STOPPED (default) |
Sensor defined but not ticking — safe for dev |
RUNNING |
Polls Hub every minimum_interval_seconds |
| Revision unchanged | Tick completes, no run emitted |
| Revision changed | RunRequest emitted, full pipeline re-materializes |
Cursor state
Sensor state is stored in context.cursor (Dagster-managed, persisted in
the instance storage). This is safe for multi-replica deployments.
The file-based fallback _STATE_FILE in assets.py is included only
as a local-dev reference — use context.cursor in production.
Run deduplication
run_key=f"humaneval_refresh_{current_revision}" ensures that if Dagster
attempts to emit the same revision trigger twice (e.g. after a restart),
only one run is launched. Dagster deduplicates by run_key automatically.
Hub revision detection
load_dataset_builder() fetches DatasetInfo without downloading data.
The revision proxy used here (hash(description + version)) is lightweight
but approximate. For production, use huggingface_hub.list_dataset_commits()
to get the real commit SHA:
from huggingface_hub import list_dataset_commits
commits = list(list_dataset_commits("openai/openai_humaneval"))
latest_sha = commits[0].commit_id
How to run
pip install dagster dagster-hf-datasets
cd dagster_hf_datasets_examples
dagster dev -m event_driven_benchmark_refresh.definitions
To test the sensor manually: in the Dagster UI go to Automation → Sensors,
find humaneval_revision_sensor, and click Evaluate to run one tick.
To enable continuous polling, toggle the sensor to Running.
- Total size
- 210 kB
- Files
- 70
- Last updated
- Jun 14
- Pre-warmed CDN
- US EU US EU