""" Session Logger — Structured pipeline event logger. Writes events to both a local JSONL file and the Supabase `session_logs` table. Falls back gracefully to file-only logging if Supabase is unavailable. =========================================================================== Supabase table setup (run once in Supabase SQL editor): =========================================================================== CREATE TABLE session_logs ( id BIGSERIAL PRIMARY KEY, session_id TEXT NOT NULL, user_email TEXT, ts TIMESTAMPTZ DEFAULT NOW(), stage TEXT, event TEXT NOT NULL, duration_ms INTEGER, error TEXT, meta JSONB ); CREATE INDEX ON session_logs (session_id); CREATE INDEX ON session_logs (user_email); CREATE INDEX ON session_logs (ts DESC); =========================================================================== Usage: from session_logger import init_session_logger, get_session_logger logger = init_session_logger("20260319_143000", user_email="alice@example.com") t = logger.log_start("research") # ... do work ... logger.log_end("research", t, model="gpt-4o", rows=42) logger.log("deploy", "Liveboard created", liveboard_id="abc-123") """ import json import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional # --------------------------------------------------------------------------- # SessionLogger class # --------------------------------------------------------------------------- class SessionLogger: """Structured event logger that writes to file and Supabase session_logs.""" TABLE = "session_logs" def __init__(self, session_id: str, user_email: str = None): """ Initialize the session logger. Args: session_id: Unique ID for this build session (e.g. datetime string). user_email: Email of the user running this session. """ self.session_id = session_id self.user_email = user_email # File log path: logs/sessions/{session_id}.log (one JSON line per event) script_dir = Path(__file__).parent log_dir = script_dir / "logs" / "sessions" try: log_dir.mkdir(parents=True, exist_ok=True) except Exception as e: print(f"[SessionLogger] Could not create log directory {log_dir}: {e}", file=sys.stderr) self._log_file = log_dir / f"{session_id}.log" # Try to initialise Supabase — never raise self._supabase_ok = False self._client = None self._init_supabase() def _init_supabase(self): """Attempt to connect to Supabase. Sets self._supabase_ok and self._client.""" try: # Lazy import to avoid circular imports from supabase_client import SupabaseSettings ss = SupabaseSettings() if ss.is_enabled(): self._client = ss.client self._supabase_ok = True except Exception as e: print(f"[SessionLogger] Supabase unavailable, falling back to file-only logging: {e}", file=sys.stderr) # ------------------------------------------------------------------ # Core log method # ------------------------------------------------------------------ def log(self, stage: str, event: str, duration_ms: int = None, error: str = None, **meta): """ Log one pipeline event. Args: stage: Pipeline stage name (e.g. 'research', 'deploy'). event: Short description of what happened. duration_ms: Optional elapsed time in milliseconds. error: Optional error message if the event represents a failure. **meta: Arbitrary key/value pairs stored in the meta JSONB column. """ ts = datetime.now(timezone.utc).isoformat() record = { "session_id": self.session_id, "user_email": self.user_email, "ts": ts, "stage": stage, "event": event, "duration_ms": duration_ms, "error": error, "meta": meta if meta else None, } # Always write to file self._write_file(record) # Write to Supabase if available if self._supabase_ok: self._write_supabase(record) def _write_file(self, record: dict): """Append one JSON line to the session log file. Never raises.""" try: with open(self._log_file, "a", encoding="utf-8") as fh: fh.write(json.dumps(record, default=str) + "\n") except Exception as e: print(f"[SessionLogger] File write failed: {e}", file=sys.stderr) def _write_supabase(self, record: dict): """Insert one row into session_logs. Never raises.""" try: # Build the insert payload, omitting None values for cleanliness payload = {k: v for k, v in record.items() if v is not None} self._client.table(self.TABLE).insert(payload).execute() except Exception as e: # Supabase write failure is non-fatal — demote to stderr print(f"[SessionLogger] Supabase write failed: {e}", file=sys.stderr) # Mark Supabase as unavailable so we stop trying for this session self._supabase_ok = False # ------------------------------------------------------------------ # Convenience helpers # ------------------------------------------------------------------ def log_start(self, stage: str) -> float: """ Log that a pipeline stage has started. Returns: Monotonic start time (pass to log_end). """ self.log(stage, f"{stage} started") return time.monotonic() def log_end(self, stage: str, start_time: float, error: str = None, **meta): """ Log that a pipeline stage has ended, computing duration from start_time. Args: stage: Pipeline stage name (must match the one passed to log_start). start_time: Value returned by the corresponding log_start call. error: Optional error message if the stage failed. **meta: Arbitrary key/value pairs stored in the meta column. """ elapsed_ms = int((time.monotonic() - start_time) * 1000) event = f"{stage} failed" if error else f"{stage} completed" self.log(stage, event, duration_ms=elapsed_ms, error=error, **meta) # --------------------------------------------------------------------------- # Module-level singleton helpers # --------------------------------------------------------------------------- _current_logger: Optional[SessionLogger] = None def get_session_logger() -> Optional[SessionLogger]: """Return the active SessionLogger, or None if not yet initialised.""" return _current_logger def init_session_logger(session_id: str, user_email: str = None) -> SessionLogger: """ Create (or replace) the module-level SessionLogger. Args: session_id: Unique ID for this build session. user_email: Email of the user running this session. Returns: The newly created SessionLogger instance. """ global _current_logger _current_logger = SessionLogger(session_id, user_email) return _current_logger