Spaces:
Running
Running
| """ | |
| Session Logger β Structured pipeline event logger. | |
| Writes events to both a local JSONL file and the Supabase `session_logs` table. | |
| Falls back gracefully to file-only logging if Supabase is unavailable. | |
| =========================================================================== | |
| Supabase table setup (run once in Supabase SQL editor): | |
| =========================================================================== | |
| CREATE TABLE session_logs ( | |
| id BIGSERIAL PRIMARY KEY, | |
| session_id TEXT NOT NULL, | |
| user_email TEXT, | |
| ts TIMESTAMPTZ DEFAULT NOW(), | |
| stage TEXT, | |
| event TEXT NOT NULL, | |
| duration_ms INTEGER, | |
| error TEXT, | |
| meta JSONB | |
| ); | |
| CREATE INDEX ON session_logs (session_id); | |
| CREATE INDEX ON session_logs (user_email); | |
| CREATE INDEX ON session_logs (ts DESC); | |
| =========================================================================== | |
| Usage: | |
| from session_logger import init_session_logger, get_session_logger | |
| logger = init_session_logger("20260319_143000", user_email="alice@example.com") | |
| t = logger.log_start("research") | |
| # ... do work ... | |
| logger.log_end("research", t, model="gpt-4o", rows=42) | |
| logger.log("deploy", "Liveboard created", liveboard_id="abc-123") | |
| """ | |
| import json | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| # --------------------------------------------------------------------------- | |
| # SessionLogger class | |
| # --------------------------------------------------------------------------- | |
| class SessionLogger: | |
| """Structured event logger that writes to file and Supabase session_logs.""" | |
| TABLE = "session_logs" | |
| def __init__(self, session_id: str, user_email: str = None): | |
| """ | |
| Initialize the session logger. | |
| Args: | |
| session_id: Unique ID for this build session (e.g. datetime string). | |
| user_email: Email of the user running this session. | |
| """ | |
| self.session_id = session_id | |
| self.user_email = user_email | |
| # File log path: logs/sessions/{session_id}.log (one JSON line per event) | |
| script_dir = Path(__file__).parent | |
| log_dir = script_dir / "logs" / "sessions" | |
| try: | |
| log_dir.mkdir(parents=True, exist_ok=True) | |
| except Exception as e: | |
| print(f"[SessionLogger] Could not create log directory {log_dir}: {e}", file=sys.stderr) | |
| self._log_file = log_dir / f"{session_id}.log" | |
| # Try to initialise Supabase β never raise | |
| self._supabase_ok = False | |
| self._client = None | |
| self._init_supabase() | |
| def _init_supabase(self): | |
| """Attempt to connect to Supabase. Sets self._supabase_ok and self._client.""" | |
| try: | |
| # Lazy import to avoid circular imports | |
| from supabase_client import SupabaseSettings | |
| ss = SupabaseSettings() | |
| if ss.is_enabled(): | |
| self._client = ss.client | |
| self._supabase_ok = True | |
| except Exception as e: | |
| print(f"[SessionLogger] Supabase unavailable, falling back to file-only logging: {e}", | |
| file=sys.stderr) | |
| # ------------------------------------------------------------------ | |
| # Core log method | |
| # ------------------------------------------------------------------ | |
| def log(self, stage: str, event: str, duration_ms: int = None, | |
| error: str = None, **meta): | |
| """ | |
| Log one pipeline event. | |
| Args: | |
| stage: Pipeline stage name (e.g. 'research', 'deploy'). | |
| event: Short description of what happened. | |
| duration_ms: Optional elapsed time in milliseconds. | |
| error: Optional error message if the event represents a failure. | |
| **meta: Arbitrary key/value pairs stored in the meta JSONB column. | |
| """ | |
| ts = datetime.now(timezone.utc).isoformat() | |
| record = { | |
| "session_id": self.session_id, | |
| "user_email": self.user_email, | |
| "ts": ts, | |
| "stage": stage, | |
| "event": event, | |
| "duration_ms": duration_ms, | |
| "error": error, | |
| "meta": meta if meta else None, | |
| } | |
| # Always write to file | |
| self._write_file(record) | |
| # Write to Supabase if available | |
| if self._supabase_ok: | |
| self._write_supabase(record) | |
| def _write_file(self, record: dict): | |
| """Append one JSON line to the session log file. Never raises.""" | |
| try: | |
| with open(self._log_file, "a", encoding="utf-8") as fh: | |
| fh.write(json.dumps(record, default=str) + "\n") | |
| except Exception as e: | |
| print(f"[SessionLogger] File write failed: {e}", file=sys.stderr) | |
| def _write_supabase(self, record: dict): | |
| """Insert one row into session_logs. Never raises.""" | |
| try: | |
| # Build the insert payload, omitting None values for cleanliness | |
| payload = {k: v for k, v in record.items() if v is not None} | |
| self._client.table(self.TABLE).insert(payload).execute() | |
| except Exception as e: | |
| # Supabase write failure is non-fatal β demote to stderr | |
| print(f"[SessionLogger] Supabase write failed: {e}", file=sys.stderr) | |
| # Mark Supabase as unavailable so we stop trying for this session | |
| self._supabase_ok = False | |
| # ------------------------------------------------------------------ | |
| # Convenience helpers | |
| # ------------------------------------------------------------------ | |
| def log_start(self, stage: str) -> float: | |
| """ | |
| Log that a pipeline stage has started. | |
| Returns: | |
| Monotonic start time (pass to log_end). | |
| """ | |
| self.log(stage, f"{stage} started") | |
| return time.monotonic() | |
| def log_end(self, stage: str, start_time: float, error: str = None, **meta): | |
| """ | |
| Log that a pipeline stage has ended, computing duration from start_time. | |
| Args: | |
| stage: Pipeline stage name (must match the one passed to log_start). | |
| start_time: Value returned by the corresponding log_start call. | |
| error: Optional error message if the stage failed. | |
| **meta: Arbitrary key/value pairs stored in the meta column. | |
| """ | |
| elapsed_ms = int((time.monotonic() - start_time) * 1000) | |
| event = f"{stage} failed" if error else f"{stage} completed" | |
| self.log(stage, event, duration_ms=elapsed_ms, error=error, **meta) | |
| # --------------------------------------------------------------------------- | |
| # Module-level singleton helpers | |
| # --------------------------------------------------------------------------- | |
| _current_logger: Optional[SessionLogger] = None | |
| def get_session_logger() -> Optional[SessionLogger]: | |
| """Return the active SessionLogger, or None if not yet initialised.""" | |
| return _current_logger | |
| def init_session_logger(session_id: str, user_email: str = None) -> SessionLogger: | |
| """ | |
| Create (or replace) the module-level SessionLogger. | |
| Args: | |
| session_id: Unique ID for this build session. | |
| user_email: Email of the user running this session. | |
| Returns: | |
| The newly created SessionLogger instance. | |
| """ | |
| global _current_logger | |
| _current_logger = SessionLogger(session_id, user_email) | |
| return _current_logger | |