maris-ai-master / backend-rust /src /gpu_telemetry.rs
MarisUK's picture
Maris AI model sync
f440f03 verified
use std::{sync::Arc, time::Duration};
use anyhow::Context;
use nvml_wrapper::{enum_wrappers::device::TemperatureSensor, Nvml};
use sqlx::PgPool;
use tokio::time::sleep;
use crate::config::Settings;
pub async fn spawn_telemetry_task(pool: PgPool, settings: Arc<Settings>) -> anyhow::Result<()> {
if !settings.gpu_nvml_enabled {
return Ok(());
}
let nvml = match Nvml::init() {
Ok(nvml) => Arc::new(nvml),
Err(error) => {
tracing::warn!("NVML nav pieejams: {}", error);
return Ok(());
}
};
tokio::spawn(async move {
loop {
if let Err(error) = collect_once(&pool, &nvml).await {
tracing::warn!("GPU telemetry kļūda: {}", error);
}
sleep(Duration::from_millis(settings.gpu_telemetry_poll_ms.max(500))).await;
}
});
Ok(())
}
async fn collect_once(pool: &PgPool, nvml: &Nvml) -> anyhow::Result<()> {
let count = nvml.device_count().context("Neizdevās nolasīt NVML device count")?;
for device_index in 0..count {
let device = nvml
.device_by_index(device_index)
.with_context(|| format!("Neizdevās nolasīt GPU {}", device_index))?;
let memory = device.memory_info().context("Neizdevās nolasīt GPU memory")?;
let utilization = device.utilization_rates().ok();
let power = device.power_usage().ok().map(|value| value as f64 / 1000.0);
let temperature = device.temperature(TemperatureSensor::Gpu).ok().map(|value| value as i32);
let name = device.name().unwrap_or_else(|_| format!("GPU-{}", device_index));
let used_mb = (memory.used / 1024 / 1024) as i32;
let total_mb = (memory.total / 1024 / 1024) as i32;
let free_mb = (memory.free / 1024 / 1024) as i32;
let utilization_percent = utilization.map(|item| item.gpu as f32).unwrap_or(0.0);
sqlx::query(
r#"
INSERT INTO vision_gpu_resources (
device_id, device_name, memory_total_mb, memory_available_mb, active_jobs,
utilization_percent, status, metadata_json, last_heartbeat
)
VALUES ($1, $2, $3, $4, COALESCE((SELECT active_jobs FROM vision_gpu_resources WHERE device_id = $1), 0),
$5, 'healthy', $6, NOW())
ON CONFLICT (device_id) DO UPDATE SET
device_name = EXCLUDED.device_name,
memory_total_mb = EXCLUDED.memory_total_mb,
memory_available_mb = EXCLUDED.memory_available_mb,
utilization_percent = EXCLUDED.utilization_percent,
metadata_json = EXCLUDED.metadata_json,
status = 'healthy',
last_heartbeat = NOW()
"#,
)
.bind(device_index as i32)
.bind(name)
.bind(total_mb)
.bind(free_mb)
.bind(utilization_percent)
.bind(serde_json::json!({
"temperature_c": temperature,
"power_watts": power,
"memory_used_mb": used_mb,
}))
.execute(pool)
.await
.context("Neizdevās atjaunināt GPU telemetry")?;
sqlx::query(
r#"
INSERT INTO vision_gpu_telemetry_history (
device_id, temperature_c, power_watts, memory_used_mb, memory_total_mb, utilization_percent, metadata_json
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(device_index as i32)
.bind(temperature)
.bind(power)
.bind(used_mb)
.bind(total_mb)
.bind(utilization_percent)
.bind(serde_json::json!({
"memory_available_mb": free_mb,
}))
.execute(pool)
.await
.context("Neizdevās saglabāt GPU telemetry history")?;
}
Ok(())
}