use dotenvy::dotenv; use rtix::application::services::oauth::OAuthConfig; use rtix::application::{reconciliation, services}; use rtix::core::session; use rtix::infrastructure::{db, repositories, storage::assets}; use rtix::interfaces::http::api; use std::env; use std::sync::Arc; use tracing_core::Subscriber; use tracing_subscriber::layer::Context; use tracing_subscriber::Layer; struct TelemetryLayer { pool: sqlx::PgPool, } impl Layer for TelemetryLayer { fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) { if *event.metadata().level() == tracing_core::Level::ERROR { let mut visitor = ErrorVisitor { message: String::new(), }; event.record(&mut visitor); let message = visitor.message.clone(); if !message.is_empty() { let pool = self.pool.clone(); if let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn(async move { let _ = sqlx::query( "INSERT INTO error_telemetry (source, error_level, message) VALUES ($1, $2, $3)" ) .bind("BACKEND") .bind("ERROR") .bind(&message) .execute(&pool) .await; }); } } } } } struct ErrorVisitor { message: String, } impl tracing_core::field::Visit for ErrorVisitor { fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) { if field.name() == "message" { self.message = format!("{:?}", value); } } } #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[cfg(test)] mod mock_tests; // Rebuild trigger #[tokio::main] async fn main() { dotenv().ok(); // Critical Environment Verification let required_vars = [ "DATABASE_URL", "JWT_SECRET", "RTIX_UPI_ID", "RTIX_MASTER_KEY", ]; let mut missing_vars = Vec::new(); for var in required_vars { if env::var(var).is_err() { missing_vars.push(var); } } if !missing_vars.is_empty() { eprintln!( "FATAL: Missing required environment variables: {:?}", missing_vars ); eprintln!("Please add these to your Render Dashboard (Environment tab)."); std::process::exit(1); } use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; // Initialize OpenTelemetry Tracer let tracer = rtix::core::telemetry::init_tracer().expect("Failed to initialize OpenTelemetry tracer"); let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); // Database Connection & Schema Initialization let db_router = Arc::new(db::init_db_router().await); let pool = db_router.primary().clone(); // Initialize Observability Registry tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| "rtix=info,tower_http=info,axum::rejection=warn".into()), ) .with(tracing_subscriber::fmt::layer()) .with(telemetry_layer) .with(TelemetryLayer { pool: pool.clone() }) .init(); let env_type = if env::var("RENDER").is_ok() { "RENDER_CLOUD" } else { "LOCAL_DEVELOPMENT" }; tracing::info!("🚀 Starting Rtix Secure Backend (Env: {})", env_type); // Initialize Prometheus Metrics let metrics_handle = metrics_exporter_prometheus::PrometheusBuilder::new() .install_recorder() .expect("Failed to install Prometheus recorder"); // Initialize global broadcast channel for real-time events (4096 message buffer for headroom) let (tx, _rx) = tokio::sync::broadcast::channel(4096); // Start background IP block cache refresher (eliminates per-request DB queries) rtix::interfaces::http::middleware::spawn_blocked_ip_cache_refresher(pool.clone()); // Asset Provider let upload_dir = env::var("UPLOAD_DIR").unwrap_or_else(|_| "uploads".to_string()); let asset_provider: Arc = if let (Ok(bucket), Ok(access_key), Ok(secret_key)) = ( env::var("S3_BUCKET"), env::var("S3_ACCESS_KEY_ID"), env::var("S3_SECRET_ACCESS_KEY"), ) { let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-1".to_string()); let endpoint = env::var("S3_ENDPOINT").ok(); let cdn_url = env::var("S3_CDN_URL").ok(); tracing::info!( "Initializing S3 Cloud Asset Provider for bucket: {}", bucket ); Arc::new(assets::S3AssetProvider::new( bucket, region, access_key, secret_key, endpoint, cdn_url, )) } else { tracing::info!("Initializing Local File Asset Provider at: {}", upload_dir); Arc::new(assets::LocalAssetProvider::new(&upload_dir)) }; // Repositories let merchant_repo = Arc::new(repositories::merchant::PostgresMerchantRepository::new( pool.clone(), )); let order_repo = Arc::new(repositories::order::PostgresOrderRepository::new( pool.clone(), )); let product_repo = Arc::new(repositories::product::PostgresProductRepository::new( pool.clone(), )); let idempotency_repo = Arc::new(repositories::idempotency::PostgresIdempotencyRepository::new(pool.clone())); let coupon_repo = Arc::new(repositories::coupon::PostgresCouponRepository::new( pool.clone(), )); // Services let auth_service = Arc::new(services::auth::RtixAuthService::new( merchant_repo.clone(), session::jwt_secret(), )); let merchant_service = Arc::new(services::merchant::RtixMerchantService::new( merchant_repo.clone(), product_repo.clone(), order_repo.clone(), coupon_repo.clone(), pool.clone(), tx.clone(), )); let checkout_service = Arc::new(services::checkout::RtixCheckoutService::new( product_repo.clone(), merchant_repo.clone(), order_repo.clone(), asset_provider.clone(), tx.clone(), pool.clone(), )); let payment_service = Arc::new(services::payment::RtixPaymentService::new( order_repo.clone(), merchant_repo.clone(), tx.clone(), )); let customer_service = Arc::new(services::customer::RtixCustomerService::new( pool.clone(), session::jwt_secret(), )); let idempotency_service = Arc::new(services::idempotency::RtixIdempotencyService::new( idempotency_repo, )); let intelligence_service = Arc::new(services::intelligence::IntelligenceService::new( pool.clone(), )); // Spawn AI Engineer background telemetry processor let bg_intelligence_service = intelligence_service.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); loop { interval.tick().await; if let Err(e) = bg_intelligence_service.process_unanalyzed_telemetry().await { tracing::error!("Background AI telemetry analysis failed: {:?}", e); } } }); let state = api::AppState { pool, db: db_router, tx, assets: asset_provider, auth_service, merchant_service, checkout_service, payment_service, customer_service, idempotency_service, intelligence_service, order_repo: order_repo.clone(), metrics_handle: Arc::new(metrics_handle), oauth_config: OAuthConfig::from_env(), jwt_secret: std::env::var("JWT_SECRET").unwrap_or_default(), }; let app = api::create_router(state.clone()); // Spawn Autonomous Reconciliation Engine (Background Task) let recon_state = state.clone(); tokio::spawn(async move { reconciliation::spawn_reconciliation_worker(recon_state).await; }); // Spawn Protocol Sentinel (Operational Guard & Recovery) let protocol_sentinel = services::ProtocolSentinel::new(state.merchant_service.clone(), state.pool.clone()); tokio::spawn(async move { protocol_sentinel.run().await; }); // Spawn Webhook Service (Background Task) let webhook_service = services::webhook::WebhookService::new(state.pool.clone()); let webhook_rx = state.tx.subscribe(); tokio::spawn(async move { webhook_service.run(webhook_rx).await; }); // Spawn SRE AI Log and Database Monitor (Continuous Background Worker) rtix::interfaces::http::routes::developer::start_ai_log_monitor(state.pool.clone()); let port = env::var("PORT") .unwrap_or_else(|_| "3000".to_string()) .parse::() .unwrap_or(3000); let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)) .await .expect("Failed to bind port"); tracing::info!( "🛡️ Rtix Core Engine ONLINE on {}", listener.local_addr().unwrap() ); tracing::info!("💎 Security Guard Active. Payment Systems Stabilized."); // Graceful Shutdown Logic let shutdown = async { let ctrl_c = tokio::signal::ctrl_c(); let terminate = async { tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("failed to install signal handler") .recv() .await; }; tokio::select! { _ = ctrl_c => tracing::info!("Shutdown signal received (Ctrl-C)"), _ = terminate => tracing::info!("Shutdown signal received (SIGTERM)"), } tracing::info!("Commencing graceful shutdown of settlement engines..."); }; axum::serve(listener, app) .with_graceful_shutdown(shutdown) .await .expect("Server runtime error"); rtix::core::telemetry::shutdown_tracer(); tracing::info!("Rtix Secure Protocol OFFLINE. All assets protected."); }