Spaces:
Running
Running
| use crate::domain::error::AppResult; | |
| use crate::infrastructure::db::DbPool; | |
| use crate::infrastructure::repositories::OrderRepository; | |
| use crate::interfaces::http::api::RealtimeEvent; | |
| use std::sync::Arc; | |
| use tokio::sync::broadcast::Sender; | |
| pub fn verify_razorpay_signature( | |
| order_id: &str, | |
| payment_id: &str, | |
| signature: &str, | |
| secret: &str, | |
| ) -> bool { | |
| use hmac::{Hmac, Mac}; | |
| use sha2::Sha256; | |
| type HmacSha256 = Hmac<Sha256>; | |
| let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { | |
| Ok(m) => m, | |
| Err(_) => return false, | |
| }; | |
| mac.update(format!("{}|{}", order_id, payment_id).as_bytes()); | |
| let result = mac.finalize().into_bytes(); | |
| let expected_signature = hex::encode(result); | |
| expected_signature == signature | |
| } | |
| pub async fn execute_process_callback( | |
| pool: &DbPool, | |
| _order_repo: &Arc<dyn OrderRepository>, | |
| tx_sender: &Sender<RealtimeEvent>, | |
| form: crate::interfaces::http::routes::payment::RazorpayCallbackForm, | |
| ) -> AppResult<crate::application::services::payment::PaymentResult> { | |
| use crate::domain::constants::*; | |
| let txnid = form.txnid.clone(); | |
| let payment_id = form.razorpay_payment_id.clone(); | |
| let rzp_order_id = form.razorpay_order_id.clone(); | |
| let signature = form.razorpay_signature.clone(); | |
| let upi_vpa = form.upi_vpa.clone(); | |
| // 1. Begin atomic database transaction immediately to encapsulate all reads and locks | |
| let mut tx = pool.begin().await.map_err(crate::domain::error::AppError::Database)?; | |
| // 2. Acquire a strict database write lock (FOR UPDATE) on the target order row. | |
| // This blocks any concurrent callbacks or operations on this order, eliminating time-of-check to time-of-use race conditions! | |
| let order = sqlx::query_as::<_, crate::domain::models::OrderRecord>( | |
| "SELECT transaction_id, merchant_id, link_id, buyer_phone, buyer_phone_hash, buyer_name, buyer_email, shipping_pincode, delivery_address, price_inr, status, vpa, outbound_weight, return_weight, proof_data, proof_received_at, settled_at, paid_at, shipped_at, delivered_at, shipping_method, estimated_delivery_at, payu_id, is_payment, platform_fee_paid, platform_fee, delivery_fee, distance_km, risk_score, risk_flags, cgst, sgst, igst, utr_number, platform_fee_utr, delivery_gps_lat, delivery_gps_lng, is_geofence_verified, pincode_volatility_at_checkout, discount_amount, coupon_code, checkout_gps_lat, checkout_gps_lng, device_fingerprint, created_at FROM orders WHERE transaction_id = $1 FOR UPDATE" | |
| ) | |
| .bind(&txnid) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| let order = match order { | |
| Some(o) => { | |
| let mut o_mut = o; | |
| o_mut.decrypt_pii(); | |
| if o_mut.vpa.as_deref() == Some("") { | |
| o_mut.vpa = None; | |
| } | |
| o_mut | |
| } | |
| None => { | |
| tx.rollback().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::NotFound("Order not found".into())); | |
| } | |
| }; | |
| // Strict Security Binding: Verify that the razorpay_order_id matches the one generated and stored in the database! | |
| if order.payu_id.is_empty() || order.payu_id != rzp_order_id { | |
| tx.rollback().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::BadRequest( | |
| "Razorpay Order ID mismatch or invalid for this transaction. Secure validation active.".to_string(), | |
| )); | |
| } | |
| // Idempotency: If already paid, return early (release lock by committing transaction) | |
| if order.status == ORDER_STATUS_PAID_PENDING_DELIVERY | |
| || order.status == ORDER_STATUS_DELIVERED_PENDING_APPROVAL | |
| || order.status == ORDER_STATUS_SETTLED | |
| { | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Ok(crate::application::services::payment::PaymentResult { | |
| success: true, | |
| transaction_id: txnid, | |
| amount: order.price_inr, | |
| gateway_id: Some(order.payu_id), // keeping field name same in DB | |
| }); | |
| } | |
| let secret = std::env::var("RAZORPAY_KEY_SECRET").unwrap_or_default(); | |
| if !verify_razorpay_signature(&rzp_order_id, &payment_id, &signature, &secret) { | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(&txnid), | |
| &order.merchant_id, | |
| "PAYMENT_CALLBACK_SIGNATURE_MISMATCH", | |
| "CRITICAL", | |
| Some("Razorpay callback signature mismatch detected."), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2 AND status = $3") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(&txnid) | |
| .bind(ORDER_STATUS_PENDING_PAYMENT) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::Forbidden( | |
| "Signature mismatch".to_string(), | |
| )); | |
| } | |
| // Verify Payment Amount to prevent underpayment fraud | |
| let rzp_key_id = std::env::var("RAZORPAY_KEY_ID").unwrap_or_default(); | |
| if !rzp_key_id.is_empty() && !secret.is_empty() { | |
| let client = reqwest::Client::new(); | |
| let response = client | |
| .get(format!("https://api.razorpay.com/v1/payments/{}", payment_id)) | |
| .basic_auth(&rzp_key_id, Some(&secret)) | |
| .send() | |
| .await; | |
| match response { | |
| Ok(resp) => { | |
| if resp.status().is_success() { | |
| if let Ok(data) = resp.json::<serde_json::Value>().await { | |
| if let Some(captured_amount) = data.get("amount").and_then(|a| a.as_u64()) { | |
| let expected_amount_paise = ((order.price_inr + order.delivery_fee + order.cgst + order.sgst + order.igst) * 100.0).round() as u64; | |
| if captured_amount != expected_amount_paise { | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(&txnid), | |
| &order.merchant_id, | |
| "PAYMENT_AMOUNT_MISMATCH", | |
| "CRITICAL", | |
| Some(&format!( | |
| "Payment Mismatch in callback: Razorpay reported paid amount of {} paise, but expected {} paise. Transaction blocked.", | |
| captured_amount, expected_amount_paise | |
| )), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(&txnid) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::Forbidden( | |
| "Payment amount mismatch detected. Transaction blocked.".to_string(), | |
| )); | |
| } | |
| } | |
| } | |
| } else { | |
| let status_err = resp.status(); | |
| let text_err = resp.text().await.unwrap_or_default(); | |
| tracing::error!("Razorpay payment verification API error: status={}, body={}", status_err, text_err); | |
| } | |
| } | |
| Err(e) => { | |
| tracing::error!("Failed to contact Razorpay payment verification API: {:?}", e); | |
| } | |
| } | |
| } | |
| let paid_amount = order.price_inr + order.delivery_fee + order.cgst + order.sgst + order.igst; | |
| let is_payment = true; // Mark as paid correctly to secure mandate state | |
| // 3. Double-Spend & Payment Replay Protection: Prevent reuse of a single gateway payment ID | |
| let duplicate_payment = sqlx::query( | |
| "SELECT transaction_id FROM orders WHERE payu_id = $1 AND transaction_id != $2" | |
| ) | |
| .bind(&payment_id) | |
| .bind(&txnid) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| if let Some(row) = duplicate_payment { | |
| use sqlx::Row; | |
| let existing_txn = row.get::<String, _>("transaction_id"); | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(&txnid), | |
| &order.merchant_id, | |
| "PAYMENT_REPLAY_ATTACK", | |
| "CRITICAL", | |
| Some(&format!( | |
| "Replay Attack: Razorpay Payment ID {} was already spent on transaction {}. Denied double-fulfillment.", | |
| payment_id, existing_txn | |
| )), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(&txnid) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::Forbidden( | |
| "Double spend / payment replay detected".to_string(), | |
| )); | |
| } | |
| if let Some(vpa) = &upi_vpa { | |
| if !vpa.is_empty() { | |
| let active_payment = sqlx::query( | |
| "SELECT transaction_id FROM orders WHERE vpa = $1 AND status = $2 AND transaction_id != $3" | |
| ) | |
| .bind(vpa) | |
| .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) | |
| .bind(&txnid) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| if let Some(row) = active_payment { | |
| use sqlx::Row; | |
| let existing_txn = row.get::<String, _>("transaction_id"); | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(&txnid), | |
| &order.merchant_id, | |
| "VPA_SINGLETON_VIOLATION", | |
| "CRITICAL", | |
| Some(&format!("VPA {} already has an active payment ({}). Blocked simultaneous fulfillment.", vpa, existing_txn)), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ).await; | |
| sqlx::query("UPDATE orders SET status = $1, vpa = $2 WHERE transaction_id = $3") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(vpa) | |
| .bind(&txnid) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Ok(crate::application::services::payment::PaymentResult { | |
| success: false, | |
| transaction_id: txnid, | |
| amount: paid_amount, | |
| gateway_id: Some(payment_id), | |
| }); | |
| } | |
| } | |
| } | |
| let result = sqlx::query( | |
| "UPDATE orders SET status = $1, paid_at = CURRENT_TIMESTAMP, payu_id = $2, vpa = $3, is_payment = $4, platform_fee_paid = $5 WHERE transaction_id = $6 AND status = $7", | |
| ) | |
| .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) | |
| .bind(&payment_id) | |
| .bind(upi_vpa.clone().unwrap_or_default()) | |
| .bind(is_payment) | |
| .bind(false) | |
| .bind(&txnid) | |
| .bind(ORDER_STATUS_PENDING_PAYMENT) | |
| .execute(&mut *tx) | |
| .await?; | |
| if result.rows_affected() > 0 { | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(&txnid), | |
| &order.merchant_id, | |
| "STATE_TRANSITION", | |
| "LOW", | |
| Some("Payment captured and order moved to PAID_PENDING_DELIVERY."), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| let _ = crate::application::services::payment::write_mock_confirmation_email(pool, &order).await; | |
| let _ = tx_sender.send(crate::interfaces::http::api::RealtimeEvent::NewOrder { | |
| transaction_id: txnid.clone(), | |
| merchant_id: order.merchant_id.clone(), | |
| amount: order.price_inr, | |
| buyer_phone: order.buyer_phone.clone(), | |
| }); | |
| let _ = tx_sender.send( | |
| crate::interfaces::http::api::RealtimeEvent::OrderStatusChanged { | |
| transaction_id: txnid.clone(), | |
| merchant_id: order.merchant_id, | |
| new_status: ORDER_STATUS_PAID_PENDING_DELIVERY.to_string(), | |
| }, | |
| ); | |
| } | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| Ok(crate::application::services::payment::PaymentResult { | |
| success: true, | |
| transaction_id: txnid, | |
| amount: paid_amount, | |
| gateway_id: Some(payment_id), | |
| }) | |
| } | |
| pub fn verify_webhook_signature( | |
| body: &[u8], | |
| signature: &str, | |
| secret: &str, | |
| ) -> bool { | |
| use hmac::{Hmac, Mac}; | |
| use sha2::Sha256; | |
| type HmacSha256 = Hmac<Sha256>; | |
| let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { | |
| Ok(m) => m, | |
| Err(_) => return false, | |
| }; | |
| mac.update(body); | |
| let result = mac.finalize().into_bytes(); | |
| let expected_signature = hex::encode(result); | |
| expected_signature == signature | |
| } | |
| pub async fn execute_webhook_payment( | |
| pool: &DbPool, | |
| tx_sender: &Sender<RealtimeEvent>, | |
| rzp_order_id: &str, | |
| payment_id: &str, | |
| amount_paise: u64, | |
| upi_vpa: Option<&str>, | |
| ) -> AppResult<()> { | |
| use crate::domain::constants::*; | |
| let mut tx = pool.begin().await.map_err(crate::domain::error::AppError::Database)?; | |
| let order = sqlx::query_as::<_, crate::domain::models::OrderRecord>( | |
| "SELECT transaction_id, merchant_id, link_id, buyer_phone, buyer_phone_hash, buyer_name, buyer_email, shipping_pincode, delivery_address, price_inr, status, vpa, outbound_weight, return_weight, proof_data, proof_received_at, settled_at, paid_at, shipped_at, delivered_at, shipping_method, estimated_delivery_at, payu_id, is_payment, platform_fee_paid, platform_fee, delivery_fee, distance_km, risk_score, risk_flags, cgst, sgst, igst, utr_number, platform_fee_utr, delivery_gps_lat, delivery_gps_lng, is_geofence_verified, pincode_volatility_at_checkout, discount_amount, coupon_code, checkout_gps_lat, checkout_gps_lng, device_fingerprint, created_at FROM orders WHERE payu_id = $1 FOR UPDATE" | |
| ) | |
| .bind(rzp_order_id) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| let order = match order { | |
| Some(o) => { | |
| let mut o_mut = o; | |
| o_mut.decrypt_pii(); | |
| if o_mut.vpa.as_deref() == Some("") { | |
| o_mut.vpa = None; | |
| } | |
| o_mut | |
| } | |
| None => { | |
| tx.rollback().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::NotFound(format!("Order with payu_id {} not found", rzp_order_id))); | |
| } | |
| }; | |
| let txnid = &order.transaction_id; | |
| // Verify Payment Amount to prevent underpayment fraud | |
| let expected_amount_paise = ((order.price_inr + order.delivery_fee + order.cgst + order.sgst + order.igst) * 100.0).round() as u64; | |
| if amount_paise != expected_amount_paise { | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(txnid), | |
| &order.merchant_id, | |
| "PAYMENT_AMOUNT_MISMATCH", | |
| "CRITICAL", | |
| Some(&format!( | |
| "Payment Mismatch via Webhook: Razorpay reported paid amount of {} paise, but expected {} paise. Transaction blocked.", | |
| amount_paise, expected_amount_paise | |
| )), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(txnid) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::Forbidden( | |
| "Payment amount mismatch detected. Transaction blocked.".to_string(), | |
| )); | |
| } | |
| if order.status == ORDER_STATUS_PAID_PENDING_DELIVERY | |
| || order.status == ORDER_STATUS_DELIVERED_PENDING_APPROVAL | |
| || order.status == ORDER_STATUS_SETTLED | |
| { | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Ok(()); | |
| } | |
| let is_payment = true; | |
| let duplicate_payment = sqlx::query( | |
| "SELECT transaction_id FROM orders WHERE payu_id = $1 AND transaction_id != $2" | |
| ) | |
| .bind(payment_id) | |
| .bind(txnid) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| if let Some(row) = duplicate_payment { | |
| use sqlx::Row; | |
| let existing_txn = row.get::<String, _>("transaction_id"); | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(txnid), | |
| &order.merchant_id, | |
| "PAYMENT_REPLAY_ATTACK", | |
| "CRITICAL", | |
| Some(&format!( | |
| "Replay Attack: Razorpay Payment ID {} was already spent on transaction {}. Denied double-fulfillment.", | |
| payment_id, existing_txn | |
| )), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| sqlx::query("UPDATE orders SET status = $1 WHERE transaction_id = $2") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(txnid) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Err(crate::domain::error::AppError::Forbidden( | |
| "Double spend / payment replay detected".to_string(), | |
| )); | |
| } | |
| if let Some(vpa) = upi_vpa { | |
| if !vpa.is_empty() { | |
| let active_payment = sqlx::query( | |
| "SELECT transaction_id FROM orders WHERE vpa = $1 AND status = $2 AND transaction_id != $3" | |
| ) | |
| .bind(vpa) | |
| .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) | |
| .bind(txnid) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| if let Some(row) = active_payment { | |
| use sqlx::Row; | |
| let existing_txn = row.get::<String, _>("transaction_id"); | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(txnid), | |
| &order.merchant_id, | |
| "VPA_SINGLETON_VIOLATION", | |
| "CRITICAL", | |
| Some(&format!("VPA {} already has an active payment ({}). Blocked simultaneous fulfillment.", vpa, existing_txn)), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ).await; | |
| sqlx::query("UPDATE orders SET status = $1, vpa = $2 WHERE transaction_id = $3") | |
| .bind(ORDER_STATUS_PAYMENT_FAILED) | |
| .bind(vpa) | |
| .bind(txnid) | |
| .execute(&mut *tx) | |
| .await?; | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| return Ok(()); | |
| } | |
| } | |
| } | |
| let result = sqlx::query( | |
| "UPDATE orders SET status = $1, paid_at = CURRENT_TIMESTAMP, payu_id = $2, vpa = $3, is_payment = $4, platform_fee_paid = $5 WHERE transaction_id = $6 AND status = $7", | |
| ) | |
| .bind(ORDER_STATUS_PAID_PENDING_DELIVERY) | |
| .bind(payment_id) | |
| .bind(upi_vpa.unwrap_or_default()) | |
| .bind(is_payment) | |
| .bind(false) | |
| .bind(txnid) | |
| .bind(ORDER_STATUS_PENDING_PAYMENT) | |
| .execute(&mut *tx) | |
| .await?; | |
| if result.rows_affected() > 0 { | |
| crate::domain::audit::log_risk_event( | |
| &mut tx, | |
| Some(txnid), | |
| &order.merchant_id, | |
| "STATE_TRANSITION", | |
| "LOW", | |
| Some("Payment captured via webhook and order moved to PAID_PENDING_DELIVERY."), | |
| None, | |
| None, | |
| order.device_fingerprint.as_deref(), | |
| Some(tx_sender), | |
| ) | |
| .await; | |
| let _ = crate::application::services::payment::write_mock_confirmation_email(pool, &order).await; | |
| let _ = tx_sender.send(crate::interfaces::http::api::RealtimeEvent::NewOrder { | |
| transaction_id: txnid.clone(), | |
| merchant_id: order.merchant_id.clone(), | |
| amount: order.price_inr, | |
| buyer_phone: order.buyer_phone.clone(), | |
| }); | |
| let _ = tx_sender.send( | |
| crate::interfaces::http::api::RealtimeEvent::OrderStatusChanged { | |
| transaction_id: txnid.clone(), | |
| merchant_id: order.merchant_id, | |
| new_status: ORDER_STATUS_PAID_PENDING_DELIVERY.to_string(), | |
| }, | |
| ); | |
| } | |
| tx.commit().await.map_err(crate::domain::error::AppError::Database)?; | |
| Ok(()) | |
| } | |