RTIX / src /application /services /background.rs
github-actions
deploy: clean backend production release
d8ffec9
use crate::application::services::MerchantService;
use crate::domain::constants::*;
use crate::infrastructure::db::DbPool;
use futures_util::stream::{self, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
pub struct ProtocolSentinel {
merchant_service: Arc<dyn MerchantService>,
pool: DbPool,
}
impl ProtocolSentinel {
pub fn new(merchant_service: Arc<dyn MerchantService>, pool: DbPool) -> Self {
Self {
merchant_service,
pool,
}
}
pub async fn run(&self) {
tracing::info!("Protocol Sentinel Active: Secure Health Guard Initialized.");
loop {
// 1. Process Auto-Settlements (Liquidity Guard)
if let Err(e) = self.process_auto_settlements().await {
tracing::error!("Sentinel Error [Auto-Settlement]: {:?}", e);
}
// 2. Cleanup Stale Intents (Resource Guard)
if let Err(e) = self.cleanup_stale_intents().await {
tracing::error!("Sentinel Error [Stale-Cleanup]: {:?}", e);
}
// 3. Finalize Aged Deliveries (Custodial Guard)
if let Err(e) = self.enforce_custodial_deadlines().await {
tracing::error!("Sentinel Error [Custodial-Enforcement]: {:?}", e);
}
// 4. Escalate Stale Forensic Holds (Arbitration Guard)
if let Err(e) = self.escalate_stale_holds().await {
tracing::error!("Sentinel Error [Hold-Escalation]: {:?}", e);
}
// 5. Generate Monthly Billing (Postpaid Billing Guard)
if let Err(e) = self.generate_monthly_billing().await {
tracing::error!("Sentinel Error [Monthly-Billing]: {:?}", e);
}
// 6. Freeze Overdue Merchants (Billing Enforcement Guard)
if let Err(e) = self.freeze_overdue_merchants().await {
tracing::error!("Sentinel Error [Billing-Enforcement]: {:?}", e);
}
// ─── Jitter Sleep ──────────────────────────────────────────────────
// Add ±300s random jitter to the 1-hour base interval.
// This prevents the thundering herd problem on cold starts / restarts
// where every replica would otherwise fire all tasks in perfect sync,
// creating a coordinated spike against the database every 3600 seconds.
let jitter_secs = rand::random::<u64>() % 300;
let sleep_duration = Duration::from_secs(3600 + jitter_secs);
tracing::debug!(
"Protocol Sentinel: sleeping {}s until next pulse (base 3600s + {}s jitter)",
sleep_duration.as_secs(),
jitter_secs
);
sleep(sleep_duration).await;
}
}
async fn escalate_stale_holds(&self) -> Result<(), sqlx::Error> {
// Escalate DISPUTED_HELD orders after 48 hours to DISPUTED_IN_REVIEW
let result = sqlx::query(
"UPDATE orders SET status = $1 WHERE status = $2 AND created_at < CURRENT_TIMESTAMP - INTERVAL '48 hours'"
)
.bind(ORDER_STATUS_DISPUTED)
.bind(ORDER_STATUS_DISPUTED_HELD)
.execute(&self.pool)
.await?;
if result.rows_affected() > 0 {
tracing::info!(
"Sentinel: Escalated {} stale forensic holds to full review.",
result.rows_affected()
);
}
Ok(())
}
async fn process_auto_settlements(&self) -> Result<(), sqlx::Error> {
// ── FOR UPDATE SKIP LOCKED ─────────────────────────────────────────────
// SKIP LOCKED ensures each replica claims a disjoint set of rows.
let eligible_orders = sqlx::query(
r#"
SELECT o.transaction_id, o.merchant_id, m.auto_settle_threshold, o.risk_score
FROM orders o
JOIN merchants m ON o.merchant_id = m.merchant_id
WHERE o.status = $1
AND o.delivered_at < CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND o.risk_score <= m.auto_settle_threshold
FOR UPDATE SKIP LOCKED
"#,
)
.bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL)
.fetch_all(&self.pool)
.await?;
let merchant_service = self.merchant_service.clone();
let _results = stream::iter(eligible_orders)
.map(|order| {
let ms = merchant_service.clone();
async move {
use sqlx::Row;
let tid: String = order.get("transaction_id");
let mid: String = order.get("merchant_id");
let _ = ms.approve_settlement(&mid, &tid, None, None, None).await;
}
})
.buffer_unordered(10)
.collect::<Vec<()>>()
.await;
Ok(())
}
async fn cleanup_stale_intents(&self) -> Result<(), sqlx::Error> {
// Expire orders stuck in PENDING_PAYMENT for > 6 hours
let result = sqlx::query(
"UPDATE orders SET status = 'EXPIRED_VOID' WHERE status = $1 AND created_at < CURRENT_TIMESTAMP - INTERVAL '6 hours'"
)
.bind(ORDER_STATUS_PENDING_PAYMENT)
.execute(&self.pool)
.await?;
if result.rows_affected() > 0 {
tracing::info!(
"Sentinel: Purged {} stale payment intents.",
result.rows_affected()
);
}
Ok(())
}
async fn enforce_custodial_deadlines(&self) -> Result<(), sqlx::Error> {
// ── FOR UPDATE SKIP LOCKED ─────────────────────────────────────────────
let eligible_orders = sqlx::query(
"SELECT transaction_id, merchant_id FROM orders WHERE status = $1 AND delivered_at < CURRENT_TIMESTAMP - INTERVAL '48 hours' FOR UPDATE SKIP LOCKED"
)
.bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL)
.fetch_all(&self.pool)
.await?;
let merchant_service = self.merchant_service.clone();
let _results = stream::iter(eligible_orders)
.map(|order| {
let ms = merchant_service.clone();
async move {
use sqlx::Row;
let tid: String = order.get("transaction_id");
let mid: String = order.get("merchant_id");
tracing::info!(
"Sentinel: Enforcing custodial deadline for {}. Finalizing liquidity.",
tid
);
let _ = ms.approve_settlement(&mid, &tid, None, None, None).await;
}
})
.buffer_unordered(10)
.collect::<Vec<()>>()
.await;
Ok(())
}
pub async fn generate_monthly_billing(&self) -> Result<(), sqlx::Error> {
// Query merchants whose billing_cycle_start has passed 30 days
let merchants = sqlx::query(
"SELECT merchant_id, billing_cycle_start FROM merchants WHERE billing_cycle_start <= CURRENT_TIMESTAMP - INTERVAL '30 days'"
)
.fetch_all(&self.pool)
.await?;
for row in merchants {
use sqlx::Row;
let merchant_id: String = row.get("merchant_id");
let billing_cycle_start: chrono::NaiveDateTime = row.get("billing_cycle_start");
let mut tx = self.pool.begin().await?;
// Count successful orders placed within this billing cycle
// Statuses like PENDING_PAYMENT, PAYMENT_FAILED, EXPIRED_VOID are excluded.
let order_count: i32 = sqlx::query_scalar(
"SELECT COUNT(*)::INT FROM orders \
WHERE merchant_id = $1 \
AND created_at >= $2 \
AND created_at < $2 + INTERVAL '30 days' \
AND status NOT IN ('PENDING_PAYMENT', 'PAYMENT_FAILED', 'EXPIRED_VOID')"
)
.bind(&merchant_id)
.bind(billing_cycle_start)
.fetch_one(&mut *tx)
.await?;
let amount_inr = order_count as f64 * 2.0;
let invoice_id = format!("INV-{}", uuid::Uuid::new_v4().to_string()[..8].to_uppercase());
// Generate invoice
sqlx::query(
"INSERT INTO merchant_invoices (invoice_id, merchant_id, amount_inr, order_count, status, billing_period_start, billing_period_end, due_at) \
VALUES ($1, $2, $3, $4, 'UNPAID', $5, $5 + INTERVAL '30 days', CURRENT_TIMESTAMP + INTERVAL '7 days')"
)
.bind(&invoice_id)
.bind(&merchant_id)
.bind(amount_inr)
.bind(order_count)
.bind(billing_cycle_start)
.execute(&mut *tx)
.await?;
// Update merchant's billing_cycle_start
sqlx::query(
"UPDATE merchants SET billing_cycle_start = billing_cycle_start + INTERVAL '30 days' WHERE merchant_id = $1"
)
.bind(&merchant_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::info!(
merchant_id = %merchant_id,
invoice_id = %invoice_id,
amount = amount_inr,
"Sentinel: Generated monthly postpaid invoice."
);
}
Ok(())
}
pub async fn freeze_overdue_merchants(&self) -> Result<(), sqlx::Error> {
// Find merchants with unpaid/overdue invoices past due_at
let overdue_merchants = sqlx::query(
"SELECT DISTINCT merchant_id FROM merchant_invoices WHERE status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP"
)
.fetch_all(&self.pool)
.await?;
for row in overdue_merchants {
use sqlx::Row;
let merchant_id: String = row.get("merchant_id");
let mut tx = self.pool.begin().await?;
// Freeze merchant account
sqlx::query("UPDATE merchants SET is_frozen = TRUE WHERE merchant_id = $1")
.bind(&merchant_id)
.execute(&mut *tx)
.await?;
// Mark invoice(s) as OVERDUE
sqlx::query("UPDATE merchant_invoices SET status = 'OVERDUE' WHERE merchant_id = $1 AND status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP")
.bind(&merchant_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::warn!(
merchant_id = %merchant_id,
"Sentinel: Merchant account frozen due to overdue invoice."
);
}
// Auto-unfreeze merchants who have no outstanding unpaid/overdue invoices past due date
// (This acts as a self-healing sweep)
let unfrozen = sqlx::query(
"UPDATE merchants SET is_frozen = FALSE \
WHERE is_frozen = TRUE \
AND merchant_id NOT IN ( \
SELECT DISTINCT merchant_id FROM merchant_invoices \
WHERE status IN ('UNPAID', 'OVERDUE') AND due_at < CURRENT_TIMESTAMP \
)"
)
.execute(&self.pool)
.await?;
if unfrozen.rows_affected() > 0 {
tracing::info!(
"Sentinel: Auto-unfroze {} merchants with settled invoices.",
unfrozen.rows_affected()
);
}
Ok(())
}
}