Spaces:
Running
Running
| use dotenvy::dotenv; | |
| use rtix::application::services::oauth::OAuthConfig; | |
| use rtix::application::{reconciliation, services}; | |
| use rtix::core::session; | |
| use rtix::infrastructure::{db, repositories, storage::assets}; | |
| use rtix::interfaces::http::api; | |
| use std::env; | |
| use std::sync::Arc; | |
| use tracing_core::Subscriber; | |
| use tracing_subscriber::layer::Context; | |
| use tracing_subscriber::Layer; | |
| struct TelemetryLayer { | |
| pool: sqlx::PgPool, | |
| } | |
| impl<S: Subscriber> Layer<S> for TelemetryLayer { | |
| fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) { | |
| if *event.metadata().level() == tracing_core::Level::ERROR { | |
| let mut visitor = ErrorVisitor { | |
| message: String::new(), | |
| }; | |
| event.record(&mut visitor); | |
| let message = visitor.message.clone(); | |
| if !message.is_empty() { | |
| let pool = self.pool.clone(); | |
| if let Ok(handle) = tokio::runtime::Handle::try_current() { | |
| handle.spawn(async move { | |
| let _ = sqlx::query( | |
| "INSERT INTO error_telemetry (source, error_level, message) VALUES ($1, $2, $3)" | |
| ) | |
| .bind("BACKEND") | |
| .bind("ERROR") | |
| .bind(&message) | |
| .execute(&pool) | |
| .await; | |
| }); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| struct ErrorVisitor { | |
| message: String, | |
| } | |
| impl tracing_core::field::Visit for ErrorVisitor { | |
| fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) { | |
| if field.name() == "message" { | |
| self.message = format!("{:?}", value); | |
| } | |
| } | |
| } | |
| static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; | |
| mod mock_tests; | |
| // Rebuild trigger | |
| async fn main() { | |
| dotenv().ok(); | |
| // Critical Environment Verification | |
| let required_vars = [ | |
| "DATABASE_URL", | |
| "JWT_SECRET", | |
| "RTIX_UPI_ID", | |
| "RTIX_MASTER_KEY", | |
| ]; | |
| let mut missing_vars = Vec::new(); | |
| for var in required_vars { | |
| if env::var(var).is_err() { | |
| missing_vars.push(var); | |
| } | |
| } | |
| if !missing_vars.is_empty() { | |
| eprintln!( | |
| "FATAL: Missing required environment variables: {:?}", | |
| missing_vars | |
| ); | |
| eprintln!("Please add these to your Render Dashboard (Environment tab)."); | |
| std::process::exit(1); | |
| } | |
| use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; | |
| // Initialize OpenTelemetry Tracer | |
| let tracer = | |
| rtix::core::telemetry::init_tracer().expect("Failed to initialize OpenTelemetry tracer"); | |
| let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); | |
| // Database Connection & Schema Initialization | |
| let db_router = Arc::new(db::init_db_router().await); | |
| let pool = db_router.primary().clone(); | |
| // Initialize Observability Registry | |
| tracing_subscriber::registry() | |
| .with( | |
| tracing_subscriber::EnvFilter::try_from_default_env() | |
| .unwrap_or_else(|_| "rtix=info,tower_http=info,axum::rejection=warn".into()), | |
| ) | |
| .with(tracing_subscriber::fmt::layer()) | |
| .with(telemetry_layer) | |
| .with(TelemetryLayer { pool: pool.clone() }) | |
| .init(); | |
| let env_type = if env::var("RENDER").is_ok() { | |
| "RENDER_CLOUD" | |
| } else { | |
| "LOCAL_DEVELOPMENT" | |
| }; | |
| tracing::info!("🚀 Starting Rtix Secure Backend (Env: {})", env_type); | |
| // Initialize Prometheus Metrics | |
| let metrics_handle = metrics_exporter_prometheus::PrometheusBuilder::new() | |
| .install_recorder() | |
| .expect("Failed to install Prometheus recorder"); | |
| // Initialize global broadcast channel for real-time events (4096 message buffer for headroom) | |
| let (tx, _rx) = tokio::sync::broadcast::channel(4096); | |
| // Start background IP block cache refresher (eliminates per-request DB queries) | |
| rtix::interfaces::http::middleware::spawn_blocked_ip_cache_refresher(pool.clone()); | |
| // Asset Provider | |
| let upload_dir = env::var("UPLOAD_DIR").unwrap_or_else(|_| "uploads".to_string()); | |
| let asset_provider: Arc<dyn assets::AssetProvider> = | |
| if let (Ok(bucket), Ok(access_key), Ok(secret_key)) = ( | |
| env::var("S3_BUCKET"), | |
| env::var("S3_ACCESS_KEY_ID"), | |
| env::var("S3_SECRET_ACCESS_KEY"), | |
| ) { | |
| let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-1".to_string()); | |
| let endpoint = env::var("S3_ENDPOINT").ok(); | |
| let cdn_url = env::var("S3_CDN_URL").ok(); | |
| tracing::info!( | |
| "Initializing S3 Cloud Asset Provider for bucket: {}", | |
| bucket | |
| ); | |
| Arc::new(assets::S3AssetProvider::new( | |
| bucket, region, access_key, secret_key, endpoint, cdn_url, | |
| )) | |
| } else { | |
| tracing::info!("Initializing Local File Asset Provider at: {}", upload_dir); | |
| Arc::new(assets::LocalAssetProvider::new(&upload_dir)) | |
| }; | |
| // Repositories | |
| let merchant_repo = Arc::new(repositories::merchant::PostgresMerchantRepository::new( | |
| pool.clone(), | |
| )); | |
| let order_repo = Arc::new(repositories::order::PostgresOrderRepository::new( | |
| pool.clone(), | |
| )); | |
| let product_repo = Arc::new(repositories::product::PostgresProductRepository::new( | |
| pool.clone(), | |
| )); | |
| let idempotency_repo = | |
| Arc::new(repositories::idempotency::PostgresIdempotencyRepository::new(pool.clone())); | |
| let coupon_repo = Arc::new(repositories::coupon::PostgresCouponRepository::new( | |
| pool.clone(), | |
| )); | |
| // Services | |
| let auth_service = Arc::new(services::auth::RtixAuthService::new( | |
| merchant_repo.clone(), | |
| session::jwt_secret(), | |
| )); | |
| let merchant_service = Arc::new(services::merchant::RtixMerchantService::new( | |
| merchant_repo.clone(), | |
| product_repo.clone(), | |
| order_repo.clone(), | |
| coupon_repo.clone(), | |
| pool.clone(), | |
| tx.clone(), | |
| )); | |
| let checkout_service = Arc::new(services::checkout::RtixCheckoutService::new( | |
| product_repo.clone(), | |
| merchant_repo.clone(), | |
| order_repo.clone(), | |
| asset_provider.clone(), | |
| tx.clone(), | |
| pool.clone(), | |
| )); | |
| let payment_service = Arc::new(services::payment::RtixPaymentService::new( | |
| order_repo.clone(), | |
| merchant_repo.clone(), | |
| tx.clone(), | |
| )); | |
| let customer_service = Arc::new(services::customer::RtixCustomerService::new( | |
| pool.clone(), | |
| session::jwt_secret(), | |
| )); | |
| let idempotency_service = Arc::new(services::idempotency::RtixIdempotencyService::new( | |
| idempotency_repo, | |
| )); | |
| let intelligence_service = Arc::new(services::intelligence::IntelligenceService::new( | |
| pool.clone(), | |
| )); | |
| // Spawn AI Engineer background telemetry processor | |
| let bg_intelligence_service = intelligence_service.clone(); | |
| tokio::spawn(async move { | |
| let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); | |
| loop { | |
| interval.tick().await; | |
| if let Err(e) = bg_intelligence_service.process_unanalyzed_telemetry().await { | |
| tracing::error!("Background AI telemetry analysis failed: {:?}", e); | |
| } | |
| } | |
| }); | |
| let state = api::AppState { | |
| pool, | |
| db: db_router, | |
| tx, | |
| assets: asset_provider, | |
| auth_service, | |
| merchant_service, | |
| checkout_service, | |
| payment_service, | |
| customer_service, | |
| idempotency_service, | |
| intelligence_service, | |
| order_repo: order_repo.clone(), | |
| metrics_handle: Arc::new(metrics_handle), | |
| oauth_config: OAuthConfig::from_env(), | |
| jwt_secret: std::env::var("JWT_SECRET").unwrap_or_default(), | |
| }; | |
| let app = api::create_router(state.clone()); | |
| // Spawn Autonomous Reconciliation Engine (Background Task) | |
| let recon_state = state.clone(); | |
| tokio::spawn(async move { | |
| reconciliation::spawn_reconciliation_worker(recon_state).await; | |
| }); | |
| // Spawn Protocol Sentinel (Operational Guard & Recovery) | |
| let protocol_sentinel = | |
| services::ProtocolSentinel::new(state.merchant_service.clone(), state.pool.clone()); | |
| tokio::spawn(async move { | |
| protocol_sentinel.run().await; | |
| }); | |
| // Spawn Webhook Service (Background Task) | |
| let webhook_service = services::webhook::WebhookService::new(state.pool.clone()); | |
| let webhook_rx = state.tx.subscribe(); | |
| tokio::spawn(async move { | |
| webhook_service.run(webhook_rx).await; | |
| }); | |
| // Spawn SRE AI Log and Database Monitor (Continuous Background Worker) | |
| rtix::interfaces::http::routes::developer::start_ai_log_monitor(state.pool.clone()); | |
| let port = env::var("PORT") | |
| .unwrap_or_else(|_| "3000".to_string()) | |
| .parse::<u16>() | |
| .unwrap_or(3000); | |
| let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)) | |
| .await | |
| .expect("Failed to bind port"); | |
| tracing::info!( | |
| "🛡️ Rtix Core Engine ONLINE on {}", | |
| listener.local_addr().unwrap() | |
| ); | |
| tracing::info!("💎 Security Guard Active. Payment Systems Stabilized."); | |
| // Graceful Shutdown Logic | |
| let shutdown = async { | |
| let ctrl_c = tokio::signal::ctrl_c(); | |
| let terminate = async { | |
| tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) | |
| .expect("failed to install signal handler") | |
| .recv() | |
| .await; | |
| }; | |
| tokio::select! { | |
| _ = ctrl_c => tracing::info!("Shutdown signal received (Ctrl-C)"), | |
| _ = terminate => tracing::info!("Shutdown signal received (SIGTERM)"), | |
| } | |
| tracing::info!("Commencing graceful shutdown of settlement engines..."); | |
| }; | |
| axum::serve(listener, app) | |
| .with_graceful_shutdown(shutdown) | |
| .await | |
| .expect("Server runtime error"); | |
| rtix::core::telemetry::shutdown_tracer(); | |
| tracing::info!("Rtix Secure Protocol OFFLINE. All assets protected."); | |
| } | |