test_495 / trackio /jsonl_buffer.py
abidlabs's picture
abidlabs HF Staff
Upload folder using huggingface_hub
daf707a verified
from __future__ import annotations
import os
from pathlib import Path
import orjson
from trackio.utils import TRACKIO_DIR, serialize_values
BUFFER_DIR = TRACKIO_DIR / "pending"
def _buffer_path(project: str, kind: str) -> Path:
safe = "".join(c for c in project if c.isalnum() or c in ("-", "_")).rstrip()
if not safe:
safe = "default"
return BUFFER_DIR / f"{safe}_{kind}.jsonl"
def _append(path: Path, entries: list[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "ab") as f:
for entry in entries:
f.write(orjson.dumps(entry) + b"\n")
def _read_all(path: Path) -> list[dict]:
if not path.exists():
return []
entries = []
with open(path, "rb") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(orjson.loads(line))
except Exception:
continue
return entries
def _clear(path: Path) -> None:
try:
os.remove(path)
except FileNotFoundError:
pass
def append_logs(project: str, logs: list[dict], space_id: str) -> None:
path = _buffer_path(project, "logs")
entries = []
for entry in logs:
row = {
"project": entry["project"],
"run": entry["run"],
"metrics": serialize_values(entry["metrics"]),
"step": entry.get("step"),
"log_id": entry.get("log_id"),
"space_id": space_id,
}
if entry.get("config"):
row["config"] = serialize_values(entry["config"])
entries.append(row)
_append(path, entries)
def append_system_logs(project: str, logs: list[dict], space_id: str) -> None:
path = _buffer_path(project, "system_logs")
entries = []
for entry in logs:
entries.append(
{
"project": entry["project"],
"run": entry["run"],
"metrics": serialize_values(entry["metrics"]),
"timestamp": entry.get("timestamp"),
"log_id": entry.get("log_id"),
"space_id": space_id,
}
)
_append(path, entries)
def append_uploads(project: str, uploads: list[dict], space_id: str) -> None:
path = _buffer_path(project, "uploads")
entries = []
for entry in uploads:
file_data = entry.get("uploaded_file")
file_path = ""
if isinstance(file_data, dict):
file_path = file_data.get("path", "")
elif hasattr(file_data, "path"):
file_path = str(file_data.path)
else:
file_path = str(file_data)
entries.append(
{
"project": entry["project"],
"run": entry.get("run"),
"step": entry.get("step"),
"file_path": file_path,
"relative_path": entry.get("relative_path"),
"space_id": space_id,
}
)
_append(path, entries)
def append_alerts(project: str, alerts: list[dict]) -> None:
path = _buffer_path(project, "alerts")
entries = []
for entry in alerts:
entries.append(
{
"project": entry["project"],
"run": entry["run"],
"title": entry["title"],
"text": entry.get("text"),
"level": entry["level"],
"step": entry.get("step"),
"timestamp": entry.get("timestamp"),
"alert_id": entry.get("alert_id"),
}
)
_append(path, entries)
def get_pending_logs(project: str) -> list[dict] | None:
entries = _read_all(_buffer_path(project, "logs"))
if not entries:
return None
logs = []
for row in entries:
logs.append(
{
"project": row["project"],
"run": row["run"],
"metrics": row["metrics"],
"step": row.get("step"),
"log_id": row.get("log_id"),
"config": row.get("config"),
}
)
return {"logs": logs, "space_id": entries[0].get("space_id")}
def get_pending_system_logs(project: str) -> list[dict] | None:
entries = _read_all(_buffer_path(project, "system_logs"))
if not entries:
return None
logs = []
for row in entries:
logs.append(
{
"project": row["project"],
"run": row["run"],
"metrics": row["metrics"],
"timestamp": row.get("timestamp"),
"log_id": row.get("log_id"),
}
)
return {"logs": logs, "space_id": entries[0].get("space_id")}
def get_pending_uploads(project: str) -> list[dict] | None:
entries = _read_all(_buffer_path(project, "uploads"))
if not entries:
return None
uploads = []
for row in entries:
uploads.append(
{
"project": row["project"],
"run": row.get("run"),
"step": row.get("step"),
"file_path": row["file_path"],
"relative_path": row.get("relative_path"),
}
)
return {"uploads": uploads, "space_id": entries[0].get("space_id")}
def clear_logs(project: str) -> None:
_clear(_buffer_path(project, "logs"))
def clear_system_logs(project: str) -> None:
_clear(_buffer_path(project, "system_logs"))
def clear_uploads(project: str) -> None:
_clear(_buffer_path(project, "uploads"))
def clear_alerts(project: str) -> None:
_clear(_buffer_path(project, "alerts"))
def has_pending_data(project: str) -> bool:
for kind in ("logs", "system_logs", "uploads", "alerts"):
path = _buffer_path(project, kind)
if path.exists() and path.stat().st_size > 0:
return True
return False