use crate::infrastructure::db::DbPool; use crate::interfaces::http::api::RealtimeEvent; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::broadcast; use tracing::{error, info, warn}; #[derive(Serialize, Deserialize, Clone, sqlx::FromRow)] pub struct WebhookRecord { pub webhook_id: String, pub merchant_id: String, pub url: String, pub secret: String, pub events: serde_json::Value, } pub struct WebhookService { pool: DbPool, client: Client, } impl WebhookService { pub fn new(pool: DbPool) -> Self { Self { pool, client: Client::builder() .timeout(std::time::Duration::from_secs(10)) .build() .unwrap(), } } pub async fn run(&self, mut rx: broadcast::Receiver) { info!("Webhook Service started."); loop { match rx.recv().await { Ok(event) => { let service = self.clone_self(); tokio::spawn(async move { if let Err(e) = service.process_event(event).await { error!("Error delivering webhook: {:?}", e); } }); } Err(broadcast::error::RecvError::Lagged(n)) => { warn!("Webhook Service lagged by {} events", n); } Err(broadcast::error::RecvError::Closed) => break, } } } fn clone_self(&self) -> Arc { Arc::new(Self { pool: self.pool.clone(), client: self.client.clone(), }) } async fn process_event( &self, event: RealtimeEvent, ) -> Result<(), Box> { let merchant_id = match &event { RealtimeEvent::OrderStatusChanged { merchant_id, .. } => merchant_id, RealtimeEvent::NewOrder { merchant_id, .. } => merchant_id, RealtimeEvent::RiskAlert { merchant_id, .. } => merchant_id, _ => return Ok(()), // Ignore other events for webhooks for now }; let event_type = match &event { RealtimeEvent::OrderStatusChanged { new_status, .. } => { format!("order.{}", new_status.to_lowercase().replace("_", ".")) } RealtimeEvent::NewOrder { .. } => "order.created".to_string(), RealtimeEvent::RiskAlert { .. } => "risk.alert".to_string(), _ => "system.event".to_string(), }; // Fetch active webhooks for this merchant let webhooks = sqlx::query_as::<_, WebhookRecord>( "SELECT webhook_id, merchant_id, url, secret, events FROM merchant_webhooks WHERE merchant_id = $1 AND is_active = TRUE" ) .bind(merchant_id) .fetch_all(&self.pool) .await?; for webhook in webhooks { // Check if webhook is subscribed to this event let subscribed_events: Vec = serde_json::from_value(webhook.events.clone()).unwrap_or_default(); if !subscribed_events.contains(&event_type) && !subscribed_events.contains(&"*".to_string()) { continue; } self.deliver_webhook(webhook, event.clone(), &event_type) .await; } Ok(()) } async fn deliver_webhook( &self, webhook: WebhookRecord, event: RealtimeEvent, event_type: &str, ) { let payload = serde_json::json!({ "event": event_type, "timestamp": chrono::Utc::now().to_rfc3339(), "data": event }); let payload_str = payload.to_string(); // HMAC-SHA256 signature for security use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; let mut mac = HmacSha256::new_from_slice(webhook.secret.as_bytes()) .expect("HMAC can take key of any size"); mac.update(payload_str.as_bytes()); let signature = hex::encode(mac.finalize().into_bytes()); let max_attempts = 3; let mut attempt = 0; let mut delay = std::time::Duration::from_secs(1); while attempt < max_attempts { let attempt_number = attempt + 1; let (status_code, success, response_body) = match self .client .post(&webhook.url) .header("Content-Type", "application/json") .header("X-Rtix-Signature", &signature) .header("X-Rtix-Event", event_type) .body(payload_str.clone()) .send() .await { Ok(resp) => { let status = resp.status(); let sc = Some(status.as_u16() as i32); let succ = status.is_success(); let body = match resp.text().await { Ok(t) => Some(t), Err(e) => Some(e.to_string()), }; if succ { info!( "Webhook {} delivered successfully to {} on attempt {}", webhook.webhook_id, webhook.url, attempt_number ); } else { warn!( "Webhook {} delivery failed with status {} on attempt {}/{} to {}", webhook.webhook_id, status, attempt_number, max_attempts, webhook.url ); } (sc, succ, body) } Err(e) => { let body = Some(e.to_string()); error!( "Webhook {} delivery error on attempt {}/{} to {}: {}", webhook.webhook_id, attempt_number, max_attempts, webhook.url, e ); (None, false, body) } }; // Insert a log record let log_id = uuid::Uuid::new_v4().to_string(); let log_query = "INSERT INTO webhook_delivery_logs (log_id, webhook_id, merchant_id, url, event_type, payload, status_code, success, response_body, attempt_number) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"; if let Err(err) = sqlx::query(log_query) .bind(&log_id) .bind(&webhook.webhook_id) .bind(&webhook.merchant_id) .bind(&webhook.url) .bind(event_type) .bind(&payload) .bind(status_code) .bind(success) .bind(&response_body) .bind(attempt_number) .execute(&self.pool) .await { error!("Failed to write webhook delivery log: {}", err); } if success { return; } attempt += 1; if attempt < max_attempts { tokio::time::sleep(delay).await; delay *= 2; } } } }