use crate::domain::error::{AppError, AppResult}; use crate::infrastructure::repositories::{IdempotencyRecord, IdempotencyRepository}; use async_trait::async_trait; use dashmap::DashMap; use once_cell::sync::Lazy; use sha2::{Digest, Sha256}; use std::sync::Arc; use std::time::{Duration, Instant}; #[derive(Clone)] struct CachedIdempotency { response_data: Option, request_hash: String, status: String, expires_at: Instant, } static IDEMPOTENCY_CACHE: Lazy> = Lazy::new(DashMap::new); const MAX_IDEMPOTENCY_ENTRIES: usize = 50_000; fn prune_idempotency_cache() { let now = Instant::now(); IDEMPOTENCY_CACHE.retain(|_, cached| cached.expires_at > now); if IDEMPOTENCY_CACHE.len() > MAX_IDEMPOTENCY_ENTRIES { let mut keys_to_remove = Vec::new(); for entry in IDEMPOTENCY_CACHE.iter() { keys_to_remove.push(entry.key().clone()); if keys_to_remove.len() >= MAX_IDEMPOTENCY_ENTRIES / 2 { break; } } for k in keys_to_remove { IDEMPOTENCY_CACHE.remove(&k); } } } #[async_trait] pub trait IdempotencyService: Send + Sync { async fn check_idempotency( &self, key: &str, merchant_id: &str, action_scope: &str, request_body: &[u8], ) -> AppResult; async fn complete_idempotency( &self, key: &str, merchant_id: &str, action_scope: &str, response_json: &str, ) -> AppResult<()>; } pub enum IdempotencyStatus { New, InProgress, Completed(String), } pub struct RtixIdempotencyService { repo: Arc, } impl RtixIdempotencyService { pub fn new(repo: Arc) -> Self { Self { repo } } fn calculate_hash(data: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(data); format!("{:x}", hasher.finalize()) } } #[async_trait] impl IdempotencyService for RtixIdempotencyService { async fn check_idempotency( &self, key: &str, merchant_id: &str, action_scope: &str, request_body: &[u8], ) -> AppResult { let request_hash = Self::calculate_hash(request_body); let cache_key = format!("{}:{}:{}", key, merchant_id, action_scope); // 1. Check Cache if let Some(cached) = IDEMPOTENCY_CACHE.get(&cache_key) { if cached.expires_at > Instant::now() { if cached.request_hash != request_hash { return Err(AppError::Conflict( "Idempotency key reused with different request payload".to_string(), )); } return match cached.status.as_str() { "COMPLETED" => Ok(IdempotencyStatus::Completed( cached.response_data.clone().unwrap_or_default(), )), "IN_PROGRESS" => Ok(IdempotencyStatus::InProgress), _ => Ok(IdempotencyStatus::New), }; } } // 2. Check Repository if let Some(record) = self.repo.get_record(key, merchant_id, action_scope).await? { if record.request_hash != request_hash { return Err(AppError::Conflict( "Idempotency key reused with different request payload".to_string(), )); } prune_idempotency_cache(); IDEMPOTENCY_CACHE.insert( cache_key, CachedIdempotency { response_data: record.response_data.clone(), request_hash: record.request_hash.clone(), status: record.status.clone(), expires_at: Instant::now() + Duration::from_secs(3600), }, ); return match record.status.as_str() { "COMPLETED" => Ok(IdempotencyStatus::Completed( record.response_data.unwrap_or_default(), )), "IN_PROGRESS" => Ok(IdempotencyStatus::InProgress), _ => Ok(IdempotencyStatus::New), }; } // Create new record let record = IdempotencyRecord { key: key.to_string(), merchant_id: merchant_id.to_string(), action_scope: action_scope.to_string(), request_hash: request_hash.clone(), response_data: None, status: "IN_PROGRESS".to_string(), }; self.repo.save_record(&record).await?; prune_idempotency_cache(); IDEMPOTENCY_CACHE.insert( cache_key, CachedIdempotency { response_data: None, request_hash, status: "IN_PROGRESS".to_string(), expires_at: Instant::now() + Duration::from_secs(3600), }, ); Ok(IdempotencyStatus::New) } async fn complete_idempotency( &self, key: &str, merchant_id: &str, action_scope: &str, response_json: &str, ) -> AppResult<()> { self.repo .update_response(key, merchant_id, action_scope, response_json, "COMPLETED") .await?; let cache_key = format!("{}:{}:{}", key, merchant_id, action_scope); if let Some(mut cached) = IDEMPOTENCY_CACHE.get_mut(&cache_key) { cached.status = "COMPLETED".to_string(); cached.response_data = Some(response_json.to_string()); cached.expires_at = Instant::now() + Duration::from_secs(3600); } else { if let Ok(Some(record)) = self.repo.get_record(key, merchant_id, action_scope).await { prune_idempotency_cache(); IDEMPOTENCY_CACHE.insert( cache_key, CachedIdempotency { response_data: Some(response_json.to_string()), request_hash: record.request_hash, status: "COMPLETED".to_string(), expires_at: Instant::now() + Duration::from_secs(3600), }, ); } } Ok(()) } }