RTIX / src /application /services /subscription.rs
github-actions
deploy: clean backend production release
bad8640
use crate::domain::error::{AppError, AppResult};
use crate::domain::models::{
CreatePlanRequest, CreateSubscriptionRequest, Subscription, SubscriptionBillingEvent,
SubscriptionPlan, SubscriptionSummary,
};
use crate::infrastructure::db::DbPool;
use chrono::Utc;
use uuid::Uuid;
pub struct SubscriptionService {
pool: DbPool,
}
impl SubscriptionService {
pub fn new(pool: DbPool) -> Self {
Self { pool }
}
// ─── Plans ────────────────────────────────────────────────────────────────
pub async fn create_plan(
&self,
merchant_id: &str,
req: CreatePlanRequest,
) -> AppResult<SubscriptionPlan> {
if req.price_inr <= 0.0 {
return Err(AppError::Validation("Price must be positive".into()));
}
if req.interval_days <= 0 {
return Err(AppError::Validation("interval_days must be >= 1".into()));
}
let id = Uuid::new_v4().to_string();
let trial_days = req.trial_days.unwrap_or(0);
let plan = sqlx::query_as::<_, SubscriptionPlan>(
r#"INSERT INTO subscription_plans
(id, merchant_id, name, description, price_inr, interval_days, trial_days)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *"#,
)
.bind(&id)
.bind(merchant_id)
.bind(&req.name)
.bind(&req.description)
.bind(req.price_inr)
.bind(req.interval_days)
.bind(trial_days)
.fetch_one(&self.pool)
.await
.map_err(AppError::Database)?;
tracing::info!(merchant_id, plan_id = %id, "Created subscription plan: {}", req.name);
Ok(plan)
}
pub async fn list_plans(&self, merchant_id: &str) -> AppResult<Vec<SubscriptionPlan>> {
sqlx::query_as::<_, SubscriptionPlan>(
"SELECT * FROM subscription_plans WHERE merchant_id = $1 AND is_active = TRUE ORDER BY created_at DESC",
)
.bind(merchant_id)
.fetch_all(&self.pool)
.await
.map_err(AppError::Database)
}
pub async fn deactivate_plan(&self, merchant_id: &str, plan_id: &str) -> AppResult<()> {
let affected = sqlx::query(
"UPDATE subscription_plans SET is_active = FALSE, updated_at = NOW() WHERE id = $1 AND merchant_id = $2",
)
.bind(plan_id)
.bind(merchant_id)
.execute(&self.pool)
.await
.map_err(AppError::Database)?
.rows_affected();
if affected == 0 {
return Err(AppError::NotFound("Plan not found".into()));
}
Ok(())
}
// ─── Subscriptions ───────────────────────────────────────────────────────
pub async fn create_subscription(
&self,
merchant_id: &str,
req: CreateSubscriptionRequest,
) -> AppResult<Subscription> {
// Verify plan belongs to merchant and is active
let plan = sqlx::query_as::<_, SubscriptionPlan>(
"SELECT * FROM subscription_plans WHERE id = $1 AND merchant_id = $2 AND is_active = TRUE",
)
.bind(&req.plan_id)
.bind(merchant_id)
.fetch_optional(&self.pool)
.await
.map_err(AppError::Database)?
.ok_or_else(|| AppError::NotFound("Plan not found or inactive".into()))?;
let id = Uuid::new_v4().to_string();
let now = Utc::now();
let trial_end = now + chrono::Duration::days(plan.trial_days as i64);
let period_end = if plan.trial_days > 0 {
trial_end
} else {
now + chrono::Duration::days(plan.interval_days as i64)
};
let initial_status = if plan.trial_days > 0 {
"TRIAL"
} else {
"ACTIVE"
};
let sub = sqlx::query_as::<_, Subscription>(
r#"INSERT INTO subscriptions
(id, merchant_id, plan_id, subscriber_email, subscriber_phone, subscriber_name,
status, current_period_start, current_period_end, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING *"#,
)
.bind(&id)
.bind(merchant_id)
.bind(&req.plan_id)
.bind(&req.subscriber_email)
.bind(&req.subscriber_phone)
.bind(&req.subscriber_name)
.bind(initial_status)
.bind(now)
.bind(period_end)
.bind(&req.metadata)
.fetch_one(&self.pool)
.await
.map_err(AppError::Database)?;
tracing::info!(
merchant_id, subscription_id = %id,
"New subscription: {} -> plan {}", req.subscriber_email, plan.name
);
Ok(sub)
}
pub async fn list_subscriptions(
&self,
merchant_id: &str,
status_filter: Option<&str>,
) -> AppResult<Vec<Subscription>> {
let subs = if let Some(status) = status_filter {
sqlx::query_as::<_, Subscription>(
"SELECT * FROM subscriptions WHERE merchant_id = $1 AND status = $2 ORDER BY created_at DESC LIMIT 100",
)
.bind(merchant_id)
.bind(status)
.fetch_all(&self.pool)
.await
} else {
sqlx::query_as::<_, Subscription>(
"SELECT * FROM subscriptions WHERE merchant_id = $1 ORDER BY created_at DESC LIMIT 100",
)
.bind(merchant_id)
.fetch_all(&self.pool)
.await
}
.map_err(AppError::Database)?;
Ok(subs)
}
pub async fn cancel_subscription(
&self,
merchant_id: &str,
subscription_id: &str,
reason: Option<String>,
) -> AppResult<Subscription> {
let sub = sqlx::query_as::<_, Subscription>(
r#"UPDATE subscriptions
SET status = 'CANCELLED', cancelled_at = NOW(), cancel_reason = $3, updated_at = NOW()
WHERE id = $1 AND merchant_id = $2 AND status IN ('ACTIVE', 'TRIAL', 'PAST_DUE')
RETURNING *"#,
)
.bind(subscription_id)
.bind(merchant_id)
.bind(&reason)
.fetch_optional(&self.pool)
.await
.map_err(AppError::Database)?
.ok_or_else(|| AppError::NotFound("Subscription not found or already cancelled".into()))?;
tracing::info!(merchant_id, subscription_id, "Subscription cancelled");
Ok(sub)
}
pub async fn get_summary(&self, merchant_id: &str) -> AppResult<SubscriptionSummary> {
use sqlx::Row;
let counts = sqlx::query(
r#"SELECT
COUNT(*) FILTER (WHERE status = 'ACTIVE') as active,
COUNT(*) FILTER (WHERE status = 'TRIAL') as trial,
COUNT(*) FILTER (WHERE status = 'CANCELLED') as cancelled,
COUNT(*) FILTER (WHERE status = 'PAST_DUE') as past_due
FROM subscriptions WHERE merchant_id = $1"#,
)
.bind(merchant_id)
.fetch_one(&self.pool)
.await
.map_err(AppError::Database)?;
// MRR = sum of monthly-normalised active subscription prices
let mrr: f64 = sqlx::query_scalar(
r#"SELECT COALESCE(SUM(p.price_inr * (30.0 / p.interval_days)), 0.0)
FROM subscriptions s
JOIN subscription_plans p ON s.plan_id = p.id
WHERE s.merchant_id = $1 AND s.status = 'ACTIVE'"#,
)
.bind(merchant_id)
.fetch_one(&self.pool)
.await
.map_err(AppError::Database)?;
Ok(SubscriptionSummary {
active_count: counts.get("active"),
trial_count: counts.get("trial"),
cancelled_count: counts.get("cancelled"),
past_due_count: counts.get("past_due"),
mrr_inr: mrr,
arr_inr: mrr * 12.0,
})
}
pub async fn get_billing_history(
&self,
merchant_id: &str,
subscription_id: &str,
) -> AppResult<Vec<SubscriptionBillingEvent>> {
sqlx::query_as::<_, SubscriptionBillingEvent>(
r#"SELECT * FROM subscription_billing_events
WHERE merchant_id = $1 AND subscription_id = $2
ORDER BY billed_at DESC LIMIT 50"#,
)
.bind(merchant_id)
.bind(subscription_id)
.fetch_all(&self.pool)
.await
.map_err(AppError::Database)
}
/// Background engine: find ACTIVE subscriptions whose billing period has expired
/// and mark them PAST_DUE, then record a billing event. In production this
/// would be wired to a UPI AutoPay or payment gateway trigger.
///
/// # Hardening
/// - **Advisory lock**: Uses `pg_try_advisory_lock` keyed on the current billing
/// hour so only one server replica executes the cycle per hour. Returns 0 if
/// another instance has the lock.
/// - **Audit trail**: Inserts a `SubscriptionBillingEvent` row for every
/// transition — `BILLING_FAILED` for ACTIVE→PAST_DUE and `TRIAL_ENDED` for
/// TRIAL→ACTIVE — giving merchants a complete billing history.
pub async fn run_billing_cycle(&self) -> AppResult<u64> {
// ─── Advisory Lock — idempotency across replicas ──────────────────────
// Key: hash of "billing_cycle" + current UTC hour. This ensures only one
// replica runs the cycle per hour even on multi-instance deployments.
let lock_acquired: bool = sqlx::query_scalar(
"SELECT pg_try_advisory_lock(hashtext('billing_cycle_' || to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD-HH24')))"
)
.fetch_one(&self.pool)
.await
.map_err(AppError::Database)?;
if !lock_acquired {
tracing::debug!(
"Billing cycle: advisory lock held by another instance — skipping this tick"
);
return Ok(0);
}
tracing::info!("Billing cycle: advisory lock acquired — running cycle");
// ─── Step 1: ACTIVE → PAST_DUE ───────────────────────────────────────
// Fetch affected rows BEFORE the bulk UPDATE so we can write billing events.
let past_due_candidates: Vec<(String, String, f64)> = sqlx::query_as(
r#"SELECT s.id, s.merchant_id, p.price_inr
FROM subscriptions s
JOIN subscription_plans p ON s.plan_id = p.id
WHERE s.status = 'ACTIVE' AND s.current_period_end < NOW()"#,
)
.fetch_all(&self.pool)
.await
.map_err(AppError::Database)?;
let past_due_count = past_due_candidates.len() as u64;
if past_due_count > 0 {
// Bulk transition
sqlx::query(
"UPDATE subscriptions SET status = 'PAST_DUE', updated_at = NOW() WHERE status = 'ACTIVE' AND current_period_end < NOW()"
)
.execute(&self.pool)
.await
.map_err(AppError::Database)?;
// Write one BILLING_FAILED event per affected subscription
for (sub_id, merchant_id, price_inr) in &past_due_candidates {
let event_id = Uuid::new_v4().to_string();
if let Err(e) = sqlx::query(
r#"INSERT INTO subscription_billing_events
(id, subscription_id, merchant_id, amount_inr, status, failure_reason)
VALUES ($1, $2, $3, $4, 'FAILED', 'Billing period expired — subscription moved to PAST_DUE')"#,
)
.bind(&event_id)
.bind(sub_id)
.bind(merchant_id)
.bind(price_inr)
.execute(&self.pool)
.await
{
tracing::error!(
subscription_id = %sub_id,
"Billing cycle: failed to record BILLING_FAILED event: {}", e
);
}
}
tracing::warn!(
"Billing cycle: {} subscriptions moved to PAST_DUE — billing events recorded",
past_due_count
);
}
// ─── Step 2: TRIAL → ACTIVE ───────────────────────────────────────────
// Fetch candidates before the transition to record TRIAL_ENDED events.
let trial_ended_candidates: Vec<(String, String, f64)> = sqlx::query_as(
r#"SELECT s.id, s.merchant_id, p.price_inr
FROM subscriptions s
JOIN subscription_plans p ON s.plan_id = p.id
WHERE s.status = 'TRIAL' AND s.current_period_end < NOW()"#,
)
.fetch_all(&self.pool)
.await
.map_err(AppError::Database)?;
if !trial_ended_candidates.is_empty() {
sqlx::query(
r#"UPDATE subscriptions
SET status = 'ACTIVE',
current_period_start = NOW(),
current_period_end = NOW() + (SELECT interval_days * INTERVAL '1 day' FROM subscription_plans WHERE id = plan_id),
updated_at = NOW()
WHERE status = 'TRIAL' AND current_period_end < NOW()"#,
)
.execute(&self.pool)
.await
.map_err(AppError::Database)?;
for (sub_id, merchant_id, price_inr) in &trial_ended_candidates {
let event_id = Uuid::new_v4().to_string();
if let Err(e) = sqlx::query(
r#"INSERT INTO subscription_billing_events
(id, subscription_id, merchant_id, amount_inr, status, failure_reason)
VALUES ($1, $2, $3, $4, 'PENDING', 'Trial period ended — first billing cycle initiated')"#,
)
.bind(&event_id)
.bind(sub_id)
.bind(merchant_id)
.bind(price_inr)
.execute(&self.pool)
.await
{
tracing::error!(
subscription_id = %sub_id,
"Billing cycle: failed to record TRIAL_ENDED event: {}", e
);
}
}
tracing::info!(
"Billing cycle: {} trials graduated to ACTIVE — TRIAL_ENDED events recorded",
trial_ended_candidates.len()
);
}
Ok(past_due_count)
}
}