Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| from pathlib import Path | |
| import orjson | |
| from trackio.utils import TRACKIO_DIR, serialize_values | |
| BUFFER_DIR = TRACKIO_DIR / "pending" | |
| def _buffer_path(project: str, kind: str) -> Path: | |
| safe = "".join(c for c in project if c.isalnum() or c in ("-", "_")).rstrip() | |
| if not safe: | |
| safe = "default" | |
| return BUFFER_DIR / f"{safe}_{kind}.jsonl" | |
| def _append(path: Path, entries: list[dict]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "ab") as f: | |
| for entry in entries: | |
| f.write(orjson.dumps(entry) + b"\n") | |
| def _read_all(path: Path) -> list[dict]: | |
| if not path.exists(): | |
| return [] | |
| entries = [] | |
| with open(path, "rb") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| entries.append(orjson.loads(line)) | |
| except Exception: | |
| continue | |
| return entries | |
| def _clear(path: Path) -> None: | |
| try: | |
| os.remove(path) | |
| except FileNotFoundError: | |
| pass | |
| def append_logs(project: str, logs: list[dict], space_id: str) -> None: | |
| path = _buffer_path(project, "logs") | |
| entries = [] | |
| for entry in logs: | |
| row = { | |
| "project": entry["project"], | |
| "run": entry["run"], | |
| "metrics": serialize_values(entry["metrics"]), | |
| "step": entry.get("step"), | |
| "log_id": entry.get("log_id"), | |
| "space_id": space_id, | |
| } | |
| if entry.get("config"): | |
| row["config"] = serialize_values(entry["config"]) | |
| entries.append(row) | |
| _append(path, entries) | |
| def append_system_logs(project: str, logs: list[dict], space_id: str) -> None: | |
| path = _buffer_path(project, "system_logs") | |
| entries = [] | |
| for entry in logs: | |
| entries.append( | |
| { | |
| "project": entry["project"], | |
| "run": entry["run"], | |
| "metrics": serialize_values(entry["metrics"]), | |
| "timestamp": entry.get("timestamp"), | |
| "log_id": entry.get("log_id"), | |
| "space_id": space_id, | |
| } | |
| ) | |
| _append(path, entries) | |
| def append_uploads(project: str, uploads: list[dict], space_id: str) -> None: | |
| path = _buffer_path(project, "uploads") | |
| entries = [] | |
| for entry in uploads: | |
| file_data = entry.get("uploaded_file") | |
| file_path = "" | |
| if isinstance(file_data, dict): | |
| file_path = file_data.get("path", "") | |
| elif hasattr(file_data, "path"): | |
| file_path = str(file_data.path) | |
| else: | |
| file_path = str(file_data) | |
| entries.append( | |
| { | |
| "project": entry["project"], | |
| "run": entry.get("run"), | |
| "step": entry.get("step"), | |
| "file_path": file_path, | |
| "relative_path": entry.get("relative_path"), | |
| "space_id": space_id, | |
| } | |
| ) | |
| _append(path, entries) | |
| def append_alerts(project: str, alerts: list[dict]) -> None: | |
| path = _buffer_path(project, "alerts") | |
| entries = [] | |
| for entry in alerts: | |
| entries.append( | |
| { | |
| "project": entry["project"], | |
| "run": entry["run"], | |
| "title": entry["title"], | |
| "text": entry.get("text"), | |
| "level": entry["level"], | |
| "step": entry.get("step"), | |
| "timestamp": entry.get("timestamp"), | |
| "alert_id": entry.get("alert_id"), | |
| } | |
| ) | |
| _append(path, entries) | |
| def get_pending_logs(project: str) -> list[dict] | None: | |
| entries = _read_all(_buffer_path(project, "logs")) | |
| if not entries: | |
| return None | |
| logs = [] | |
| for row in entries: | |
| logs.append( | |
| { | |
| "project": row["project"], | |
| "run": row["run"], | |
| "metrics": row["metrics"], | |
| "step": row.get("step"), | |
| "log_id": row.get("log_id"), | |
| "config": row.get("config"), | |
| } | |
| ) | |
| return {"logs": logs, "space_id": entries[0].get("space_id")} | |
| def get_pending_system_logs(project: str) -> list[dict] | None: | |
| entries = _read_all(_buffer_path(project, "system_logs")) | |
| if not entries: | |
| return None | |
| logs = [] | |
| for row in entries: | |
| logs.append( | |
| { | |
| "project": row["project"], | |
| "run": row["run"], | |
| "metrics": row["metrics"], | |
| "timestamp": row.get("timestamp"), | |
| "log_id": row.get("log_id"), | |
| } | |
| ) | |
| return {"logs": logs, "space_id": entries[0].get("space_id")} | |
| def get_pending_uploads(project: str) -> list[dict] | None: | |
| entries = _read_all(_buffer_path(project, "uploads")) | |
| if not entries: | |
| return None | |
| uploads = [] | |
| for row in entries: | |
| uploads.append( | |
| { | |
| "project": row["project"], | |
| "run": row.get("run"), | |
| "step": row.get("step"), | |
| "file_path": row["file_path"], | |
| "relative_path": row.get("relative_path"), | |
| } | |
| ) | |
| return {"uploads": uploads, "space_id": entries[0].get("space_id")} | |
| def clear_logs(project: str) -> None: | |
| _clear(_buffer_path(project, "logs")) | |
| def clear_system_logs(project: str) -> None: | |
| _clear(_buffer_path(project, "system_logs")) | |
| def clear_uploads(project: str) -> None: | |
| _clear(_buffer_path(project, "uploads")) | |
| def clear_alerts(project: str) -> None: | |
| _clear(_buffer_path(project, "alerts")) | |
| def has_pending_data(project: str) -> bool: | |
| for kind in ("logs", "system_logs", "uploads", "alerts"): | |
| path = _buffer_path(project, kind) | |
| if path.exists() and path.stat().st_size > 0: | |
| return True | |
| return False | |