RTIX / src /application /services /webhook.rs
github-actions
deploy: clean backend production release
d8ffec9
use crate::infrastructure::db::DbPool;
use crate::interfaces::http::api::RealtimeEvent;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{error, info, warn};
#[derive(Serialize, Deserialize, Clone, sqlx::FromRow)]
pub struct WebhookRecord {
pub webhook_id: String,
pub merchant_id: String,
pub url: String,
pub secret: String,
pub events: serde_json::Value,
}
pub struct WebhookService {
pool: DbPool,
client: Client,
}
impl WebhookService {
pub fn new(pool: DbPool) -> Self {
Self {
pool,
client: Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap(),
}
}
pub async fn run(&self, mut rx: broadcast::Receiver<RealtimeEvent>) {
info!("Webhook Service started.");
loop {
match rx.recv().await {
Ok(event) => {
let service = self.clone_self();
tokio::spawn(async move {
if let Err(e) = service.process_event(event).await {
error!("Error delivering webhook: {:?}", e);
}
});
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Webhook Service lagged by {} events", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
fn clone_self(&self) -> Arc<Self> {
Arc::new(Self {
pool: self.pool.clone(),
client: self.client.clone(),
})
}
async fn process_event(
&self,
event: RealtimeEvent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let merchant_id = match &event {
RealtimeEvent::OrderStatusChanged { merchant_id, .. } => merchant_id,
RealtimeEvent::NewOrder { merchant_id, .. } => merchant_id,
RealtimeEvent::RiskAlert { merchant_id, .. } => merchant_id,
_ => return Ok(()), // Ignore other events for webhooks for now
};
let event_type = match &event {
RealtimeEvent::OrderStatusChanged { new_status, .. } => {
format!("order.{}", new_status.to_lowercase().replace("_", "."))
}
RealtimeEvent::NewOrder { .. } => "order.created".to_string(),
RealtimeEvent::RiskAlert { .. } => "risk.alert".to_string(),
_ => "system.event".to_string(),
};
// Fetch active webhooks for this merchant
let webhooks = sqlx::query_as::<_, WebhookRecord>(
"SELECT webhook_id, merchant_id, url, secret, events FROM merchant_webhooks WHERE merchant_id = $1 AND is_active = TRUE"
)
.bind(merchant_id)
.fetch_all(&self.pool)
.await?;
for webhook in webhooks {
// Check if webhook is subscribed to this event
let subscribed_events: Vec<String> =
serde_json::from_value(webhook.events.clone()).unwrap_or_default();
if !subscribed_events.contains(&event_type)
&& !subscribed_events.contains(&"*".to_string())
{
continue;
}
self.deliver_webhook(webhook, event.clone(), &event_type)
.await;
}
Ok(())
}
async fn deliver_webhook(
&self,
webhook: WebhookRecord,
event: RealtimeEvent,
event_type: &str,
) {
let payload = serde_json::json!({
"event": event_type,
"timestamp": chrono::Utc::now().to_rfc3339(),
"data": event
});
let payload_str = payload.to_string();
// HMAC-SHA256 signature for security
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let mut mac = HmacSha256::new_from_slice(webhook.secret.as_bytes())
.expect("HMAC can take key of any size");
mac.update(payload_str.as_bytes());
let signature = hex::encode(mac.finalize().into_bytes());
let max_attempts = 3;
let mut attempt = 0;
let mut delay = std::time::Duration::from_secs(1);
while attempt < max_attempts {
let attempt_number = attempt + 1;
let (status_code, success, response_body) = match self
.client
.post(&webhook.url)
.header("Content-Type", "application/json")
.header("X-Rtix-Signature", &signature)
.header("X-Rtix-Event", event_type)
.body(payload_str.clone())
.send()
.await
{
Ok(resp) => {
let status = resp.status();
let sc = Some(status.as_u16() as i32);
let succ = status.is_success();
let body = match resp.text().await {
Ok(t) => Some(t),
Err(e) => Some(e.to_string()),
};
if succ {
info!(
"Webhook {} delivered successfully to {} on attempt {}",
webhook.webhook_id, webhook.url, attempt_number
);
} else {
warn!(
"Webhook {} delivery failed with status {} on attempt {}/{} to {}",
webhook.webhook_id, status, attempt_number, max_attempts, webhook.url
);
}
(sc, succ, body)
}
Err(e) => {
let body = Some(e.to_string());
error!(
"Webhook {} delivery error on attempt {}/{} to {}: {}",
webhook.webhook_id, attempt_number, max_attempts, webhook.url, e
);
(None, false, body)
}
};
// Insert a log record
let log_id = uuid::Uuid::new_v4().to_string();
let log_query = "INSERT INTO webhook_delivery_logs (log_id, webhook_id, merchant_id, url, event_type, payload, status_code, success, response_body, attempt_number) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)";
if let Err(err) = sqlx::query(log_query)
.bind(&log_id)
.bind(&webhook.webhook_id)
.bind(&webhook.merchant_id)
.bind(&webhook.url)
.bind(event_type)
.bind(&payload)
.bind(status_code)
.bind(success)
.bind(&response_body)
.bind(attempt_number)
.execute(&self.pool)
.await
{
error!("Failed to write webhook delivery log: {}", err);
}
if success {
return;
}
attempt += 1;
if attempt < max_attempts {
tokio::time::sleep(delay).await;
delay *= 2;
}
}
}
}