Spaces:
Running
Running
| use crate::infrastructure::db::{DbPool, DbRouter}; | |
| use crate::infrastructure::storage::assets::AssetProvider; | |
| use axum::{ | |
| extract::DefaultBodyLimit, | |
| http::{header::HeaderName, Method}, | |
| routing::{get, post}, | |
| Router, | |
| }; | |
| use serde::{Deserialize, Serialize}; | |
| use std::sync::Arc; | |
| use std::time::Instant; | |
| use tokio::sync::broadcast; | |
| use tower_http::compression::CompressionLayer; | |
| use tower_http::cors::CorsLayer; | |
| use tower_http::trace::TraceLayer; | |
| pub type AppPool = DbPool; | |
| use crate::application::services::{ | |
| AuthService, CheckoutService, CustomerService, IdempotencyService, IntelligenceService, | |
| MerchantService, PaymentService, | |
| }; | |
| use crate::application::services::oauth::OAuthConfig; | |
| use crate::infrastructure::repositories::OrderRepository; | |
| use crate::interfaces::http::{middleware, routes}; | |
| use metrics_exporter_prometheus::PrometheusHandle; | |
| pub struct AppState { | |
| pub pool: AppPool, | |
| pub db: std::sync::Arc<DbRouter>, | |
| pub tx: broadcast::Sender<RealtimeEvent>, | |
| pub assets: Arc<dyn AssetProvider>, | |
| pub auth_service: Arc<dyn AuthService>, | |
| pub merchant_service: Arc<dyn MerchantService>, | |
| pub checkout_service: Arc<dyn CheckoutService>, | |
| pub payment_service: Arc<dyn PaymentService>, | |
| pub customer_service: Arc<dyn CustomerService>, | |
| pub idempotency_service: Arc<dyn IdempotencyService>, | |
| pub intelligence_service: Arc<IntelligenceService>, | |
| pub order_repo: Arc<dyn OrderRepository>, | |
| pub metrics_handle: Arc<PrometheusHandle>, | |
| /// OAuth provider credentials and redirect URLs | |
| pub oauth_config: OAuthConfig, | |
| /// Raw JWT secret bytes (needed by OAuth handlers to sign tokens) | |
| pub jwt_secret: String, | |
| } | |
| pub enum RealtimeEvent { | |
| OrderStatusChanged { | |
| transaction_id: String, | |
| merchant_id: String, | |
| new_status: String, | |
| }, | |
| NewOrder { | |
| transaction_id: String, | |
| merchant_id: String, | |
| amount: f64, | |
| buyer_phone: String, | |
| }, | |
| RiskAlert { | |
| transaction_id: String, | |
| merchant_id: String, | |
| risk_score: f64, | |
| message: String, | |
| }, | |
| NetworkVolatilityAlert { | |
| pincode: String, | |
| volatility_score: f64, | |
| message: String, | |
| }, | |
| SentinelBlock { | |
| ip: String, | |
| reason: String, | |
| }, | |
| AIEngineerProgress { | |
| insight_id: String, | |
| step: String, | |
| status: String, | |
| message: String, | |
| }, | |
| } | |
| use axum::{http::StatusCode, response::Json}; | |
| use serde_json::json; | |
| async fn health_check() -> (StatusCode, Json<serde_json::Value>) { | |
| ( | |
| StatusCode::OK, | |
| Json(json!({ | |
| "status": "healthy", | |
| "version": "1.0.0-RTIX", | |
| "service": "rtix-core", | |
| "mode": "INSTITUTIONAL", | |
| "smart_integrity": "TAMPER_EVIDENT", | |
| "timestamp": chrono::Utc::now().to_rfc3339() | |
| })), | |
| ) | |
| } | |
| async fn readiness_check( | |
| axum::extract::State(state): axum::extract::State<AppState>, | |
| ) -> (StatusCode, Json<serde_json::Value>) { | |
| match sqlx::query("SELECT 1").execute(&state.pool).await { | |
| Ok(_) => ( | |
| StatusCode::OK, | |
| Json(json!({ "status": "ready", "database": "connected" })), | |
| ), | |
| Err(e) => ( | |
| StatusCode::SERVICE_UNAVAILABLE, | |
| Json( | |
| json!({ "status": "unready", "database": "disconnected", "error": e.to_string() }), | |
| ), | |
| ), | |
| } | |
| } | |
| async fn protocol_status( | |
| axum::extract::State(state): axum::extract::State<AppState>, | |
| ) -> (StatusCode, Json<serde_json::Value>) { | |
| let stats = sqlx::query( | |
| r#" | |
| SELECT | |
| COALESCE(SUM(price_inr), 0) as total_volume, | |
| COUNT(*) as total_orders, | |
| COALESCE(SUM(CASE WHEN status = $1 THEN price_inr ELSE 0 END), 0) as tvl, | |
| COALESCE(SUM(CASE WHEN status = $2 THEN 1 ELSE 0 END), 0) as active_disputes | |
| FROM orders | |
| "#, | |
| ) | |
| .bind(crate::domain::constants::ORDER_STATUS_PAID_PENDING_DELIVERY) | |
| .bind(crate::domain::constants::ORDER_STATUS_DISPUTED_HELD) | |
| .fetch_one(&state.pool) | |
| .await; | |
| match stats { | |
| Ok(s) => { | |
| use sqlx::Row; | |
| let total_volume: f64 = s.get("total_volume"); | |
| let total_orders: i64 = s.get("total_orders"); | |
| let tvl: f64 = s.get("tvl"); | |
| let active_disputes: i64 = s.get("active_disputes"); | |
| ( | |
| StatusCode::OK, | |
| Json(json!({ | |
| "protocol_version": "SOV-1.0-STABLE", | |
| "network_state": "OPERATIONAL_STABLE", | |
| "integrity_anchor": "CHRONO_SEALED_FORENSIC_CHAIN", | |
| "financial_metrics": { | |
| "custodial_liquidity_inr": tvl, | |
| "settlement_finality_volume_inr": total_volume, | |
| "total_ledger_entries": total_orders, | |
| }, | |
| "security_intelligence": { | |
| "smart_confidence_score": "99.98%", | |
| "active_arbitration_hold": active_disputes, | |
| "sentinel_status": "ACTIVE_HEARTBEAT", | |
| "rate_limiting_tier": "INSTITUTIONAL_HARDENED", | |
| }, | |
| "timestamp": chrono::Utc::now().to_rfc3339() | |
| })), | |
| ) | |
| } | |
| Err(e) => ( | |
| StatusCode::INTERNAL_SERVER_ERROR, | |
| Json(json!({ "status": "error", "message": e.to_string() })), | |
| ), | |
| } | |
| } | |
| async fn performance_diagnostics( | |
| axum::extract::State(state): axum::extract::State<AppState>, | |
| ) -> (StatusCode, Json<serde_json::Value>) { | |
| // In a real app, this would pull from prometheus/metrics | |
| // For now, let's provide a summary of the database and memory state | |
| let start = std::time::Instant::now(); | |
| let _ = sqlx::query("SELECT 1").execute(&state.pool).await; | |
| let db_latency = start.elapsed().as_millis(); | |
| ( | |
| StatusCode::OK, | |
| Json(json!({ | |
| "service": "rtix-performance-engine", | |
| "diagnostics": { | |
| "db_connection_latency_ms": db_latency, | |
| "async_runtime_status": "HEALTHY", | |
| "memory_allocator": "mimalloc-hardened", | |
| "active_rate_limiters": 3, | |
| "sentinel_blocks_24h": 0, // Mock for now | |
| }, | |
| "timestamp": chrono::Utc::now().to_rfc3339() | |
| })), | |
| ) | |
| } | |
| pub fn create_router(state: AppState) -> Router { | |
| let default_limit = DefaultBodyLimit::max(5 * 1024 * 1024); | |
| // 1. High-Density / Low-Payload Routes (Auth & Profile) | |
| // OAuth routes are unauthenticated GET redirects — no body limit needed | |
| let auth_routes = routes::auth::router().layer(DefaultBodyLimit::max(8192)); | |
| // 2. Merchant Operational Routes | |
| let merchant_routes = routes::merchant::router().layer(DefaultBodyLimit::max(15 * 1024 * 1024)); | |
| // 3. Checkout Routes (Standard Limit + Rate Limiting) | |
| let checkout_routes = routes::checkout::router() | |
| .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) | |
| .layer(axum::middleware::from_fn( | |
| crate::interfaces::http::security_hardening::smart_rate_limiter, | |
| )); | |
| // 4. Customer Routes | |
| let customer_routes = routes::customer::router().layer(default_limit); | |
| let mut router = Router::new() | |
| .route( | |
| "/", | |
| get(|| async { (StatusCode::OK, "Rtix Secure API Active") }), | |
| ) | |
| .route("/health", get(health_check)) | |
| .route("/ready", get(readiness_check)) | |
| .route("/v1/protocol/status", get(protocol_status)) | |
| .route("/v1/protocol/diagnostics", get(performance_diagnostics)) | |
| .nest("/v1/auth", auth_routes) | |
| .nest("/v1/developer", routes::developer::router().layer(default_limit)) | |
| .nest("/v1/merchant", merchant_routes) | |
| .nest("/v1/customer", customer_routes) | |
| .nest("/v1/checkout", checkout_routes) | |
| .route( | |
| "/v1/order/:transaction_id/status", | |
| get(routes::checkout::get_order_status), | |
| ) | |
| .nest( | |
| "/v1/payment", | |
| crate::interfaces::http::routes::payment::router() | |
| .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) | |
| .layer(axum::middleware::from_fn( | |
| crate::interfaces::http::security_hardening::smart_rate_limiter, | |
| )), | |
| ) | |
| .route( | |
| "/v1/feedback", | |
| post(crate::interfaces::http::routes::feedback::submit_feedback).layer(default_limit), | |
| ) | |
| .nest( | |
| "/v1/product-feedback", | |
| crate::interfaces::http::routes::product_feedback::router().layer(default_limit), | |
| ) | |
| .route( | |
| "/v1/ws", | |
| get(crate::interfaces::http::routes::realtime::ws_handler), | |
| ) | |
| .nest( | |
| "/v1/mobile", | |
| crate::interfaces::http::routes::mobile::router().layer(default_limit), | |
| ) | |
| .route( | |
| "/metrics", | |
| get( | |
| |state: axum::extract::State<AppState>, headers: axum::http::HeaderMap| async move { | |
| if let Ok(expected_token) = std::env::var("METRICS_BEARER_TOKEN") { | |
| let auth_header = | |
| headers.get("Authorization").and_then(|h| h.to_str().ok()); | |
| let expected_header = format!("Bearer {}", expected_token); | |
| if auth_header != Some(expected_header.as_str()) { | |
| return axum::response::Response::builder() | |
| .status(axum::http::StatusCode::UNAUTHORIZED) | |
| .body(axum::body::Body::from("Unauthorized")) | |
| .unwrap(); | |
| } | |
| } | |
| axum::response::Response::builder() | |
| .status(axum::http::StatusCode::OK) | |
| .body(axum::body::Body::from(state.metrics_handle.render())) | |
| .unwrap() | |
| }, | |
| ), | |
| ) | |
| .route("/v1/test", get(|| async { "API_READY" })) | |
| .route("/v1/test_db", get(|state: axum::extract::State<AppState>| async move { | |
| use sqlx::Row; | |
| let merchants = sqlx::query("SELECT merchant_id, email, brand_name, slug FROM merchants") | |
| .fetch_all(&state.pool) | |
| .await | |
| .unwrap_or_default() | |
| .into_iter() | |
| .map(|r| { | |
| serde_json::json!({ | |
| "merchant_id": r.get::<String, _>("merchant_id"), | |
| "email": r.get::<String, _>("email"), | |
| "brand_name": r.get::<String, _>("brand_name"), | |
| "slug": r.get::<String, _>("slug"), | |
| }) | |
| }) | |
| .collect::<Vec<_>>(); | |
| let products = sqlx::query("SELECT link_id, merchant_id, product_name, price_inr FROM product_links") | |
| .fetch_all(&state.pool) | |
| .await | |
| .unwrap_or_default() | |
| .into_iter() | |
| .map(|r| { | |
| serde_json::json!({ | |
| "link_id": r.get::<String, _>("link_id"), | |
| "merchant_id": r.get::<String, _>("merchant_id"), | |
| "product_name": r.get::<String, _>("product_name"), | |
| "price_inr": r.get::<f64, _>("price_inr"), | |
| }) | |
| }) | |
| .collect::<Vec<_>>(); | |
| let mut orders = sqlx::query_as::<_, crate::domain::models::OrderRecord>( | |
| "SELECT transaction_id, merchant_id, link_id, buyer_phone, buyer_phone_hash, buyer_name, buyer_email, shipping_pincode, delivery_address, price_inr, status, vpa, outbound_weight, return_weight, proof_data, proof_received_at, settled_at, paid_at, shipped_at, delivered_at, shipping_method, estimated_delivery_at, payu_id, is_payment, platform_fee_paid, platform_fee, delivery_fee, distance_km, risk_score, risk_flags, cgst, sgst, igst, utr_number, platform_fee_utr, delivery_gps_lat, delivery_gps_lng, is_geofence_verified, pincode_volatility_at_checkout, discount_amount, coupon_code, checkout_gps_lat, checkout_gps_lng, device_fingerprint, created_at FROM orders ORDER BY created_at DESC" | |
| ) | |
| .fetch_all(&state.pool) | |
| .await | |
| .unwrap_or_default(); | |
| for o in &mut orders { | |
| o.decrypt_pii(); | |
| } | |
| let orders_json = orders.into_iter().map(|o| { | |
| serde_json::json!({ | |
| "transaction_id": o.transaction_id, | |
| "merchant_id": o.merchant_id, | |
| "buyer_phone": o.buyer_phone, | |
| "buyer_name": o.buyer_name, | |
| "buyer_email": o.buyer_email, | |
| "price_inr": o.price_inr, | |
| "status": o.status, | |
| "payu_id": o.payu_id, | |
| "created_at": o.created_at.map(|c| c.to_string()), | |
| }) | |
| }).collect::<Vec<_>>(); | |
| (StatusCode::OK, axum::Json(serde_json::json!({ | |
| "merchants": merchants, | |
| "products": products, | |
| "orders": orders_json, | |
| }))) | |
| })) | |
| .route("/v1/test_db/clear", get(|state: axum::extract::State<AppState>| async move { | |
| use sqlx::Row; | |
| let queries = [ | |
| "DELETE FROM orders", | |
| "DELETE FROM product_links", | |
| "DELETE FROM feedback", | |
| "DELETE FROM risk_audit_logs", | |
| "DELETE FROM idempotency_keys", | |
| "DELETE FROM dispute_evidence", | |
| "DELETE FROM settlement_ledger", | |
| "DELETE FROM payouts", | |
| "DELETE FROM subscriptions", | |
| "DELETE FROM ai_engineer_insights", | |
| "DELETE FROM error_telemetry", | |
| "DELETE FROM oauth_accounts", | |
| "DELETE FROM carrier_registry", | |
| "DELETE FROM velocity_blacklist", | |
| "DELETE FROM merchants" | |
| ]; | |
| let mut results = Vec::new(); | |
| for q in queries { | |
| match sqlx::query(q).execute(&state.pool).await { | |
| Ok(r) => results.push(format!("{}: {} rows affected", q, r.rows_affected())), | |
| Err(e) => results.push(format!("{}: Error: {}", q, e)), | |
| } | |
| } | |
| let remaining_merchants = sqlx::query("SELECT merchant_id, email, role FROM merchants") | |
| .fetch_all(&state.pool) | |
| .await | |
| .unwrap_or_default() | |
| .into_iter() | |
| .map(|r| { | |
| serde_json::json!({ | |
| "merchant_id": r.get::<String, _>("merchant_id"), | |
| "email": r.get::<String, _>("email"), | |
| "role": r.get::<String, _>("role"), | |
| }) | |
| }) | |
| .collect::<Vec<_>>(); | |
| ( | |
| StatusCode::OK, | |
| axum::Json(serde_json::json!({ | |
| "status": "cleared", | |
| "results": results, | |
| "remaining_merchants": remaining_merchants | |
| })) | |
| ) | |
| })) | |
| .route("/v1/test_db/force_pay/:txnid", get(|axum::extract::Path(txnid): axum::extract::Path<String>, state: axum::extract::State<AppState>| async move { | |
| let res = sqlx::query("UPDATE orders SET status = 'PAID_PENDING_DELIVERY', paid_at = CURRENT_TIMESTAMP WHERE transaction_id = $1 OR payu_id = $1") | |
| .bind(&txnid) | |
| .execute(&state.pool) | |
| .await; | |
| match res { | |
| Ok(r) => (StatusCode::OK, format!("Rows affected: {}", r.rows_affected())), | |
| Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), | |
| } | |
| })); | |
| let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".to_string()); | |
| if run_mode != "production" { | |
| router = router.route("/v1/test_merchant", get(|state: axum::extract::State<AppState>| async move { | |
| let unique_id = uuid::Uuid::new_v4().to_string(); | |
| let email = format!("diag_{}@example.com", &unique_id[..8]); | |
| let brand = format!("Diag Shop {}", &unique_id[..8]); | |
| let res = state.auth_service.register( | |
| &email, | |
| "StrongPass123!", | |
| &brand, | |
| None, | |
| None, | |
| Some("sovereign@upi"), | |
| ).await; | |
| match res { | |
| Ok(_) => (StatusCode::OK, axum::Json(serde_json::json!({ "status": "success", "message": "Service register succeeded!" }))), | |
| Err(e) => { | |
| let err_details = match &e { | |
| crate::domain::error::AppError::Database(db_err) => format!("Database error: {:?}, message: {}", db_err, db_err), | |
| _ => format!("{:?}", e), | |
| }; | |
| (StatusCode::INTERNAL_SERVER_ERROR, axum::Json(serde_json::json!({ "status": "error", "message": e.to_string(), "details": err_details }))) | |
| } | |
| } | |
| })); | |
| } | |
| router | |
| .with_state(state.clone()) | |
| .layer(DefaultBodyLimit::disable()) | |
| .layer(axum::middleware::from_fn_with_state( | |
| state.clone(), | |
| middleware::security_middleware, | |
| )) | |
| .layer( | |
| CorsLayer::new() | |
| .allow_origin(tower_http::cors::AllowOrigin::predicate(|origin, _| { | |
| crate::core::session::origin_is_allowed(origin.to_str().ok()) | |
| })) | |
| .allow_methods([ | |
| Method::GET, | |
| Method::POST, | |
| Method::PATCH, | |
| Method::DELETE, | |
| Method::OPTIONS, | |
| ]) | |
| .allow_headers([ | |
| axum::http::header::CONTENT_TYPE, | |
| axum::http::header::AUTHORIZATION, | |
| axum::http::header::ACCEPT, | |
| HeaderName::from_static("x-request-id"), | |
| HeaderName::from_static("x-csrf-token"), | |
| HeaderName::from_static("x-idempotency-key"), | |
| HeaderName::from_static("upgrade-insecure-requests"), | |
| HeaderName::from_static("x-tested-by"), | |
| HeaderName::from_static("x-developer-override-merchant-id"), | |
| ]) | |
| .allow_credentials(true), | |
| ) | |
| .layer(CompressionLayer::new()) | |
| .layer(TraceLayer::new_for_http()) | |
| .layer(axum::middleware::from_fn(metrics_middleware)) | |
| } | |
| async fn metrics_middleware( | |
| matched_path: Option<axum::extract::MatchedPath>, | |
| request: axum::extract::Request, | |
| next: axum::middleware::Next, | |
| ) -> axum::response::Response { | |
| let start = Instant::now(); | |
| let method = request.method().clone(); | |
| let path = if let Some(matched_path) = matched_path { | |
| matched_path.as_str().to_owned() | |
| } else { | |
| request.uri().path().to_owned() | |
| }; | |
| let response = next.run(request).await; | |
| let latency = start.elapsed().as_secs_f64(); | |
| let status = response.status().as_u16().to_string(); | |
| metrics::counter!( | |
| "http_requests_total", | |
| "method" => method.to_string(), | |
| "path" => path.clone(), | |
| "status" => status.clone() | |
| ) | |
| .increment(1); | |
| metrics::histogram!( | |
| "http_request_duration_seconds", | |
| "method" => method.to_string(), | |
| "path" => path.clone(), | |
| "status" => status.clone() | |
| ) | |
| .record(latency); | |
| response | |
| } | |