bobber commited on
Commit
f4e2b6e
Β·
verified Β·
1 Parent(s): 54e0771

Telemetry collector with Gradio API endpoint + periodic flush

Browse files
Files changed (1) hide show
  1. app.py +51 -27
app.py CHANGED
@@ -1,8 +1,7 @@
1
  import json
2
  import os
3
  from datetime import datetime
4
- from pathlib import Path
5
- from threading import Lock
6
 
7
  import gradio as gr
8
  from huggingface_hub import HfApi
@@ -11,7 +10,8 @@ DATASET_REPO = "bobber/routangseng-telemetry"
11
  HF_TOKEN = os.environ.get("HF_TOKEN")
12
  BUFFER = []
13
  BUFFER_LOCK = Lock()
14
- FLUSH_SIZE = 5 # flush every 5 entries
 
15
 
16
  api = HfApi(token=HF_TOKEN)
17
 
@@ -25,9 +25,8 @@ def flush_buffer():
25
  entries = BUFFER.copy()
26
  BUFFER = []
27
 
28
- # Download existing data, append, re-upload
29
  try:
30
- path = api.hf_hub_download(DATASET_REPO, "data/telemetry.jsonl", repo_type="dataset")
31
  existing = open(path).read().strip()
32
  except Exception:
33
  existing = ""
@@ -40,58 +39,83 @@ def flush_buffer():
40
  path_in_repo="data/telemetry.jsonl",
41
  repo_id=DATASET_REPO,
42
  repo_type="dataset",
 
43
  commit_message=f"Add {len(entries)} telemetry entries",
44
  )
45
  return len(entries)
46
 
47
 
48
- def collect(data: str):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  """Receive telemetry JSON from the WebGPU Space."""
50
  try:
51
  entry = json.loads(data)
52
- # Validate required fields
53
  if "question" not in entry or "answer" not in entry:
54
  return {"status": "error", "message": "missing question or answer"}
55
-
56
  entry["received_at"] = datetime.utcnow().isoformat() + "Z"
57
-
58
  with BUFFER_LOCK:
59
  BUFFER.append(entry)
60
  buf_size = len(BUFFER)
61
 
62
  if buf_size >= FLUSH_SIZE:
63
- n = flush_buffer()
64
- return {"status": "ok", "flushed": n}
65
-
 
 
 
66
  return {"status": "ok", "buffered": buf_size}
67
  except json.JSONDecodeError:
68
  return {"status": "error", "message": "invalid JSON"}
69
 
70
 
71
- def manual_flush():
72
- """Manually flush buffer to dataset."""
73
- n = flush_buffer()
74
- return f"Flushed {n} entries"
75
 
76
 
77
- def status():
78
- """Check buffer status."""
79
- return f"Buffer: {len(BUFFER)} entries"
 
 
 
80
 
81
 
82
- # Gradio interface (minimal UI + API)
83
  with gr.Blocks(title="θ‚‰η³–η”Ÿ Telemetry") as demo:
84
- gr.Markdown("# θ‚‰η³–η”Ÿ Telemetry Collector\nReceives Q&A data from the WebGPU Space.")
85
-
86
  with gr.Row():
87
- status_btn = gr.Button("Check Status")
88
- flush_btn = gr.Button("Flush to Dataset")
89
- output = gr.Textbox(label="Status")
90
-
91
  status_btn.click(fn=status, outputs=output)
92
  flush_btn.click(fn=manual_flush, outputs=output)
93
 
94
- # API endpoint: POST /api/collect
 
 
 
 
 
95
  demo.queue()
96
 
97
  if __name__ == "__main__":
 
1
  import json
2
  import os
3
  from datetime import datetime
4
+ from threading import Lock, Timer
 
5
 
6
  import gradio as gr
7
  from huggingface_hub import HfApi
 
10
  HF_TOKEN = os.environ.get("HF_TOKEN")
11
  BUFFER = []
12
  BUFFER_LOCK = Lock()
13
+ FLUSH_INTERVAL = 60 # seconds
14
+ FLUSH_SIZE = 10 # or flush when buffer hits this size
15
 
16
  api = HfApi(token=HF_TOKEN)
17
 
 
25
  entries = BUFFER.copy()
26
  BUFFER = []
27
 
 
28
  try:
29
+ path = api.hf_hub_download(DATASET_REPO, "data/telemetry.jsonl", repo_type="dataset", token=HF_TOKEN)
30
  existing = open(path).read().strip()
31
  except Exception:
32
  existing = ""
 
39
  path_in_repo="data/telemetry.jsonl",
40
  repo_id=DATASET_REPO,
41
  repo_type="dataset",
42
+ token=HF_TOKEN,
43
  commit_message=f"Add {len(entries)} telemetry entries",
44
  )
45
  return len(entries)
46
 
47
 
48
+ def periodic_flush():
49
+ """Flush buffer periodically."""
50
+ try:
51
+ n = flush_buffer()
52
+ if n > 0:
53
+ print(f"[telemetry] Flushed {n} entries to dataset")
54
+ except Exception as e:
55
+ print(f"[telemetry] Flush error: {e}")
56
+ Timer(FLUSH_INTERVAL, periodic_flush).start()
57
+
58
+
59
+ # Start periodic flush
60
+ Timer(FLUSH_INTERVAL, periodic_flush).start()
61
+
62
+
63
+ def collect(data: str) -> dict:
64
  """Receive telemetry JSON from the WebGPU Space."""
65
  try:
66
  entry = json.loads(data)
 
67
  if "question" not in entry or "answer" not in entry:
68
  return {"status": "error", "message": "missing question or answer"}
69
+
70
  entry["received_at"] = datetime.utcnow().isoformat() + "Z"
71
+
72
  with BUFFER_LOCK:
73
  BUFFER.append(entry)
74
  buf_size = len(BUFFER)
75
 
76
  if buf_size >= FLUSH_SIZE:
77
+ try:
78
+ n = flush_buffer()
79
+ return {"status": "ok", "flushed": n}
80
+ except Exception as e:
81
+ return {"status": "ok", "buffered": buf_size, "flush_error": str(e)}
82
+
83
  return {"status": "ok", "buffered": buf_size}
84
  except json.JSONDecodeError:
85
  return {"status": "error", "message": "invalid JSON"}
86
 
87
 
88
+ def status():
89
+ with BUFFER_LOCK:
90
+ buf = len(BUFFER)
91
+ return f"Buffer: {buf} entries | Dataset: {DATASET_REPO} | Token: {'set' if HF_TOKEN else 'MISSING'}"
92
 
93
 
94
+ def manual_flush():
95
+ try:
96
+ n = flush_buffer()
97
+ return f"Flushed {n} entries to {DATASET_REPO}"
98
+ except Exception as e:
99
+ return f"Flush error: {e}"
100
 
101
 
 
102
  with gr.Blocks(title="θ‚‰η³–η”Ÿ Telemetry") as demo:
103
+ gr.Markdown("# πŸ“Š θ‚‰η³–η”Ÿ Telemetry\nCollects Q&A from WebGPU Space β†’ HuggingFace Dataset")
104
+
105
  with gr.Row():
106
+ status_btn = gr.Button("Status")
107
+ flush_btn = gr.Button("Flush Now")
108
+ output = gr.Textbox(label="Result")
109
+
110
  status_btn.click(fn=status, outputs=output)
111
  flush_btn.click(fn=manual_flush, outputs=output)
112
 
113
+ # API endpoint for the WebGPU Space
114
+ collect_input = gr.Textbox(visible=False)
115
+ collect_output = gr.JSON(visible=False)
116
+ collect_btn = gr.Button(visible=False)
117
+ collect_btn.click(fn=collect, inputs=collect_input, outputs=collect_output, api_name="collect")
118
+
119
  demo.queue()
120
 
121
  if __name__ == "__main__":