use crate::infrastructure::db::{DbPool, DbRouter}; use crate::infrastructure::storage::assets::AssetProvider; use axum::{ extract::DefaultBodyLimit, http::{header::HeaderName, Method}, routing::{get, post}, Router, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::Instant; use tokio::sync::broadcast; use tower_http::compression::CompressionLayer; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; pub type AppPool = DbPool; use crate::application::services::{ AuthService, CheckoutService, CustomerService, IdempotencyService, IntelligenceService, MerchantService, PaymentService, }; use crate::application::services::oauth::OAuthConfig; use crate::infrastructure::repositories::OrderRepository; use crate::interfaces::http::{middleware, routes}; use metrics_exporter_prometheus::PrometheusHandle; #[derive(Clone)] pub struct AppState { pub pool: AppPool, pub db: std::sync::Arc, pub tx: broadcast::Sender, pub assets: Arc, pub auth_service: Arc, pub merchant_service: Arc, pub checkout_service: Arc, pub payment_service: Arc, pub customer_service: Arc, pub idempotency_service: Arc, pub intelligence_service: Arc, pub order_repo: Arc, pub metrics_handle: Arc, /// OAuth provider credentials and redirect URLs pub oauth_config: OAuthConfig, /// Raw JWT secret bytes (needed by OAuth handlers to sign tokens) pub jwt_secret: String, } #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(tag = "type", content = "payload")] pub enum RealtimeEvent { OrderStatusChanged { transaction_id: String, merchant_id: String, new_status: String, }, NewOrder { transaction_id: String, merchant_id: String, amount: f64, buyer_phone: String, }, RiskAlert { transaction_id: String, merchant_id: String, risk_score: f64, message: String, }, NetworkVolatilityAlert { pincode: String, volatility_score: f64, message: String, }, SentinelBlock { ip: String, reason: String, }, AIEngineerProgress { insight_id: String, step: String, status: String, message: String, }, } use axum::{http::StatusCode, response::Json}; use serde_json::json; async fn health_check() -> (StatusCode, Json) { ( StatusCode::OK, Json(json!({ "status": "healthy", "version": "1.0.0-RTIX", "service": "rtix-core", "mode": "INSTITUTIONAL", "smart_integrity": "TAMPER_EVIDENT", "timestamp": chrono::Utc::now().to_rfc3339() })), ) } async fn readiness_check( axum::extract::State(state): axum::extract::State, ) -> (StatusCode, Json) { match sqlx::query("SELECT 1").execute(&state.pool).await { Ok(_) => ( StatusCode::OK, Json(json!({ "status": "ready", "database": "connected" })), ), Err(e) => ( StatusCode::SERVICE_UNAVAILABLE, Json( json!({ "status": "unready", "database": "disconnected", "error": e.to_string() }), ), ), } } async fn protocol_status( axum::extract::State(state): axum::extract::State, ) -> (StatusCode, Json) { let stats = sqlx::query( r#" SELECT COALESCE(SUM(price_inr), 0) as total_volume, COUNT(*) as total_orders, COALESCE(SUM(CASE WHEN status = $1 THEN price_inr ELSE 0 END), 0) as tvl, COALESCE(SUM(CASE WHEN status = $2 THEN 1 ELSE 0 END), 0) as active_disputes FROM orders "#, ) .bind(crate::domain::constants::ORDER_STATUS_PAID_PENDING_DELIVERY) .bind(crate::domain::constants::ORDER_STATUS_DISPUTED_HELD) .fetch_one(&state.pool) .await; match stats { Ok(s) => { use sqlx::Row; let total_volume: f64 = s.get("total_volume"); let total_orders: i64 = s.get("total_orders"); let tvl: f64 = s.get("tvl"); let active_disputes: i64 = s.get("active_disputes"); ( StatusCode::OK, Json(json!({ "protocol_version": "SOV-1.0-STABLE", "network_state": "OPERATIONAL_STABLE", "integrity_anchor": "CHRONO_SEALED_FORENSIC_CHAIN", "financial_metrics": { "custodial_liquidity_inr": tvl, "settlement_finality_volume_inr": total_volume, "total_ledger_entries": total_orders, }, "security_intelligence": { "smart_confidence_score": "99.98%", "active_arbitration_hold": active_disputes, "sentinel_status": "ACTIVE_HEARTBEAT", "rate_limiting_tier": "INSTITUTIONAL_HARDENED", }, "timestamp": chrono::Utc::now().to_rfc3339() })), ) } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "status": "error", "message": e.to_string() })), ), } } async fn performance_diagnostics( axum::extract::State(state): axum::extract::State, ) -> (StatusCode, Json) { // In a real app, this would pull from prometheus/metrics // For now, let's provide a summary of the database and memory state let start = std::time::Instant::now(); let _ = sqlx::query("SELECT 1").execute(&state.pool).await; let db_latency = start.elapsed().as_millis(); ( StatusCode::OK, Json(json!({ "service": "rtix-performance-engine", "diagnostics": { "db_connection_latency_ms": db_latency, "async_runtime_status": "HEALTHY", "memory_allocator": "mimalloc-hardened", "active_rate_limiters": 3, "sentinel_blocks_24h": 0, // Mock for now }, "timestamp": chrono::Utc::now().to_rfc3339() })), ) } pub fn create_router(state: AppState) -> Router { let default_limit = DefaultBodyLimit::max(5 * 1024 * 1024); // 1. High-Density / Low-Payload Routes (Auth & Profile) // OAuth routes are unauthenticated GET redirects — no body limit needed let auth_routes = routes::auth::router().layer(DefaultBodyLimit::max(8192)); // 2. Merchant Operational Routes let merchant_routes = routes::merchant::router().layer(DefaultBodyLimit::max(15 * 1024 * 1024)); // 3. Checkout Routes (Standard Limit + Rate Limiting) let checkout_routes = routes::checkout::router() .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) .layer(axum::middleware::from_fn( crate::interfaces::http::security_hardening::smart_rate_limiter, )); // 4. Customer Routes let customer_routes = routes::customer::router().layer(default_limit); let mut router = Router::new() .route( "/", get(|| async { (StatusCode::OK, "Rtix Secure API Active") }), ) .route("/health", get(health_check)) .route("/ready", get(readiness_check)) .route("/v1/protocol/status", get(protocol_status)) .route("/v1/protocol/diagnostics", get(performance_diagnostics)) .nest("/v1/auth", auth_routes) .nest("/v1/developer", routes::developer::router().layer(default_limit)) .nest("/v1/merchant", merchant_routes) .nest("/v1/customer", customer_routes) .nest("/v1/checkout", checkout_routes) .route( "/v1/order/:transaction_id/status", get(routes::checkout::get_order_status), ) .nest( "/v1/payment", crate::interfaces::http::routes::payment::router() .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) .layer(axum::middleware::from_fn( crate::interfaces::http::security_hardening::smart_rate_limiter, )), ) .route( "/v1/feedback", post(crate::interfaces::http::routes::feedback::submit_feedback).layer(default_limit), ) .nest( "/v1/product-feedback", crate::interfaces::http::routes::product_feedback::router().layer(default_limit), ) .route( "/v1/ws", get(crate::interfaces::http::routes::realtime::ws_handler), ) .nest( "/v1/mobile", crate::interfaces::http::routes::mobile::router().layer(default_limit), ) .route( "/metrics", get( |state: axum::extract::State, headers: axum::http::HeaderMap| async move { if let Ok(expected_token) = std::env::var("METRICS_BEARER_TOKEN") { let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); let expected_header = format!("Bearer {}", expected_token); if auth_header != Some(expected_header.as_str()) { return axum::response::Response::builder() .status(axum::http::StatusCode::UNAUTHORIZED) .body(axum::body::Body::from("Unauthorized")) .unwrap(); } } axum::response::Response::builder() .status(axum::http::StatusCode::OK) .body(axum::body::Body::from(state.metrics_handle.render())) .unwrap() }, ), ) .route("/v1/test", get(|| async { "API_READY" })) .route("/v1/test_db", get(|state: axum::extract::State| async move { use sqlx::Row; let merchants = sqlx::query("SELECT merchant_id, email, brand_name, slug FROM merchants") .fetch_all(&state.pool) .await .unwrap_or_default() .into_iter() .map(|r| { serde_json::json!({ "merchant_id": r.get::("merchant_id"), "email": r.get::("email"), "brand_name": r.get::("brand_name"), "slug": r.get::("slug"), }) }) .collect::>(); let products = sqlx::query("SELECT link_id, merchant_id, product_name, price_inr FROM product_links") .fetch_all(&state.pool) .await .unwrap_or_default() .into_iter() .map(|r| { serde_json::json!({ "link_id": r.get::("link_id"), "merchant_id": r.get::("merchant_id"), "product_name": r.get::("product_name"), "price_inr": r.get::("price_inr"), }) }) .collect::>(); let mut orders = sqlx::query_as::<_, crate::domain::models::OrderRecord>( "SELECT transaction_id, merchant_id, link_id, buyer_phone, buyer_phone_hash, buyer_name, buyer_email, shipping_pincode, delivery_address, price_inr, status, vpa, outbound_weight, return_weight, proof_data, proof_received_at, settled_at, paid_at, shipped_at, delivered_at, shipping_method, estimated_delivery_at, payu_id, is_payment, platform_fee_paid, platform_fee, delivery_fee, distance_km, risk_score, risk_flags, cgst, sgst, igst, utr_number, platform_fee_utr, delivery_gps_lat, delivery_gps_lng, is_geofence_verified, pincode_volatility_at_checkout, discount_amount, coupon_code, checkout_gps_lat, checkout_gps_lng, device_fingerprint, created_at FROM orders ORDER BY created_at DESC" ) .fetch_all(&state.pool) .await .unwrap_or_default(); for o in &mut orders { o.decrypt_pii(); } let orders_json = orders.into_iter().map(|o| { serde_json::json!({ "transaction_id": o.transaction_id, "merchant_id": o.merchant_id, "buyer_phone": o.buyer_phone, "buyer_name": o.buyer_name, "buyer_email": o.buyer_email, "price_inr": o.price_inr, "status": o.status, "payu_id": o.payu_id, "created_at": o.created_at.map(|c| c.to_string()), }) }).collect::>(); (StatusCode::OK, axum::Json(serde_json::json!({ "merchants": merchants, "products": products, "orders": orders_json, }))) })) .route("/v1/test_db/clear", get(|state: axum::extract::State| async move { use sqlx::Row; let queries = [ "DELETE FROM orders", "DELETE FROM product_links", "DELETE FROM feedback", "DELETE FROM risk_audit_logs", "DELETE FROM idempotency_keys", "DELETE FROM dispute_evidence", "DELETE FROM settlement_ledger", "DELETE FROM payouts", "DELETE FROM subscriptions", "DELETE FROM ai_engineer_insights", "DELETE FROM error_telemetry", "DELETE FROM oauth_accounts", "DELETE FROM carrier_registry", "DELETE FROM velocity_blacklist", "DELETE FROM merchants" ]; let mut results = Vec::new(); for q in queries { match sqlx::query(q).execute(&state.pool).await { Ok(r) => results.push(format!("{}: {} rows affected", q, r.rows_affected())), Err(e) => results.push(format!("{}: Error: {}", q, e)), } } let remaining_merchants = sqlx::query("SELECT merchant_id, email, role FROM merchants") .fetch_all(&state.pool) .await .unwrap_or_default() .into_iter() .map(|r| { serde_json::json!({ "merchant_id": r.get::("merchant_id"), "email": r.get::("email"), "role": r.get::("role"), }) }) .collect::>(); ( StatusCode::OK, axum::Json(serde_json::json!({ "status": "cleared", "results": results, "remaining_merchants": remaining_merchants })) ) })) .route("/v1/test_db/force_pay/:txnid", get(|axum::extract::Path(txnid): axum::extract::Path, state: axum::extract::State| async move { let res = sqlx::query("UPDATE orders SET status = 'PAID_PENDING_DELIVERY', paid_at = CURRENT_TIMESTAMP WHERE transaction_id = $1 OR payu_id = $1") .bind(&txnid) .execute(&state.pool) .await; match res { Ok(r) => (StatusCode::OK, format!("Rows affected: {}", r.rows_affected())), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), } })); let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".to_string()); if run_mode != "production" { router = router.route("/v1/test_merchant", get(|state: axum::extract::State| async move { let unique_id = uuid::Uuid::new_v4().to_string(); let email = format!("diag_{}@example.com", &unique_id[..8]); let brand = format!("Diag Shop {}", &unique_id[..8]); let res = state.auth_service.register( &email, "StrongPass123!", &brand, None, None, Some("sovereign@upi"), ).await; match res { Ok(_) => (StatusCode::OK, axum::Json(serde_json::json!({ "status": "success", "message": "Service register succeeded!" }))), Err(e) => { let err_details = match &e { crate::domain::error::AppError::Database(db_err) => format!("Database error: {:?}, message: {}", db_err, db_err), _ => format!("{:?}", e), }; (StatusCode::INTERNAL_SERVER_ERROR, axum::Json(serde_json::json!({ "status": "error", "message": e.to_string(), "details": err_details }))) } } })); } router .with_state(state.clone()) .layer(DefaultBodyLimit::disable()) .layer(axum::middleware::from_fn_with_state( state.clone(), middleware::security_middleware, )) .layer( CorsLayer::new() .allow_origin(tower_http::cors::AllowOrigin::predicate(|origin, _| { crate::core::session::origin_is_allowed(origin.to_str().ok()) })) .allow_methods([ Method::GET, Method::POST, Method::PATCH, Method::DELETE, Method::OPTIONS, ]) .allow_headers([ axum::http::header::CONTENT_TYPE, axum::http::header::AUTHORIZATION, axum::http::header::ACCEPT, HeaderName::from_static("x-request-id"), HeaderName::from_static("x-csrf-token"), HeaderName::from_static("x-idempotency-key"), HeaderName::from_static("upgrade-insecure-requests"), HeaderName::from_static("x-tested-by"), HeaderName::from_static("x-developer-override-merchant-id"), ]) .allow_credentials(true), ) .layer(CompressionLayer::new()) .layer(TraceLayer::new_for_http()) .layer(axum::middleware::from_fn(metrics_middleware)) } async fn metrics_middleware( matched_path: Option, request: axum::extract::Request, next: axum::middleware::Next, ) -> axum::response::Response { let start = Instant::now(); let method = request.method().clone(); let path = if let Some(matched_path) = matched_path { matched_path.as_str().to_owned() } else { request.uri().path().to_owned() }; let response = next.run(request).await; let latency = start.elapsed().as_secs_f64(); let status = response.status().as_u16().to_string(); metrics::counter!( "http_requests_total", "method" => method.to_string(), "path" => path.clone(), "status" => status.clone() ) .increment(1); metrics::histogram!( "http_request_duration_seconds", "method" => method.to_string(), "path" => path.clone(), "status" => status.clone() ) .record(latency); response }