RTIX / src /main.rs
github-actions
deploy: clean backend production release
c33971d
use dotenvy::dotenv;
use rtix::application::services::oauth::OAuthConfig;
use rtix::application::{reconciliation, services};
use rtix::core::session;
use rtix::infrastructure::{db, repositories, storage::assets};
use rtix::interfaces::http::api;
use std::env;
use std::sync::Arc;
use tracing_core::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
struct TelemetryLayer {
pool: sqlx::PgPool,
}
impl<S: Subscriber> Layer<S> for TelemetryLayer {
fn on_event(&self, event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
if *event.metadata().level() == tracing_core::Level::ERROR {
let mut visitor = ErrorVisitor {
message: String::new(),
};
event.record(&mut visitor);
let message = visitor.message.clone();
if !message.is_empty() {
let pool = self.pool.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = sqlx::query(
"INSERT INTO error_telemetry (source, error_level, message) VALUES ($1, $2, $3)"
)
.bind("BACKEND")
.bind("ERROR")
.bind(&message)
.execute(&pool)
.await;
});
}
}
}
}
}
struct ErrorVisitor {
message: String,
}
impl tracing_core::field::Visit for ErrorVisitor {
fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{:?}", value);
}
}
}
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[cfg(test)]
mod mock_tests;
// Rebuild trigger
#[tokio::main]
async fn main() {
dotenv().ok();
// Critical Environment Verification
let required_vars = [
"DATABASE_URL",
"JWT_SECRET",
"RTIX_UPI_ID",
"RTIX_MASTER_KEY",
];
let mut missing_vars = Vec::new();
for var in required_vars {
if env::var(var).is_err() {
missing_vars.push(var);
}
}
if !missing_vars.is_empty() {
eprintln!(
"FATAL: Missing required environment variables: {:?}",
missing_vars
);
eprintln!("Please add these to your Render Dashboard (Environment tab).");
std::process::exit(1);
}
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// Initialize OpenTelemetry Tracer
let tracer =
rtix::core::telemetry::init_tracer().expect("Failed to initialize OpenTelemetry tracer");
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
// Database Connection & Schema Initialization
let db_router = Arc::new(db::init_db_router().await);
let pool = db_router.primary().clone();
// Initialize Observability Registry
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "rtix=info,tower_http=info,axum::rejection=warn".into()),
)
.with(tracing_subscriber::fmt::layer())
.with(telemetry_layer)
.with(TelemetryLayer { pool: pool.clone() })
.init();
let env_type = if env::var("RENDER").is_ok() {
"RENDER_CLOUD"
} else {
"LOCAL_DEVELOPMENT"
};
tracing::info!("🚀 Starting Rtix Secure Backend (Env: {})", env_type);
// Initialize Prometheus Metrics
let metrics_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.install_recorder()
.expect("Failed to install Prometheus recorder");
// Initialize global broadcast channel for real-time events (4096 message buffer for headroom)
let (tx, _rx) = tokio::sync::broadcast::channel(4096);
// Start background IP block cache refresher (eliminates per-request DB queries)
rtix::interfaces::http::middleware::spawn_blocked_ip_cache_refresher(pool.clone());
// Asset Provider
let upload_dir = env::var("UPLOAD_DIR").unwrap_or_else(|_| "uploads".to_string());
let asset_provider: Arc<dyn assets::AssetProvider> =
if let (Ok(bucket), Ok(access_key), Ok(secret_key)) = (
env::var("S3_BUCKET"),
env::var("S3_ACCESS_KEY_ID"),
env::var("S3_SECRET_ACCESS_KEY"),
) {
let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-1".to_string());
let endpoint = env::var("S3_ENDPOINT").ok();
let cdn_url = env::var("S3_CDN_URL").ok();
tracing::info!(
"Initializing S3 Cloud Asset Provider for bucket: {}",
bucket
);
Arc::new(assets::S3AssetProvider::new(
bucket, region, access_key, secret_key, endpoint, cdn_url,
))
} else {
tracing::info!("Initializing Local File Asset Provider at: {}", upload_dir);
Arc::new(assets::LocalAssetProvider::new(&upload_dir))
};
// Repositories
let merchant_repo = Arc::new(repositories::merchant::PostgresMerchantRepository::new(
pool.clone(),
));
let order_repo = Arc::new(repositories::order::PostgresOrderRepository::new(
pool.clone(),
));
let product_repo = Arc::new(repositories::product::PostgresProductRepository::new(
pool.clone(),
));
let idempotency_repo =
Arc::new(repositories::idempotency::PostgresIdempotencyRepository::new(pool.clone()));
let coupon_repo = Arc::new(repositories::coupon::PostgresCouponRepository::new(
pool.clone(),
));
// Services
let auth_service = Arc::new(services::auth::RtixAuthService::new(
merchant_repo.clone(),
session::jwt_secret(),
));
let merchant_service = Arc::new(services::merchant::RtixMerchantService::new(
merchant_repo.clone(),
product_repo.clone(),
order_repo.clone(),
coupon_repo.clone(),
pool.clone(),
tx.clone(),
));
let checkout_service = Arc::new(services::checkout::RtixCheckoutService::new(
product_repo.clone(),
merchant_repo.clone(),
order_repo.clone(),
asset_provider.clone(),
tx.clone(),
pool.clone(),
));
let payment_service = Arc::new(services::payment::RtixPaymentService::new(
order_repo.clone(),
merchant_repo.clone(),
tx.clone(),
));
let customer_service = Arc::new(services::customer::RtixCustomerService::new(
pool.clone(),
session::jwt_secret(),
));
let idempotency_service = Arc::new(services::idempotency::RtixIdempotencyService::new(
idempotency_repo,
));
let intelligence_service = Arc::new(services::intelligence::IntelligenceService::new(
pool.clone(),
));
// Spawn AI Engineer background telemetry processor
let bg_intelligence_service = intelligence_service.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
if let Err(e) = bg_intelligence_service.process_unanalyzed_telemetry().await {
tracing::error!("Background AI telemetry analysis failed: {:?}", e);
}
}
});
let state = api::AppState {
pool,
db: db_router,
tx,
assets: asset_provider,
auth_service,
merchant_service,
checkout_service,
payment_service,
customer_service,
idempotency_service,
intelligence_service,
order_repo: order_repo.clone(),
metrics_handle: Arc::new(metrics_handle),
oauth_config: OAuthConfig::from_env(),
jwt_secret: std::env::var("JWT_SECRET").unwrap_or_default(),
};
let app = api::create_router(state.clone());
// Spawn Autonomous Reconciliation Engine (Background Task)
let recon_state = state.clone();
tokio::spawn(async move {
reconciliation::spawn_reconciliation_worker(recon_state).await;
});
// Spawn Protocol Sentinel (Operational Guard & Recovery)
let protocol_sentinel =
services::ProtocolSentinel::new(state.merchant_service.clone(), state.pool.clone());
tokio::spawn(async move {
protocol_sentinel.run().await;
});
// Spawn Webhook Service (Background Task)
let webhook_service = services::webhook::WebhookService::new(state.pool.clone());
let webhook_rx = state.tx.subscribe();
tokio::spawn(async move {
webhook_service.run(webhook_rx).await;
});
// Spawn SRE AI Log and Database Monitor (Continuous Background Worker)
rtix::interfaces::http::routes::developer::start_ai_log_monitor(state.pool.clone());
let port = env::var("PORT")
.unwrap_or_else(|_| "3000".to_string())
.parse::<u16>()
.unwrap_or(3000);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.expect("Failed to bind port");
tracing::info!(
"🛡️ Rtix Core Engine ONLINE on {}",
listener.local_addr().unwrap()
);
tracing::info!("💎 Security Guard Active. Payment Systems Stabilized.");
// Graceful Shutdown Logic
let shutdown = async {
let ctrl_c = tokio::signal::ctrl_c();
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
tokio::select! {
_ = ctrl_c => tracing::info!("Shutdown signal received (Ctrl-C)"),
_ = terminate => tracing::info!("Shutdown signal received (SIGTERM)"),
}
tracing::info!("Commencing graceful shutdown of settlement engines...");
};
axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await
.expect("Server runtime error");
rtix::core::telemetry::shutdown_tracer();
tracing::info!("Rtix Secure Protocol OFFLINE. All assets protected.");
}