use crate::domain::error::AppResult; use crate::infrastructure::db::DbPool; use crate::infrastructure::repositories::OrderRepository; use crate::interfaces::http::api::RealtimeEvent; use std::sync::Arc; use tokio::sync::broadcast::Sender; pub fn verify_razorpay_signature( order_id: &str, payment_id: &str, signature: &str, secret: &str, ) -> bool { use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { Ok(m) => m, Err(_) => return false, }; mac.update(format!("{}|{}", order_id, payment_id).as_bytes()); let result = mac.finalize().into_bytes(); let expected_signature = hex::encode(result); expected_signature == signature } pub async fn execute_process_callback( pool: &DbPool, _order_repo: &Arc, tx_sender: &Sender, form: crate::interfaces::http::routes::payment::RazorpayCallbackForm, ) -> AppResult { use crate::domain::constants::*; let txnid = form.txnid.clone(); let payment_id = form.razorpay_payment_id.clone(); let rzp_order_id = form.razorpay_order_id.clone(); let signature = form.razorpay_signature.clone(); let upi_vpa = form.upi_vpa.clone(); // 1. Begin atomic database transaction immediately to encapsulate all reads and locks let mut tx = pool.begin().await.map_err(crate::domain::error::AppError::Database)?; // 2. Acquire a strict database write lock (FOR UPDATE) on the target order row. // This blocks any concurrent callbacks or operations on this order, eliminating time-of-check to time-of-use race conditions! let order = sqlx::query_as::<_, crate::domain::models::OrderRecord>( "SELECT transaction_id, merchant_id, link_id, buyer_phone, buyer_phone_hash, buyer_name, buyer_email, shipping_pincode, delivery_address, price_inr, status, vpa, outbound_weight, return_weight, proof_data, proof_received_at, settled_at, paid_at, shipped_at, delivered_at, shipping_method, estimated_delivery_at, payu_id, is_payment, platform_fee_paid, platform_fee, delivery_fee, distance_km, risk_score, risk_flags, cgst, sgst, igst, utr_number, platform_fee_utr, delivery_gps_lat, delivery_gps_lng, is_geofence_verified, pincode_volatility_at_checkout, discount_amount, coupon_code, checkout_gps_lat, checkout_gps_lng, device_fingerprint, created_at FROM orders WHERE transaction_id = $1 FOR UPDATE" ) .bind(&txnid) .fetch_optional(&mut *tx) .await?; let order = match order { Some(o) => { let mut o_mut = o; o_mut.decrypt_pii(); if o_mut.vpa.as_deref() == Some("") { o_mut.vpa = None; } o_mut } None => { tx.rollback().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::NotFound("Order not found".into())); } }; // Strict Security Binding: Verify that the razorpay_order_id matches the one generated and stored in the database! if order.payu_id.is_empty() || order.payu_id != rzp_order_id { tx.rollback().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::BadRequest( "Razorpay Order ID mismatch or invalid for this transaction. Secure validation active.".to_string(), )); } // Idempotency: If already paid, return early (release lock by committing transaction) if order.status == ORDER_STATUS_PAID_PENDING_DELIVERY || order.status == ORDER_STATUS_DELIVERED_PENDING_APPROVAL || order.status == ORDER_STATUS_SETTLED { tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Ok(crate::application::services::payment::PaymentResult { success: true, transaction_id: txnid, amount: order.price_inr, gateway_id: Some(order.payu_id), // keeping field name same in DB }); } let secret = std::env::var("RAZORPAY_KEY_SECRET").unwrap_or_default(); if !verify_razorpay_signature(&rzp_order_id, &payment_id, &signature, &secret) { crate::domain::audit::log_risk_event( &mut tx, Some(&txnid), &order.merchant_id, "PAYMENT_CALLBACK_SIGNATURE_MISMATCH", "CRITICAL", Some("Razorpay callback signature mismatch detected."), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2 AND status = $3") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(&txnid) .bind(ORDER_STATUS_PENDING_PAYMENT) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::Forbidden( "Signature mismatch".to_string(), )); } // Verify Payment Amount to prevent underpayment fraud let rzp_key_id = std::env::var("RAZORPAY_KEY_ID").unwrap_or_default(); if !rzp_key_id.is_empty() && !secret.is_empty() { let client = reqwest::Client::new(); let response = client .get(format!("https://api.razorpay.com/v1/payments/{}", payment_id)) .basic_auth(&rzp_key_id, Some(&secret)) .send() .await; match response { Ok(resp) => { if resp.status().is_success() { if let Ok(data) = resp.json::().await { if let Some(captured_amount) = data.get("amount").and_then(|a| a.as_u64()) { let expected_amount_paise = ((order.price_inr + order.delivery_fee + order.cgst + order.sgst + order.igst) * 100.0).round() as u64; if captured_amount != expected_amount_paise { crate::domain::audit::log_risk_event( &mut tx, Some(&txnid), &order.merchant_id, "PAYMENT_AMOUNT_MISMATCH", "CRITICAL", Some(&format!( "Payment Mismatch in callback: Razorpay reported paid amount of {} paise, but expected {} paise. Transaction blocked.", captured_amount, expected_amount_paise )), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(&txnid) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::Forbidden( "Payment amount mismatch detected. Transaction blocked.".to_string(), )); } } } } else { let status_err = resp.status(); let text_err = resp.text().await.unwrap_or_default(); tracing::error!("Razorpay payment verification API error: status={}, body={}", status_err, text_err); } } Err(e) => { tracing::error!("Failed to contact Razorpay payment verification API: {:?}", e); } } } let paid_amount = order.price_inr + order.delivery_fee + order.cgst + order.sgst + order.igst; let is_payment = true; // Mark as paid correctly to secure mandate state // 3. Double-Spend & Payment Replay Protection: Prevent reuse of a single gateway payment ID let duplicate_payment = sqlx::query( "SELECT transaction_id FROM orders WHERE payu_id = $1 AND transaction_id != $2" ) .bind(&payment_id) .bind(&txnid) .fetch_optional(&mut *tx) .await?; if let Some(row) = duplicate_payment { use sqlx::Row; let existing_txn = row.get::("transaction_id"); crate::domain::audit::log_risk_event( &mut tx, Some(&txnid), &order.merchant_id, "PAYMENT_REPLAY_ATTACK", "CRITICAL", Some(&format!( "Replay Attack: Razorpay Payment ID {} was already spent on transaction {}. Denied double-fulfillment.", payment_id, existing_txn )), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(&txnid) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::Forbidden( "Double spend / payment replay detected".to_string(), )); } if let Some(vpa) = &upi_vpa { if !vpa.is_empty() { let active_payment = sqlx::query( "SELECT transaction_id FROM orders WHERE vpa = $1 AND status = $2 AND transaction_id != $3" ) .bind(vpa) .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) .bind(&txnid) .fetch_optional(&mut *tx) .await?; if let Some(row) = active_payment { use sqlx::Row; let existing_txn = row.get::("transaction_id"); crate::domain::audit::log_risk_event( &mut tx, Some(&txnid), &order.merchant_id, "VPA_SINGLETON_VIOLATION", "CRITICAL", Some(&format!("VPA {} already has an active payment ({}). Blocked simultaneous fulfillment.", vpa, existing_txn)), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ).await; sqlx::query("UPDATE orders SET status = $1, vpa = $2 WHERE transaction_id = $3") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(vpa) .bind(&txnid) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Ok(crate::application::services::payment::PaymentResult { success: false, transaction_id: txnid, amount: paid_amount, gateway_id: Some(payment_id), }); } } } let result = sqlx::query( "UPDATE orders SET status = $1, paid_at = CURRENT_TIMESTAMP, payu_id = $2, vpa = $3, is_payment = $4, platform_fee_paid = $5 WHERE transaction_id = $6 AND status = $7", ) .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) .bind(&payment_id) .bind(upi_vpa.clone().unwrap_or_default()) .bind(is_payment) .bind(false) .bind(&txnid) .bind(ORDER_STATUS_PENDING_PAYMENT) .execute(&mut *tx) .await?; if result.rows_affected() > 0 { crate::domain::audit::log_risk_event( &mut tx, Some(&txnid), &order.merchant_id, "STATE_TRANSITION", "LOW", Some("Payment captured and order moved to PAID_PENDING_DELIVERY."), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; let _ = crate::application::services::payment::write_mock_confirmation_email(pool, &order).await; let _ = tx_sender.send(crate::interfaces::http::api::RealtimeEvent::NewOrder { transaction_id: txnid.clone(), merchant_id: order.merchant_id.clone(), amount: order.price_inr, buyer_phone: order.buyer_phone.clone(), }); let _ = tx_sender.send( crate::interfaces::http::api::RealtimeEvent::OrderStatusChanged { transaction_id: txnid.clone(), merchant_id: order.merchant_id, new_status: ORDER_STATUS_PAID_PENDING_DELIVERY.to_string(), }, ); } tx.commit().await.map_err(crate::domain::error::AppError::Database)?; Ok(crate::application::services::payment::PaymentResult { success: true, transaction_id: txnid, amount: paid_amount, gateway_id: Some(payment_id), }) } pub fn verify_webhook_signature( body: &[u8], signature: &str, secret: &str, ) -> bool { use hmac::{Hmac, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { Ok(m) => m, Err(_) => return false, }; mac.update(body); let result = mac.finalize().into_bytes(); let expected_signature = hex::encode(result); expected_signature == signature } pub async fn execute_webhook_payment( pool: &DbPool, tx_sender: &Sender, rzp_order_id: &str, payment_id: &str, amount_paise: u64, upi_vpa: Option<&str>, ) -> AppResult<()> { use crate::domain::constants::*; let mut tx = pool.begin().await.map_err(crate::domain::error::AppError::Database)?; let order = sqlx::query_as::<_, crate::domain::models::OrderRecord>( "SELECT transaction_id, merchant_id, link_id, buyer_phone, buyer_phone_hash, buyer_name, buyer_email, shipping_pincode, delivery_address, price_inr, status, vpa, outbound_weight, return_weight, proof_data, proof_received_at, settled_at, paid_at, shipped_at, delivered_at, shipping_method, estimated_delivery_at, payu_id, is_payment, platform_fee_paid, platform_fee, delivery_fee, distance_km, risk_score, risk_flags, cgst, sgst, igst, utr_number, platform_fee_utr, delivery_gps_lat, delivery_gps_lng, is_geofence_verified, pincode_volatility_at_checkout, discount_amount, coupon_code, checkout_gps_lat, checkout_gps_lng, device_fingerprint, created_at FROM orders WHERE payu_id = $1 FOR UPDATE" ) .bind(rzp_order_id) .fetch_optional(&mut *tx) .await?; let order = match order { Some(o) => { let mut o_mut = o; o_mut.decrypt_pii(); if o_mut.vpa.as_deref() == Some("") { o_mut.vpa = None; } o_mut } None => { tx.rollback().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::NotFound(format!("Order with payu_id {} not found", rzp_order_id))); } }; let txnid = &order.transaction_id; // Verify Payment Amount to prevent underpayment fraud let expected_amount_paise = ((order.price_inr + order.delivery_fee + order.cgst + order.sgst + order.igst) * 100.0).round() as u64; if amount_paise != expected_amount_paise { crate::domain::audit::log_risk_event( &mut tx, Some(txnid), &order.merchant_id, "PAYMENT_AMOUNT_MISMATCH", "CRITICAL", Some(&format!( "Payment Mismatch via Webhook: Razorpay reported paid amount of {} paise, but expected {} paise. Transaction blocked.", amount_paise, expected_amount_paise )), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(txnid) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::Forbidden( "Payment amount mismatch detected. Transaction blocked.".to_string(), )); } if order.status == ORDER_STATUS_PAID_PENDING_DELIVERY || order.status == ORDER_STATUS_DELIVERED_PENDING_APPROVAL || order.status == ORDER_STATUS_SETTLED { tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Ok(()); } let is_payment = true; let duplicate_payment = sqlx::query( "SELECT transaction_id FROM orders WHERE payu_id = $1 AND transaction_id != $2" ) .bind(payment_id) .bind(txnid) .fetch_optional(&mut *tx) .await?; if let Some(row) = duplicate_payment { use sqlx::Row; let existing_txn = row.get::("transaction_id"); crate::domain::audit::log_risk_event( &mut tx, Some(txnid), &order.merchant_id, "PAYMENT_REPLAY_ATTACK", "CRITICAL", Some(&format!( "Replay Attack: Razorpay Payment ID {} was already spent on transaction {}. Denied double-fulfillment.", payment_id, existing_txn )), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(txnid) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Err(crate::domain::error::AppError::Forbidden( "Double spend / payment replay detected".to_string(), )); } if let Some(vpa) = upi_vpa { if !vpa.is_empty() { let active_payment = sqlx::query( "SELECT transaction_id FROM orders WHERE vpa = $1 AND status = $2 AND transaction_id != $3" ) .bind(vpa) .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) .bind(txnid) .fetch_optional(&mut *tx) .await?; if let Some(row) = active_payment { use sqlx::Row; let existing_txn = row.get::("transaction_id"); crate::domain::audit::log_risk_event( &mut tx, Some(txnid), &order.merchant_id, "VPA_SINGLETON_VIOLATION", "CRITICAL", Some(&format!("VPA {} already has an active payment ({}). Blocked simultaneous fulfillment.", vpa, existing_txn)), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ).await; sqlx::query("UPDATE orders SET status = $1, vpa = $2 WHERE transaction_id = $3") .bind(ORDER_STATUS_PAYMENT_FAILED) .bind(vpa) .bind(txnid) .execute(&mut *tx) .await?; tx.commit().await.map_err(crate::domain::error::AppError::Database)?; return Ok(()); } } } let result = sqlx::query( "UPDATE orders SET status = $1, paid_at = CURRENT_TIMESTAMP, payu_id = $2, vpa = $3, is_payment = $4, platform_fee_paid = $5 WHERE transaction_id = $6 AND status = $7", ) .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) .bind(payment_id) .bind(upi_vpa.unwrap_or_default()) .bind(is_payment) .bind(false) .bind(txnid) .bind(ORDER_STATUS_PENDING_PAYMENT) .execute(&mut *tx) .await?; if result.rows_affected() > 0 { crate::domain::audit::log_risk_event( &mut tx, Some(txnid), &order.merchant_id, "STATE_TRANSITION", "LOW", Some("Payment captured via webhook and order moved to PAID_PENDING_DELIVERY."), None, None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; let _ = crate::application::services::payment::write_mock_confirmation_email(pool, &order).await; let _ = tx_sender.send(crate::interfaces::http::api::RealtimeEvent::NewOrder { transaction_id: txnid.clone(), merchant_id: order.merchant_id.clone(), amount: order.price_inr, buyer_phone: order.buyer_phone.clone(), }); let _ = tx_sender.send( crate::interfaces::http::api::RealtimeEvent::OrderStatusChanged { transaction_id: txnid.clone(), merchant_id: order.merchant_id, new_status: ORDER_STATUS_PAID_PENDING_DELIVERY.to_string(), }, ); } tx.commit().await.map_err(crate::domain::error::AppError::Database)?; Ok(()) }