demoprep / session_logger.py
mikeboone's picture
feat: March 2026 sprint β€” new vision merge, pipeline improvements, settings refactor
5ac32c1
"""
Session Logger β€” Structured pipeline event logger.
Writes events to both a local JSONL file and the Supabase `session_logs` table.
Falls back gracefully to file-only logging if Supabase is unavailable.
===========================================================================
Supabase table setup (run once in Supabase SQL editor):
===========================================================================
CREATE TABLE session_logs (
id BIGSERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
user_email TEXT,
ts TIMESTAMPTZ DEFAULT NOW(),
stage TEXT,
event TEXT NOT NULL,
duration_ms INTEGER,
error TEXT,
meta JSONB
);
CREATE INDEX ON session_logs (session_id);
CREATE INDEX ON session_logs (user_email);
CREATE INDEX ON session_logs (ts DESC);
===========================================================================
Usage:
from session_logger import init_session_logger, get_session_logger
logger = init_session_logger("20260319_143000", user_email="alice@example.com")
t = logger.log_start("research")
# ... do work ...
logger.log_end("research", t, model="gpt-4o", rows=42)
logger.log("deploy", "Liveboard created", liveboard_id="abc-123")
"""
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# SessionLogger class
# ---------------------------------------------------------------------------
class SessionLogger:
"""Structured event logger that writes to file and Supabase session_logs."""
TABLE = "session_logs"
def __init__(self, session_id: str, user_email: str = None):
"""
Initialize the session logger.
Args:
session_id: Unique ID for this build session (e.g. datetime string).
user_email: Email of the user running this session.
"""
self.session_id = session_id
self.user_email = user_email
# File log path: logs/sessions/{session_id}.log (one JSON line per event)
script_dir = Path(__file__).parent
log_dir = script_dir / "logs" / "sessions"
try:
log_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[SessionLogger] Could not create log directory {log_dir}: {e}", file=sys.stderr)
self._log_file = log_dir / f"{session_id}.log"
# Try to initialise Supabase β€” never raise
self._supabase_ok = False
self._client = None
self._init_supabase()
def _init_supabase(self):
"""Attempt to connect to Supabase. Sets self._supabase_ok and self._client."""
try:
# Lazy import to avoid circular imports
from supabase_client import SupabaseSettings
ss = SupabaseSettings()
if ss.is_enabled():
self._client = ss.client
self._supabase_ok = True
except Exception as e:
print(f"[SessionLogger] Supabase unavailable, falling back to file-only logging: {e}",
file=sys.stderr)
# ------------------------------------------------------------------
# Core log method
# ------------------------------------------------------------------
def log(self, stage: str, event: str, duration_ms: int = None,
error: str = None, **meta):
"""
Log one pipeline event.
Args:
stage: Pipeline stage name (e.g. 'research', 'deploy').
event: Short description of what happened.
duration_ms: Optional elapsed time in milliseconds.
error: Optional error message if the event represents a failure.
**meta: Arbitrary key/value pairs stored in the meta JSONB column.
"""
ts = datetime.now(timezone.utc).isoformat()
record = {
"session_id": self.session_id,
"user_email": self.user_email,
"ts": ts,
"stage": stage,
"event": event,
"duration_ms": duration_ms,
"error": error,
"meta": meta if meta else None,
}
# Always write to file
self._write_file(record)
# Write to Supabase if available
if self._supabase_ok:
self._write_supabase(record)
def _write_file(self, record: dict):
"""Append one JSON line to the session log file. Never raises."""
try:
with open(self._log_file, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record, default=str) + "\n")
except Exception as e:
print(f"[SessionLogger] File write failed: {e}", file=sys.stderr)
def _write_supabase(self, record: dict):
"""Insert one row into session_logs. Never raises."""
try:
# Build the insert payload, omitting None values for cleanliness
payload = {k: v for k, v in record.items() if v is not None}
self._client.table(self.TABLE).insert(payload).execute()
except Exception as e:
# Supabase write failure is non-fatal β€” demote to stderr
print(f"[SessionLogger] Supabase write failed: {e}", file=sys.stderr)
# Mark Supabase as unavailable so we stop trying for this session
self._supabase_ok = False
# ------------------------------------------------------------------
# Convenience helpers
# ------------------------------------------------------------------
def log_start(self, stage: str) -> float:
"""
Log that a pipeline stage has started.
Returns:
Monotonic start time (pass to log_end).
"""
self.log(stage, f"{stage} started")
return time.monotonic()
def log_end(self, stage: str, start_time: float, error: str = None, **meta):
"""
Log that a pipeline stage has ended, computing duration from start_time.
Args:
stage: Pipeline stage name (must match the one passed to log_start).
start_time: Value returned by the corresponding log_start call.
error: Optional error message if the stage failed.
**meta: Arbitrary key/value pairs stored in the meta column.
"""
elapsed_ms = int((time.monotonic() - start_time) * 1000)
event = f"{stage} failed" if error else f"{stage} completed"
self.log(stage, event, duration_ms=elapsed_ms, error=error, **meta)
# ---------------------------------------------------------------------------
# Module-level singleton helpers
# ---------------------------------------------------------------------------
_current_logger: Optional[SessionLogger] = None
def get_session_logger() -> Optional[SessionLogger]:
"""Return the active SessionLogger, or None if not yet initialised."""
return _current_logger
def init_session_logger(session_id: str, user_email: str = None) -> SessionLogger:
"""
Create (or replace) the module-level SessionLogger.
Args:
session_id: Unique ID for this build session.
user_email: Email of the user running this session.
Returns:
The newly created SessionLogger instance.
"""
global _current_logger
_current_logger = SessionLogger(session_id, user_email)
return _current_logger