Spaces:
Running on Zero
Running on Zero
| """ | |
| agent_trace.py | |
| ββββββββββββββ | |
| Logs the 3-step GharScan agentic reasoning chain per inference call. | |
| Traces are periodically pushed to HuggingFace Hub as a public dataset. | |
| Qualifies for: π‘ Sharing is Caring badge | |
| Dataset target: <hf_username>/gharscan-agent-traces | |
| """ | |
| import json | |
| import uuid | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| from loguru import logger | |
| from huggingface_hub import HfApi, login | |
| import os | |
| HF_TRACE_REPO = os.getenv("HF_TRACE_REPO", "ritvik360/gharscan-agent-traces") | |
| LOCAL_TRACE_DIR = Path("/tmp/gharscan_traces") | |
| FLUSH_EVERY_N = 20 # Push to Hub every N traces to avoid rate-limiting | |
| class TraceSession: | |
| """Represents a single user inference session with 3 reasoning steps.""" | |
| def __init__(self, session_id: Optional[str] = None): | |
| self.session_id = session_id or str(uuid.uuid4())[:8] | |
| self.started_at = datetime.now(timezone.utc).isoformat() | |
| self.steps: list = [] | |
| self.final_report: Optional[dict] = None | |
| self.duration_ms: Optional[int] = None | |
| self._t0 = time.monotonic() | |
| def log_step(self, step_name: str, step_input: dict, step_output: dict) -> None: | |
| self.steps.append({ | |
| "step": step_name, # "classify" | "severity" | "cost" | |
| "input": step_input, | |
| "output": step_output, | |
| "elapsed_ms": int((time.monotonic() - self._t0) * 1000), | |
| }) | |
| def finalize(self, final_report: dict) -> None: | |
| self.final_report = final_report | |
| self.duration_ms = int((time.monotonic() - self._t0) * 1000) | |
| def to_dict(self) -> dict: | |
| return { | |
| "session_id": self.session_id, | |
| "started_at": self.started_at, | |
| "duration_ms": self.duration_ms, | |
| "steps": self.steps, | |
| "final_report": self.final_report, | |
| } | |
| class AgentTraceLogger: | |
| """ | |
| Manages trace collection and periodic HF Hub uploads. | |
| Keeps traces in local JSONL buffer; flushes to Hub every FLUSH_EVERY_N calls. | |
| """ | |
| def __init__(self): | |
| LOCAL_TRACE_DIR.mkdir(parents=True, exist_ok=True) | |
| self._buffer_path = LOCAL_TRACE_DIR / "traces_buffer.jsonl" | |
| self._count = 0 | |
| self._api = None | |
| self._hf_ready = False | |
| self._init_hf() | |
| def _init_hf(self): | |
| """Try to authenticate with HF Hub. Silently skip if no token.""" | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token: | |
| try: | |
| login(token=hf_token) | |
| self._api = HfApi() | |
| self._hf_ready = True | |
| # Ensure dataset repo exists | |
| self._api.create_repo( | |
| repo_id=HF_TRACE_REPO, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| private=False, | |
| ) | |
| logger.info(f"AgentTraceLogger ready β {HF_TRACE_REPO}") | |
| except Exception as e: | |
| logger.warning(f"HF trace upload disabled: {e}") | |
| else: | |
| logger.warning("HF_TOKEN not set β traces saved locally only.") | |
| def start_trace(self) -> TraceSession: | |
| return TraceSession() | |
| def save_trace(self, session: TraceSession) -> None: | |
| """Append trace to local buffer; flush to HF Hub every N traces.""" | |
| record = json.dumps(session.to_dict(), ensure_ascii=False) | |
| with open(self._buffer_path, "a", encoding="utf-8") as f: | |
| f.write(record + "\n") | |
| self._count += 1 | |
| logger.debug(f"Trace saved [{self._count}]: session={session.session_id}") | |
| if self._count % FLUSH_EVERY_N == 0 and self._hf_ready: | |
| self._flush_to_hub() | |
| def _flush_to_hub(self) -> None: | |
| """Upload local JSONL buffer to HF Hub dataset repo.""" | |
| try: | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") | |
| remote_filename = f"traces_{timestamp}.jsonl" | |
| self._api.upload_file( | |
| path_or_fileobj=str(self._buffer_path), | |
| path_in_repo=f"data/{remote_filename}", | |
| repo_id=HF_TRACE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Auto-flush: {self._count} traces", | |
| ) | |
| # Reset local buffer after successful upload | |
| self._buffer_path.write_text("") | |
| logger.info(f"Traces flushed to Hub β {HF_TRACE_REPO}/data/{remote_filename}") | |
| except Exception as e: | |
| logger.error(f"Trace flush failed: {e}") | |
| def force_flush(self) -> None: | |
| """Call manually before Space shutdown to push remaining traces.""" | |
| if self._hf_ready and self._buffer_path.exists(): | |
| self._flush_to_hub() | |