use axum::{ body::Body, extract::{Path, State}, http::{header, StatusCode}, response::Response, }; use std::sync::Arc; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use super::routes::AppState; pub async fn events( State(state): State>, Path(id): Path, ) -> Result, StatusCode> { let rx = state.solver.subscribe(&id).ok_or(StatusCode::NOT_FOUND)?; let bootstrap_json = state .solver .bootstrap_event(&id) .map_err(|_| StatusCode::NOT_FOUND)?; let bootstrap_event_sequence = event_sequence_from_json(&bootstrap_json); let bootstrap = tokio_stream::iter(std::iter::once(Ok::<_, std::convert::Infallible>( format!("data: {}\n\n", bootstrap_json).into_bytes(), ))); let live = BroadcastStream::new(rx).filter_map(move |msg| match msg { Ok(json) => { if event_is_not_newer(&json, bootstrap_event_sequence) { return None; } Some(Ok::<_, std::convert::Infallible>( format!("data: {}\n\n", json).into_bytes(), )) } Err(_) => None, // Lagged - skip missed messages }); let stream = bootstrap.chain(live); Ok(Response::builder() .header(header::CONTENT_TYPE, "text/event-stream") .header(header::CACHE_CONTROL, "no-cache") .header("X-Accel-Buffering", "no") .body(Body::from_stream(stream)) .unwrap()) } fn event_sequence_from_json(json: &str) -> Option { serde_json::from_str::(json) .ok() .and_then(|value| { value .get("eventSequence") .and_then(serde_json::Value::as_u64) }) } fn event_is_not_newer(json: &str, bootstrap_event_sequence: Option) -> bool { let Some(bootstrap_event_sequence) = bootstrap_event_sequence else { return false; }; event_sequence_from_json(json) .is_some_and(|event_sequence| event_sequence <= bootstrap_event_sequence) }