Spaces:
Running
Running
| use crate::infrastructure::db::DbPool; | |
| use crate::interfaces::http::api::RealtimeEvent; | |
| use reqwest::Client; | |
| use serde::{Deserialize, Serialize}; | |
| use std::sync::Arc; | |
| use tokio::sync::broadcast; | |
| use tracing::{error, info, warn}; | |
| pub struct WebhookRecord { | |
| pub webhook_id: String, | |
| pub merchant_id: String, | |
| pub url: String, | |
| pub secret: String, | |
| pub events: serde_json::Value, | |
| } | |
| pub struct WebhookService { | |
| pool: DbPool, | |
| client: Client, | |
| } | |
| impl WebhookService { | |
| pub fn new(pool: DbPool) -> Self { | |
| Self { | |
| pool, | |
| client: Client::builder() | |
| .timeout(std::time::Duration::from_secs(10)) | |
| .build() | |
| .unwrap(), | |
| } | |
| } | |
| pub async fn run(&self, mut rx: broadcast::Receiver<RealtimeEvent>) { | |
| info!("Webhook Service started."); | |
| loop { | |
| match rx.recv().await { | |
| Ok(event) => { | |
| let service = self.clone_self(); | |
| tokio::spawn(async move { | |
| if let Err(e) = service.process_event(event).await { | |
| error!("Error delivering webhook: {:?}", e); | |
| } | |
| }); | |
| } | |
| Err(broadcast::error::RecvError::Lagged(n)) => { | |
| warn!("Webhook Service lagged by {} events", n); | |
| } | |
| Err(broadcast::error::RecvError::Closed) => break, | |
| } | |
| } | |
| } | |
| fn clone_self(&self) -> Arc<Self> { | |
| Arc::new(Self { | |
| pool: self.pool.clone(), | |
| client: self.client.clone(), | |
| }) | |
| } | |
| async fn process_event( | |
| &self, | |
| event: RealtimeEvent, | |
| ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| let merchant_id = match &event { | |
| RealtimeEvent::OrderStatusChanged { merchant_id, .. } => merchant_id, | |
| RealtimeEvent::NewOrder { merchant_id, .. } => merchant_id, | |
| RealtimeEvent::RiskAlert { merchant_id, .. } => merchant_id, | |
| _ => return Ok(()), // Ignore other events for webhooks for now | |
| }; | |
| let event_type = match &event { | |
| RealtimeEvent::OrderStatusChanged { new_status, .. } => { | |
| format!("order.{}", new_status.to_lowercase().replace("_", ".")) | |
| } | |
| RealtimeEvent::NewOrder { .. } => "order.created".to_string(), | |
| RealtimeEvent::RiskAlert { .. } => "risk.alert".to_string(), | |
| _ => "system.event".to_string(), | |
| }; | |
| // Fetch active webhooks for this merchant | |
| let webhooks = sqlx::query_as::<_, WebhookRecord>( | |
| "SELECT webhook_id, merchant_id, url, secret, events FROM merchant_webhooks WHERE merchant_id = $1 AND is_active = TRUE" | |
| ) | |
| .bind(merchant_id) | |
| .fetch_all(&self.pool) | |
| .await?; | |
| for webhook in webhooks { | |
| // Check if webhook is subscribed to this event | |
| let subscribed_events: Vec<String> = | |
| serde_json::from_value(webhook.events.clone()).unwrap_or_default(); | |
| if !subscribed_events.contains(&event_type) | |
| && !subscribed_events.contains(&"*".to_string()) | |
| { | |
| continue; | |
| } | |
| self.deliver_webhook(webhook, event.clone(), &event_type) | |
| .await; | |
| } | |
| Ok(()) | |
| } | |
| async fn deliver_webhook( | |
| &self, | |
| webhook: WebhookRecord, | |
| event: RealtimeEvent, | |
| event_type: &str, | |
| ) { | |
| let payload = serde_json::json!({ | |
| "event": event_type, | |
| "timestamp": chrono::Utc::now().to_rfc3339(), | |
| "data": event | |
| }); | |
| let payload_str = payload.to_string(); | |
| // HMAC-SHA256 signature for security | |
| use hmac::{Hmac, Mac}; | |
| use sha2::Sha256; | |
| type HmacSha256 = Hmac<Sha256>; | |
| let mut mac = HmacSha256::new_from_slice(webhook.secret.as_bytes()) | |
| .expect("HMAC can take key of any size"); | |
| mac.update(payload_str.as_bytes()); | |
| let signature = hex::encode(mac.finalize().into_bytes()); | |
| let max_attempts = 3; | |
| let mut attempt = 0; | |
| let mut delay = std::time::Duration::from_secs(1); | |
| while attempt < max_attempts { | |
| let attempt_number = attempt + 1; | |
| let (status_code, success, response_body) = match self | |
| .client | |
| .post(&webhook.url) | |
| .header("Content-Type", "application/json") | |
| .header("X-Rtix-Signature", &signature) | |
| .header("X-Rtix-Event", event_type) | |
| .body(payload_str.clone()) | |
| .send() | |
| .await | |
| { | |
| Ok(resp) => { | |
| let status = resp.status(); | |
| let sc = Some(status.as_u16() as i32); | |
| let succ = status.is_success(); | |
| let body = match resp.text().await { | |
| Ok(t) => Some(t), | |
| Err(e) => Some(e.to_string()), | |
| }; | |
| if succ { | |
| info!( | |
| "Webhook {} delivered successfully to {} on attempt {}", | |
| webhook.webhook_id, webhook.url, attempt_number | |
| ); | |
| } else { | |
| warn!( | |
| "Webhook {} delivery failed with status {} on attempt {}/{} to {}", | |
| webhook.webhook_id, status, attempt_number, max_attempts, webhook.url | |
| ); | |
| } | |
| (sc, succ, body) | |
| } | |
| Err(e) => { | |
| let body = Some(e.to_string()); | |
| error!( | |
| "Webhook {} delivery error on attempt {}/{} to {}: {}", | |
| webhook.webhook_id, attempt_number, max_attempts, webhook.url, e | |
| ); | |
| (None, false, body) | |
| } | |
| }; | |
| // Insert a log record | |
| let log_id = uuid::Uuid::new_v4().to_string(); | |
| let log_query = "INSERT INTO webhook_delivery_logs (log_id, webhook_id, merchant_id, url, event_type, payload, status_code, success, response_body, attempt_number) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"; | |
| if let Err(err) = sqlx::query(log_query) | |
| .bind(&log_id) | |
| .bind(&webhook.webhook_id) | |
| .bind(&webhook.merchant_id) | |
| .bind(&webhook.url) | |
| .bind(event_type) | |
| .bind(&payload) | |
| .bind(status_code) | |
| .bind(success) | |
| .bind(&response_body) | |
| .bind(attempt_number) | |
| .execute(&self.pool) | |
| .await | |
| { | |
| error!("Failed to write webhook delivery log: {}", err); | |
| } | |
| if success { | |
| return; | |
| } | |
| attempt += 1; | |
| if attempt < max_attempts { | |
| tokio::time::sleep(delay).await; | |
| delay *= 2; | |
| } | |
| } | |
| } | |
| } | |