GharScan / agent_trace.py
Ritvik Shrivastava
feat: initial GharScan deployment
41c0a9e
Raw
History Blame Contribute Delete
4.88 kB
"""
agent_trace.py
──────────────
Logs the 3-step GharScan agentic reasoning chain per inference call.
Traces are periodically pushed to HuggingFace Hub as a public dataset.
Qualifies for: πŸ“‘ Sharing is Caring badge
Dataset target: <hf_username>/gharscan-agent-traces
"""
import json
import uuid
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from loguru import logger
from huggingface_hub import HfApi, login
import os
HF_TRACE_REPO = os.getenv("HF_TRACE_REPO", "ritvik360/gharscan-agent-traces")
LOCAL_TRACE_DIR = Path("/tmp/gharscan_traces")
FLUSH_EVERY_N = 20 # Push to Hub every N traces to avoid rate-limiting
class TraceSession:
"""Represents a single user inference session with 3 reasoning steps."""
def __init__(self, session_id: Optional[str] = None):
self.session_id = session_id or str(uuid.uuid4())[:8]
self.started_at = datetime.now(timezone.utc).isoformat()
self.steps: list = []
self.final_report: Optional[dict] = None
self.duration_ms: Optional[int] = None
self._t0 = time.monotonic()
def log_step(self, step_name: str, step_input: dict, step_output: dict) -> None:
self.steps.append({
"step": step_name, # "classify" | "severity" | "cost"
"input": step_input,
"output": step_output,
"elapsed_ms": int((time.monotonic() - self._t0) * 1000),
})
def finalize(self, final_report: dict) -> None:
self.final_report = final_report
self.duration_ms = int((time.monotonic() - self._t0) * 1000)
def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"started_at": self.started_at,
"duration_ms": self.duration_ms,
"steps": self.steps,
"final_report": self.final_report,
}
class AgentTraceLogger:
"""
Manages trace collection and periodic HF Hub uploads.
Keeps traces in local JSONL buffer; flushes to Hub every FLUSH_EVERY_N calls.
"""
def __init__(self):
LOCAL_TRACE_DIR.mkdir(parents=True, exist_ok=True)
self._buffer_path = LOCAL_TRACE_DIR / "traces_buffer.jsonl"
self._count = 0
self._api = None
self._hf_ready = False
self._init_hf()
def _init_hf(self):
"""Try to authenticate with HF Hub. Silently skip if no token."""
hf_token = os.getenv("HF_TOKEN")
if hf_token:
try:
login(token=hf_token)
self._api = HfApi()
self._hf_ready = True
# Ensure dataset repo exists
self._api.create_repo(
repo_id=HF_TRACE_REPO,
repo_type="dataset",
exist_ok=True,
private=False,
)
logger.info(f"AgentTraceLogger ready β†’ {HF_TRACE_REPO}")
except Exception as e:
logger.warning(f"HF trace upload disabled: {e}")
else:
logger.warning("HF_TOKEN not set β€” traces saved locally only.")
def start_trace(self) -> TraceSession:
return TraceSession()
def save_trace(self, session: TraceSession) -> None:
"""Append trace to local buffer; flush to HF Hub every N traces."""
record = json.dumps(session.to_dict(), ensure_ascii=False)
with open(self._buffer_path, "a", encoding="utf-8") as f:
f.write(record + "\n")
self._count += 1
logger.debug(f"Trace saved [{self._count}]: session={session.session_id}")
if self._count % FLUSH_EVERY_N == 0 and self._hf_ready:
self._flush_to_hub()
def _flush_to_hub(self) -> None:
"""Upload local JSONL buffer to HF Hub dataset repo."""
try:
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
remote_filename = f"traces_{timestamp}.jsonl"
self._api.upload_file(
path_or_fileobj=str(self._buffer_path),
path_in_repo=f"data/{remote_filename}",
repo_id=HF_TRACE_REPO,
repo_type="dataset",
commit_message=f"Auto-flush: {self._count} traces",
)
# Reset local buffer after successful upload
self._buffer_path.write_text("")
logger.info(f"Traces flushed to Hub β†’ {HF_TRACE_REPO}/data/{remote_filename}")
except Exception as e:
logger.error(f"Trace flush failed: {e}")
def force_flush(self) -> None:
"""Call manually before Space shutdown to push remaining traces."""
if self._hf_ready and self._buffer_path.exists():
self._flush_to_hub()