Spaces:
Running
Running
| use crate::application::services::MerchantService; | |
| use crate::domain::constants::*; | |
| use crate::infrastructure::db::DbPool; | |
| use futures_util::stream::{self, StreamExt}; | |
| use std::sync::Arc; | |
| use std::time::Duration; | |
| use tokio::time::sleep; | |
| pub struct ProtocolSentinel { | |
| merchant_service: Arc<dyn MerchantService>, | |
| pool: DbPool, | |
| } | |
| impl ProtocolSentinel { | |
| pub fn new(merchant_service: Arc<dyn MerchantService>, pool: DbPool) -> Self { | |
| Self { | |
| merchant_service, | |
| pool, | |
| } | |
| } | |
| pub async fn run(&self) { | |
| tracing::info!("Protocol Sentinel Active: Secure Health Guard Initialized."); | |
| loop { | |
| // 1. Process Auto-Settlements (Liquidity Guard) | |
| if let Err(e) = self.process_auto_settlements().await { | |
| tracing::error!("Sentinel Error [Auto-Settlement]: {:?}", e); | |
| } | |
| // 2. Cleanup Stale Intents (Resource Guard) | |
| if let Err(e) = self.cleanup_stale_intents().await { | |
| tracing::error!("Sentinel Error [Stale-Cleanup]: {:?}", e); | |
| } | |
| // 3. Finalize Aged Deliveries (Custodial Guard) | |
| if let Err(e) = self.enforce_custodial_deadlines().await { | |
| tracing::error!("Sentinel Error [Custodial-Enforcement]: {:?}", e); | |
| } | |
| // 4. Escalate Stale Forensic Holds (Arbitration Guard) | |
| if let Err(e) = self.escalate_stale_holds().await { | |
| tracing::error!("Sentinel Error [Hold-Escalation]: {:?}", e); | |
| } | |
| // 5. Generate Monthly Billing (Postpaid Billing Guard) | |
| if let Err(e) = self.generate_monthly_billing().await { | |
| tracing::error!("Sentinel Error [Monthly-Billing]: {:?}", e); | |
| } | |
| // 6. Freeze Overdue Merchants (Billing Enforcement Guard) | |
| if let Err(e) = self.freeze_overdue_merchants().await { | |
| tracing::error!("Sentinel Error [Billing-Enforcement]: {:?}", e); | |
| } | |
| // ─── Jitter Sleep ────────────────────────────────────────────────── | |
| // Add ±300s random jitter to the 1-hour base interval. | |
| // This prevents the thundering herd problem on cold starts / restarts | |
| // where every replica would otherwise fire all tasks in perfect sync, | |
| // creating a coordinated spike against the database every 3600 seconds. | |
| let jitter_secs = rand::random::<u64>() % 300; | |
| let sleep_duration = Duration::from_secs(3600 + jitter_secs); | |
| tracing::debug!( | |
| "Protocol Sentinel: sleeping {}s until next pulse (base 3600s + {}s jitter)", | |
| sleep_duration.as_secs(), | |
| jitter_secs | |
| ); | |
| sleep(sleep_duration).await; | |
| } | |
| } | |
| async fn escalate_stale_holds(&self) -> Result<(), sqlx::Error> { | |
| // Escalate DISPUTED_HELD orders after 48 hours to DISPUTED_IN_REVIEW | |
| let result = sqlx::query( | |
| "UPDATE orders SET status = $1 WHERE status = $2 AND created_at < CURRENT_TIMESTAMP - INTERVAL '48 hours'" | |
| ) | |
| .bind(ORDER_STATUS_DISPUTED) | |
| .bind(ORDER_STATUS_DISPUTED_HELD) | |
| .execute(&self.pool) | |
| .await?; | |
| if result.rows_affected() > 0 { | |
| tracing::info!( | |
| "Sentinel: Escalated {} stale forensic holds to full review.", | |
| result.rows_affected() | |
| ); | |
| } | |
| Ok(()) | |
| } | |
| async fn process_auto_settlements(&self) -> Result<(), sqlx::Error> { | |
| // ── FOR UPDATE SKIP LOCKED ───────────────────────────────────────────── | |
| // SKIP LOCKED ensures each replica claims a disjoint set of rows. | |
| let eligible_orders = sqlx::query( | |
| r#" | |
| SELECT o.transaction_id, o.merchant_id, m.auto_settle_threshold, o.risk_score | |
| FROM orders o | |
| JOIN merchants m ON o.merchant_id = m.merchant_id | |
| WHERE o.status = $1 | |
| AND o.delivered_at < CURRENT_TIMESTAMP - INTERVAL '24 hours' | |
| AND o.risk_score <= m.auto_settle_threshold | |
| FOR UPDATE SKIP LOCKED | |
| "#, | |
| ) | |
| .bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL) | |
| .fetch_all(&self.pool) | |
| .await?; | |
| let merchant_service = self.merchant_service.clone(); | |
| let _results = stream::iter(eligible_orders) | |
| .map(|order| { | |
| let ms = merchant_service.clone(); | |
| async move { | |
| use sqlx::Row; | |
| let tid: String = order.get("transaction_id"); | |
| let mid: String = order.get("merchant_id"); | |
| let _ = ms.approve_settlement(&mid, &tid, None, None, None).await; | |
| } | |
| }) | |
| .buffer_unordered(10) | |
| .collect::<Vec<()>>() | |
| .await; | |
| Ok(()) | |
| } | |
| async fn cleanup_stale_intents(&self) -> Result<(), sqlx::Error> { | |
| // Expire orders stuck in PENDING_PAYMENT for > 6 hours | |
| let result = sqlx::query( | |
| "UPDATE orders SET status = 'EXPIRED_VOID' WHERE status = $1 AND created_at < CURRENT_TIMESTAMP - INTERVAL '6 hours'" | |
| ) | |
| .bind(ORDER_STATUS_PENDING_PAYMENT) | |
| .execute(&self.pool) | |
| .await?; | |
| if result.rows_affected() > 0 { | |
| tracing::info!( | |
| "Sentinel: Purged {} stale payment intents.", | |
| result.rows_affected() | |
| ); | |
| } | |
| Ok(()) | |
| } | |
| async fn enforce_custodial_deadlines(&self) -> Result<(), sqlx::Error> { | |
| // ── FOR UPDATE SKIP LOCKED ───────────────────────────────────────────── | |
| let eligible_orders = sqlx::query( | |
| "SELECT transaction_id, merchant_id FROM orders WHERE status = $1 AND delivered_at < CURRENT_TIMESTAMP - INTERVAL '48 hours' FOR UPDATE SKIP LOCKED" | |
| ) | |
| .bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL) | |
| .fetch_all(&self.pool) | |
| .await?; | |
| let merchant_service = self.merchant_service.clone(); | |
| let _results = stream::iter(eligible_orders) | |
| .map(|order| { | |
| let ms = merchant_service.clone(); | |
| async move { | |
| use sqlx::Row; | |
| let tid: String = order.get("transaction_id"); | |
| let mid: String = order.get("merchant_id"); | |
| tracing::info!( | |
| "Sentinel: Enforcing custodial deadline for {}. Finalizing liquidity.", | |
| tid | |
| ); | |
| let _ = ms.approve_settlement(&mid, &tid, None, None, None).await; | |
| } | |
| }) | |
| .buffer_unordered(10) | |
| .collect::<Vec<()>>() | |
| .await; | |
| Ok(()) | |
| } | |
| pub async fn generate_monthly_billing(&self) -> Result<(), sqlx::Error> { | |
| // Query merchants whose billing_cycle_start has passed 30 days | |
| let merchants = sqlx::query( | |
| "SELECT merchant_id, billing_cycle_start FROM merchants WHERE billing_cycle_start <= CURRENT_TIMESTAMP - INTERVAL '30 days'" | |
| ) | |
| .fetch_all(&self.pool) | |
| .await?; | |
| for row in merchants { | |
| use sqlx::Row; | |
| let merchant_id: String = row.get("merchant_id"); | |
| let billing_cycle_start: chrono::NaiveDateTime = row.get("billing_cycle_start"); | |
| let mut tx = self.pool.begin().await?; | |
| // Count successful orders placed within this billing cycle | |
| // Statuses like PENDING_PAYMENT, PAYMENT_FAILED, EXPIRED_VOID are excluded. | |
| let order_count: i32 = sqlx::query_scalar( | |
| "SELECT COUNT(*)::INT FROM orders \ | |
| WHERE merchant_id = $1 \ | |
| AND created_at >= $2 \ | |
| AND created_at < $2 + INTERVAL '30 days' \ | |
| AND status NOT IN ('PENDING_PAYMENT', 'PAYMENT_FAILED', 'EXPIRED_VOID')" | |
| ) | |
| .bind(&merchant_id) | |
| .bind(billing_cycle_start) | |
| .fetch_one(&mut *tx) | |
| .await?; | |
| let amount_inr = order_count as f64 * 2.0; | |
| let invoice_id = format!("INV-{}", uuid::Uuid::new_v4().to_string()[..8].to_uppercase()); | |
| // Generate invoice | |
| sqlx::query( | |
| "INSERT INTO merchant_invoices (invoice_id, merchant_id, amount_inr, order_count, status, billing_period_start, billing_period_end, due_at) \ | |
| VALUES ($1, $2, $3, $4, 'UNPAID', $5, $5 + INTERVAL '30 days', CURRENT_TIMESTAMP + INTERVAL '7 days')" | |
| ) | |
| .bind(&invoice_id) | |
| .bind(&merchant_id) | |
| .bind(amount_inr) | |
| .bind(order_count) | |
| .bind(billing_cycle_start) | |
| .execute(&mut *tx) | |
| .await?; | |
| // Update merchant's billing_cycle_start | |
| sqlx::query( | |
| "UPDATE merchants SET billing_cycle_start = billing_cycle_start + INTERVAL '30 days' WHERE merchant_id = $1" | |
| ) | |
| .bind(&merchant_id) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await?; | |
| tracing::info!( | |
| merchant_id = %merchant_id, | |
| invoice_id = %invoice_id, | |
| amount = amount_inr, | |
| "Sentinel: Generated monthly postpaid invoice." | |
| ); | |
| } | |
| Ok(()) | |
| } | |
| pub async fn freeze_overdue_merchants(&self) -> Result<(), sqlx::Error> { | |
| // Find merchants with unpaid/overdue invoices past due_at | |
| let overdue_merchants = sqlx::query( | |
| "SELECT DISTINCT merchant_id FROM merchant_invoices WHERE status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP" | |
| ) | |
| .fetch_all(&self.pool) | |
| .await?; | |
| for row in overdue_merchants { | |
| use sqlx::Row; | |
| let merchant_id: String = row.get("merchant_id"); | |
| let mut tx = self.pool.begin().await?; | |
| // Freeze merchant account | |
| sqlx::query("UPDATE merchants SET is_frozen = TRUE WHERE merchant_id = $1") | |
| .bind(&merchant_id) | |
| .execute(&mut *tx) | |
| .await?; | |
| // Mark invoice(s) as OVERDUE | |
| sqlx::query("UPDATE merchant_invoices SET status = 'OVERDUE' WHERE merchant_id = $1 AND status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP") | |
| .bind(&merchant_id) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await?; | |
| tracing::warn!( | |
| merchant_id = %merchant_id, | |
| "Sentinel: Merchant account frozen due to overdue invoice." | |
| ); | |
| } | |
| // Auto-unfreeze merchants who have no outstanding unpaid/overdue invoices past due date | |
| // (This acts as a self-healing sweep) | |
| let unfrozen = sqlx::query( | |
| "UPDATE merchants SET is_frozen = FALSE \ | |
| WHERE is_frozen = TRUE \ | |
| AND merchant_id NOT IN ( \ | |
| SELECT DISTINCT merchant_id FROM merchant_invoices \ | |
| WHERE status IN ('UNPAID', 'OVERDUE') AND due_at < CURRENT_TIMESTAMP \ | |
| )" | |
| ) | |
| .execute(&self.pool) | |
| .await?; | |
| if unfrozen.rows_affected() > 0 { | |
| tracing::info!( | |
| "Sentinel: Auto-unfroze {} merchants with settled invoices.", | |
| unfrozen.rows_affected() | |
| ); | |
| } | |
| Ok(()) | |
| } | |
| } | |