use crate::domain::error::{AppError, AppResult}; use crate::domain::models::{ CreatePlanRequest, CreateSubscriptionRequest, Subscription, SubscriptionBillingEvent, SubscriptionPlan, SubscriptionSummary, }; use crate::infrastructure::db::DbPool; use chrono::Utc; use uuid::Uuid; pub struct SubscriptionService { pool: DbPool, } impl SubscriptionService { pub fn new(pool: DbPool) -> Self { Self { pool } } // ─── Plans ──────────────────────────────────────────────────────────────── pub async fn create_plan( &self, merchant_id: &str, req: CreatePlanRequest, ) -> AppResult { if req.price_inr <= 0.0 { return Err(AppError::Validation("Price must be positive".into())); } if req.interval_days <= 0 { return Err(AppError::Validation("interval_days must be >= 1".into())); } let id = Uuid::new_v4().to_string(); let trial_days = req.trial_days.unwrap_or(0); let plan = sqlx::query_as::<_, SubscriptionPlan>( r#"INSERT INTO subscription_plans (id, merchant_id, name, description, price_inr, interval_days, trial_days) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *"#, ) .bind(&id) .bind(merchant_id) .bind(&req.name) .bind(&req.description) .bind(req.price_inr) .bind(req.interval_days) .bind(trial_days) .fetch_one(&self.pool) .await .map_err(AppError::Database)?; tracing::info!(merchant_id, plan_id = %id, "Created subscription plan: {}", req.name); Ok(plan) } pub async fn list_plans(&self, merchant_id: &str) -> AppResult> { sqlx::query_as::<_, SubscriptionPlan>( "SELECT * FROM subscription_plans WHERE merchant_id = $1 AND is_active = TRUE ORDER BY created_at DESC", ) .bind(merchant_id) .fetch_all(&self.pool) .await .map_err(AppError::Database) } pub async fn deactivate_plan(&self, merchant_id: &str, plan_id: &str) -> AppResult<()> { let affected = sqlx::query( "UPDATE subscription_plans SET is_active = FALSE, updated_at = NOW() WHERE id = $1 AND merchant_id = $2", ) .bind(plan_id) .bind(merchant_id) .execute(&self.pool) .await .map_err(AppError::Database)? .rows_affected(); if affected == 0 { return Err(AppError::NotFound("Plan not found".into())); } Ok(()) } // ─── Subscriptions ─────────────────────────────────────────────────────── pub async fn create_subscription( &self, merchant_id: &str, req: CreateSubscriptionRequest, ) -> AppResult { // Verify plan belongs to merchant and is active let plan = sqlx::query_as::<_, SubscriptionPlan>( "SELECT * FROM subscription_plans WHERE id = $1 AND merchant_id = $2 AND is_active = TRUE", ) .bind(&req.plan_id) .bind(merchant_id) .fetch_optional(&self.pool) .await .map_err(AppError::Database)? .ok_or_else(|| AppError::NotFound("Plan not found or inactive".into()))?; let id = Uuid::new_v4().to_string(); let now = Utc::now(); let trial_end = now + chrono::Duration::days(plan.trial_days as i64); let period_end = if plan.trial_days > 0 { trial_end } else { now + chrono::Duration::days(plan.interval_days as i64) }; let initial_status = if plan.trial_days > 0 { "TRIAL" } else { "ACTIVE" }; let sub = sqlx::query_as::<_, Subscription>( r#"INSERT INTO subscriptions (id, merchant_id, plan_id, subscriber_email, subscriber_phone, subscriber_name, status, current_period_start, current_period_end, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *"#, ) .bind(&id) .bind(merchant_id) .bind(&req.plan_id) .bind(&req.subscriber_email) .bind(&req.subscriber_phone) .bind(&req.subscriber_name) .bind(initial_status) .bind(now) .bind(period_end) .bind(&req.metadata) .fetch_one(&self.pool) .await .map_err(AppError::Database)?; tracing::info!( merchant_id, subscription_id = %id, "New subscription: {} -> plan {}", req.subscriber_email, plan.name ); Ok(sub) } pub async fn list_subscriptions( &self, merchant_id: &str, status_filter: Option<&str>, ) -> AppResult> { let subs = if let Some(status) = status_filter { sqlx::query_as::<_, Subscription>( "SELECT * FROM subscriptions WHERE merchant_id = $1 AND status = $2 ORDER BY created_at DESC LIMIT 100", ) .bind(merchant_id) .bind(status) .fetch_all(&self.pool) .await } else { sqlx::query_as::<_, Subscription>( "SELECT * FROM subscriptions WHERE merchant_id = $1 ORDER BY created_at DESC LIMIT 100", ) .bind(merchant_id) .fetch_all(&self.pool) .await } .map_err(AppError::Database)?; Ok(subs) } pub async fn cancel_subscription( &self, merchant_id: &str, subscription_id: &str, reason: Option, ) -> AppResult { let sub = sqlx::query_as::<_, Subscription>( r#"UPDATE subscriptions SET status = 'CANCELLED', cancelled_at = NOW(), cancel_reason = $3, updated_at = NOW() WHERE id = $1 AND merchant_id = $2 AND status IN ('ACTIVE', 'TRIAL', 'PAST_DUE') RETURNING *"#, ) .bind(subscription_id) .bind(merchant_id) .bind(&reason) .fetch_optional(&self.pool) .await .map_err(AppError::Database)? .ok_or_else(|| AppError::NotFound("Subscription not found or already cancelled".into()))?; tracing::info!(merchant_id, subscription_id, "Subscription cancelled"); Ok(sub) } pub async fn get_summary(&self, merchant_id: &str) -> AppResult { use sqlx::Row; let counts = sqlx::query( r#"SELECT COUNT(*) FILTER (WHERE status = 'ACTIVE') as active, COUNT(*) FILTER (WHERE status = 'TRIAL') as trial, COUNT(*) FILTER (WHERE status = 'CANCELLED') as cancelled, COUNT(*) FILTER (WHERE status = 'PAST_DUE') as past_due FROM subscriptions WHERE merchant_id = $1"#, ) .bind(merchant_id) .fetch_one(&self.pool) .await .map_err(AppError::Database)?; // MRR = sum of monthly-normalised active subscription prices let mrr: f64 = sqlx::query_scalar( r#"SELECT COALESCE(SUM(p.price_inr * (30.0 / p.interval_days)), 0.0) FROM subscriptions s JOIN subscription_plans p ON s.plan_id = p.id WHERE s.merchant_id = $1 AND s.status = 'ACTIVE'"#, ) .bind(merchant_id) .fetch_one(&self.pool) .await .map_err(AppError::Database)?; Ok(SubscriptionSummary { active_count: counts.get("active"), trial_count: counts.get("trial"), cancelled_count: counts.get("cancelled"), past_due_count: counts.get("past_due"), mrr_inr: mrr, arr_inr: mrr * 12.0, }) } pub async fn get_billing_history( &self, merchant_id: &str, subscription_id: &str, ) -> AppResult> { sqlx::query_as::<_, SubscriptionBillingEvent>( r#"SELECT * FROM subscription_billing_events WHERE merchant_id = $1 AND subscription_id = $2 ORDER BY billed_at DESC LIMIT 50"#, ) .bind(merchant_id) .bind(subscription_id) .fetch_all(&self.pool) .await .map_err(AppError::Database) } /// Background engine: find ACTIVE subscriptions whose billing period has expired /// and mark them PAST_DUE, then record a billing event. In production this /// would be wired to a UPI AutoPay or payment gateway trigger. /// /// # Hardening /// - **Advisory lock**: Uses `pg_try_advisory_lock` keyed on the current billing /// hour so only one server replica executes the cycle per hour. Returns 0 if /// another instance has the lock. /// - **Audit trail**: Inserts a `SubscriptionBillingEvent` row for every /// transition — `BILLING_FAILED` for ACTIVE→PAST_DUE and `TRIAL_ENDED` for /// TRIAL→ACTIVE — giving merchants a complete billing history. pub async fn run_billing_cycle(&self) -> AppResult { // ─── Advisory Lock — idempotency across replicas ────────────────────── // Key: hash of "billing_cycle" + current UTC hour. This ensures only one // replica runs the cycle per hour even on multi-instance deployments. let lock_acquired: bool = sqlx::query_scalar( "SELECT pg_try_advisory_lock(hashtext('billing_cycle_' || to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD-HH24')))" ) .fetch_one(&self.pool) .await .map_err(AppError::Database)?; if !lock_acquired { tracing::debug!( "Billing cycle: advisory lock held by another instance — skipping this tick" ); return Ok(0); } tracing::info!("Billing cycle: advisory lock acquired — running cycle"); // ─── Step 1: ACTIVE → PAST_DUE ─────────────────────────────────────── // Fetch affected rows BEFORE the bulk UPDATE so we can write billing events. let past_due_candidates: Vec<(String, String, f64)> = sqlx::query_as( r#"SELECT s.id, s.merchant_id, p.price_inr FROM subscriptions s JOIN subscription_plans p ON s.plan_id = p.id WHERE s.status = 'ACTIVE' AND s.current_period_end < NOW()"#, ) .fetch_all(&self.pool) .await .map_err(AppError::Database)?; let past_due_count = past_due_candidates.len() as u64; if past_due_count > 0 { // Bulk transition sqlx::query( "UPDATE subscriptions SET status = 'PAST_DUE', updated_at = NOW() WHERE status = 'ACTIVE' AND current_period_end < NOW()" ) .execute(&self.pool) .await .map_err(AppError::Database)?; // Write one BILLING_FAILED event per affected subscription for (sub_id, merchant_id, price_inr) in &past_due_candidates { let event_id = Uuid::new_v4().to_string(); if let Err(e) = sqlx::query( r#"INSERT INTO subscription_billing_events (id, subscription_id, merchant_id, amount_inr, status, failure_reason) VALUES ($1, $2, $3, $4, 'FAILED', 'Billing period expired — subscription moved to PAST_DUE')"#, ) .bind(&event_id) .bind(sub_id) .bind(merchant_id) .bind(price_inr) .execute(&self.pool) .await { tracing::error!( subscription_id = %sub_id, "Billing cycle: failed to record BILLING_FAILED event: {}", e ); } } tracing::warn!( "Billing cycle: {} subscriptions moved to PAST_DUE — billing events recorded", past_due_count ); } // ─── Step 2: TRIAL → ACTIVE ─────────────────────────────────────────── // Fetch candidates before the transition to record TRIAL_ENDED events. let trial_ended_candidates: Vec<(String, String, f64)> = sqlx::query_as( r#"SELECT s.id, s.merchant_id, p.price_inr FROM subscriptions s JOIN subscription_plans p ON s.plan_id = p.id WHERE s.status = 'TRIAL' AND s.current_period_end < NOW()"#, ) .fetch_all(&self.pool) .await .map_err(AppError::Database)?; if !trial_ended_candidates.is_empty() { sqlx::query( r#"UPDATE subscriptions SET status = 'ACTIVE', current_period_start = NOW(), current_period_end = NOW() + (SELECT interval_days * INTERVAL '1 day' FROM subscription_plans WHERE id = plan_id), updated_at = NOW() WHERE status = 'TRIAL' AND current_period_end < NOW()"#, ) .execute(&self.pool) .await .map_err(AppError::Database)?; for (sub_id, merchant_id, price_inr) in &trial_ended_candidates { let event_id = Uuid::new_v4().to_string(); if let Err(e) = sqlx::query( r#"INSERT INTO subscription_billing_events (id, subscription_id, merchant_id, amount_inr, status, failure_reason) VALUES ($1, $2, $3, $4, 'PENDING', 'Trial period ended — first billing cycle initiated')"#, ) .bind(&event_id) .bind(sub_id) .bind(merchant_id) .bind(price_inr) .execute(&self.pool) .await { tracing::error!( subscription_id = %sub_id, "Billing cycle: failed to record TRIAL_ENDED event: {}", e ); } } tracing::info!( "Billing cycle: {} trials graduated to ACTIVE — TRIAL_ENDED events recorded", trial_ended_candidates.len() ); } Ok(past_due_count) } }