Spaces:
Running
Running
| use crate::domain::error::{AppError, AppResult}; | |
| use crate::domain::models::{ | |
| CreatePlanRequest, CreateSubscriptionRequest, Subscription, SubscriptionBillingEvent, | |
| SubscriptionPlan, SubscriptionSummary, | |
| }; | |
| use crate::infrastructure::db::DbPool; | |
| use chrono::Utc; | |
| use uuid::Uuid; | |
| pub struct SubscriptionService { | |
| pool: DbPool, | |
| } | |
| impl SubscriptionService { | |
| pub fn new(pool: DbPool) -> Self { | |
| Self { pool } | |
| } | |
| // ─── Plans ──────────────────────────────────────────────────────────────── | |
| pub async fn create_plan( | |
| &self, | |
| merchant_id: &str, | |
| req: CreatePlanRequest, | |
| ) -> AppResult<SubscriptionPlan> { | |
| if req.price_inr <= 0.0 { | |
| return Err(AppError::Validation("Price must be positive".into())); | |
| } | |
| if req.interval_days <= 0 { | |
| return Err(AppError::Validation("interval_days must be >= 1".into())); | |
| } | |
| let id = Uuid::new_v4().to_string(); | |
| let trial_days = req.trial_days.unwrap_or(0); | |
| let plan = sqlx::query_as::<_, SubscriptionPlan>( | |
| r#"INSERT INTO subscription_plans | |
| (id, merchant_id, name, description, price_inr, interval_days, trial_days) | |
| VALUES ($1, $2, $3, $4, $5, $6, $7) | |
| RETURNING *"#, | |
| ) | |
| .bind(&id) | |
| .bind(merchant_id) | |
| .bind(&req.name) | |
| .bind(&req.description) | |
| .bind(req.price_inr) | |
| .bind(req.interval_days) | |
| .bind(trial_days) | |
| .fetch_one(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| tracing::info!(merchant_id, plan_id = %id, "Created subscription plan: {}", req.name); | |
| Ok(plan) | |
| } | |
| pub async fn list_plans(&self, merchant_id: &str) -> AppResult<Vec<SubscriptionPlan>> { | |
| sqlx::query_as::<_, SubscriptionPlan>( | |
| "SELECT * FROM subscription_plans WHERE merchant_id = $1 AND is_active = TRUE ORDER BY created_at DESC", | |
| ) | |
| .bind(merchant_id) | |
| .fetch_all(&self.pool) | |
| .await | |
| .map_err(AppError::Database) | |
| } | |
| pub async fn deactivate_plan(&self, merchant_id: &str, plan_id: &str) -> AppResult<()> { | |
| let affected = sqlx::query( | |
| "UPDATE subscription_plans SET is_active = FALSE, updated_at = NOW() WHERE id = $1 AND merchant_id = $2", | |
| ) | |
| .bind(plan_id) | |
| .bind(merchant_id) | |
| .execute(&self.pool) | |
| .await | |
| .map_err(AppError::Database)? | |
| .rows_affected(); | |
| if affected == 0 { | |
| return Err(AppError::NotFound("Plan not found".into())); | |
| } | |
| Ok(()) | |
| } | |
| // ─── Subscriptions ─────────────────────────────────────────────────────── | |
| pub async fn create_subscription( | |
| &self, | |
| merchant_id: &str, | |
| req: CreateSubscriptionRequest, | |
| ) -> AppResult<Subscription> { | |
| // Verify plan belongs to merchant and is active | |
| let plan = sqlx::query_as::<_, SubscriptionPlan>( | |
| "SELECT * FROM subscription_plans WHERE id = $1 AND merchant_id = $2 AND is_active = TRUE", | |
| ) | |
| .bind(&req.plan_id) | |
| .bind(merchant_id) | |
| .fetch_optional(&self.pool) | |
| .await | |
| .map_err(AppError::Database)? | |
| .ok_or_else(|| AppError::NotFound("Plan not found or inactive".into()))?; | |
| let id = Uuid::new_v4().to_string(); | |
| let now = Utc::now(); | |
| let trial_end = now + chrono::Duration::days(plan.trial_days as i64); | |
| let period_end = if plan.trial_days > 0 { | |
| trial_end | |
| } else { | |
| now + chrono::Duration::days(plan.interval_days as i64) | |
| }; | |
| let initial_status = if plan.trial_days > 0 { | |
| "TRIAL" | |
| } else { | |
| "ACTIVE" | |
| }; | |
| let sub = sqlx::query_as::<_, Subscription>( | |
| r#"INSERT INTO subscriptions | |
| (id, merchant_id, plan_id, subscriber_email, subscriber_phone, subscriber_name, | |
| status, current_period_start, current_period_end, metadata) | |
| VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) | |
| RETURNING *"#, | |
| ) | |
| .bind(&id) | |
| .bind(merchant_id) | |
| .bind(&req.plan_id) | |
| .bind(&req.subscriber_email) | |
| .bind(&req.subscriber_phone) | |
| .bind(&req.subscriber_name) | |
| .bind(initial_status) | |
| .bind(now) | |
| .bind(period_end) | |
| .bind(&req.metadata) | |
| .fetch_one(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| tracing::info!( | |
| merchant_id, subscription_id = %id, | |
| "New subscription: {} -> plan {}", req.subscriber_email, plan.name | |
| ); | |
| Ok(sub) | |
| } | |
| pub async fn list_subscriptions( | |
| &self, | |
| merchant_id: &str, | |
| status_filter: Option<&str>, | |
| ) -> AppResult<Vec<Subscription>> { | |
| let subs = if let Some(status) = status_filter { | |
| sqlx::query_as::<_, Subscription>( | |
| "SELECT * FROM subscriptions WHERE merchant_id = $1 AND status = $2 ORDER BY created_at DESC LIMIT 100", | |
| ) | |
| .bind(merchant_id) | |
| .bind(status) | |
| .fetch_all(&self.pool) | |
| .await | |
| } else { | |
| sqlx::query_as::<_, Subscription>( | |
| "SELECT * FROM subscriptions WHERE merchant_id = $1 ORDER BY created_at DESC LIMIT 100", | |
| ) | |
| .bind(merchant_id) | |
| .fetch_all(&self.pool) | |
| .await | |
| } | |
| .map_err(AppError::Database)?; | |
| Ok(subs) | |
| } | |
| pub async fn cancel_subscription( | |
| &self, | |
| merchant_id: &str, | |
| subscription_id: &str, | |
| reason: Option<String>, | |
| ) -> AppResult<Subscription> { | |
| let sub = sqlx::query_as::<_, Subscription>( | |
| r#"UPDATE subscriptions | |
| SET status = 'CANCELLED', cancelled_at = NOW(), cancel_reason = $3, updated_at = NOW() | |
| WHERE id = $1 AND merchant_id = $2 AND status IN ('ACTIVE', 'TRIAL', 'PAST_DUE') | |
| RETURNING *"#, | |
| ) | |
| .bind(subscription_id) | |
| .bind(merchant_id) | |
| .bind(&reason) | |
| .fetch_optional(&self.pool) | |
| .await | |
| .map_err(AppError::Database)? | |
| .ok_or_else(|| AppError::NotFound("Subscription not found or already cancelled".into()))?; | |
| tracing::info!(merchant_id, subscription_id, "Subscription cancelled"); | |
| Ok(sub) | |
| } | |
| pub async fn get_summary(&self, merchant_id: &str) -> AppResult<SubscriptionSummary> { | |
| use sqlx::Row; | |
| let counts = sqlx::query( | |
| r#"SELECT | |
| COUNT(*) FILTER (WHERE status = 'ACTIVE') as active, | |
| COUNT(*) FILTER (WHERE status = 'TRIAL') as trial, | |
| COUNT(*) FILTER (WHERE status = 'CANCELLED') as cancelled, | |
| COUNT(*) FILTER (WHERE status = 'PAST_DUE') as past_due | |
| FROM subscriptions WHERE merchant_id = $1"#, | |
| ) | |
| .bind(merchant_id) | |
| .fetch_one(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| // MRR = sum of monthly-normalised active subscription prices | |
| let mrr: f64 = sqlx::query_scalar( | |
| r#"SELECT COALESCE(SUM(p.price_inr * (30.0 / p.interval_days)), 0.0) | |
| FROM subscriptions s | |
| JOIN subscription_plans p ON s.plan_id = p.id | |
| WHERE s.merchant_id = $1 AND s.status = 'ACTIVE'"#, | |
| ) | |
| .bind(merchant_id) | |
| .fetch_one(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| Ok(SubscriptionSummary { | |
| active_count: counts.get("active"), | |
| trial_count: counts.get("trial"), | |
| cancelled_count: counts.get("cancelled"), | |
| past_due_count: counts.get("past_due"), | |
| mrr_inr: mrr, | |
| arr_inr: mrr * 12.0, | |
| }) | |
| } | |
| pub async fn get_billing_history( | |
| &self, | |
| merchant_id: &str, | |
| subscription_id: &str, | |
| ) -> AppResult<Vec<SubscriptionBillingEvent>> { | |
| sqlx::query_as::<_, SubscriptionBillingEvent>( | |
| r#"SELECT * FROM subscription_billing_events | |
| WHERE merchant_id = $1 AND subscription_id = $2 | |
| ORDER BY billed_at DESC LIMIT 50"#, | |
| ) | |
| .bind(merchant_id) | |
| .bind(subscription_id) | |
| .fetch_all(&self.pool) | |
| .await | |
| .map_err(AppError::Database) | |
| } | |
| /// Background engine: find ACTIVE subscriptions whose billing period has expired | |
| /// and mark them PAST_DUE, then record a billing event. In production this | |
| /// would be wired to a UPI AutoPay or payment gateway trigger. | |
| /// | |
| /// # Hardening | |
| /// - **Advisory lock**: Uses `pg_try_advisory_lock` keyed on the current billing | |
| /// hour so only one server replica executes the cycle per hour. Returns 0 if | |
| /// another instance has the lock. | |
| /// - **Audit trail**: Inserts a `SubscriptionBillingEvent` row for every | |
| /// transition — `BILLING_FAILED` for ACTIVE→PAST_DUE and `TRIAL_ENDED` for | |
| /// TRIAL→ACTIVE — giving merchants a complete billing history. | |
| pub async fn run_billing_cycle(&self) -> AppResult<u64> { | |
| // ─── Advisory Lock — idempotency across replicas ────────────────────── | |
| // Key: hash of "billing_cycle" + current UTC hour. This ensures only one | |
| // replica runs the cycle per hour even on multi-instance deployments. | |
| let lock_acquired: bool = sqlx::query_scalar( | |
| "SELECT pg_try_advisory_lock(hashtext('billing_cycle_' || to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD-HH24')))" | |
| ) | |
| .fetch_one(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| if !lock_acquired { | |
| tracing::debug!( | |
| "Billing cycle: advisory lock held by another instance — skipping this tick" | |
| ); | |
| return Ok(0); | |
| } | |
| tracing::info!("Billing cycle: advisory lock acquired — running cycle"); | |
| // ─── Step 1: ACTIVE → PAST_DUE ─────────────────────────────────────── | |
| // Fetch affected rows BEFORE the bulk UPDATE so we can write billing events. | |
| let past_due_candidates: Vec<(String, String, f64)> = sqlx::query_as( | |
| r#"SELECT s.id, s.merchant_id, p.price_inr | |
| FROM subscriptions s | |
| JOIN subscription_plans p ON s.plan_id = p.id | |
| WHERE s.status = 'ACTIVE' AND s.current_period_end < NOW()"#, | |
| ) | |
| .fetch_all(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| let past_due_count = past_due_candidates.len() as u64; | |
| if past_due_count > 0 { | |
| // Bulk transition | |
| sqlx::query( | |
| "UPDATE subscriptions SET status = 'PAST_DUE', updated_at = NOW() WHERE status = 'ACTIVE' AND current_period_end < NOW()" | |
| ) | |
| .execute(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| // Write one BILLING_FAILED event per affected subscription | |
| for (sub_id, merchant_id, price_inr) in &past_due_candidates { | |
| let event_id = Uuid::new_v4().to_string(); | |
| if let Err(e) = sqlx::query( | |
| r#"INSERT INTO subscription_billing_events | |
| (id, subscription_id, merchant_id, amount_inr, status, failure_reason) | |
| VALUES ($1, $2, $3, $4, 'FAILED', 'Billing period expired — subscription moved to PAST_DUE')"#, | |
| ) | |
| .bind(&event_id) | |
| .bind(sub_id) | |
| .bind(merchant_id) | |
| .bind(price_inr) | |
| .execute(&self.pool) | |
| .await | |
| { | |
| tracing::error!( | |
| subscription_id = %sub_id, | |
| "Billing cycle: failed to record BILLING_FAILED event: {}", e | |
| ); | |
| } | |
| } | |
| tracing::warn!( | |
| "Billing cycle: {} subscriptions moved to PAST_DUE — billing events recorded", | |
| past_due_count | |
| ); | |
| } | |
| // ─── Step 2: TRIAL → ACTIVE ─────────────────────────────────────────── | |
| // Fetch candidates before the transition to record TRIAL_ENDED events. | |
| let trial_ended_candidates: Vec<(String, String, f64)> = sqlx::query_as( | |
| r#"SELECT s.id, s.merchant_id, p.price_inr | |
| FROM subscriptions s | |
| JOIN subscription_plans p ON s.plan_id = p.id | |
| WHERE s.status = 'TRIAL' AND s.current_period_end < NOW()"#, | |
| ) | |
| .fetch_all(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| if !trial_ended_candidates.is_empty() { | |
| sqlx::query( | |
| r#"UPDATE subscriptions | |
| SET status = 'ACTIVE', | |
| current_period_start = NOW(), | |
| current_period_end = NOW() + (SELECT interval_days * INTERVAL '1 day' FROM subscription_plans WHERE id = plan_id), | |
| updated_at = NOW() | |
| WHERE status = 'TRIAL' AND current_period_end < NOW()"#, | |
| ) | |
| .execute(&self.pool) | |
| .await | |
| .map_err(AppError::Database)?; | |
| for (sub_id, merchant_id, price_inr) in &trial_ended_candidates { | |
| let event_id = Uuid::new_v4().to_string(); | |
| if let Err(e) = sqlx::query( | |
| r#"INSERT INTO subscription_billing_events | |
| (id, subscription_id, merchant_id, amount_inr, status, failure_reason) | |
| VALUES ($1, $2, $3, $4, 'PENDING', 'Trial period ended — first billing cycle initiated')"#, | |
| ) | |
| .bind(&event_id) | |
| .bind(sub_id) | |
| .bind(merchant_id) | |
| .bind(price_inr) | |
| .execute(&self.pool) | |
| .await | |
| { | |
| tracing::error!( | |
| subscription_id = %sub_id, | |
| "Billing cycle: failed to record TRIAL_ENDED event: {}", e | |
| ); | |
| } | |
| } | |
| tracing::info!( | |
| "Billing cycle: {} trials graduated to ACTIVE — TRIAL_ENDED events recorded", | |
| trial_ended_candidates.len() | |
| ); | |
| } | |
| Ok(past_due_count) | |
| } | |
| } | |