renminwansui1976 commited on
Commit
7338993
·
unverified ·
1 Parent(s): 1f6fc81

更新 main.rs

Browse files
Files changed (1) hide show
  1. src/main.rs +179 -239
src/main.rs CHANGED
@@ -17,20 +17,58 @@ use walkdir::WalkDir;
17
  const WORKSPACE_DIR: &str = "/home/user/.openclaw";
18
  const STATE_FILE: &str = ".hf-sync-state.json";
19
  const FINAL_WAIT_TIMEOUT: Duration = Duration::from_secs(20);
20
-
21
- /// How long to wait for HF to become reachable at startup before giving up
22
- /// and continuing with an empty / local workspace.
23
  const NETWORK_STARTUP_TIMEOUT: Duration = Duration::from_secs(60);
24
  const NETWORK_RETRY_INTERVAL: Duration = Duration::from_secs(5);
25
 
 
 
 
 
26
  #[derive(Debug, Clone)]
27
- struct Config {
28
  token: String,
29
  dataset_id: String,
30
  sync_interval: Duration,
31
  workspace: PathBuf,
32
  }
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  #[derive(Debug, Deserialize)]
35
  struct RepoLookup {
36
  id: Option<String>,
@@ -63,11 +101,7 @@ struct CommitRequest {
63
  #[serde(tag = "op")]
64
  enum CommitOperation {
65
  #[serde(rename = "addOrUpdate")]
66
- AddOrUpdate {
67
- path: String,
68
- encoding: String,
69
- content: String,
70
- },
71
  #[serde(rename = "delete")]
72
  Delete { path: String },
73
  }
@@ -83,6 +117,8 @@ struct FileState {
83
  size: u64,
84
  }
85
 
 
 
86
  #[tokio::main]
87
  async fn main() {
88
  if let Err(err) = run().await {
@@ -92,67 +128,75 @@ async fn main() {
92
  }
93
 
94
  async fn run() -> Result<()> {
95
- let cfg = load_config()?;
96
- tokio::fs::create_dir_all(&cfg.workspace)
 
97
  .await
98
  .context("failed to create workspace directory")?;
99
 
100
- let client = build_client(&cfg)?;
101
-
102
- // ── Startup sync: wait for network, then pull. Non-fatal on failure. ────────
103
- // HF Space containers sometimes take 10-30s for DNS/routing to become
104
- // available after the container starts. We wait up to 60s, then proceed.
105
- eprintln!("waiting for network connectivity to huggingface.co…");
106
- match wait_for_network(&client).await {
107
- true => {
108
- eprintln!("network is up — proceeding with startup sync");
109
- if let Err(err) = startup_sync(&client, &cfg).await {
110
- eprintln!("startup sync failed (continuing with local workspace): {err:#}");
111
  }
 
 
112
  }
113
- false => {
114
- eprintln!(
115
- "huggingface.co unreachable after {:?} — starting with local workspace",
116
- NETWORK_STARTUP_TIMEOUT
117
- );
118
- }
119
- }
120
 
121
- rebuild_sync_state(&cfg.workspace).await?;
 
122
 
123
- // ── Spawn child process ──────────────────────────────────────────────────────
124
  let mut child = spawn_child_from_args()?;
125
  let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
126
- let mut ticker = interval(cfg.sync_interval);
127
 
128
- loop {
129
- tokio::select! {
130
- _ = ticker.tick() => {
131
- if let Err(err) = push_workspace(&client, &cfg).await {
132
- eprintln!("periodic push failed: {err:#}");
 
 
 
 
 
 
 
133
  }
134
- }
135
- _ = sigterm.recv() => {
136
- eprintln!("received SIGTERM — forwarding and running final sync");
137
- forward_sigterm(&mut child);
138
- if let Err(err) = push_workspace(&client, &cfg).await {
139
- eprintln!("final push failed: {err:#}");
 
 
 
 
 
140
  }
141
- wait_for_child_shutdown(&mut child).await;
142
- break;
143
  }
144
- status = child.wait() => {
145
- match status {
146
- Ok(s) => {
147
- eprintln!("child exited: {s}");
148
- if let Err(err) = push_workspace(&client, &cfg).await {
149
- eprintln!("final push after child exit failed: {err:#}");
150
- }
151
- exit(s.code().unwrap_or(1));
152
- }
153
- Err(err) => {
154
- eprintln!("error waiting for child: {err:#}");
155
- break;
 
 
156
  }
157
  }
158
  }
@@ -162,87 +206,62 @@ async fn run() -> Result<()> {
162
  Ok(())
163
  }
164
 
165
- /// Poll HF until reachable or timeout. Returns true if network came up.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
166
  async fn wait_for_network(client: &reqwest::Client) -> bool {
167
- let probe_url = "https://huggingface.co/api/whoami-v2";
168
  let deadline = tokio::time::Instant::now() + NETWORK_STARTUP_TIMEOUT;
169
-
170
  loop {
171
- // Use a short per-attempt timeout so we fail fast and retry
172
- let attempt = timeout(
173
- Duration::from_secs(8),
174
- client.get(probe_url).send(),
175
- )
176
- .await;
177
-
178
- match attempt {
179
- // Any HTTP response (even 401) means DNS + routing is working
180
- Ok(Ok(_)) => return true,
181
- Ok(Err(err)) => eprintln!("network probe error: {err}"),
182
- Err(_) => eprintln!("network probe timed out"),
183
  }
184
-
185
  if tokio::time::Instant::now() >= deadline {
186
  return false;
187
  }
188
-
189
- eprintln!("retrying in {:?}…", NETWORK_RETRY_INTERVAL);
190
  sleep(NETWORK_RETRY_INTERVAL).await;
191
  }
192
  }
193
 
194
- /// Pull workspace from HF dataset on first boot.
195
- async fn startup_sync(client: &reqwest::Client, cfg: &Config) -> Result<()> {
 
196
  ensure_dataset_exists(client, cfg).await?;
197
  pull_workspace(client, cfg).await
198
  }
199
 
200
- fn load_config() -> Result<Config> {
201
- let token = env::var("HF_TOKEN").context("HF_TOKEN is required")?;
202
-
203
- let dataset_id = env::var("OPENCLAW_DATASET_REPO")
204
- .or_else(|_| env::var("HF_DATASET_ID"))
205
- .context("OPENCLAW_DATASET_REPO (or HF_DATASET_ID) is required")?;
206
-
207
- validate_dataset_id(&dataset_id)?;
208
-
209
- let sync_interval_secs: u64 = env::var("SYNC_INTERVAL")
210
- .unwrap_or_else(|_| "60".to_string())
211
- .parse()
212
- .context("SYNC_INTERVAL must be an integer")?;
213
-
214
- if sync_interval_secs == 0 {
215
- return Err(anyhow!("SYNC_INTERVAL must be > 0"));
216
- }
217
-
218
- Ok(Config {
219
- token,
220
- dataset_id,
221
- sync_interval: Duration::from_secs(sync_interval_secs),
222
- workspace: PathBuf::from(WORKSPACE_DIR),
223
- })
224
- }
225
-
226
- fn validate_dataset_id(id: &str) -> Result<()> {
227
- let mut parts = id.split('/');
228
- let owner = parts.next().unwrap_or_default();
229
- let repo = parts.next().unwrap_or_default();
230
- if owner.is_empty() || repo.is_empty() || parts.next().is_some() {
231
- return Err(anyhow!("OPENCLAW_DATASET_REPO must be owner/name"));
232
- }
233
- Ok(())
234
- }
235
 
236
- fn build_client(cfg: &Config) -> Result<reqwest::Client> {
237
  let mut headers = reqwest::header::HeaderMap::new();
238
  headers.insert(
239
  AUTHORIZATION,
240
- format!("Bearer {}", cfg.token)
241
- .parse()
242
- .context("invalid HF_TOKEN")?,
243
  );
244
  headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
245
-
246
  reqwest::Client::builder()
247
  .default_headers(headers)
248
  .timeout(Duration::from_secs(30))
@@ -250,52 +269,40 @@ fn build_client(cfg: &Config) -> Result<reqwest::Client> {
250
  .context("failed to build HTTP client")
251
  }
252
 
253
- async fn ensure_dataset_exists(client: &reqwest::Client, cfg: &Config) -> Result<()> {
254
  let url = format!("https://huggingface.co/api/datasets/{}", cfg.dataset_id);
255
  let resp = client.get(&url).send().await?;
256
-
257
  match resp.status() {
258
  StatusCode::OK => {
259
  let repo: RepoLookup = resp.json().await.unwrap_or(RepoLookup { id: None });
260
- eprintln!("dataset ok: {}", repo.id.unwrap_or(cfg.dataset_id.clone()));
261
  Ok(())
262
  }
263
  StatusCode::NOT_FOUND => {
264
  let auto = env::var("AUTO_CREATE_DATASET")
265
- .map(|v| matches!(v.to_lowercase().as_str(), "1" | "true" | "yes" | "on"))
266
  .unwrap_or(false);
267
  if auto {
268
- eprintln!("dataset not found — AUTO_CREATE_DATASET=true, creating…");
269
  create_private_dataset(client, cfg).await
270
  } else {
271
  Err(anyhow!(
272
- "dataset {} not found. Create it at huggingface.co/new-dataset \
273
- or set AUTO_CREATE_DATASET=true.",
274
  cfg.dataset_id
275
  ))
276
  }
277
  }
278
- s => Err(anyhow!(
279
- "dataset lookup failed ({s}): {}",
280
- resp.text().await.unwrap_or_default()
281
- )),
282
  }
283
  }
284
 
285
- async fn create_private_dataset(client: &reqwest::Client, cfg: &Config) -> Result<()> {
286
  let (owner, name) = cfg.dataset_id.split_once('/').unwrap();
287
-
288
  let username = client
289
  .get("https://huggingface.co/api/whoami-v2")
290
- .send()
291
- .await?
292
- .error_for_status()?
293
- .json::<serde_json::Value>()
294
- .await?
295
- .get("name")
296
- .and_then(|v| v.as_str())
297
- .unwrap_or("")
298
- .to_owned();
299
 
300
  let req = CreateRepoRequest {
301
  name: name.to_string(),
@@ -303,90 +310,54 @@ async fn create_private_dataset(client: &reqwest::Client, cfg: &Config) -> Resul
303
  private: true,
304
  repo_type: "dataset".to_string(),
305
  };
306
-
307
- let resp = client
308
- .post("https://huggingface.co/api/repos/create")
309
- .json(&req)
310
- .send()
311
- .await?;
312
-
313
  if resp.status().is_success() || resp.status() == StatusCode::CONFLICT {
314
  Ok(())
315
  } else {
316
- Err(anyhow!(
317
- "create dataset failed ({}): {}",
318
- resp.status(),
319
- resp.text().await.unwrap_or_default()
320
- ))
321
  }
322
  }
323
 
324
- async fn pull_workspace(client: &reqwest::Client, cfg: &Config) -> Result<()> {
325
- eprintln!("pulling workspace from {}", cfg.dataset_id);
326
  let files = list_remote_files(client, cfg).await?;
327
-
328
  for file in &files {
329
- let url = format!(
330
- "https://huggingface.co/datasets/{}/resolve/main/{}",
331
- cfg.dataset_id, file
332
- );
333
  let bytes = client.get(url).send().await?.error_for_status()?.bytes().await?;
334
  let target = cfg.workspace.join(file);
335
- if let Some(p) = target.parent() {
336
- tokio::fs::create_dir_all(p).await?;
337
- }
338
  let mut f = tokio::fs::File::create(&target).await?;
339
  f.write_all(&bytes).await?;
340
  }
341
-
342
- eprintln!("pull complete: {} files", files.len());
343
  Ok(())
344
  }
345
 
346
- async fn list_remote_files(client: &reqwest::Client, cfg: &Config) -> Result<Vec<String>> {
347
  let url = format!(
348
  "https://huggingface.co/api/datasets/{}/tree/main?recursive=true",
349
  cfg.dataset_id
350
  );
351
  let resp = client.get(url).send().await?;
352
- if resp.status() == StatusCode::NOT_FOUND {
353
- return Ok(vec![]);
354
- }
355
  let entries: Vec<TreeEntry> = resp.error_for_status()?.json().await?;
356
  Ok(entries.into_iter().filter(|e| e.kind == "file").map(|e| e.path).collect())
357
  }
358
 
359
- async fn push_workspace(client: &reqwest::Client, cfg: &Config) -> Result<()> {
360
  let state_path = cfg.workspace.join(STATE_FILE);
361
  let mut state = load_state(&state_path).await?;
362
-
363
  let mut current = HashMap::<String, FileState>::new();
364
  let mut operations = Vec::<CommitOperation>::new();
365
 
366
- for entry in WalkDir::new(&cfg.workspace)
367
- .into_iter()
368
- .filter_map(|e| e.ok())
369
- .filter(|e| e.file_type().is_file())
370
- {
371
  let full = entry.path();
372
- if full == state_path {
373
- continue;
374
- }
375
- let rel = full
376
- .strip_prefix(&cfg.workspace)?
377
- .to_string_lossy()
378
- .replace('\\', "/");
379
-
380
  let bytes = tokio::fs::read(full).await?;
381
  let md5 = format!("{:x}", md5::compute(&bytes));
382
  let size = bytes.len() as u64;
383
-
384
- let changed = state
385
- .files
386
- .get(&rel)
387
- .map(|s| s.md5 != md5 || s.size != size)
388
- .unwrap_or(true);
389
-
390
  if changed {
391
  operations.push(CommitOperation::AddOrUpdate {
392
  path: rel.clone(),
@@ -403,68 +374,38 @@ async fn push_workspace(client: &reqwest::Client, cfg: &Config) -> Result<()> {
403
  operations.push(CommitOperation::Delete { path: removed.clone() });
404
  }
405
 
406
- if operations.is_empty() {
407
- eprintln!("no workspace changes");
408
- return Ok(());
409
- }
410
 
411
- let req = CommitRequest {
412
- commit: format!("sync {} files", operations.len()),
413
- operations,
414
- };
415
 
416
- let url = format!(
417
- "https://huggingface.co/api/datasets/{}/commit/main",
418
- cfg.dataset_id
419
- );
420
- let resp = client.post(url).json(&req).send().await?;
421
  if !resp.status().is_success() {
422
- return Err(anyhow!(
423
- "push failed ({}): {}",
424
- resp.status(),
425
- resp.text().await.unwrap_or_default()
426
- ));
427
  }
428
-
429
  state.files = current;
430
  save_state(&state_path, &state).await?;
431
- eprintln!("push complete");
432
  Ok(())
433
  }
434
 
435
  async fn rebuild_sync_state(workspace: &Path) -> Result<()> {
436
  let state_path = workspace.join(STATE_FILE);
437
  let mut files = HashMap::new();
438
-
439
- for entry in WalkDir::new(workspace)
440
- .into_iter()
441
- .filter_map(|e| e.ok())
442
- .filter(|e| e.file_type().is_file())
443
- {
444
  let full = entry.path();
445
- if full == state_path {
446
- continue;
447
- }
448
- let rel = full
449
- .strip_prefix(workspace)?
450
- .to_string_lossy()
451
- .replace('\\', "/");
452
  let bytes = tokio::fs::read(full).await?;
453
- files.insert(rel, FileState {
454
- md5: format!("{:x}", md5::compute(&bytes)),
455
- size: bytes.len() as u64,
456
- });
457
  }
458
-
459
  save_state(&state_path, &SyncState { files }).await
460
  }
461
 
462
  async fn load_state(path: &Path) -> Result<SyncState> {
463
- if !path.exists() {
464
- return Ok(SyncState::default());
465
- }
466
- let raw = tokio::fs::read(path).await?;
467
- Ok(serde_json::from_slice(&raw).context("bad sync state")?)
468
  }
469
 
470
  async fn save_state(path: &Path, state: &SyncState) -> Result<()> {
@@ -472,25 +413,24 @@ async fn save_state(path: &Path, state: &SyncState) -> Result<()> {
472
  Ok(())
473
  }
474
 
 
 
475
  fn spawn_child_from_args() -> Result<Child> {
476
  let args: Vec<String> = env::args().skip(1).collect();
477
- if args.is_empty() {
478
- return Err(anyhow!("no child command provided"));
479
- }
480
  Command::new(&args[0])
481
  .args(&args[1..])
482
  .stdin(Stdio::inherit())
483
  .stdout(Stdio::inherit())
484
  .stderr(Stdio::inherit())
485
  .spawn()
486
- .context("failed to spawn child process")
487
  }
488
 
489
  fn forward_sigterm(child: &mut Child) {
490
  if let Some(id) = child.id() {
491
  // SAFETY: valid pid from live child
492
- let ret = unsafe { libc::kill(id as libc::pid_t, libc::SIGTERM) };
493
- if ret != 0 {
494
  eprintln!("SIGTERM forward failed: {}", std::io::Error::last_os_error());
495
  }
496
  }
@@ -501,7 +441,7 @@ async fn wait_for_child_shutdown(child: &mut Child) {
501
  Ok(Ok(s)) => eprintln!("child exited after SIGTERM: {s}"),
502
  Ok(Err(e)) => eprintln!("wait error: {e:#}"),
503
  Err(_) => {
504
- eprintln!("child timeout — sending SIGKILL");
505
  if let Some(id) = child.id() {
506
  // SAFETY: valid pid from live child
507
  unsafe { libc::kill(id as libc::pid_t, libc::SIGKILL) };
 
17
  const WORKSPACE_DIR: &str = "/home/user/.openclaw";
18
  const STATE_FILE: &str = ".hf-sync-state.json";
19
  const FINAL_WAIT_TIMEOUT: Duration = Duration::from_secs(20);
 
 
 
20
  const NETWORK_STARTUP_TIMEOUT: Duration = Duration::from_secs(60);
21
  const NETWORK_RETRY_INTERVAL: Duration = Duration::from_secs(5);
22
 
23
+ // ── Config ────────────────────────────────────────────────────────────────────
24
+
25
+ /// Sync config — optional: if HF_TOKEN or OPENCLAW_DATASET_REPO are absent,
26
+ /// syncing is simply disabled and OpenClaw still starts normally.
27
  #[derive(Debug, Clone)]
28
+ struct SyncConfig {
29
  token: String,
30
  dataset_id: String,
31
  sync_interval: Duration,
32
  workspace: PathBuf,
33
  }
34
 
35
+ impl SyncConfig {
36
+ /// Returns None (with a warning) instead of crashing when secrets are missing.
37
+ fn load() -> Option<Self> {
38
+ let token = match env::var("HF_TOKEN") {
39
+ Ok(t) if !t.is_empty() => t,
40
+ _ => {
41
+ eprintln!("[sync] HF_TOKEN not set — HF dataset sync disabled");
42
+ return None;
43
+ }
44
+ };
45
+
46
+ let dataset_id = env::var("OPENCLAW_DATASET_REPO")
47
+ .or_else(|_| env::var("HF_DATASET_ID"))
48
+ .unwrap_or_default();
49
+
50
+ if dataset_id.is_empty() || dataset_id.split('/').count() != 2 {
51
+ eprintln!("[sync] OPENCLAW_DATASET_REPO not set or invalid — HF dataset sync disabled");
52
+ return None;
53
+ }
54
+
55
+ let sync_interval_secs: u64 = env::var("SYNC_INTERVAL")
56
+ .unwrap_or_else(|_| "60".to_string())
57
+ .parse()
58
+ .unwrap_or(60)
59
+ .max(1);
60
+
61
+ Some(SyncConfig {
62
+ token,
63
+ dataset_id,
64
+ sync_interval: Duration::from_secs(sync_interval_secs),
65
+ workspace: PathBuf::from(WORKSPACE_DIR),
66
+ })
67
+ }
68
+ }
69
+
70
+ // ── HF API types ──────────────────────────────────────────────────────────────
71
+
72
  #[derive(Debug, Deserialize)]
73
  struct RepoLookup {
74
  id: Option<String>,
 
101
  #[serde(tag = "op")]
102
  enum CommitOperation {
103
  #[serde(rename = "addOrUpdate")]
104
+ AddOrUpdate { path: String, encoding: String, content: String },
 
 
 
 
105
  #[serde(rename = "delete")]
106
  Delete { path: String },
107
  }
 
117
  size: u64,
118
  }
119
 
120
+ // ── Main ──────────────────────────────────────────────────────────────────────
121
+
122
  #[tokio::main]
123
  async fn main() {
124
  if let Err(err) = run().await {
 
128
  }
129
 
130
  async fn run() -> Result<()> {
131
+ // Always create the workspace dir — OpenClaw may need it regardless of sync.
132
+ let workspace = PathBuf::from(WORKSPACE_DIR);
133
+ tokio::fs::create_dir_all(&workspace)
134
  .await
135
  .context("failed to create workspace directory")?;
136
 
137
+ // Load sync config — None means sync is disabled, container still runs.
138
+ let sync = SyncConfig::load();
139
+
140
+ if let Some(cfg) = &sync {
141
+ let client = build_client(cfg)?;
142
+
143
+ eprintln!("[sync] waiting for network…");
144
+ if wait_for_network(&client).await {
145
+ eprintln!("[sync] network up — running startup sync");
146
+ if let Err(e) = startup_sync(&client, cfg).await {
147
+ eprintln!("[sync] startup sync failed (continuing): {e:#}");
148
  }
149
+ } else {
150
+ eprintln!("[sync] network unavailable after {:?} — skipping startup sync", NETWORK_STARTUP_TIMEOUT);
151
  }
 
 
 
 
 
 
 
152
 
153
+ rebuild_sync_state(&cfg.workspace).await?;
154
+ }
155
 
156
+ // Spawn the child process (start.sh → LiteLLM + OpenClaw).
157
  let mut child = spawn_child_from_args()?;
158
  let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
 
159
 
160
+ if let Some(cfg) = &sync {
161
+ // Sync loop — only active when sync is configured.
162
+ let client = build_client(cfg)?;
163
+ let mut ticker = interval(cfg.sync_interval);
164
+ ticker.reset(); // skip the immediate first tick
165
+
166
+ loop {
167
+ tokio::select! {
168
+ _ = ticker.tick() => {
169
+ if let Err(e) = push_workspace(&client, cfg).await {
170
+ eprintln!("[sync] periodic push failed: {e:#}");
171
+ }
172
  }
173
+ _ = sigterm.recv() => {
174
+ eprintln!("[sync] SIGTERM final push then shutdown");
175
+ forward_sigterm(&mut child);
176
+ if let Err(e) = push_workspace(&client, cfg).await {
177
+ eprintln!("[sync] final push failed: {e:#}");
178
+ }
179
+ wait_for_child_shutdown(&mut child).await;
180
+ break;
181
+ }
182
+ status = child.wait() => {
183
+ handle_child_exit(status, &client, cfg).await;
184
  }
 
 
185
  }
186
+ }
187
+ } else {
188
+ // No sync — just supervise the child.
189
+ loop {
190
+ tokio::select! {
191
+ _ = sigterm.recv() => {
192
+ forward_sigterm(&mut child);
193
+ wait_for_child_shutdown(&mut child).await;
194
+ break;
195
+ }
196
+ status = child.wait() => {
197
+ match status {
198
+ Ok(s) => exit(s.code().unwrap_or(1)),
199
+ Err(e) => { eprintln!("child wait error: {e:#}"); break; }
200
  }
201
  }
202
  }
 
206
  Ok(())
207
  }
208
 
209
+ async fn handle_child_exit(
210
+ status: std::io::Result<std::process::ExitStatus>,
211
+ client: &reqwest::Client,
212
+ cfg: &SyncConfig,
213
+ ) -> ! {
214
+ match status {
215
+ Ok(s) => {
216
+ eprintln!("[sync] child exited: {s}");
217
+ if let Err(e) = push_workspace(client, cfg).await {
218
+ eprintln!("[sync] final push after child exit failed: {e:#}");
219
+ }
220
+ exit(s.code().unwrap_or(1));
221
+ }
222
+ Err(e) => {
223
+ eprintln!("child wait error: {e:#}");
224
+ exit(1);
225
+ }
226
+ }
227
+ }
228
+
229
+ // ── Network probe ─────────────────────────────────────────────────────────────
230
+
231
  async fn wait_for_network(client: &reqwest::Client) -> bool {
232
+ let url = "https://huggingface.co/api/whoami-v2";
233
  let deadline = tokio::time::Instant::now() + NETWORK_STARTUP_TIMEOUT;
 
234
  loop {
235
+ let res = timeout(Duration::from_secs(8), client.get(url).send()).await;
236
+ match res {
237
+ Ok(Ok(_)) => return true, // any HTTP response = DNS is working
238
+ Ok(Err(e)) => eprintln!("[sync] probe error: {e}"),
239
+ Err(_) => eprintln!("[sync] probe timeout"),
 
 
 
 
 
 
 
240
  }
 
241
  if tokio::time::Instant::now() >= deadline {
242
  return false;
243
  }
244
+ eprintln!("[sync] retrying in {:?}…", NETWORK_RETRY_INTERVAL);
 
245
  sleep(NETWORK_RETRY_INTERVAL).await;
246
  }
247
  }
248
 
249
+ // ── Startup sync ──────────────────────────────────────────────────────────────
250
+
251
+ async fn startup_sync(client: &reqwest::Client, cfg: &SyncConfig) -> Result<()> {
252
  ensure_dataset_exists(client, cfg).await?;
253
  pull_workspace(client, cfg).await
254
  }
255
 
256
+ // ── HF API ────────────────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
257
 
258
+ fn build_client(cfg: &SyncConfig) -> Result<reqwest::Client> {
259
  let mut headers = reqwest::header::HeaderMap::new();
260
  headers.insert(
261
  AUTHORIZATION,
262
+ format!("Bearer {}", cfg.token).parse().context("invalid HF_TOKEN")?,
 
 
263
  );
264
  headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
 
265
  reqwest::Client::builder()
266
  .default_headers(headers)
267
  .timeout(Duration::from_secs(30))
 
269
  .context("failed to build HTTP client")
270
  }
271
 
272
+ async fn ensure_dataset_exists(client: &reqwest::Client, cfg: &SyncConfig) -> Result<()> {
273
  let url = format!("https://huggingface.co/api/datasets/{}", cfg.dataset_id);
274
  let resp = client.get(&url).send().await?;
 
275
  match resp.status() {
276
  StatusCode::OK => {
277
  let repo: RepoLookup = resp.json().await.unwrap_or(RepoLookup { id: None });
278
+ eprintln!("[sync] dataset ok: {}", repo.id.unwrap_or(cfg.dataset_id.clone()));
279
  Ok(())
280
  }
281
  StatusCode::NOT_FOUND => {
282
  let auto = env::var("AUTO_CREATE_DATASET")
283
+ .map(|v| matches!(v.to_lowercase().as_str(), "1"|"true"|"yes"|"on"))
284
  .unwrap_or(false);
285
  if auto {
286
+ eprintln!("[sync] dataset not found — creating…");
287
  create_private_dataset(client, cfg).await
288
  } else {
289
  Err(anyhow!(
290
+ "dataset {} not found. Create it or set AUTO_CREATE_DATASET=true",
 
291
  cfg.dataset_id
292
  ))
293
  }
294
  }
295
+ s => Err(anyhow!("dataset lookup failed ({s}): {}", resp.text().await.unwrap_or_default())),
 
 
 
296
  }
297
  }
298
 
299
+ async fn create_private_dataset(client: &reqwest::Client, cfg: &SyncConfig) -> Result<()> {
300
  let (owner, name) = cfg.dataset_id.split_once('/').unwrap();
 
301
  let username = client
302
  .get("https://huggingface.co/api/whoami-v2")
303
+ .send().await?.error_for_status()?
304
+ .json::<serde_json::Value>().await?
305
+ .get("name").and_then(|v| v.as_str()).unwrap_or("").to_owned();
 
 
 
 
 
 
306
 
307
  let req = CreateRepoRequest {
308
  name: name.to_string(),
 
310
  private: true,
311
  repo_type: "dataset".to_string(),
312
  };
313
+ let resp = client.post("https://huggingface.co/api/repos/create").json(&req).send().await?;
 
 
 
 
 
 
314
  if resp.status().is_success() || resp.status() == StatusCode::CONFLICT {
315
  Ok(())
316
  } else {
317
+ Err(anyhow!("create dataset failed ({}): {}", resp.status(), resp.text().await.unwrap_or_default()))
 
 
 
 
318
  }
319
  }
320
 
321
+ async fn pull_workspace(client: &reqwest::Client, cfg: &SyncConfig) -> Result<()> {
322
+ eprintln!("[sync] pulling from {}", cfg.dataset_id);
323
  let files = list_remote_files(client, cfg).await?;
 
324
  for file in &files {
325
+ let url = format!("https://huggingface.co/datasets/{}/resolve/main/{}", cfg.dataset_id, file);
 
 
 
326
  let bytes = client.get(url).send().await?.error_for_status()?.bytes().await?;
327
  let target = cfg.workspace.join(file);
328
+ if let Some(p) = target.parent() { tokio::fs::create_dir_all(p).await?; }
 
 
329
  let mut f = tokio::fs::File::create(&target).await?;
330
  f.write_all(&bytes).await?;
331
  }
332
+ eprintln!("[sync] pull complete: {} files", files.len());
 
333
  Ok(())
334
  }
335
 
336
+ async fn list_remote_files(client: &reqwest::Client, cfg: &SyncConfig) -> Result<Vec<String>> {
337
  let url = format!(
338
  "https://huggingface.co/api/datasets/{}/tree/main?recursive=true",
339
  cfg.dataset_id
340
  );
341
  let resp = client.get(url).send().await?;
342
+ if resp.status() == StatusCode::NOT_FOUND { return Ok(vec![]); }
 
 
343
  let entries: Vec<TreeEntry> = resp.error_for_status()?.json().await?;
344
  Ok(entries.into_iter().filter(|e| e.kind == "file").map(|e| e.path).collect())
345
  }
346
 
347
+ async fn push_workspace(client: &reqwest::Client, cfg: &SyncConfig) -> Result<()> {
348
  let state_path = cfg.workspace.join(STATE_FILE);
349
  let mut state = load_state(&state_path).await?;
 
350
  let mut current = HashMap::<String, FileState>::new();
351
  let mut operations = Vec::<CommitOperation>::new();
352
 
353
+ for entry in WalkDir::new(&cfg.workspace).into_iter().filter_map(|e| e.ok()).filter(|e| e.file_type().is_file()) {
 
 
 
 
354
  let full = entry.path();
355
+ if full == state_path { continue; }
356
+ let rel = full.strip_prefix(&cfg.workspace)?.to_string_lossy().replace('\\', "/");
 
 
 
 
 
 
357
  let bytes = tokio::fs::read(full).await?;
358
  let md5 = format!("{:x}", md5::compute(&bytes));
359
  let size = bytes.len() as u64;
360
+ let changed = state.files.get(&rel).map(|s| s.md5 != md5 || s.size != size).unwrap_or(true);
 
 
 
 
 
 
361
  if changed {
362
  operations.push(CommitOperation::AddOrUpdate {
363
  path: rel.clone(),
 
374
  operations.push(CommitOperation::Delete { path: removed.clone() });
375
  }
376
 
377
+ if operations.is_empty() { eprintln!("[sync] no changes"); return Ok(()); }
 
 
 
378
 
379
+ let resp = client
380
+ .post(format!("https://huggingface.co/api/datasets/{}/commit/main", cfg.dataset_id))
381
+ .json(&CommitRequest { commit: format!("sync {} files", operations.len()), operations })
382
+ .send().await?;
383
 
 
 
 
 
 
384
  if !resp.status().is_success() {
385
+ return Err(anyhow!("push failed ({}): {}", resp.status(), resp.text().await.unwrap_or_default()));
 
 
 
 
386
  }
 
387
  state.files = current;
388
  save_state(&state_path, &state).await?;
389
+ eprintln!("[sync] push complete");
390
  Ok(())
391
  }
392
 
393
  async fn rebuild_sync_state(workspace: &Path) -> Result<()> {
394
  let state_path = workspace.join(STATE_FILE);
395
  let mut files = HashMap::new();
396
+ for entry in WalkDir::new(workspace).into_iter().filter_map(|e| e.ok()).filter(|e| e.file_type().is_file()) {
 
 
 
 
 
397
  let full = entry.path();
398
+ if full == state_path { continue; }
399
+ let rel = full.strip_prefix(workspace)?.to_string_lossy().replace('\\', "/");
 
 
 
 
 
400
  let bytes = tokio::fs::read(full).await?;
401
+ files.insert(rel, FileState { md5: format!("{:x}", md5::compute(&bytes)), size: bytes.len() as u64 });
 
 
 
402
  }
 
403
  save_state(&state_path, &SyncState { files }).await
404
  }
405
 
406
  async fn load_state(path: &Path) -> Result<SyncState> {
407
+ if !path.exists() { return Ok(SyncState::default()); }
408
+ Ok(serde_json::from_slice(&tokio::fs::read(path).await?).context("bad sync state")?)
 
 
 
409
  }
410
 
411
  async fn save_state(path: &Path, state: &SyncState) -> Result<()> {
 
413
  Ok(())
414
  }
415
 
416
+ // ── Child process ─────────────────────────────────────────────────────────────
417
+
418
  fn spawn_child_from_args() -> Result<Child> {
419
  let args: Vec<String> = env::args().skip(1).collect();
420
+ if args.is_empty() { return Err(anyhow!("no child command provided")); }
 
 
421
  Command::new(&args[0])
422
  .args(&args[1..])
423
  .stdin(Stdio::inherit())
424
  .stdout(Stdio::inherit())
425
  .stderr(Stdio::inherit())
426
  .spawn()
427
+ .context("failed to spawn child")
428
  }
429
 
430
  fn forward_sigterm(child: &mut Child) {
431
  if let Some(id) = child.id() {
432
  // SAFETY: valid pid from live child
433
+ if unsafe { libc::kill(id as libc::pid_t, libc::SIGTERM) } != 0 {
 
434
  eprintln!("SIGTERM forward failed: {}", std::io::Error::last_os_error());
435
  }
436
  }
 
441
  Ok(Ok(s)) => eprintln!("child exited after SIGTERM: {s}"),
442
  Ok(Err(e)) => eprintln!("wait error: {e:#}"),
443
  Err(_) => {
444
+ eprintln!("child timeout — SIGKILL");
445
  if let Some(id) = child.id() {
446
  // SAFETY: valid pid from live child
447
  unsafe { libc::kill(id as libc::pid_t, libc::SIGKILL) };