import json import os from datetime import datetime from threading import Lock, Timer import gradio as gr from huggingface_hub import HfApi DATASET_REPO = "bobber/routangseng-telemetry" HF_TOKEN = os.environ.get("HF_TOKEN") BUFFER = [] BUFFER_LOCK = Lock() FLUSH_INTERVAL = 60 FLUSH_SIZE = 10 api = HfApi(token=HF_TOKEN) def flush_buffer(): global BUFFER with BUFFER_LOCK: if not BUFFER: return 0 entries = BUFFER.copy() BUFFER = [] try: path = api.hf_hub_download(DATASET_REPO, "data/telemetry.jsonl", repo_type="dataset", token=HF_TOKEN) existing = open(path).read().strip() # Remove any invalid lines (e.g. "[]" from initial creation) existing = "\n".join( line for line in existing.split("\n") if line.strip() and line.strip() != "[]" ) except Exception: existing = "" new_lines = "\n".join(json.dumps(e, ensure_ascii=False) for e in entries) content = (existing + "\n" + new_lines).strip() + "\n" api.upload_file( path_or_fileobj=content.encode("utf-8"), path_in_repo="data/telemetry.jsonl", repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Add {len(entries)} telemetry entries", ) return len(entries) def periodic_flush(): try: n = flush_buffer() if n > 0: print(f"[telemetry] Flushed {n} entries") except Exception as e: print(f"[telemetry] Flush error: {e}") Timer(FLUSH_INTERVAL, periodic_flush).start() Timer(FLUSH_INTERVAL, periodic_flush).start() def collect(data: str) -> str: try: entry = json.loads(data) if "question" not in entry or "answer" not in entry: return json.dumps({"status": "error", "message": "missing fields"}) entry["received_at"] = datetime.utcnow().isoformat() + "Z" with BUFFER_LOCK: BUFFER.append(entry) buf_size = len(BUFFER) if buf_size >= FLUSH_SIZE: try: n = flush_buffer() return json.dumps({"status": "ok", "flushed": n}) except Exception as e: return json.dumps({"status": "ok", "buffered": buf_size, "error": str(e)}) return json.dumps({"status": "ok", "buffered": buf_size}) except Exception as e: return json.dumps({"status": "error", "message": str(e)}) def get_status() -> str: with BUFFER_LOCK: buf = len(BUFFER) return f"Buffer: {buf} | Token: {'✅' if HF_TOKEN else '❌'} | Dataset: {DATASET_REPO}" def do_flush() -> str: try: n = flush_buffer() return f"Flushed {n} entries" except Exception as e: return f"Error: {e}" with gr.Blocks(title="肉糖生 Telemetry") as demo: gr.Markdown("# 📊 肉糖生 Telemetry") with gr.Row(): gr.Button("Status").click(fn=get_status, outputs=gr.Textbox(label="Status")) gr.Button("Flush").click(fn=do_flush, outputs=gr.Textbox(label="Result")) # Public API endpoint inp = gr.Textbox(label="JSON data") out = gr.Textbox(label="Response") inp.submit(fn=collect, inputs=inp, outputs=out, api_name="collect") demo.launch()