Spaces:
Running
Running
File size: 11,831 Bytes
c33971d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 | use crate::application::services::MerchantService;
use crate::domain::constants::*;
use crate::infrastructure::db::DbPool;
use futures_util::stream::{self, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
pub struct ProtocolSentinel {
merchant_service: Arc<dyn MerchantService>,
pool: DbPool,
}
impl ProtocolSentinel {
pub fn new(merchant_service: Arc<dyn MerchantService>, pool: DbPool) -> Self {
Self {
merchant_service,
pool,
}
}
pub async fn run(&self) {
tracing::info!("Protocol Sentinel Active: Secure Health Guard Initialized.");
loop {
// 1. Process Auto-Settlements (Liquidity Guard)
if let Err(e) = self.process_auto_settlements().await {
tracing::error!("Sentinel Error [Auto-Settlement]: {:?}", e);
}
// 2. Cleanup Stale Intents (Resource Guard)
if let Err(e) = self.cleanup_stale_intents().await {
tracing::error!("Sentinel Error [Stale-Cleanup]: {:?}", e);
}
// 3. Finalize Aged Deliveries (Custodial Guard)
if let Err(e) = self.enforce_custodial_deadlines().await {
tracing::error!("Sentinel Error [Custodial-Enforcement]: {:?}", e);
}
// 4. Escalate Stale Forensic Holds (Arbitration Guard)
if let Err(e) = self.escalate_stale_holds().await {
tracing::error!("Sentinel Error [Hold-Escalation]: {:?}", e);
}
// 5. Generate Monthly Billing (Postpaid Billing Guard)
if let Err(e) = self.generate_monthly_billing().await {
tracing::error!("Sentinel Error [Monthly-Billing]: {:?}", e);
}
// 6. Freeze Overdue Merchants (Billing Enforcement Guard)
if let Err(e) = self.freeze_overdue_merchants().await {
tracing::error!("Sentinel Error [Billing-Enforcement]: {:?}", e);
}
// ─── Jitter Sleep ──────────────────────────────────────────────────
// Add ±300s random jitter to the 1-hour base interval.
// This prevents the thundering herd problem on cold starts / restarts
// where every replica would otherwise fire all tasks in perfect sync,
// creating a coordinated spike against the database every 3600 seconds.
let jitter_secs = rand::random::<u64>() % 300;
let sleep_duration = Duration::from_secs(3600 + jitter_secs);
tracing::debug!(
"Protocol Sentinel: sleeping {}s until next pulse (base 3600s + {}s jitter)",
sleep_duration.as_secs(),
jitter_secs
);
sleep(sleep_duration).await;
}
}
async fn escalate_stale_holds(&self) -> Result<(), sqlx::Error> {
// Escalate DISPUTED_HELD orders after 48 hours to DISPUTED_IN_REVIEW
let result = sqlx::query(
"UPDATE orders SET status = $1 WHERE status = $2 AND created_at < CURRENT_TIMESTAMP - INTERVAL '48 hours'"
)
.bind(ORDER_STATUS_DISPUTED)
.bind(ORDER_STATUS_DISPUTED_HELD)
.execute(&self.pool)
.await?;
if result.rows_affected() > 0 {
tracing::info!(
"Sentinel: Escalated {} stale forensic holds to full review.",
result.rows_affected()
);
}
Ok(())
}
async fn process_auto_settlements(&self) -> Result<(), sqlx::Error> {
// ── FOR UPDATE SKIP LOCKED ─────────────────────────────────────────────
// SKIP LOCKED ensures each replica claims a disjoint set of rows.
let eligible_orders = sqlx::query(
r#"
SELECT o.transaction_id, o.merchant_id, m.auto_settle_threshold, o.risk_score
FROM orders o
JOIN merchants m ON o.merchant_id = m.merchant_id
WHERE o.status = $1
AND o.delivered_at < CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND o.risk_score <= m.auto_settle_threshold
FOR UPDATE SKIP LOCKED
"#,
)
.bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL)
.fetch_all(&self.pool)
.await?;
let merchant_service = self.merchant_service.clone();
let _results = stream::iter(eligible_orders)
.map(|order| {
let ms = merchant_service.clone();
async move {
use sqlx::Row;
let tid: String = order.get("transaction_id");
let mid: String = order.get("merchant_id");
let _ = ms.approve_settlement(&mid, &tid, None, None, None).await;
}
})
.buffer_unordered(10)
.collect::<Vec<()>>()
.await;
Ok(())
}
async fn cleanup_stale_intents(&self) -> Result<(), sqlx::Error> {
// Expire orders stuck in PENDING_PAYMENT for > 6 hours
let result = sqlx::query(
"UPDATE orders SET status = 'EXPIRED_VOID' WHERE status = $1 AND created_at < CURRENT_TIMESTAMP - INTERVAL '6 hours'"
)
.bind(ORDER_STATUS_PENDING_PAYMENT)
.execute(&self.pool)
.await?;
if result.rows_affected() > 0 {
tracing::info!(
"Sentinel: Purged {} stale payment intents.",
result.rows_affected()
);
}
Ok(())
}
async fn enforce_custodial_deadlines(&self) -> Result<(), sqlx::Error> {
// ── FOR UPDATE SKIP LOCKED ─────────────────────────────────────────────
let eligible_orders = sqlx::query(
"SELECT transaction_id, merchant_id FROM orders WHERE status = $1 AND delivered_at < CURRENT_TIMESTAMP - INTERVAL '48 hours' FOR UPDATE SKIP LOCKED"
)
.bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL)
.fetch_all(&self.pool)
.await?;
let merchant_service = self.merchant_service.clone();
let _results = stream::iter(eligible_orders)
.map(|order| {
let ms = merchant_service.clone();
async move {
use sqlx::Row;
let tid: String = order.get("transaction_id");
let mid: String = order.get("merchant_id");
tracing::info!(
"Sentinel: Enforcing custodial deadline for {}. Finalizing liquidity.",
tid
);
let _ = ms.approve_settlement(&mid, &tid, None, None, None).await;
}
})
.buffer_unordered(10)
.collect::<Vec<()>>()
.await;
Ok(())
}
pub async fn generate_monthly_billing(&self) -> Result<(), sqlx::Error> {
// Query merchants whose billing_cycle_start has passed 30 days
let merchants = sqlx::query(
"SELECT merchant_id, billing_cycle_start FROM merchants WHERE billing_cycle_start <= CURRENT_TIMESTAMP - INTERVAL '30 days'"
)
.fetch_all(&self.pool)
.await?;
for row in merchants {
use sqlx::Row;
let merchant_id: String = row.get("merchant_id");
let billing_cycle_start: chrono::NaiveDateTime = row.get("billing_cycle_start");
let mut tx = self.pool.begin().await?;
// Count successful orders placed within this billing cycle
// Statuses like PENDING_UPI_SETTLEMENT, PAYMENT_FAILED, EXPIRED_VOID are excluded.
let order_count: i32 = sqlx::query_scalar(
"SELECT COUNT(*)::INT FROM orders \
WHERE merchant_id = $1 \
AND created_at >= $2 \
AND created_at < $2 + INTERVAL '30 days' \
AND status NOT IN ('PENDING_UPI_SETTLEMENT', 'PAYMENT_FAILED', 'EXPIRED_VOID')"
)
.bind(&merchant_id)
.bind(billing_cycle_start)
.fetch_one(&mut *tx)
.await?;
let amount_inr = order_count as f64 * 2.0;
let invoice_id = format!(
"INV-{}",
uuid::Uuid::new_v4().to_string()[..8].to_uppercase()
);
// Generate invoice
sqlx::query(
"INSERT INTO merchant_invoices (invoice_id, merchant_id, amount_inr, order_count, status, billing_period_start, billing_period_end, due_at) \
VALUES ($1, $2, $3, $4, 'UNPAID', $5, $5 + INTERVAL '30 days', CURRENT_TIMESTAMP + INTERVAL '7 days')"
)
.bind(&invoice_id)
.bind(&merchant_id)
.bind(amount_inr)
.bind(order_count)
.bind(billing_cycle_start)
.execute(&mut *tx)
.await?;
// Update merchant's billing_cycle_start
sqlx::query(
"UPDATE merchants SET billing_cycle_start = billing_cycle_start + INTERVAL '30 days' WHERE merchant_id = $1"
)
.bind(&merchant_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::info!(
merchant_id = %merchant_id,
invoice_id = %invoice_id,
amount = amount_inr,
"Sentinel: Generated monthly postpaid invoice."
);
}
Ok(())
}
pub async fn freeze_overdue_merchants(&self) -> Result<(), sqlx::Error> {
// Find merchants with unpaid/overdue invoices past due_at
let overdue_merchants = sqlx::query(
"SELECT DISTINCT merchant_id FROM merchant_invoices WHERE status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP"
)
.fetch_all(&self.pool)
.await?;
for row in overdue_merchants {
use sqlx::Row;
let merchant_id: String = row.get("merchant_id");
let mut tx = self.pool.begin().await?;
// Freeze merchant account
sqlx::query("UPDATE merchants SET is_frozen = TRUE WHERE merchant_id = $1")
.bind(&merchant_id)
.execute(&mut *tx)
.await?;
// Mark invoice(s) as OVERDUE
sqlx::query("UPDATE merchant_invoices SET status = 'OVERDUE' WHERE merchant_id = $1 AND status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP")
.bind(&merchant_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::warn!(
merchant_id = %merchant_id,
"Sentinel: Merchant account frozen due to overdue invoice."
);
}
// Auto-unfreeze merchants who have no outstanding unpaid/overdue invoices past due date
// (This acts as a self-healing sweep)
let unfrozen = sqlx::query(
"UPDATE merchants SET is_frozen = FALSE \
WHERE is_frozen = TRUE \
AND merchant_id NOT IN ( \
SELECT DISTINCT merchant_id FROM merchant_invoices \
WHERE status IN ('UNPAID', 'OVERDUE') AND due_at < CURRENT_TIMESTAMP \
)",
)
.execute(&self.pool)
.await?;
if unfrozen.rows_affected() > 0 {
tracing::info!(
"Sentinel: Auto-unfroze {} merchants with settled invoices.",
unfrozen.rows_affected()
);
}
Ok(())
}
}
|