raylim Claude Opus 4.5 commited on
Commit
fcee23a
·
unverified ·
1 Parent(s): e1e8689

Add HF dataset download support for telemetry reporting

Browse files

- Add download_from_hf_dataset() method to TelemetryStorage for pulling
telemetry data from HuggingFace dataset repositories
- Add --hf-repo flag to telemetry_report.py to pull remote telemetry
- Use clean temp directory when --hf-repo is specified to avoid mixing
with local data
- Support running instances by falling back to heartbeat events when
no shutdown events exist
- Fix hourly rate fallback to use DEFAULT_HOURLY_RATE when stored rate is 0

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

scripts/telemetry_report.py CHANGED
@@ -22,6 +22,12 @@ Usage:
22
  # HTML format for email
23
  python scripts/telemetry_report.py /path/to/telemetry --daily --format html
24
 
 
 
 
 
 
 
25
  Example cron entry (daily report at 8am):
26
  0 8 * * * python /app/scripts/telemetry_report.py /data/telemetry --daily --email team@example.com
27
  """
@@ -78,9 +84,7 @@ def load_events(
78
  return events
79
 
80
 
81
- def generate_text_report(
82
- telemetry_dir: Path, date: Optional[str] = None
83
- ) -> str:
84
  """Generate plain text report.
85
 
86
  Args:
@@ -107,13 +111,31 @@ def generate_text_report(
107
  # Cost summary from session events
108
  if sessions:
109
  shutdowns = [s for s in sessions if s.get("event_type") == "app_shutdown"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  if shutdowns:
111
  total_uptime_sec = sum(s.get("uptime_sec", 0) for s in shutdowns)
112
  total_uptime_hrs = total_uptime_sec / 3600
113
  total_analysis_sec = sum(s.get("analysis_time_sec", 0) for s in shutdowns)
114
  total_analysis_hrs = total_analysis_sec / 3600
115
  total_idle_hrs = total_uptime_hrs - total_analysis_hrs
116
- hourly_rate = shutdowns[0].get("hourly_rate", DEFAULT_HOURLY_RATE)
 
117
  total_cost = total_uptime_hrs * hourly_rate
118
  analysis_count = sum(s.get("analysis_count", 0) for s in shutdowns)
119
 
@@ -123,8 +145,16 @@ def generate_text_report(
123
  else 0
124
  )
125
 
 
 
 
 
 
 
 
 
126
  lines.append("=== COST SUMMARY ===")
127
- lines.append(f"App sessions: {len(shutdowns)}")
128
  lines.append(f"Total uptime: {total_uptime_hrs:.2f} hours")
129
  lines.append(
130
  f" - Active analysis: {total_analysis_hrs:.2f} hrs ({utilization:.1f}%)"
@@ -144,10 +174,14 @@ def generate_text_report(
144
  successful = [c for c in completes if c.get("success", False)]
145
 
146
  total_slides = sum(s.get("slide_count", 0) for s in starts)
147
- unique_sessions = len(set(u.get("session_hash") for u in usage if u.get("session_hash")))
 
 
148
 
149
  # Calculate average duration
150
- durations = [c.get("duration_sec", 0) for c in completes if c.get("duration_sec")]
 
 
151
  avg_duration = sum(durations) / len(durations) if durations else 0
152
 
153
  lines.append("=== USAGE SUMMARY ===")
@@ -184,7 +218,9 @@ def generate_text_report(
184
  # Resource summary
185
  if resources:
186
  total_duration = sum(r.get("total_duration_sec", 0) for r in resources)
187
- total_tiles = sum(r.get("tile_count", 0) for r in resources if r.get("tile_count"))
 
 
188
  peak_memory = max(
189
  (r.get("peak_gpu_memory_gb", 0) for r in resources), default=0
190
  )
@@ -224,9 +260,7 @@ def generate_text_report(
224
  return "\n".join(lines)
225
 
226
 
227
- def generate_html_report(
228
- telemetry_dir: Path, date: Optional[str] = None
229
- ) -> str:
230
  """Generate HTML report.
231
 
232
  Args:
@@ -267,12 +301,27 @@ def generate_html_report(
267
  # Cost summary
268
  if sessions:
269
  shutdowns = [s for s in sessions if s.get("event_type") == "app_shutdown"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
270
  if shutdowns:
271
  total_uptime_sec = sum(s.get("uptime_sec", 0) for s in shutdowns)
272
  total_uptime_hrs = total_uptime_sec / 3600
273
  total_analysis_sec = sum(s.get("analysis_time_sec", 0) for s in shutdowns)
274
  total_analysis_hrs = total_analysis_sec / 3600
275
- hourly_rate = shutdowns[0].get("hourly_rate", DEFAULT_HOURLY_RATE)
276
  total_cost = total_uptime_hrs * hourly_rate
277
  analysis_count = sum(s.get("analysis_count", 0) for s in shutdowns)
278
  utilization = (
@@ -281,9 +330,18 @@ def generate_html_report(
281
  else 0
282
  )
283
 
 
 
 
 
 
 
 
284
  html.append("<h2>Cost Summary</h2>")
285
  html.append("<table>")
286
- html.append(f"<tr><td>App sessions</td><td>{len(shutdowns)}</td></tr>")
 
 
287
  html.append(
288
  f"<tr><td>Total uptime</td><td>{total_uptime_hrs:.2f} hours</td></tr>"
289
  )
@@ -305,7 +363,9 @@ def generate_html_report(
305
  completes = [u for u in usage if u.get("event_type") == "analysis_complete"]
306
  successful = [c for c in completes if c.get("success", False)]
307
  total_slides = sum(s.get("slide_count", 0) for s in starts)
308
- unique_sessions = len(set(u.get("session_hash") for u in usage if u.get("session_hash")))
 
 
309
 
310
  html.append("<h2>Usage Summary</h2>")
311
  html.append("<table>")
@@ -372,6 +432,98 @@ def send_email(report: str, to_email: str, subject: str, format: str = "text"):
372
  server.sendmail(from_email, [to_email], msg.as_string())
373
 
374
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
375
  def main():
376
  parser = argparse.ArgumentParser(
377
  description="Generate Mosaic telemetry reports",
@@ -406,8 +558,28 @@ def main():
406
  default="text",
407
  help="Output format (default: text)",
408
  )
 
 
 
 
 
409
  args = parser.parse_args()
410
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
  if not args.telemetry_dir.exists():
412
  print(f"Telemetry directory not found: {args.telemetry_dir}", file=sys.stderr)
413
  sys.exit(1)
 
22
  # HTML format for email
23
  python scripts/telemetry_report.py /path/to/telemetry --daily --format html
24
 
25
+ # Pull data from HuggingFace Dataset repository
26
+ python scripts/telemetry_report.py --hf-repo PDM-Group/mosaic-telemetry
27
+
28
+ # Pull from HF and save to specific directory
29
+ python scripts/telemetry_report.py /path/to/telemetry --hf-repo PDM-Group/mosaic-telemetry
30
+
31
  Example cron entry (daily report at 8am):
32
  0 8 * * * python /app/scripts/telemetry_report.py /data/telemetry --daily --email team@example.com
33
  """
 
84
  return events
85
 
86
 
87
+ def generate_text_report(telemetry_dir: Path, date: Optional[str] = None) -> str:
 
 
88
  """Generate plain text report.
89
 
90
  Args:
 
111
  # Cost summary from session events
112
  if sessions:
113
  shutdowns = [s for s in sessions if s.get("event_type") == "app_shutdown"]
114
+
115
+ # For running instances without shutdowns, use the latest heartbeat per session
116
+ if not shutdowns:
117
+ # Group heartbeats by app_start_time to identify unique sessions
118
+ heartbeats = [s for s in sessions if s.get("event_type") == "heartbeat"]
119
+ if heartbeats:
120
+ # Get the latest heartbeat for each session (by app_start_time)
121
+ sessions_by_start = {}
122
+ for hb in heartbeats:
123
+ start_time = hb.get("app_start_time")
124
+ if start_time:
125
+ if start_time not in sessions_by_start or hb.get(
126
+ "uptime_sec", 0
127
+ ) > sessions_by_start[start_time].get("uptime_sec", 0):
128
+ sessions_by_start[start_time] = hb
129
+ shutdowns = list(sessions_by_start.values())
130
+
131
  if shutdowns:
132
  total_uptime_sec = sum(s.get("uptime_sec", 0) for s in shutdowns)
133
  total_uptime_hrs = total_uptime_sec / 3600
134
  total_analysis_sec = sum(s.get("analysis_time_sec", 0) for s in shutdowns)
135
  total_analysis_hrs = total_analysis_sec / 3600
136
  total_idle_hrs = total_uptime_hrs - total_analysis_hrs
137
+ # Use hourly_rate from data, fallback to DEFAULT if missing or zero
138
+ hourly_rate = shutdowns[0].get("hourly_rate") or DEFAULT_HOURLY_RATE
139
  total_cost = total_uptime_hrs * hourly_rate
140
  analysis_count = sum(s.get("analysis_count", 0) for s in shutdowns)
141
 
 
145
  else 0
146
  )
147
 
148
+ # Check if these are from running instances (heartbeats) vs completed (shutdowns)
149
+ is_running = all(s.get("event_type") == "heartbeat" for s in shutdowns)
150
+ session_label = (
151
+ f"Running sessions: {len(shutdowns)}"
152
+ if is_running
153
+ else f"App sessions: {len(shutdowns)}"
154
+ )
155
+
156
  lines.append("=== COST SUMMARY ===")
157
+ lines.append(session_label)
158
  lines.append(f"Total uptime: {total_uptime_hrs:.2f} hours")
159
  lines.append(
160
  f" - Active analysis: {total_analysis_hrs:.2f} hrs ({utilization:.1f}%)"
 
174
  successful = [c for c in completes if c.get("success", False)]
175
 
176
  total_slides = sum(s.get("slide_count", 0) for s in starts)
177
+ unique_sessions = len(
178
+ set(u.get("session_hash") for u in usage if u.get("session_hash"))
179
+ )
180
 
181
  # Calculate average duration
182
+ durations = [
183
+ c.get("duration_sec", 0) for c in completes if c.get("duration_sec")
184
+ ]
185
  avg_duration = sum(durations) / len(durations) if durations else 0
186
 
187
  lines.append("=== USAGE SUMMARY ===")
 
218
  # Resource summary
219
  if resources:
220
  total_duration = sum(r.get("total_duration_sec", 0) for r in resources)
221
+ total_tiles = sum(
222
+ r.get("tile_count", 0) for r in resources if r.get("tile_count")
223
+ )
224
  peak_memory = max(
225
  (r.get("peak_gpu_memory_gb", 0) for r in resources), default=0
226
  )
 
260
  return "\n".join(lines)
261
 
262
 
263
+ def generate_html_report(telemetry_dir: Path, date: Optional[str] = None) -> str:
 
 
264
  """Generate HTML report.
265
 
266
  Args:
 
301
  # Cost summary
302
  if sessions:
303
  shutdowns = [s for s in sessions if s.get("event_type") == "app_shutdown"]
304
+
305
+ # For running instances without shutdowns, use the latest heartbeat per session
306
+ if not shutdowns:
307
+ heartbeats = [s for s in sessions if s.get("event_type") == "heartbeat"]
308
+ if heartbeats:
309
+ sessions_by_start = {}
310
+ for hb in heartbeats:
311
+ start_time = hb.get("app_start_time")
312
+ if start_time:
313
+ if start_time not in sessions_by_start or hb.get(
314
+ "uptime_sec", 0
315
+ ) > sessions_by_start[start_time].get("uptime_sec", 0):
316
+ sessions_by_start[start_time] = hb
317
+ shutdowns = list(sessions_by_start.values())
318
+
319
  if shutdowns:
320
  total_uptime_sec = sum(s.get("uptime_sec", 0) for s in shutdowns)
321
  total_uptime_hrs = total_uptime_sec / 3600
322
  total_analysis_sec = sum(s.get("analysis_time_sec", 0) for s in shutdowns)
323
  total_analysis_hrs = total_analysis_sec / 3600
324
+ hourly_rate = shutdowns[0].get("hourly_rate") or DEFAULT_HOURLY_RATE
325
  total_cost = total_uptime_hrs * hourly_rate
326
  analysis_count = sum(s.get("analysis_count", 0) for s in shutdowns)
327
  utilization = (
 
330
  else 0
331
  )
332
 
333
+ is_running = all(s.get("event_type") == "heartbeat" for s in shutdowns)
334
+ session_label = (
335
+ f"Running sessions: {len(shutdowns)}"
336
+ if is_running
337
+ else f"App sessions: {len(shutdowns)}"
338
+ )
339
+
340
  html.append("<h2>Cost Summary</h2>")
341
  html.append("<table>")
342
+ html.append(
343
+ f"<tr><td>{session_label.split(':')[0]}</td><td>{len(shutdowns)}</td></tr>"
344
+ )
345
  html.append(
346
  f"<tr><td>Total uptime</td><td>{total_uptime_hrs:.2f} hours</td></tr>"
347
  )
 
363
  completes = [u for u in usage if u.get("event_type") == "analysis_complete"]
364
  successful = [c for c in completes if c.get("success", False)]
365
  total_slides = sum(s.get("slide_count", 0) for s in starts)
366
+ unique_sessions = len(
367
+ set(u.get("session_hash") for u in usage if u.get("session_hash"))
368
+ )
369
 
370
  html.append("<h2>Usage Summary</h2>")
371
  html.append("<table>")
 
432
  server.sendmail(from_email, [to_email], msg.as_string())
433
 
434
 
435
+ def download_from_hf(repo_id: str, telemetry_dir: Path) -> bool:
436
+ """Download telemetry data from HuggingFace Dataset repository.
437
+
438
+ Args:
439
+ repo_id: HuggingFace Dataset repository ID
440
+ telemetry_dir: Local directory to store downloaded files
441
+
442
+ Returns:
443
+ True if download was successful, False otherwise
444
+ """
445
+ try:
446
+ from mosaic.telemetry.storage import TelemetryStorage
447
+ except ImportError:
448
+ # Fallback for standalone usage without mosaic installed
449
+ try:
450
+ from huggingface_hub import HfApi, hf_hub_download
451
+ except ImportError:
452
+ print(
453
+ "huggingface_hub not installed. Install with: pip install huggingface-hub",
454
+ file=sys.stderr,
455
+ )
456
+ return False
457
+
458
+ api = HfApi()
459
+ daily_dir = telemetry_dir / "daily"
460
+ daily_dir.mkdir(parents=True, exist_ok=True)
461
+
462
+ try:
463
+ files = api.list_repo_files(repo_id=repo_id, repo_type="dataset")
464
+ except Exception as e:
465
+ print(f"Failed to list files in {repo_id}: {e}", file=sys.stderr)
466
+ return False
467
+
468
+ jsonl_files = [
469
+ f for f in files if f.startswith("daily/") and f.endswith(".jsonl")
470
+ ]
471
+ if not jsonl_files:
472
+ print(f"No telemetry files found in {repo_id}", file=sys.stderr)
473
+ return False
474
+
475
+ downloaded = 0
476
+ for remote_path in jsonl_files:
477
+ try:
478
+ local_path = hf_hub_download(
479
+ repo_id=repo_id,
480
+ filename=remote_path,
481
+ repo_type="dataset",
482
+ )
483
+ filename = os.path.basename(remote_path)
484
+ target_path = daily_dir / filename
485
+
486
+ with open(local_path, "r", encoding="utf-8") as f:
487
+ remote_content = f.read()
488
+
489
+ if target_path.exists():
490
+ with open(target_path, "r", encoding="utf-8") as f:
491
+ local_content = f.read()
492
+ local_lines = (
493
+ set(local_content.strip().split("\n"))
494
+ if local_content.strip()
495
+ else set()
496
+ )
497
+ remote_lines = (
498
+ remote_content.strip().split("\n")
499
+ if remote_content.strip()
500
+ else []
501
+ )
502
+ new_lines = [
503
+ line
504
+ for line in remote_lines
505
+ if line and line not in local_lines
506
+ ]
507
+ if new_lines:
508
+ with open(target_path, "a", encoding="utf-8") as f:
509
+ for line in new_lines:
510
+ f.write(line + "\n")
511
+ print(f"Merged {len(new_lines)} new events into {filename}")
512
+ else:
513
+ with open(target_path, "w", encoding="utf-8") as f:
514
+ f.write(remote_content)
515
+ print(f"Downloaded: {filename}")
516
+ downloaded += 1
517
+ except Exception as e:
518
+ print(f"Failed to download {remote_path}: {e}", file=sys.stderr)
519
+
520
+ return downloaded > 0
521
+
522
+ # Use TelemetryStorage if mosaic is available
523
+ storage = TelemetryStorage(telemetry_dir)
524
+ return storage.download_from_hf_dataset(repo_id)
525
+
526
+
527
  def main():
528
  parser = argparse.ArgumentParser(
529
  description="Generate Mosaic telemetry reports",
 
558
  default="text",
559
  help="Output format (default: text)",
560
  )
561
+ parser.add_argument(
562
+ "--hf-repo",
563
+ type=str,
564
+ help="HuggingFace Dataset repository to pull telemetry from (e.g., PDM-Group/mosaic-telemetry)",
565
+ )
566
  args = parser.parse_args()
567
 
568
+ # If HF repo specified, download to a clean temp directory
569
+ if args.hf_repo:
570
+ import tempfile
571
+
572
+ # Use a clean temp directory to avoid mixing with local data
573
+ temp_dir = Path(tempfile.mkdtemp(prefix="mosaic_telemetry_"))
574
+ print(f"Downloading telemetry from {args.hf_repo}...")
575
+ if not download_from_hf(args.hf_repo, temp_dir):
576
+ print(
577
+ "Warning: Failed to download some or all telemetry data",
578
+ file=sys.stderr,
579
+ )
580
+ # Use the temp directory for report generation
581
+ args.telemetry_dir = temp_dir
582
+
583
  if not args.telemetry_dir.exists():
584
  print(f"Telemetry directory not found: {args.telemetry_dir}", file=sys.stderr)
585
  sys.exit(1)
src/mosaic/telemetry/storage.py CHANGED
@@ -154,3 +154,103 @@ class TelemetryStorage:
154
  if not self.daily_dir.exists():
155
  return []
156
  return list(self.daily_dir.glob("*.jsonl"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
  if not self.daily_dir.exists():
155
  return []
156
  return list(self.daily_dir.glob("*.jsonl"))
157
+
158
+ def download_from_hf_dataset(self, repo_id: str) -> bool:
159
+ """Download telemetry files from HF Dataset repository.
160
+
161
+ Downloads all JSONL files from the daily/ folder in the repository
162
+ to the local daily directory, merging with any existing local files.
163
+
164
+ Args:
165
+ repo_id: HuggingFace Dataset repository ID (e.g., "PDM-Group/mosaic-telemetry")
166
+
167
+ Returns:
168
+ True if at least one file was downloaded, False otherwise
169
+ """
170
+ try:
171
+ from huggingface_hub import HfApi, hf_hub_download
172
+ except ImportError:
173
+ logger.warning("huggingface_hub not installed, skipping download")
174
+ return False
175
+
176
+ api = HfApi()
177
+ downloaded = 0
178
+
179
+ try:
180
+ # List files in the repository
181
+ files = api.list_repo_files(repo_id=repo_id, repo_type="dataset")
182
+ except Exception as e:
183
+ logger.error(f"Failed to list files in {repo_id}: {e}")
184
+ return False
185
+
186
+ # Filter for daily/*.jsonl files
187
+ jsonl_files = [
188
+ f for f in files if f.startswith("daily/") and f.endswith(".jsonl")
189
+ ]
190
+
191
+ if not jsonl_files:
192
+ logger.info(f"No telemetry files found in {repo_id}")
193
+ return False
194
+
195
+ self._ensure_directories()
196
+
197
+ for remote_path in jsonl_files:
198
+ try:
199
+ # Download file to a temp location
200
+ local_path = hf_hub_download(
201
+ repo_id=repo_id,
202
+ filename=remote_path,
203
+ repo_type="dataset",
204
+ )
205
+
206
+ # Get the filename (e.g., "session_2026-01-21.jsonl")
207
+ filename = os.path.basename(remote_path)
208
+ target_path = self.daily_dir / filename
209
+
210
+ # Read downloaded content
211
+ with open(local_path, "r", encoding="utf-8") as f:
212
+ remote_content = f.read()
213
+
214
+ # If local file exists, merge (append new lines only)
215
+ if target_path.exists():
216
+ with open(target_path, "r", encoding="utf-8") as f:
217
+ local_content = f.read()
218
+
219
+ # Parse existing lines to avoid duplicates
220
+ local_lines = (
221
+ set(local_content.strip().split("\n"))
222
+ if local_content.strip()
223
+ else set()
224
+ )
225
+ remote_lines = (
226
+ remote_content.strip().split("\n")
227
+ if remote_content.strip()
228
+ else []
229
+ )
230
+
231
+ # Append only new lines
232
+ new_lines = [
233
+ line
234
+ for line in remote_lines
235
+ if line and line not in local_lines
236
+ ]
237
+ if new_lines:
238
+ with self._lock:
239
+ with open(target_path, "a", encoding="utf-8") as f:
240
+ for line in new_lines:
241
+ f.write(line + "\n")
242
+ logger.info(
243
+ f"Merged {len(new_lines)} new events into {filename}"
244
+ )
245
+ else:
246
+ # No local file, just copy
247
+ with self._lock:
248
+ with open(target_path, "w", encoding="utf-8") as f:
249
+ f.write(remote_content)
250
+ logger.info(f"Downloaded telemetry file: {filename}")
251
+
252
+ downloaded += 1
253
+ except Exception as e:
254
+ logger.warning(f"Failed to download {remote_path}: {e}")
255
+
256
+ return downloaded > 0
tests/telemetry/test_storage.py CHANGED
@@ -164,3 +164,191 @@ class TestThreadSafety:
164
  # Each line should be valid JSON
165
  for line in lines:
166
  json.loads(line)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
  # Each line should be valid JSON
165
  for line in lines:
166
  json.loads(line)
167
+
168
+
169
+ class TestHuggingFaceDownload:
170
+ """Tests for downloading from HuggingFace Dataset repositories."""
171
+
172
+ def test_download_returns_false_when_hf_not_installed(self, storage, monkeypatch):
173
+ """Test that download returns False when huggingface_hub is not available."""
174
+ import builtins
175
+
176
+ original_import = builtins.__import__
177
+
178
+ def mock_import(name, *args, **kwargs):
179
+ if name == "huggingface_hub":
180
+ raise ImportError("No module named 'huggingface_hub'")
181
+ return original_import(name, *args, **kwargs)
182
+
183
+ monkeypatch.setattr(builtins, "__import__", mock_import)
184
+
185
+ result = storage.download_from_hf_dataset("test-org/test-repo")
186
+ assert result is False
187
+
188
+ def test_download_returns_false_when_repo_list_fails(self, storage, monkeypatch):
189
+ """Test that download returns False when listing repo files fails."""
190
+ mock_api = type(
191
+ "MockHfApi",
192
+ (),
193
+ {
194
+ "list_repo_files": lambda self, repo_id, repo_type: (
195
+ _ for _ in ()
196
+ ).throw(Exception("API Error"))
197
+ },
198
+ )()
199
+
200
+ def mock_hfapi():
201
+ return mock_api
202
+
203
+ monkeypatch.setattr("huggingface_hub.HfApi", mock_hfapi)
204
+
205
+ result = storage.download_from_hf_dataset("test-org/test-repo")
206
+ assert result is False
207
+
208
+ def test_download_returns_false_when_no_files(self, storage, monkeypatch):
209
+ """Test that download returns False when no JSONL files exist."""
210
+ mock_api = type(
211
+ "MockHfApi",
212
+ (),
213
+ {
214
+ "list_repo_files": lambda self, repo_id, repo_type: [
215
+ "README.md",
216
+ "other_file.txt",
217
+ ]
218
+ },
219
+ )()
220
+
221
+ def mock_hfapi():
222
+ return mock_api
223
+
224
+ monkeypatch.setattr("huggingface_hub.HfApi", mock_hfapi)
225
+
226
+ result = storage.download_from_hf_dataset("test-org/test-repo")
227
+ assert result is False
228
+
229
+ def test_download_creates_new_file(self, storage, temp_dir, monkeypatch):
230
+ """Test that download creates new local file when it doesn't exist."""
231
+ # Create a temp file to simulate the downloaded content
232
+ downloaded_content = '{"event_id": "remote-1", "event_type": "test"}\n{"event_id": "remote-2", "event_type": "test"}\n'
233
+ downloaded_file = temp_dir / "downloaded_test.jsonl"
234
+ downloaded_file.write_text(downloaded_content)
235
+
236
+ mock_api = type(
237
+ "MockHfApi",
238
+ (),
239
+ {
240
+ "list_repo_files": lambda self, repo_id, repo_type: [
241
+ "daily/test_2026-01-20.jsonl"
242
+ ]
243
+ },
244
+ )()
245
+
246
+ def mock_hfapi():
247
+ return mock_api
248
+
249
+ def mock_download(repo_id, filename, repo_type):
250
+ return str(downloaded_file)
251
+
252
+ monkeypatch.setattr("huggingface_hub.HfApi", mock_hfapi)
253
+ monkeypatch.setattr("huggingface_hub.hf_hub_download", mock_download)
254
+
255
+ result = storage.download_from_hf_dataset("test-org/test-repo")
256
+
257
+ assert result is True
258
+ target_file = temp_dir / "daily" / "test_2026-01-20.jsonl"
259
+ assert target_file.exists()
260
+
261
+ with open(target_file) as f:
262
+ content = f.read()
263
+ assert "remote-1" in content
264
+ assert "remote-2" in content
265
+
266
+ def test_download_merges_with_existing_file(self, storage, temp_dir, monkeypatch):
267
+ """Test that download merges new content with existing local file."""
268
+ # Create existing local file
269
+ existing_content = '{"event_id": "local-1", "event_type": "test"}\n'
270
+ daily_dir = temp_dir / "daily"
271
+ daily_dir.mkdir(parents=True, exist_ok=True)
272
+ local_file = daily_dir / "test_2026-01-20.jsonl"
273
+ local_file.write_text(existing_content)
274
+
275
+ # Create remote content with one duplicate and one new
276
+ remote_content = '{"event_id": "local-1", "event_type": "test"}\n{"event_id": "remote-1", "event_type": "test"}\n'
277
+ downloaded_file = temp_dir / "downloaded_test.jsonl"
278
+ downloaded_file.write_text(remote_content)
279
+
280
+ mock_api = type(
281
+ "MockHfApi",
282
+ (),
283
+ {
284
+ "list_repo_files": lambda self, repo_id, repo_type: [
285
+ "daily/test_2026-01-20.jsonl"
286
+ ]
287
+ },
288
+ )()
289
+
290
+ def mock_hfapi():
291
+ return mock_api
292
+
293
+ def mock_download(repo_id, filename, repo_type):
294
+ return str(downloaded_file)
295
+
296
+ monkeypatch.setattr("huggingface_hub.HfApi", mock_hfapi)
297
+ monkeypatch.setattr("huggingface_hub.hf_hub_download", mock_download)
298
+
299
+ result = storage.download_from_hf_dataset("test-org/test-repo")
300
+
301
+ assert result is True
302
+
303
+ with open(local_file) as f:
304
+ lines = f.readlines()
305
+
306
+ # Should have 2 lines: original local-1 and new remote-1 (no duplicate)
307
+ assert len(lines) == 2
308
+ event_ids = [json.loads(line)["event_id"] for line in lines]
309
+ assert "local-1" in event_ids
310
+ assert "remote-1" in event_ids
311
+
312
+ def test_download_handles_multiple_files(self, storage, temp_dir, monkeypatch):
313
+ """Test that download handles multiple remote files."""
314
+ # Create remote content files
315
+ usage_content = '{"event_id": "usage-1"}\n'
316
+ failure_content = '{"event_id": "failure-1"}\n'
317
+
318
+ usage_file = temp_dir / "usage_download.jsonl"
319
+ failure_file = temp_dir / "failure_download.jsonl"
320
+ usage_file.write_text(usage_content)
321
+ failure_file.write_text(failure_content)
322
+
323
+ mock_api = type(
324
+ "MockHfApi",
325
+ (),
326
+ {
327
+ "list_repo_files": lambda self, repo_id, repo_type: [
328
+ "daily/usage_2026-01-20.jsonl",
329
+ "daily/failure_2026-01-20.jsonl",
330
+ ]
331
+ },
332
+ )()
333
+
334
+ def mock_hfapi():
335
+ return mock_api
336
+
337
+ def mock_download(repo_id, filename, repo_type):
338
+ if "usage" in filename:
339
+ return str(usage_file)
340
+ return str(failure_file)
341
+
342
+ monkeypatch.setattr("huggingface_hub.HfApi", mock_hfapi)
343
+ monkeypatch.setattr("huggingface_hub.hf_hub_download", mock_download)
344
+
345
+ result = storage.download_from_hf_dataset("test-org/test-repo")
346
+
347
+ assert result is True
348
+
349
+ daily_dir = temp_dir / "daily"
350
+ usage_target = daily_dir / "usage_2026-01-20.jsonl"
351
+ failure_target = daily_dir / "failure_2026-01-20.jsonl"
352
+
353
+ assert usage_target.exists()
354
+ assert failure_target.exists()