File size: 3,266 Bytes
3dcef90 f4e2b6e 3dcef90 7d39fa7 3dcef90 f4e2b6e 3dcef90 6e2d1aa 3dcef90 f4e2b6e 3dcef90 f4e2b6e 7d39fa7 f4e2b6e 7d39fa7 3dcef90 7d39fa7 f4e2b6e 3dcef90 f4e2b6e 3dcef90 f4e2b6e 7d39fa7 f4e2b6e 7d39fa7 f4e2b6e 7d39fa7 3dcef90 7d39fa7 f4e2b6e 7d39fa7 3dcef90 7d39fa7 f4e2b6e 7d39fa7 f4e2b6e 7d39fa7 3dcef90 7d39fa7 3dcef90 7d39fa7 f4e2b6e 7d39fa7 3dcef90 7d39fa7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | import json
import os
from datetime import datetime
from threading import Lock, Timer
import gradio as gr
from huggingface_hub import HfApi
DATASET_REPO = "bobber/routangseng-telemetry"
HF_TOKEN = os.environ.get("HF_TOKEN")
BUFFER = []
BUFFER_LOCK = Lock()
FLUSH_INTERVAL = 60
FLUSH_SIZE = 10
api = HfApi(token=HF_TOKEN)
def flush_buffer():
global BUFFER
with BUFFER_LOCK:
if not BUFFER:
return 0
entries = BUFFER.copy()
BUFFER = []
try:
path = api.hf_hub_download(DATASET_REPO, "data/telemetry.jsonl", repo_type="dataset", token=HF_TOKEN)
existing = open(path).read().strip()
# Remove any invalid lines (e.g. "[]" from initial creation)
existing = "\n".join(
line for line in existing.split("\n")
if line.strip() and line.strip() != "[]"
)
except Exception:
existing = ""
new_lines = "\n".join(json.dumps(e, ensure_ascii=False) for e in entries)
content = (existing + "\n" + new_lines).strip() + "\n"
api.upload_file(
path_or_fileobj=content.encode("utf-8"),
path_in_repo="data/telemetry.jsonl",
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Add {len(entries)} telemetry entries",
)
return len(entries)
def periodic_flush():
try:
n = flush_buffer()
if n > 0:
print(f"[telemetry] Flushed {n} entries")
except Exception as e:
print(f"[telemetry] Flush error: {e}")
Timer(FLUSH_INTERVAL, periodic_flush).start()
Timer(FLUSH_INTERVAL, periodic_flush).start()
def collect(data: str) -> str:
try:
entry = json.loads(data)
if "question" not in entry or "answer" not in entry:
return json.dumps({"status": "error", "message": "missing fields"})
entry["received_at"] = datetime.utcnow().isoformat() + "Z"
with BUFFER_LOCK:
BUFFER.append(entry)
buf_size = len(BUFFER)
if buf_size >= FLUSH_SIZE:
try:
n = flush_buffer()
return json.dumps({"status": "ok", "flushed": n})
except Exception as e:
return json.dumps({"status": "ok", "buffered": buf_size, "error": str(e)})
return json.dumps({"status": "ok", "buffered": buf_size})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
def get_status() -> str:
with BUFFER_LOCK:
buf = len(BUFFER)
return f"Buffer: {buf} | Token: {'β
' if HF_TOKEN else 'β'} | Dataset: {DATASET_REPO}"
def do_flush() -> str:
try:
n = flush_buffer()
return f"Flushed {n} entries"
except Exception as e:
return f"Error: {e}"
with gr.Blocks(title="θη³η Telemetry") as demo:
gr.Markdown("# π θη³η Telemetry")
with gr.Row():
gr.Button("Status").click(fn=get_status, outputs=gr.Textbox(label="Status"))
gr.Button("Flush").click(fn=do_flush, outputs=gr.Textbox(label="Result"))
# Public API endpoint
inp = gr.Textbox(label="JSON data")
out = gr.Textbox(label="Response")
inp.submit(fn=collect, inputs=inp, outputs=out, api_name="collect")
demo.launch()
|