use crate::application::services::MerchantService; use crate::domain::constants::*; use crate::infrastructure::db::DbPool; use futures_util::stream::{self, StreamExt}; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; pub struct ProtocolSentinel { merchant_service: Arc, pool: DbPool, } impl ProtocolSentinel { pub fn new(merchant_service: Arc, pool: DbPool) -> Self { Self { merchant_service, pool, } } pub async fn run(&self) { tracing::info!("Protocol Sentinel Active: Secure Health Guard Initialized."); loop { // 1. Process Auto-Settlements (Liquidity Guard) if let Err(e) = self.process_auto_settlements().await { tracing::error!("Sentinel Error [Auto-Settlement]: {:?}", e); } // 2. Cleanup Stale Intents (Resource Guard) if let Err(e) = self.cleanup_stale_intents().await { tracing::error!("Sentinel Error [Stale-Cleanup]: {:?}", e); } // 3. Finalize Aged Deliveries (Custodial Guard) if let Err(e) = self.enforce_custodial_deadlines().await { tracing::error!("Sentinel Error [Custodial-Enforcement]: {:?}", e); } // 4. Escalate Stale Forensic Holds (Arbitration Guard) if let Err(e) = self.escalate_stale_holds().await { tracing::error!("Sentinel Error [Hold-Escalation]: {:?}", e); } // 5. Generate Monthly Billing (Postpaid Billing Guard) if let Err(e) = self.generate_monthly_billing().await { tracing::error!("Sentinel Error [Monthly-Billing]: {:?}", e); } // 6. Freeze Overdue Merchants (Billing Enforcement Guard) if let Err(e) = self.freeze_overdue_merchants().await { tracing::error!("Sentinel Error [Billing-Enforcement]: {:?}", e); } // ─── Jitter Sleep ────────────────────────────────────────────────── // Add ±300s random jitter to the 1-hour base interval. // This prevents the thundering herd problem on cold starts / restarts // where every replica would otherwise fire all tasks in perfect sync, // creating a coordinated spike against the database every 3600 seconds. let jitter_secs = rand::random::() % 300; let sleep_duration = Duration::from_secs(3600 + jitter_secs); tracing::debug!( "Protocol Sentinel: sleeping {}s until next pulse (base 3600s + {}s jitter)", sleep_duration.as_secs(), jitter_secs ); sleep(sleep_duration).await; } } async fn escalate_stale_holds(&self) -> Result<(), sqlx::Error> { // Escalate DISPUTED_HELD orders after 48 hours to DISPUTED_IN_REVIEW let result = sqlx::query( "UPDATE orders SET status = $1 WHERE status = $2 AND created_at < CURRENT_TIMESTAMP - INTERVAL '48 hours'" ) .bind(ORDER_STATUS_DISPUTED) .bind(ORDER_STATUS_DISPUTED_HELD) .execute(&self.pool) .await?; if result.rows_affected() > 0 { tracing::info!( "Sentinel: Escalated {} stale forensic holds to full review.", result.rows_affected() ); } Ok(()) } async fn process_auto_settlements(&self) -> Result<(), sqlx::Error> { // ── FOR UPDATE SKIP LOCKED ───────────────────────────────────────────── // SKIP LOCKED ensures each replica claims a disjoint set of rows. let eligible_orders = sqlx::query( r#" SELECT o.transaction_id, o.merchant_id, m.auto_settle_threshold, o.risk_score FROM orders o JOIN merchants m ON o.merchant_id = m.merchant_id WHERE o.status = $1 AND o.delivered_at < CURRENT_TIMESTAMP - INTERVAL '24 hours' AND o.risk_score <= m.auto_settle_threshold FOR UPDATE SKIP LOCKED "#, ) .bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL) .fetch_all(&self.pool) .await?; let merchant_service = self.merchant_service.clone(); let _results = stream::iter(eligible_orders) .map(|order| { let ms = merchant_service.clone(); async move { use sqlx::Row; let tid: String = order.get("transaction_id"); let mid: String = order.get("merchant_id"); let _ = ms.approve_settlement(&mid, &tid, None, None, None).await; } }) .buffer_unordered(10) .collect::>() .await; Ok(()) } async fn cleanup_stale_intents(&self) -> Result<(), sqlx::Error> { // Expire orders stuck in PENDING_PAYMENT for > 6 hours let result = sqlx::query( "UPDATE orders SET status = 'EXPIRED_VOID' WHERE status = $1 AND created_at < CURRENT_TIMESTAMP - INTERVAL '6 hours'" ) .bind(ORDER_STATUS_PENDING_PAYMENT) .execute(&self.pool) .await?; if result.rows_affected() > 0 { tracing::info!( "Sentinel: Purged {} stale payment intents.", result.rows_affected() ); } Ok(()) } async fn enforce_custodial_deadlines(&self) -> Result<(), sqlx::Error> { // ── FOR UPDATE SKIP LOCKED ───────────────────────────────────────────── let eligible_orders = sqlx::query( "SELECT transaction_id, merchant_id FROM orders WHERE status = $1 AND delivered_at < CURRENT_TIMESTAMP - INTERVAL '48 hours' FOR UPDATE SKIP LOCKED" ) .bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL) .fetch_all(&self.pool) .await?; let merchant_service = self.merchant_service.clone(); let _results = stream::iter(eligible_orders) .map(|order| { let ms = merchant_service.clone(); async move { use sqlx::Row; let tid: String = order.get("transaction_id"); let mid: String = order.get("merchant_id"); tracing::info!( "Sentinel: Enforcing custodial deadline for {}. Finalizing liquidity.", tid ); let _ = ms.approve_settlement(&mid, &tid, None, None, None).await; } }) .buffer_unordered(10) .collect::>() .await; Ok(()) } pub async fn generate_monthly_billing(&self) -> Result<(), sqlx::Error> { // Query merchants whose billing_cycle_start has passed 30 days let merchants = sqlx::query( "SELECT merchant_id, billing_cycle_start FROM merchants WHERE billing_cycle_start <= CURRENT_TIMESTAMP - INTERVAL '30 days'" ) .fetch_all(&self.pool) .await?; for row in merchants { use sqlx::Row; let merchant_id: String = row.get("merchant_id"); let billing_cycle_start: chrono::NaiveDateTime = row.get("billing_cycle_start"); let mut tx = self.pool.begin().await?; // Count successful orders placed within this billing cycle // Statuses like PENDING_PAYMENT, PAYMENT_FAILED, EXPIRED_VOID are excluded. let order_count: i32 = sqlx::query_scalar( "SELECT COUNT(*)::INT FROM orders \ WHERE merchant_id = $1 \ AND created_at >= $2 \ AND created_at < $2 + INTERVAL '30 days' \ AND status NOT IN ('PENDING_PAYMENT', 'PAYMENT_FAILED', 'EXPIRED_VOID')" ) .bind(&merchant_id) .bind(billing_cycle_start) .fetch_one(&mut *tx) .await?; let amount_inr = order_count as f64 * 2.0; let invoice_id = format!("INV-{}", uuid::Uuid::new_v4().to_string()[..8].to_uppercase()); // Generate invoice sqlx::query( "INSERT INTO merchant_invoices (invoice_id, merchant_id, amount_inr, order_count, status, billing_period_start, billing_period_end, due_at) \ VALUES ($1, $2, $3, $4, 'UNPAID', $5, $5 + INTERVAL '30 days', CURRENT_TIMESTAMP + INTERVAL '7 days')" ) .bind(&invoice_id) .bind(&merchant_id) .bind(amount_inr) .bind(order_count) .bind(billing_cycle_start) .execute(&mut *tx) .await?; // Update merchant's billing_cycle_start sqlx::query( "UPDATE merchants SET billing_cycle_start = billing_cycle_start + INTERVAL '30 days' WHERE merchant_id = $1" ) .bind(&merchant_id) .execute(&mut *tx) .await?; tx.commit().await?; tracing::info!( merchant_id = %merchant_id, invoice_id = %invoice_id, amount = amount_inr, "Sentinel: Generated monthly postpaid invoice." ); } Ok(()) } pub async fn freeze_overdue_merchants(&self) -> Result<(), sqlx::Error> { // Find merchants with unpaid/overdue invoices past due_at let overdue_merchants = sqlx::query( "SELECT DISTINCT merchant_id FROM merchant_invoices WHERE status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP" ) .fetch_all(&self.pool) .await?; for row in overdue_merchants { use sqlx::Row; let merchant_id: String = row.get("merchant_id"); let mut tx = self.pool.begin().await?; // Freeze merchant account sqlx::query("UPDATE merchants SET is_frozen = TRUE WHERE merchant_id = $1") .bind(&merchant_id) .execute(&mut *tx) .await?; // Mark invoice(s) as OVERDUE sqlx::query("UPDATE merchant_invoices SET status = 'OVERDUE' WHERE merchant_id = $1 AND status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP") .bind(&merchant_id) .execute(&mut *tx) .await?; tx.commit().await?; tracing::warn!( merchant_id = %merchant_id, "Sentinel: Merchant account frozen due to overdue invoice." ); } // Auto-unfreeze merchants who have no outstanding unpaid/overdue invoices past due date // (This acts as a self-healing sweep) let unfrozen = sqlx::query( "UPDATE merchants SET is_frozen = FALSE \ WHERE is_frozen = TRUE \ AND merchant_id NOT IN ( \ SELECT DISTINCT merchant_id FROM merchant_invoices \ WHERE status IN ('UNPAID', 'OVERDUE') AND due_at < CURRENT_TIMESTAMP \ )" ) .execute(&self.pool) .await?; if unfrozen.rows_affected() > 0 { tracing::info!( "Sentinel: Auto-unfroze {} merchants with settled invoices.", unfrozen.rows_affected() ); } Ok(()) } }