use crate::domain::error::{AppError, AppResult}; use crate::domain::models::OrderRecord; use crate::infrastructure::db::DbPool; use crate::infrastructure::repositories::{MerchantRepository, OrderRepository, ProductRepository}; use crate::infrastructure::storage::assets::AssetProvider; use crate::interfaces::http::api::RealtimeEvent; use std::sync::Arc; use tokio::sync::broadcast::Sender; use uuid::Uuid; #[allow(clippy::too_many_arguments)] pub async fn execute_checkout_helper( product_repo: &Arc, merchant_repo: &Arc, order_repo: &Arc, pool: &DbPool, tx_sender: &Sender, link_id: &str, buyer_phone: &str, buyer_name: &str, buyer_email: &str, shipping_pincode: &str, delivery_address: &str, coupon_code: Option, request_id: Option, client_ip: &str, lat: Option, lng: Option, device_fingerprint: Option, ) -> AppResult { let product = product_repo.find_by_id(link_id).await?; let mut product = product.ok_or_else(|| AppError::NotFound("Product link not found".to_string()))?; let _ = product_repo.increment_views(link_id).await; product.image_data = crate::core::utils::hydrate_file_to_base64(product.image_data).await; let merchant = merchant_repo .find_by_id(&product.merchant_id) .await? .ok_or_else(|| AppError::NotFound("Merchant not found".to_string()))?; if merchant.is_frozen { return Err(AppError::Forbidden("Merchant account is frozen due to unpaid outstanding invoices.".to_string())); } // 1. Secure Logistics Circuit Breaker // Check if the pincode has high smart volatility (recent violations) let volatility_count = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM risk_audit_logs WHERE details LIKE $1 AND created_at > NOW() - INTERVAL '1 hour'" ) .bind(format!("%{}%", shipping_pincode)) .fetch_one(order_repo.find_pool()) .await .unwrap_or(0); if volatility_count > 5 { return Err(AppError::Forbidden(format!( "Logistics Circuit Breaker Active for zone {}. High volatility detected in recent smart audit cycles.", shipping_pincode ))); } // 2. Institutional Velocity Guard (Anti-Abuse) let intelligence = crate::application::services::intelligence::IntelligenceService::new(pool.clone()); let velocity_risk = intelligence .evaluate_velocity_risk( device_fingerprint.as_deref(), Some(client_ip), &product.merchant_id, ) .await?; if velocity_risk >= 90.0 { // Log Critical Security Event let _ = sqlx::query( "INSERT INTO risk_audit_logs (merchant_id, event_type, risk_level, details, device_fingerprint) VALUES ($1, $2, $3, $4, $5)" ) .bind(&product.merchant_id) .bind("VELOCITY_BLOCK") .bind("CRITICAL") .bind(format!("Transaction blocked due to high velocity risk ({}). Fingerprint: {:?}, IP: {}", velocity_risk, device_fingerprint, client_ip)) .bind(device_fingerprint.as_deref()) .execute(pool) .await; return Err(AppError::Forbidden( "Security Protocol Active: High-frequency transaction activity detected from this device/network. Access restricted for protocol safety.".to_string() )); } let distance_km = crate::domain::distance::estimate_distance_km(&merchant.base_pincode, shipping_pincode); let pricing_features = crate::application::services::pricing::PricingFeatures { distance_km, user_rate_per_km: merchant.delivery_rate_per_km, product_weight: product.expected_weight, base_charge: merchant.delivery_base_fee, config: serde_json::from_value(merchant.logistics_config.clone()).unwrap_or_default(), }; let delivery_fee = crate::application::services::pricing::PricingEngine::estimate_delivery_fee( pricing_features, ); // 3. Precision Geofence Check let mut geofence_verified = None; if let (Some(l_lat), Some(l_lng)) = (lat, lng) { let intelligence = crate::application::services::intelligence::IntelligenceService::new(pool.clone()); if !intelligence .verify_geofence_with_precision(shipping_pincode, l_lat, l_lng) .await? { return Err(AppError::Forbidden(format!( "Geofence Verification Failed: Your current GPS coordinates do not match the shipping pincode {}. Forensic integrity active.", shipping_pincode ))); } geofence_verified = Some(true); } let order_count = order_repo .count_by_buyer(&product.merchant_id, buyer_phone) .await?; let transaction_id = Uuid::new_v4().to_string(); // 2. Merchant Transaction Limits if merchant.plan == "FREE" && product.price_inr > 10000.0 { return Err(AppError::Forbidden( "Transaction value exceeds the ₹10,000 limit for merchants on the FREE plan. Upgrade to PRO to accept higher-value payments without limitations.".to_string() )); } if product.price_inr > merchant.max_order_value_inr { return Err(AppError::Forbidden(format!( "Transaction value ₹{} exceeds the current limit for this merchant (₹{}). Increase merchant verification level to lift this restriction.", product.price_inr, merchant.max_order_value_inr ))); } // Start Transaction for Atomicity let mut tx = pool.begin().await.map_err(AppError::Database)?; // 3. Inventory Enforcement if !product.is_unlimited { let rows_affected = sqlx::query( "UPDATE product_links SET inventory_count = inventory_count - 1 WHERE link_id = $1 AND inventory_count > 0", ) .bind(&product.link_id) .execute(&mut *tx) .await .map_err(AppError::Database)? .rows_affected(); if rows_affected == 0 { return Err(AppError::BadRequest( "Product is currently out of stock.".to_string(), )); } } let current_price = if let (Some(sale_price), Some(ends_at)) = (product.sale_price_inr, product.sale_ends_at) { if ends_at > chrono::Utc::now().naive_utc() { sale_price } else { product.price_inr } } else { product.price_inr }; let platform_fee = crate::application::services::pricing::PricingEngine::calculate_platform_fee( current_price, merchant.trust_score, ); // Calculate preliminary risk score let mut calculated_risk = 0.0; calculated_risk += (volatility_count as f64 * 5.0).min(30.0); calculated_risk += velocity_risk * 0.4; // Incorporate velocity guard signal if geofence_verified == Some(true) { calculated_risk *= 0.8; // Lower risk if GPS verified } let mut order = OrderRecord { transaction_id: transaction_id.clone(), merchant_id: product.merchant_id.clone(), link_id: link_id.to_string(), buyer_phone: buyer_phone.to_string(), buyer_phone_hash: None, // populated by encrypt_pii() at persist time buyer_name: buyer_name.to_string(), buyer_email: buyer_email.to_string(), shipping_pincode: Some(shipping_pincode.to_string()), delivery_address: Some(delivery_address.to_string()), price_inr: current_price, status: crate::domain::constants::ORDER_STATUS_PENDING_PAYMENT.to_string(), vpa: None, payu_id: String::new(), outbound_weight: product.expected_weight, return_weight: 0.0, proof_data: None, settled_at: None, shipped_at: None, delivered_at: None, shipping_method: None, estimated_delivery_at: None, is_payment: false, platform_fee_paid: false, platform_fee, delivery_fee, distance_km, risk_score: calculated_risk, risk_flags: None, cgst: 0.0, sgst: 0.0, igst: 0.0, utr_number: None, platform_fee_utr: None, delivery_gps_lat: None, delivery_gps_lng: None, is_geofence_verified: geofence_verified, pincode_volatility_at_checkout: 0.0, discount_amount: 0.0, coupon_code: None, checkout_gps_lat: lat, checkout_gps_lng: lng, device_fingerprint: device_fingerprint.clone(), paid_at: None, proof_received_at: None, created_at: None, brand_name: None, }; if let Some(ref code) = coupon_code { let coupon = sqlx::query_as::<_, crate::domain::models::Coupon>( "SELECT * FROM coupons WHERE merchant_id = $1 AND code = $2 AND is_active = TRUE", ) .bind(&product.merchant_id) .bind(code.to_uppercase()) .fetch_optional(&mut *tx) .await?; if let Some(c) = coupon { if c.is_valid(order.price_inr) { let discount = c.calculate_discount(order.price_inr); order.discount_amount = discount; order.price_inr -= discount; order.coupon_code = Some(code.clone()); let _ = sqlx::query("UPDATE coupons SET usage_count = usage_count + 1 WHERE id = $1") .bind(c.id) .execute(&mut *tx) .await; } } } let gst_breakdown = crate::application::services::india_tax::IndiaTaxService::calculate_gst( order.price_inr, merchant.state_code.unwrap_or(29), order.shipping_pincode.as_deref().unwrap_or_default(), 0.18, // 18% standard rate ); order.cgst = gst_breakdown.cgst; order.sgst = gst_breakdown.sgst; order.igst = gst_breakdown.igst; let volatility = crate::application::services::intelligence::IntelligenceService::new(pool.clone()) .get_pincode_volatility(order.shipping_pincode.as_deref().unwrap_or_default()) .await .unwrap_or(0.0); order.pincode_volatility_at_checkout = volatility; if volatility > 0.5 { let _ = tx_sender.send( crate::interfaces::http::api::RealtimeEvent::NetworkVolatilityAlert { pincode: order.shipping_pincode.clone().unwrap_or_default(), volatility_score: volatility, message: format!( "High logistics volatility detected for pincode {}.", order.shipping_pincode.clone().unwrap_or_default() ), }, ); } let (risk_score, risk_flags) = crate::application::services::risk::RiskEngine::calculate_risk_score( &order, order_count, volatility, ); order.risk_score = risk_score; order.risk_flags = Some(risk_flags); if order.risk_score >= 80.0 { // Log Critical/High Security Event outside the transaction so it's persisted even when rolled back if let Ok(mut conn) = pool.acquire().await { crate::domain::audit::log_risk_event( &mut *conn, Some(&transaction_id), &product.merchant_id, "HIGH_RISK_BLOCK", "CRITICAL", Some(&format!( "Transaction blocked due to high risk score ({:.1}) during checkout for link {}. Flags: {:?}", order.risk_score, link_id, order.risk_flags )), Some(order.risk_score), request_id.as_deref(), device_fingerprint.as_deref(), Some(tx_sender), ) .await; } if order.risk_score > 90.0 { crate::interfaces::http::middleware::block_ip_persistently( pool, client_ip, &format!("Automated Defense: High Risk Score ({:.1}) detected during checkout for link {}", order.risk_score, link_id), Some(tx_sender) ).await; } return Err(AppError::Forbidden(format!( "Security restriction: High risk profile detected (Score: {:.1}). This transaction has been blocked to prevent potential fraud.", order.risk_score ))); } else if order.risk_score > 60.0 { crate::domain::audit::log_risk_event( &mut tx, Some(&transaction_id), &product.merchant_id, "HIGH_RISK_ORDER", "HIGH", Some(&format!( "Order {} flagged with risk score {}", transaction_id, order.risk_score )), Some(order.risk_score), request_id.as_deref(), device_fingerprint.as_deref(), Some(tx_sender), ) .await; } // Persist Order using transaction order.created_at = Some(chrono::Utc::now().naive_utc()); crate::domain::models::OrderRecord::create_with_tx(&mut tx, &order).await?; tx.commit().await.map_err(AppError::Database)?; Ok(order) } #[allow(clippy::too_many_arguments)] pub async fn execute_submit_delivery_proof_helper( order_repo: &Arc, assets: &Arc, tx_sender: &Sender, transaction_id: &str, proof_data: &str, proof_token: &str, lat: Option, lng: Option, ) -> AppResult<()> { let transaction_id = crate::domain::validation::sanitize_filename(transaction_id); if crate::core::session::verify_proof_token(proof_token, &transaction_id).is_err() { return Err(AppError::Forbidden( "Invalid proof authorization token".to_string(), )); } let order = order_repo.find_by_id(&transaction_id).await?; let order = order.ok_or_else(|| AppError::NotFound("Order not found".to_string()))?; if order.status != crate::domain::constants::ORDER_STATUS_PAID_PENDING_DELIVERY { return Err(AppError::BadRequest( "Order is not in a state to accept delivery proof".to_string(), )); } let mut tx = order_repo .find_pool() .begin() .await .map_err(AppError::Database)?; let is_video = proof_data.contains("video") && proof_data.contains("mp4"); let is_png = proof_data.contains("image/png"); let file_extension = if is_video { "mp4" } else if is_png { "png" } else { "jpg" }; let filename = format!("proof_{}.{}", Uuid::new_v4(), file_extension); let bytes = crate::domain::validation::validate_base64_payload(proof_data, 10 * 1024 * 1024) .map_err(|e| AppError::BadRequest(e.message))?; // Institutional Enforcement: Require Video for High-Value Orders (> ₹5,000) if order.price_inr > 5000.0 && !is_video { return Err(AppError::BadRequest( "High-value order detected. Smart video proof is mandatory for this transaction." .to_string(), )); } if !is_video { let allowed_headers: [Vec; 2] = [ vec![0xFF, 0xD8, 0xFF], vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A], ]; if !allowed_headers .iter() .any(|header| bytes.starts_with(header)) { return Err(AppError::BadRequest("Invalid image format".to_string())); } } let merchant_plan: String = sqlx::query_scalar("SELECT plan FROM merchants WHERE merchant_id = $1") .bind(&order.merchant_id) .fetch_one(&mut *tx) .await .map_err(AppError::Database)?; let lat_val = lat.unwrap_or(0.0); let lng_val = lng.unwrap_or(0.0); let mut zk_proof = crate::core::crypto::CryptoService::generate_zk_telemetry_proof( &transaction_id, &bytes, lat_val, lng_val, ); if merchant_plan == "PRO" { let asset_path = assets .store_asset(&filename, &bytes) .await .map_err(AppError::Internal)?; if let Some(obj) = zk_proof.as_object_mut() { obj.insert("is_zero_storage".to_string(), serde_json::json!(false)); obj.insert("asset_path".to_string(), serde_json::json!(asset_path)); } } let mut is_geofence_verified = None; if let (Some(l_lat), Some(l_lng), Some(pincode)) = (lat, lng, &order.shipping_pincode) { let (p_lat, p_lng) = crate::domain::geofence::GeofenceService::get_coordinates(pincode) .unwrap_or((12.9716, 77.5946)); // Default to Bangalore Central let distance = crate::domain::geofence::GeofenceService::calculate_distance_km( l_lat, l_lng, p_lat, p_lng, ); is_geofence_verified = Some(distance < 5.0); // 5km tolerance if !is_geofence_verified.unwrap_or(false) { crate::domain::audit::log_risk_event( &mut tx, Some(&transaction_id), &order.merchant_id, "GEOFENCE_VIOLATION", "MEDIUM", Some(&format!( "Delivery proof submitted from {} km away from shipping pincode {}.", distance.round(), pincode )), Some(distance), None, order.device_fingerprint.as_deref(), Some(tx_sender), ) .await; // Trust Score Penalty for logistics deviations let _ = sqlx::query("UPDATE merchants SET trust_score = GREATEST(0.0, trust_score - 2.0) WHERE merchant_id = $1") .bind(&order.merchant_id) .execute(&mut *tx) .await; } } sqlx::query( "UPDATE orders SET proof_data = proof_data || $1::jsonb, status = $2, delivered_at = CURRENT_TIMESTAMP, delivery_gps_lat = $3, delivery_gps_lng = $4, is_geofence_verified = $5 WHERE transaction_id = $6 AND status = $7", ) .bind(serde_json::json!([zk_proof])) .bind(crate::domain::constants::ORDER_STATUS_DELIVERED_PENDING_APPROVAL) .bind(lat) .bind(lng) .bind(is_geofence_verified) .bind(&transaction_id) .bind(crate::domain::constants::ORDER_STATUS_PAID_PENDING_DELIVERY) .execute(&mut *tx) .await .map_err(AppError::Database)?; tx.commit().await.map_err(AppError::Database)?; let _ = tx_sender.send( crate::interfaces::http::api::RealtimeEvent::OrderStatusChanged { transaction_id: transaction_id.to_string(), merchant_id: order.merchant_id, new_status: crate::domain::constants::ORDER_STATUS_DELIVERED_PENDING_APPROVAL .to_string(), }, ); Ok(()) } #[allow(clippy::too_many_arguments)] pub async fn execute_cart_checkout_helper( product_repo: &Arc, merchant_repo: &Arc, _order_repo: &Arc, pool: &crate::infrastructure::db::DbPool, tx_sender: &tokio::sync::broadcast::Sender, items: Vec<(String, u32)>, buyer_phone: &str, buyer_name: &str, buyer_email: &str, shipping_pincode: &str, delivery_address: &str, coupon_code: Option, _request_id: Option, client_ip: &str, lat: Option, lng: Option, device_fingerprint: Option, ) -> AppResult { let transaction_id = format!( "TX_{}", uuid::Uuid::new_v4().to_string()[..8].to_uppercase() ); let mut total_price = 0.0; let mut total_weight = 0.0; let mut first_merchant_id = String::new(); let mut product_details = Vec::new(); for (link_id, qty) in &items { let product = product_repo .find_by_id(link_id) .await? .ok_or_else(|| AppError::NotFound(format!("Product {} not found", link_id)))?; if first_merchant_id.is_empty() { first_merchant_id = product.merchant_id.clone(); } else if first_merchant_id != product.merchant_id { return Err(AppError::BadRequest( "Cross-merchant checkout not allowed in single cart".into(), )); } let current_price = if let (Some(sale_price), Some(ends_at)) = (product.sale_price_inr, product.sale_ends_at) { if ends_at > chrono::Utc::now().naive_utc() { sale_price } else { product.price_inr } } else { product.price_inr }; total_price += current_price * (*qty as f64); total_weight += product.expected_weight * (*qty as f64); // Inventory Enforcement for Cart (Preliminary Check) if !product.is_unlimited && product.inventory_count < *qty as i32 { return Err(AppError::BadRequest(format!( "Product '{}' is low on stock ({} available).", product.product_name, product.inventory_count ))); } product_details.push((product, *qty)); } if first_merchant_id.is_empty() { return Err(AppError::BadRequest("Cart is empty".into())); } let merchant = merchant_repo .find_by_id(&first_merchant_id) .await? .ok_or_else(|| AppError::NotFound("Merchant not found".into()))?; if merchant.is_frozen { return Err(AppError::Forbidden("Merchant account is frozen due to unpaid outstanding invoices.".to_string())); } // 2. Institutional Velocity Guard (Anti-Abuse) let intelligence = crate::application::services::intelligence::IntelligenceService::new(pool.clone()); let velocity_risk = intelligence .evaluate_velocity_risk( device_fingerprint.as_deref(), Some(client_ip), &first_merchant_id, ) .await?; if velocity_risk >= 90.0 { return Err(AppError::Forbidden( "Security Protocol Active: High-frequency transaction activity detected from this device/network. Access restricted for protocol safety.".to_string() )); } let distance_km = crate::domain::distance::estimate_distance_km(&merchant.base_pincode, shipping_pincode); let pricing_features = crate::application::services::pricing::PricingFeatures { distance_km, user_rate_per_km: merchant.delivery_rate_per_km, product_weight: total_weight, base_charge: merchant.delivery_base_fee, config: serde_json::from_value(merchant.logistics_config.clone()).unwrap_or_default(), }; let delivery_fee = crate::application::services::pricing::PricingEngine::estimate_delivery_fee( pricing_features, ); // Precision Geofence Check let mut geofence_verified = None; if let (Some(l_lat), Some(l_lng)) = (lat, lng) { let intelligence = crate::application::services::intelligence::IntelligenceService::new(pool.clone()); if !intelligence .verify_geofence_with_precision(shipping_pincode, l_lat, l_lng) .await? { return Err(AppError::Forbidden(format!( "Geofence Verification Failed: Your current GPS coordinates do not match the shipping pincode {}. Forensic integrity active.", shipping_pincode ))); } geofence_verified = Some(true); } let platform_fee = crate::application::services::pricing::PricingEngine::calculate_platform_fee( total_price, merchant.trust_score, ); let gst_breakdown = crate::application::services::india_tax::IndiaTaxService::calculate_gst( total_price, merchant.state_code.unwrap_or(29), shipping_pincode, 0.18, ); let volatility = crate::application::services::intelligence::IntelligenceService::new(pool.clone()) .get_pincode_volatility(shipping_pincode) .await .unwrap_or(0.0); // Calculate preliminary risk score let mut calculated_risk = 0.0; calculated_risk += (volatility * 50.0).min(30.0); calculated_risk += velocity_risk * 0.4; if geofence_verified == Some(true) { calculated_risk *= 0.8; } let mut order = crate::domain::models::OrderRecord { transaction_id: transaction_id.clone(), merchant_id: first_merchant_id.clone(), link_id: "CART_TRANSACTION".into(), buyer_phone: buyer_phone.to_string(), buyer_phone_hash: None, // populated by encrypt_pii() at persist time buyer_name: buyer_name.to_string(), buyer_email: buyer_email.to_string(), shipping_pincode: Some(shipping_pincode.to_string()), delivery_address: Some(delivery_address.to_string()), price_inr: total_price, status: crate::domain::constants::ORDER_STATUS_PENDING_PAYMENT.to_string(), vpa: Some(String::new()), outbound_weight: total_weight, return_weight: 0.0, proof_data: Some(serde_json::json!([])), settled_at: None, shipped_at: None, delivered_at: None, shipping_method: None, estimated_delivery_at: None, payu_id: String::new(), is_payment: false, platform_fee_paid: false, platform_fee, delivery_fee, distance_km, risk_score: calculated_risk, risk_flags: None, cgst: gst_breakdown.cgst, sgst: gst_breakdown.sgst, igst: gst_breakdown.igst, utr_number: None, platform_fee_utr: None, delivery_gps_lat: None, delivery_gps_lng: None, is_geofence_verified: geofence_verified, pincode_volatility_at_checkout: volatility, discount_amount: 0.0, coupon_code: None, checkout_gps_lat: lat, checkout_gps_lng: lng, device_fingerprint: device_fingerprint.clone(), paid_at: None, proof_received_at: None, created_at: None, brand_name: None, }; let mut tx = pool.begin().await.map_err(AppError::Database)?; // Enforce cart inventory atomically within the transaction for (p, qty) in &product_details { if !p.is_unlimited { let rows_affected = sqlx::query( "UPDATE product_links SET inventory_count = inventory_count - $1 WHERE link_id = $2 AND inventory_count >= $3", ) .bind(*qty as i32) .bind(&p.link_id) .bind(*qty as i32) .execute(&mut *tx) .await .map_err(AppError::Database)? .rows_affected(); if rows_affected == 0 { return Err(AppError::BadRequest(format!( "Product '{}' went out of stock or has insufficient quantity.", p.product_name ))); } } } if let Some(ref code) = coupon_code { let coupon = sqlx::query_as::<_, crate::domain::models::Coupon>( "SELECT * FROM coupons WHERE merchant_id = $1 AND code = $2 AND is_active = TRUE", ) .bind(&first_merchant_id) .bind(code.to_uppercase()) .fetch_optional(&mut *tx) .await?; if let Some(c) = coupon { if c.is_valid(order.price_inr) { let discount = c.calculate_discount(order.price_inr); order.discount_amount = discount; order.price_inr -= discount; order.coupon_code = Some(code.clone()); let _ = sqlx::query("UPDATE coupons SET usage_count = usage_count + 1 WHERE id = $1") .bind(c.id) .execute(&mut *tx) .await; } } } let (risk_score, risk_flags) = crate::application::services::risk::RiskEngine::calculate_risk_score(&order, 0, volatility); order.risk_score = risk_score; order.risk_flags = Some(risk_flags); if order.risk_score >= 80.0 { // Log Critical/High Security Event outside the transaction so it's persisted even when rolled back if let Ok(mut conn) = pool.acquire().await { crate::domain::audit::log_risk_event( &mut *conn, Some(&transaction_id), &first_merchant_id, "HIGH_RISK_BLOCK", "CRITICAL", Some(&format!( "Cart transaction blocked due to high risk score ({:.1}) during checkout. Flags: {:?}", order.risk_score, order.risk_flags )), Some(order.risk_score), None, device_fingerprint.as_deref(), Some(tx_sender), ) .await; } if order.risk_score > 90.0 { crate::interfaces::http::middleware::block_ip_persistently( pool, client_ip, &format!( "Automated Defense: High Risk Score ({:.1}) detected during cart checkout", order.risk_score ), Some(tx_sender), ) .await; } return Err(AppError::Forbidden(format!( "Security restriction: High risk profile detected (Score: {:.1}). This transaction has been blocked to prevent potential fraud.", order.risk_score ))); } else if order.risk_score > 60.0 { crate::domain::audit::log_risk_event( &mut tx, Some(&transaction_id), &first_merchant_id, "HIGH_RISK_ORDER", "HIGH", Some(&format!( "Cart order {} flagged with risk score {}", transaction_id, order.risk_score )), Some(order.risk_score), None, device_fingerprint.as_deref(), Some(tx_sender), ) .await; } // Persist Order order.created_at = Some(chrono::Utc::now().naive_utc()); crate::domain::models::OrderRecord::create_with_tx(&mut tx, &order).await?; // Persist Order Items for (p, qty) in product_details { sqlx::query( "INSERT INTO order_items (transaction_id, product_id, product_name, quantity, price_at_checkout, weight_at_checkout) VALUES ($1, $2, $3, $4, $5, $6)" ) .bind(&transaction_id) .bind(&p.link_id) .bind(&p.product_name) .bind(qty as i32) .bind(p.price_inr) .bind(p.expected_weight) .execute(&mut *tx) .await .map_err(AppError::Database)?; } tx.commit().await.map_err(AppError::Database)?; let _ = tx_sender.send(crate::interfaces::http::api::RealtimeEvent::NewOrder { transaction_id: transaction_id.clone(), merchant_id: order.merchant_id.clone(), amount: total_price, buyer_phone: buyer_phone.to_string(), }); Ok(order) }