File size: 11,831 Bytes
c33971d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
use crate::application::services::MerchantService;
use crate::domain::constants::*;
use crate::infrastructure::db::DbPool;
use futures_util::stream::{self, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

pub struct ProtocolSentinel {
    merchant_service: Arc<dyn MerchantService>,
    pool: DbPool,
}

impl ProtocolSentinel {
    pub fn new(merchant_service: Arc<dyn MerchantService>, pool: DbPool) -> Self {
        Self {
            merchant_service,
            pool,
        }
    }

    pub async fn run(&self) {
        tracing::info!("Protocol Sentinel Active: Secure Health Guard Initialized.");
        loop {
            // 1. Process Auto-Settlements (Liquidity Guard)
            if let Err(e) = self.process_auto_settlements().await {
                tracing::error!("Sentinel Error [Auto-Settlement]: {:?}", e);
            }

            // 2. Cleanup Stale Intents (Resource Guard)
            if let Err(e) = self.cleanup_stale_intents().await {
                tracing::error!("Sentinel Error [Stale-Cleanup]: {:?}", e);
            }

            // 3. Finalize Aged Deliveries (Custodial Guard)
            if let Err(e) = self.enforce_custodial_deadlines().await {
                tracing::error!("Sentinel Error [Custodial-Enforcement]: {:?}", e);
            }

            // 4. Escalate Stale Forensic Holds (Arbitration Guard)
            if let Err(e) = self.escalate_stale_holds().await {
                tracing::error!("Sentinel Error [Hold-Escalation]: {:?}", e);
            }

            // 5. Generate Monthly Billing (Postpaid Billing Guard)
            if let Err(e) = self.generate_monthly_billing().await {
                tracing::error!("Sentinel Error [Monthly-Billing]: {:?}", e);
            }

            // 6. Freeze Overdue Merchants (Billing Enforcement Guard)
            if let Err(e) = self.freeze_overdue_merchants().await {
                tracing::error!("Sentinel Error [Billing-Enforcement]: {:?}", e);
            }

            // ─── Jitter Sleep ──────────────────────────────────────────────────
            // Add ±300s random jitter to the 1-hour base interval.
            // This prevents the thundering herd problem on cold starts / restarts
            // where every replica would otherwise fire all tasks in perfect sync,
            // creating a coordinated spike against the database every 3600 seconds.
            let jitter_secs = rand::random::<u64>() % 300;
            let sleep_duration = Duration::from_secs(3600 + jitter_secs);
            tracing::debug!(
                "Protocol Sentinel: sleeping {}s until next pulse (base 3600s + {}s jitter)",
                sleep_duration.as_secs(),
                jitter_secs
            );
            sleep(sleep_duration).await;
        }
    }

    async fn escalate_stale_holds(&self) -> Result<(), sqlx::Error> {
        // Escalate DISPUTED_HELD orders after 48 hours to DISPUTED_IN_REVIEW
        let result = sqlx::query(
            "UPDATE orders SET status = $1 WHERE status = $2 AND created_at < CURRENT_TIMESTAMP - INTERVAL '48 hours'"
        )
        .bind(ORDER_STATUS_DISPUTED)
        .bind(ORDER_STATUS_DISPUTED_HELD)
        .execute(&self.pool)
        .await?;

        if result.rows_affected() > 0 {
            tracing::info!(
                "Sentinel: Escalated {} stale forensic holds to full review.",
                result.rows_affected()
            );
        }
        Ok(())
    }

    async fn process_auto_settlements(&self) -> Result<(), sqlx::Error> {
        // ── FOR UPDATE SKIP LOCKED ─────────────────────────────────────────────
        // SKIP LOCKED ensures each replica claims a disjoint set of rows.
        let eligible_orders = sqlx::query(
            r#"
            SELECT o.transaction_id, o.merchant_id, m.auto_settle_threshold, o.risk_score
            FROM orders o
            JOIN merchants m ON o.merchant_id = m.merchant_id
            WHERE o.status = $1 
              AND o.delivered_at < CURRENT_TIMESTAMP - INTERVAL '24 hours'
              AND o.risk_score <= m.auto_settle_threshold
            FOR UPDATE SKIP LOCKED
            "#,
        )
        .bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL)
        .fetch_all(&self.pool)
        .await?;

        let merchant_service = self.merchant_service.clone();
        let _results = stream::iter(eligible_orders)
            .map(|order| {
                let ms = merchant_service.clone();
                async move {
                    use sqlx::Row;
                    let tid: String = order.get("transaction_id");
                    let mid: String = order.get("merchant_id");

                    let _ = ms.approve_settlement(&mid, &tid, None, None, None).await;
                }
            })
            .buffer_unordered(10)
            .collect::<Vec<()>>()
            .await;

        Ok(())
    }

    async fn cleanup_stale_intents(&self) -> Result<(), sqlx::Error> {
        // Expire orders stuck in PENDING_PAYMENT for > 6 hours
        let result = sqlx::query(
            "UPDATE orders SET status = 'EXPIRED_VOID' WHERE status = $1 AND created_at < CURRENT_TIMESTAMP - INTERVAL '6 hours'"
        )
        .bind(ORDER_STATUS_PENDING_PAYMENT)
        .execute(&self.pool)
        .await?;

        if result.rows_affected() > 0 {
            tracing::info!(
                "Sentinel: Purged {} stale payment intents.",
                result.rows_affected()
            );
        }
        Ok(())
    }

    async fn enforce_custodial_deadlines(&self) -> Result<(), sqlx::Error> {
        // ── FOR UPDATE SKIP LOCKED ─────────────────────────────────────────────
        let eligible_orders = sqlx::query(
            "SELECT transaction_id, merchant_id FROM orders WHERE status = $1 AND delivered_at < CURRENT_TIMESTAMP - INTERVAL '48 hours' FOR UPDATE SKIP LOCKED"
        )
        .bind(ORDER_STATUS_DELIVERED_PENDING_APPROVAL)
        .fetch_all(&self.pool)
        .await?;

        let merchant_service = self.merchant_service.clone();
        let _results = stream::iter(eligible_orders)
            .map(|order| {
                let ms = merchant_service.clone();
                async move {
                    use sqlx::Row;
                    let tid: String = order.get("transaction_id");
                    let mid: String = order.get("merchant_id");

                    tracing::info!(
                        "Sentinel: Enforcing custodial deadline for {}. Finalizing liquidity.",
                        tid
                    );
                    let _ = ms.approve_settlement(&mid, &tid, None, None, None).await;
                }
            })
            .buffer_unordered(10)
            .collect::<Vec<()>>()
            .await;

        Ok(())
    }

    pub async fn generate_monthly_billing(&self) -> Result<(), sqlx::Error> {
        // Query merchants whose billing_cycle_start has passed 30 days
        let merchants = sqlx::query(
            "SELECT merchant_id, billing_cycle_start FROM merchants WHERE billing_cycle_start <= CURRENT_TIMESTAMP - INTERVAL '30 days'"
        )
        .fetch_all(&self.pool)
        .await?;

        for row in merchants {
            use sqlx::Row;
            let merchant_id: String = row.get("merchant_id");
            let billing_cycle_start: chrono::NaiveDateTime = row.get("billing_cycle_start");

            let mut tx = self.pool.begin().await?;

            // Count successful orders placed within this billing cycle
            // Statuses like PENDING_UPI_SETTLEMENT, PAYMENT_FAILED, EXPIRED_VOID are excluded.
            let order_count: i32 = sqlx::query_scalar(
                "SELECT COUNT(*)::INT FROM orders \
                 WHERE merchant_id = $1 \
                   AND created_at >= $2 \
                   AND created_at < $2 + INTERVAL '30 days' \
                   AND status NOT IN ('PENDING_UPI_SETTLEMENT', 'PAYMENT_FAILED', 'EXPIRED_VOID')"
            )
            .bind(&merchant_id)
            .bind(billing_cycle_start)
            .fetch_one(&mut *tx)
            .await?;

            let amount_inr = order_count as f64 * 2.0;
            let invoice_id = format!(
                "INV-{}",
                uuid::Uuid::new_v4().to_string()[..8].to_uppercase()
            );

            // Generate invoice
            sqlx::query(
                "INSERT INTO merchant_invoices (invoice_id, merchant_id, amount_inr, order_count, status, billing_period_start, billing_period_end, due_at) \
                 VALUES ($1, $2, $3, $4, 'UNPAID', $5, $5 + INTERVAL '30 days', CURRENT_TIMESTAMP + INTERVAL '7 days')"
            )
            .bind(&invoice_id)
            .bind(&merchant_id)
            .bind(amount_inr)
            .bind(order_count)
            .bind(billing_cycle_start)
            .execute(&mut *tx)
            .await?;

            // Update merchant's billing_cycle_start
            sqlx::query(
                "UPDATE merchants SET billing_cycle_start = billing_cycle_start + INTERVAL '30 days' WHERE merchant_id = $1"
            )
            .bind(&merchant_id)
            .execute(&mut *tx)
            .await?;

            tx.commit().await?;

            tracing::info!(
                merchant_id = %merchant_id,
                invoice_id = %invoice_id,
                amount = amount_inr,
                "Sentinel: Generated monthly postpaid invoice."
            );
        }

        Ok(())
    }

    pub async fn freeze_overdue_merchants(&self) -> Result<(), sqlx::Error> {
        // Find merchants with unpaid/overdue invoices past due_at
        let overdue_merchants = sqlx::query(
            "SELECT DISTINCT merchant_id FROM merchant_invoices WHERE status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP"
        )
        .fetch_all(&self.pool)
        .await?;

        for row in overdue_merchants {
            use sqlx::Row;
            let merchant_id: String = row.get("merchant_id");

            let mut tx = self.pool.begin().await?;

            // Freeze merchant account
            sqlx::query("UPDATE merchants SET is_frozen = TRUE WHERE merchant_id = $1")
                .bind(&merchant_id)
                .execute(&mut *tx)
                .await?;

            // Mark invoice(s) as OVERDUE
            sqlx::query("UPDATE merchant_invoices SET status = 'OVERDUE' WHERE merchant_id = $1 AND status = 'UNPAID' AND due_at < CURRENT_TIMESTAMP")
                .bind(&merchant_id)
                .execute(&mut *tx)
                .await?;

            tx.commit().await?;

            tracing::warn!(
                merchant_id = %merchant_id,
                "Sentinel: Merchant account frozen due to overdue invoice."
            );
        }

        // Auto-unfreeze merchants who have no outstanding unpaid/overdue invoices past due date
        // (This acts as a self-healing sweep)
        let unfrozen = sqlx::query(
            "UPDATE merchants SET is_frozen = FALSE \
             WHERE is_frozen = TRUE \
               AND merchant_id NOT IN ( \
                   SELECT DISTINCT merchant_id FROM merchant_invoices \
                   WHERE status IN ('UNPAID', 'OVERDUE') AND due_at < CURRENT_TIMESTAMP \
               )",
        )
        .execute(&self.pool)
        .await?;

        if unfrozen.rows_affected() > 0 {
            tracing::info!(
                "Sentinel: Auto-unfroze {} merchants with settled invoices.",
                unfrozen.rows_affected()
            );
        }

        Ok(())
    }
}