Spaces:
Running
Running
| use crate::domain::error::{AppError, AppResult}; | |
| use crate::infrastructure::repositories::{IdempotencyRecord, IdempotencyRepository}; | |
| use async_trait::async_trait; | |
| use dashmap::DashMap; | |
| use once_cell::sync::Lazy; | |
| use sha2::{Digest, Sha256}; | |
| use std::sync::Arc; | |
| use std::time::{Duration, Instant}; | |
| struct CachedIdempotency { | |
| response_data: Option<String>, | |
| request_hash: String, | |
| status: String, | |
| expires_at: Instant, | |
| } | |
| static IDEMPOTENCY_CACHE: Lazy<DashMap<String, CachedIdempotency>> = Lazy::new(DashMap::new); | |
| const MAX_IDEMPOTENCY_ENTRIES: usize = 50_000; | |
| fn prune_idempotency_cache() { | |
| let now = Instant::now(); | |
| IDEMPOTENCY_CACHE.retain(|_, cached| cached.expires_at > now); | |
| if IDEMPOTENCY_CACHE.len() > MAX_IDEMPOTENCY_ENTRIES { | |
| let mut keys_to_remove = Vec::new(); | |
| for entry in IDEMPOTENCY_CACHE.iter() { | |
| keys_to_remove.push(entry.key().clone()); | |
| if keys_to_remove.len() >= MAX_IDEMPOTENCY_ENTRIES / 2 { | |
| break; | |
| } | |
| } | |
| for k in keys_to_remove { | |
| IDEMPOTENCY_CACHE.remove(&k); | |
| } | |
| } | |
| } | |
| pub trait IdempotencyService: Send + Sync { | |
| async fn check_idempotency( | |
| &self, | |
| key: &str, | |
| merchant_id: &str, | |
| action_scope: &str, | |
| request_body: &[u8], | |
| ) -> AppResult<IdempotencyStatus>; | |
| async fn complete_idempotency( | |
| &self, | |
| key: &str, | |
| merchant_id: &str, | |
| action_scope: &str, | |
| response_json: &str, | |
| ) -> AppResult<()>; | |
| } | |
| pub enum IdempotencyStatus { | |
| New, | |
| InProgress, | |
| Completed(String), | |
| } | |
| pub struct RtixIdempotencyService { | |
| repo: Arc<dyn IdempotencyRepository>, | |
| } | |
| impl RtixIdempotencyService { | |
| pub fn new(repo: Arc<dyn IdempotencyRepository>) -> Self { | |
| Self { repo } | |
| } | |
| fn calculate_hash(data: &[u8]) -> String { | |
| let mut hasher = Sha256::new(); | |
| hasher.update(data); | |
| format!("{:x}", hasher.finalize()) | |
| } | |
| } | |
| impl IdempotencyService for RtixIdempotencyService { | |
| async fn check_idempotency( | |
| &self, | |
| key: &str, | |
| merchant_id: &str, | |
| action_scope: &str, | |
| request_body: &[u8], | |
| ) -> AppResult<IdempotencyStatus> { | |
| let request_hash = Self::calculate_hash(request_body); | |
| let cache_key = format!("{}:{}:{}", key, merchant_id, action_scope); | |
| // 1. Check Cache | |
| if let Some(cached) = IDEMPOTENCY_CACHE.get(&cache_key) { | |
| if cached.expires_at > Instant::now() { | |
| if cached.request_hash != request_hash { | |
| return Err(AppError::Conflict( | |
| "Idempotency key reused with different request payload".to_string(), | |
| )); | |
| } | |
| return match cached.status.as_str() { | |
| "COMPLETED" => Ok(IdempotencyStatus::Completed( | |
| cached.response_data.clone().unwrap_or_default(), | |
| )), | |
| "IN_PROGRESS" => Ok(IdempotencyStatus::InProgress), | |
| _ => Ok(IdempotencyStatus::New), | |
| }; | |
| } | |
| } | |
| // 2. Check Repository | |
| if let Some(record) = self.repo.get_record(key, merchant_id, action_scope).await? { | |
| if record.request_hash != request_hash { | |
| return Err(AppError::Conflict( | |
| "Idempotency key reused with different request payload".to_string(), | |
| )); | |
| } | |
| prune_idempotency_cache(); | |
| IDEMPOTENCY_CACHE.insert( | |
| cache_key, | |
| CachedIdempotency { | |
| response_data: record.response_data.clone(), | |
| request_hash: record.request_hash.clone(), | |
| status: record.status.clone(), | |
| expires_at: Instant::now() + Duration::from_secs(3600), | |
| }, | |
| ); | |
| return match record.status.as_str() { | |
| "COMPLETED" => Ok(IdempotencyStatus::Completed( | |
| record.response_data.unwrap_or_default(), | |
| )), | |
| "IN_PROGRESS" => Ok(IdempotencyStatus::InProgress), | |
| _ => Ok(IdempotencyStatus::New), | |
| }; | |
| } | |
| // Create new record | |
| let record = IdempotencyRecord { | |
| key: key.to_string(), | |
| merchant_id: merchant_id.to_string(), | |
| action_scope: action_scope.to_string(), | |
| request_hash: request_hash.clone(), | |
| response_data: None, | |
| status: "IN_PROGRESS".to_string(), | |
| }; | |
| self.repo.save_record(&record).await?; | |
| prune_idempotency_cache(); | |
| IDEMPOTENCY_CACHE.insert( | |
| cache_key, | |
| CachedIdempotency { | |
| response_data: None, | |
| request_hash, | |
| status: "IN_PROGRESS".to_string(), | |
| expires_at: Instant::now() + Duration::from_secs(3600), | |
| }, | |
| ); | |
| Ok(IdempotencyStatus::New) | |
| } | |
| async fn complete_idempotency( | |
| &self, | |
| key: &str, | |
| merchant_id: &str, | |
| action_scope: &str, | |
| response_json: &str, | |
| ) -> AppResult<()> { | |
| self.repo | |
| .update_response(key, merchant_id, action_scope, response_json, "COMPLETED") | |
| .await?; | |
| let cache_key = format!("{}:{}:{}", key, merchant_id, action_scope); | |
| if let Some(mut cached) = IDEMPOTENCY_CACHE.get_mut(&cache_key) { | |
| cached.status = "COMPLETED".to_string(); | |
| cached.response_data = Some(response_json.to_string()); | |
| cached.expires_at = Instant::now() + Duration::from_secs(3600); | |
| } else { | |
| if let Ok(Some(record)) = self.repo.get_record(key, merchant_id, action_scope).await { | |
| prune_idempotency_cache(); | |
| IDEMPOTENCY_CACHE.insert( | |
| cache_key, | |
| CachedIdempotency { | |
| response_data: Some(response_json.to_string()), | |
| request_hash: record.request_hash, | |
| status: "COMPLETED".to_string(), | |
| expires_at: Instant::now() + Duration::from_secs(3600), | |
| }, | |
| ); | |
| } | |
| } | |
| Ok(()) | |
| } | |
| } | |