Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from datetime import datetime | |
| from threading import Lock, Timer | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| DATASET_REPO = "bobber/routangseng-telemetry" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| BUFFER = [] | |
| BUFFER_LOCK = Lock() | |
| FLUSH_INTERVAL = 60 | |
| FLUSH_SIZE = 10 | |
| api = HfApi(token=HF_TOKEN) | |
| def flush_buffer(): | |
| global BUFFER | |
| with BUFFER_LOCK: | |
| if not BUFFER: | |
| return 0 | |
| entries = BUFFER.copy() | |
| BUFFER = [] | |
| try: | |
| path = api.hf_hub_download(DATASET_REPO, "data/telemetry.jsonl", repo_type="dataset", token=HF_TOKEN) | |
| existing = open(path).read().strip() | |
| # Remove any invalid lines (e.g. "[]" from initial creation) | |
| existing = "\n".join( | |
| line for line in existing.split("\n") | |
| if line.strip() and line.strip() != "[]" | |
| ) | |
| except Exception: | |
| existing = "" | |
| new_lines = "\n".join(json.dumps(e, ensure_ascii=False) for e in entries) | |
| content = (existing + "\n" + new_lines).strip() + "\n" | |
| api.upload_file( | |
| path_or_fileobj=content.encode("utf-8"), | |
| path_in_repo="data/telemetry.jsonl", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Add {len(entries)} telemetry entries", | |
| ) | |
| return len(entries) | |
| def periodic_flush(): | |
| try: | |
| n = flush_buffer() | |
| if n > 0: | |
| print(f"[telemetry] Flushed {n} entries") | |
| except Exception as e: | |
| print(f"[telemetry] Flush error: {e}") | |
| Timer(FLUSH_INTERVAL, periodic_flush).start() | |
| Timer(FLUSH_INTERVAL, periodic_flush).start() | |
| def collect(data: str) -> str: | |
| try: | |
| entry = json.loads(data) | |
| if "question" not in entry or "answer" not in entry: | |
| return json.dumps({"status": "error", "message": "missing fields"}) | |
| entry["received_at"] = datetime.utcnow().isoformat() + "Z" | |
| with BUFFER_LOCK: | |
| BUFFER.append(entry) | |
| buf_size = len(BUFFER) | |
| if buf_size >= FLUSH_SIZE: | |
| try: | |
| n = flush_buffer() | |
| return json.dumps({"status": "ok", "flushed": n}) | |
| except Exception as e: | |
| return json.dumps({"status": "ok", "buffered": buf_size, "error": str(e)}) | |
| return json.dumps({"status": "ok", "buffered": buf_size}) | |
| except Exception as e: | |
| return json.dumps({"status": "error", "message": str(e)}) | |
| def get_status() -> str: | |
| with BUFFER_LOCK: | |
| buf = len(BUFFER) | |
| return f"Buffer: {buf} | Token: {'β ' if HF_TOKEN else 'β'} | Dataset: {DATASET_REPO}" | |
| def do_flush() -> str: | |
| try: | |
| n = flush_buffer() | |
| return f"Flushed {n} entries" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| with gr.Blocks(title="θη³η Telemetry") as demo: | |
| gr.Markdown("# π θη³η Telemetry") | |
| with gr.Row(): | |
| gr.Button("Status").click(fn=get_status, outputs=gr.Textbox(label="Status")) | |
| gr.Button("Flush").click(fn=do_flush, outputs=gr.Textbox(label="Result")) | |
| # Public API endpoint | |
| inp = gr.Textbox(label="JSON data") | |
| out = gr.Textbox(label="Response") | |
| inp.submit(fn=collect, inputs=inp, outputs=out, api_name="collect") | |
| demo.launch() | |