bobber's picture
Fix: filter invalid JSONL lines during flush
6e2d1aa verified
import json
import os
from datetime import datetime
from threading import Lock, Timer
import gradio as gr
from huggingface_hub import HfApi
DATASET_REPO = "bobber/routangseng-telemetry"
HF_TOKEN = os.environ.get("HF_TOKEN")
BUFFER = []
BUFFER_LOCK = Lock()
FLUSH_INTERVAL = 60
FLUSH_SIZE = 10
api = HfApi(token=HF_TOKEN)
def flush_buffer():
global BUFFER
with BUFFER_LOCK:
if not BUFFER:
return 0
entries = BUFFER.copy()
BUFFER = []
try:
path = api.hf_hub_download(DATASET_REPO, "data/telemetry.jsonl", repo_type="dataset", token=HF_TOKEN)
existing = open(path).read().strip()
# Remove any invalid lines (e.g. "[]" from initial creation)
existing = "\n".join(
line for line in existing.split("\n")
if line.strip() and line.strip() != "[]"
)
except Exception:
existing = ""
new_lines = "\n".join(json.dumps(e, ensure_ascii=False) for e in entries)
content = (existing + "\n" + new_lines).strip() + "\n"
api.upload_file(
path_or_fileobj=content.encode("utf-8"),
path_in_repo="data/telemetry.jsonl",
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Add {len(entries)} telemetry entries",
)
return len(entries)
def periodic_flush():
try:
n = flush_buffer()
if n > 0:
print(f"[telemetry] Flushed {n} entries")
except Exception as e:
print(f"[telemetry] Flush error: {e}")
Timer(FLUSH_INTERVAL, periodic_flush).start()
Timer(FLUSH_INTERVAL, periodic_flush).start()
def collect(data: str) -> str:
try:
entry = json.loads(data)
if "question" not in entry or "answer" not in entry:
return json.dumps({"status": "error", "message": "missing fields"})
entry["received_at"] = datetime.utcnow().isoformat() + "Z"
with BUFFER_LOCK:
BUFFER.append(entry)
buf_size = len(BUFFER)
if buf_size >= FLUSH_SIZE:
try:
n = flush_buffer()
return json.dumps({"status": "ok", "flushed": n})
except Exception as e:
return json.dumps({"status": "ok", "buffered": buf_size, "error": str(e)})
return json.dumps({"status": "ok", "buffered": buf_size})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
def get_status() -> str:
with BUFFER_LOCK:
buf = len(BUFFER)
return f"Buffer: {buf} | Token: {'βœ…' if HF_TOKEN else '❌'} | Dataset: {DATASET_REPO}"
def do_flush() -> str:
try:
n = flush_buffer()
return f"Flushed {n} entries"
except Exception as e:
return f"Error: {e}"
with gr.Blocks(title="θ‚‰η³–η”Ÿ Telemetry") as demo:
gr.Markdown("# πŸ“Š θ‚‰η³–η”Ÿ Telemetry")
with gr.Row():
gr.Button("Status").click(fn=get_status, outputs=gr.Textbox(label="Status"))
gr.Button("Flush").click(fn=do_flush, outputs=gr.Textbox(label="Result"))
# Public API endpoint
inp = gr.Textbox(label="JSON data")
out = gr.Textbox(label="Response")
inp.submit(fn=collect, inputs=inp, outputs=out, api_name="collect")
demo.launch()