RTIX / src /application /services /idempotency.rs
github-actions
deploy: clean backend production release
d8ffec9
use crate::domain::error::{AppError, AppResult};
use crate::infrastructure::repositories::{IdempotencyRecord, IdempotencyRepository};
use async_trait::async_trait;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use sha2::{Digest, Sha256};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Clone)]
struct CachedIdempotency {
response_data: Option<String>,
request_hash: String,
status: String,
expires_at: Instant,
}
static IDEMPOTENCY_CACHE: Lazy<DashMap<String, CachedIdempotency>> = Lazy::new(DashMap::new);
const MAX_IDEMPOTENCY_ENTRIES: usize = 50_000;
fn prune_idempotency_cache() {
let now = Instant::now();
IDEMPOTENCY_CACHE.retain(|_, cached| cached.expires_at > now);
if IDEMPOTENCY_CACHE.len() > MAX_IDEMPOTENCY_ENTRIES {
let mut keys_to_remove = Vec::new();
for entry in IDEMPOTENCY_CACHE.iter() {
keys_to_remove.push(entry.key().clone());
if keys_to_remove.len() >= MAX_IDEMPOTENCY_ENTRIES / 2 {
break;
}
}
for k in keys_to_remove {
IDEMPOTENCY_CACHE.remove(&k);
}
}
}
#[async_trait]
pub trait IdempotencyService: Send + Sync {
async fn check_idempotency(
&self,
key: &str,
merchant_id: &str,
action_scope: &str,
request_body: &[u8],
) -> AppResult<IdempotencyStatus>;
async fn complete_idempotency(
&self,
key: &str,
merchant_id: &str,
action_scope: &str,
response_json: &str,
) -> AppResult<()>;
}
pub enum IdempotencyStatus {
New,
InProgress,
Completed(String),
}
pub struct RtixIdempotencyService {
repo: Arc<dyn IdempotencyRepository>,
}
impl RtixIdempotencyService {
pub fn new(repo: Arc<dyn IdempotencyRepository>) -> Self {
Self { repo }
}
fn calculate_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
}
#[async_trait]
impl IdempotencyService for RtixIdempotencyService {
async fn check_idempotency(
&self,
key: &str,
merchant_id: &str,
action_scope: &str,
request_body: &[u8],
) -> AppResult<IdempotencyStatus> {
let request_hash = Self::calculate_hash(request_body);
let cache_key = format!("{}:{}:{}", key, merchant_id, action_scope);
// 1. Check Cache
if let Some(cached) = IDEMPOTENCY_CACHE.get(&cache_key) {
if cached.expires_at > Instant::now() {
if cached.request_hash != request_hash {
return Err(AppError::Conflict(
"Idempotency key reused with different request payload".to_string(),
));
}
return match cached.status.as_str() {
"COMPLETED" => Ok(IdempotencyStatus::Completed(
cached.response_data.clone().unwrap_or_default(),
)),
"IN_PROGRESS" => Ok(IdempotencyStatus::InProgress),
_ => Ok(IdempotencyStatus::New),
};
}
}
// 2. Check Repository
if let Some(record) = self.repo.get_record(key, merchant_id, action_scope).await? {
if record.request_hash != request_hash {
return Err(AppError::Conflict(
"Idempotency key reused with different request payload".to_string(),
));
}
prune_idempotency_cache();
IDEMPOTENCY_CACHE.insert(
cache_key,
CachedIdempotency {
response_data: record.response_data.clone(),
request_hash: record.request_hash.clone(),
status: record.status.clone(),
expires_at: Instant::now() + Duration::from_secs(3600),
},
);
return match record.status.as_str() {
"COMPLETED" => Ok(IdempotencyStatus::Completed(
record.response_data.unwrap_or_default(),
)),
"IN_PROGRESS" => Ok(IdempotencyStatus::InProgress),
_ => Ok(IdempotencyStatus::New),
};
}
// Create new record
let record = IdempotencyRecord {
key: key.to_string(),
merchant_id: merchant_id.to_string(),
action_scope: action_scope.to_string(),
request_hash: request_hash.clone(),
response_data: None,
status: "IN_PROGRESS".to_string(),
};
self.repo.save_record(&record).await?;
prune_idempotency_cache();
IDEMPOTENCY_CACHE.insert(
cache_key,
CachedIdempotency {
response_data: None,
request_hash,
status: "IN_PROGRESS".to_string(),
expires_at: Instant::now() + Duration::from_secs(3600),
},
);
Ok(IdempotencyStatus::New)
}
async fn complete_idempotency(
&self,
key: &str,
merchant_id: &str,
action_scope: &str,
response_json: &str,
) -> AppResult<()> {
self.repo
.update_response(key, merchant_id, action_scope, response_json, "COMPLETED")
.await?;
let cache_key = format!("{}:{}:{}", key, merchant_id, action_scope);
if let Some(mut cached) = IDEMPOTENCY_CACHE.get_mut(&cache_key) {
cached.status = "COMPLETED".to_string();
cached.response_data = Some(response_json.to_string());
cached.expires_at = Instant::now() + Duration::from_secs(3600);
} else {
if let Ok(Some(record)) = self.repo.get_record(key, merchant_id, action_scope).await {
prune_idempotency_cache();
IDEMPOTENCY_CACHE.insert(
cache_key,
CachedIdempotency {
response_data: Some(response_json.to_string()),
request_hash: record.request_hash,
status: "COMPLETED".to_string(),
expires_at: Instant::now() + Duration::from_secs(3600),
},
);
}
}
Ok(())
}
}