#!/usr/bin/env python3 """Generate usage reports from telemetry data. This script analyzes Mosaic telemetry data and generates reports for: - Cost tracking (app uptime and estimated costs) - Usage summary (analyses, slides, sessions) - Failure analysis Usage: # Full report (all time) python scripts/telemetry_report.py /path/to/telemetry # Daily report for yesterday (cron-friendly) python scripts/telemetry_report.py /path/to/telemetry --daily # Daily report for specific date python scripts/telemetry_report.py /path/to/telemetry --date 2026-01-20 # Email output (pipe to sendmail or use with cron) python scripts/telemetry_report.py /path/to/telemetry --daily --email user@example.com # Skip email if report is empty (useful for automated daily reports) python scripts/telemetry_report.py /path/to/telemetry --daily --email user@example.com --skip-empty # HTML format for email python scripts/telemetry_report.py /path/to/telemetry --daily --format html # Pull data from HuggingFace Dataset repository python scripts/telemetry_report.py --hf-repo PDM-Group/mosaic-telemetry # Pull from HF and save to specific directory python scripts/telemetry_report.py /path/to/telemetry --hf-repo PDM-Group/mosaic-telemetry Example cron entry (daily report at 8am, skip if empty): 0 8 * * * python /app/scripts/telemetry_report.py /data/telemetry --daily --email team@example.com --skip-empty """ import argparse import json import os import smtplib import sys from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from pathlib import Path from typing import Optional DEFAULT_HOURLY_RATE = 0.40 def load_events( telemetry_dir: Path, event_type: str, date: Optional[str] = None ) -> list: """Load events from JSONL files. Args: telemetry_dir: Base telemetry directory event_type: Type of event ("session", "usage", "resource", "failure") date: Optional date filter in YYYY-MM-DD format Returns: List of event dictionaries """ events = [] daily_dir = telemetry_dir / "daily" if not daily_dir.exists(): return events if date: # Load specific date file file_path = daily_dir / f"{event_type}_{date}.jsonl" if file_path.exists(): with open(file_path, encoding="utf-8") as fp: for line in fp: if line.strip(): events.append(json.loads(line)) else: # Load all files for f in daily_dir.glob(f"{event_type}_*.jsonl"): with open(f, encoding="utf-8") as fp: for line in fp: if line.strip(): events.append(json.loads(line)) return events def is_report_empty( sessions: list, usage: list, resources: list, failures: list ) -> bool: """Check if report would be empty (no meaningful data). Args: sessions: Session events usage: Usage events resources: Resource events failures: Failure events Returns: True if report is empty, False otherwise """ # Check if there are any meaningful events has_sessions = bool(sessions) has_usage = bool(usage) has_resources = bool(resources) has_failures = bool(failures) return not (has_sessions or has_usage or has_resources or has_failures) def generate_text_report(telemetry_dir: Path, date: Optional[str] = None) -> str: """Generate plain text report. Args: telemetry_dir: Base telemetry directory date: Optional date filter Returns: Report as string """ sessions = load_events(telemetry_dir, "session", date) usage = load_events(telemetry_dir, "usage", date) resources = load_events(telemetry_dir, "resource", date) failures = load_events(telemetry_dir, "failure", date) lines = [] date_label = f" for {date}" if date else " (All Time)" lines.append("=" * 60) lines.append(f"MOSAIC TELEMETRY REPORT{date_label}") lines.append("=" * 60) lines.append(f"Generated: {datetime.utcnow().isoformat()}Z") lines.append("") # Cost summary from session events if sessions: shutdowns = [s for s in sessions if s.get("event_type") == "app_shutdown"] # For running instances without shutdowns, use the latest heartbeat per session if not shutdowns: # Group heartbeats by app_start_time to identify unique sessions heartbeats = [s for s in sessions if s.get("event_type") == "heartbeat"] if heartbeats: # Get the latest heartbeat for each session (by app_start_time) sessions_by_start = {} for hb in heartbeats: start_time = hb.get("app_start_time") if start_time: if start_time not in sessions_by_start or hb.get( "uptime_sec", 0 ) > sessions_by_start[start_time].get("uptime_sec", 0): sessions_by_start[start_time] = hb shutdowns = list(sessions_by_start.values()) if shutdowns: total_uptime_sec = sum(s.get("uptime_sec", 0) for s in shutdowns) total_uptime_hrs = total_uptime_sec / 3600 total_analysis_sec = sum(s.get("analysis_time_sec", 0) for s in shutdowns) total_analysis_hrs = total_analysis_sec / 3600 total_idle_hrs = total_uptime_hrs - total_analysis_hrs # Use hourly_rate from data, fallback to DEFAULT if missing or zero hourly_rate = shutdowns[0].get("hourly_rate") or DEFAULT_HOURLY_RATE total_cost = total_uptime_hrs * hourly_rate analysis_count = sum(s.get("analysis_count", 0) for s in shutdowns) utilization = ( (total_analysis_hrs / total_uptime_hrs * 100) if total_uptime_hrs > 0 else 0 ) # Check if these are from running instances (heartbeats) vs completed (shutdowns) is_running = all(s.get("event_type") == "heartbeat" for s in shutdowns) session_label = ( f"Running sessions: {len(shutdowns)}" if is_running else f"App sessions: {len(shutdowns)}" ) lines.append("=== COST SUMMARY ===") lines.append(session_label) lines.append(f"Total uptime: {total_uptime_hrs:.2f} hours") lines.append( f" - Active analysis: {total_analysis_hrs:.2f} hrs ({utilization:.1f}%)" ) lines.append( f" - Idle time: {total_idle_hrs:.2f} hrs ({100-utilization:.1f}%)" ) lines.append(f"Estimated cost: ${total_cost:.2f} (@ ${hourly_rate}/hr)") if analysis_count > 0: lines.append(f"Cost per analysis: ${total_cost / analysis_count:.2f}") lines.append("") # Usage summary if usage: starts = [u for u in usage if u.get("event_type") == "analysis_start"] completes = [u for u in usage if u.get("event_type") == "analysis_complete"] successful = [c for c in completes if c.get("success", False)] total_slides = sum(s.get("slide_count", 0) for s in starts) unique_sessions = len( set(u.get("session_hash") for u in usage if u.get("session_hash")) ) # Calculate average duration durations = [ c.get("duration_sec", 0) for c in completes if c.get("duration_sec") ] avg_duration = sum(durations) / len(durations) if durations else 0 lines.append("=== USAGE SUMMARY ===") lines.append(f"Analyses started: {len(starts)}") lines.append(f"Analyses completed: {len(completes)}") lines.append(f"Successful analyses: {len(successful)}") lines.append(f"Total slides processed: {total_slides}") lines.append(f"Unique sessions: {unique_sessions}") if avg_duration > 0: lines.append(f"Average analysis duration: {avg_duration:.1f}s") lines.append("") # Breakdown by settings site_types = {} seg_configs = {} for s in starts: st = s.get("site_type", "Unknown") site_types[st] = site_types.get(st, 0) + 1 sc = s.get("seg_config", "Unknown") seg_configs[sc] = seg_configs.get(sc, 0) + 1 if site_types: lines.append("By site type:") for st, count in sorted(site_types.items(), key=lambda x: -x[1]): lines.append(f" {st}: {count}") lines.append("") if seg_configs: lines.append("By segmentation config:") for sc, count in sorted(seg_configs.items(), key=lambda x: -x[1]): lines.append(f" {sc}: {count}") lines.append("") # Resource summary if resources: total_duration = sum(r.get("total_duration_sec", 0) for r in resources) total_tiles = sum( r.get("tile_count", 0) for r in resources if r.get("tile_count") ) peak_memory = max( (r.get("peak_gpu_memory_gb", 0) for r in resources), default=0 ) lines.append("=== RESOURCE SUMMARY ===") lines.append(f"Total slide processing time: {total_duration / 3600:.2f} hours") lines.append(f"Total tiles processed: {total_tiles:,}") if peak_memory > 0: lines.append(f"Peak GPU memory: {peak_memory:.2f} GB") lines.append("") # Failure summary if failures: lines.append(f"=== FAILURES ({len(failures)}) ===") error_counts = {} for f in failures: error_type = f.get("error_type", "Unknown") error_counts[error_type] = error_counts.get(error_type, 0) + 1 for error_type, count in sorted(error_counts.items(), key=lambda x: -x[1])[:10]: lines.append(f" {error_type}: {count}") # Show recent failure messages lines.append("") lines.append("Recent failure messages:") for f in failures[-5:]: msg = f.get("error_message", "")[:100] stage = f.get("error_stage", "unknown") lines.append(f" [{stage}] {msg}") lines.append("") else: lines.append("=== NO FAILURES ===") lines.append("") lines.append("=" * 60) return "\n".join(lines) def generate_html_report(telemetry_dir: Path, date: Optional[str] = None) -> str: """Generate HTML report. Args: telemetry_dir: Base telemetry directory date: Optional date filter Returns: Report as HTML string """ sessions = load_events(telemetry_dir, "session", date) usage = load_events(telemetry_dir, "usage", date) resources = load_events(telemetry_dir, "resource", date) failures = load_events(telemetry_dir, "failure", date) date_label = f" for {date}" if date else " (All Time)" html = [] html.append("") html.append("") html.append("") html.append(f"Mosaic Telemetry Report{date_label}") html.append("") html.append("") html.append(f"

Mosaic Telemetry Report{date_label}

") html.append(f"

Generated: {datetime.utcnow().isoformat()}Z

") # Cost summary if sessions: shutdowns = [s for s in sessions if s.get("event_type") == "app_shutdown"] # For running instances without shutdowns, use the latest heartbeat per session if not shutdowns: heartbeats = [s for s in sessions if s.get("event_type") == "heartbeat"] if heartbeats: sessions_by_start = {} for hb in heartbeats: start_time = hb.get("app_start_time") if start_time: if start_time not in sessions_by_start or hb.get( "uptime_sec", 0 ) > sessions_by_start[start_time].get("uptime_sec", 0): sessions_by_start[start_time] = hb shutdowns = list(sessions_by_start.values()) if shutdowns: total_uptime_sec = sum(s.get("uptime_sec", 0) for s in shutdowns) total_uptime_hrs = total_uptime_sec / 3600 total_analysis_sec = sum(s.get("analysis_time_sec", 0) for s in shutdowns) total_analysis_hrs = total_analysis_sec / 3600 hourly_rate = shutdowns[0].get("hourly_rate") or DEFAULT_HOURLY_RATE total_cost = total_uptime_hrs * hourly_rate analysis_count = sum(s.get("analysis_count", 0) for s in shutdowns) utilization = ( (total_analysis_hrs / total_uptime_hrs * 100) if total_uptime_hrs > 0 else 0 ) is_running = all(s.get("event_type") == "heartbeat" for s in shutdowns) session_label = ( f"Running sessions: {len(shutdowns)}" if is_running else f"App sessions: {len(shutdowns)}" ) html.append("

Cost Summary

") html.append("") html.append( f"" ) html.append( f"" ) html.append( f"" ) html.append( f"" ) if analysis_count > 0: html.append( f"" ) html.append("
{session_label.split(':')[0]}{len(shutdowns)}
Total uptime{total_uptime_hrs:.2f} hours
Active analysis time{total_analysis_hrs:.2f} hours ({utilization:.1f}%)
Estimated cost${total_cost:.2f}
Cost per analysis${total_cost/analysis_count:.2f}
") # Usage summary if usage: starts = [u for u in usage if u.get("event_type") == "analysis_start"] completes = [u for u in usage if u.get("event_type") == "analysis_complete"] successful = [c for c in completes if c.get("success", False)] total_slides = sum(s.get("slide_count", 0) for s in starts) unique_sessions = len( set(u.get("session_hash") for u in usage if u.get("session_hash")) ) html.append("

Usage Summary

") html.append("") html.append(f"") html.append(f"") html.append( f"" ) html.append(f"") html.append(f"") html.append("
Analyses started{len(starts)}
Analyses completed{len(completes)}
Successful analyses{len(successful)}
Total slides{total_slides}
Unique sessions{unique_sessions}
") # Failures if failures: html.append(f"

Failures ({len(failures)})

") html.append("") html.append("") error_counts = {} for f in failures: error_type = f.get("error_type", "Unknown") error_counts[error_type] = error_counts.get(error_type, 0) + 1 for error_type, count in sorted(error_counts.items(), key=lambda x: -x[1])[:10]: html.append(f"") html.append("
Error TypeCount
{error_type}{count}
") html.append("") return "\n".join(html) def send_email(report: str, to_email: str, subject: str, format: str = "text"): """Send report via email using SMTP. Args: report: Report content to_email: Recipient email address subject: Email subject format: "text" or "html" """ from_email = os.environ.get("SMTP_FROM", "mosaic-telemetry@noreply.local") smtp_host = os.environ.get("SMTP_HOST", "localhost") smtp_port_env = os.environ.get("SMTP_PORT", "25") try: smtp_port = int(smtp_port_env) except ValueError: smtp_port = 25 smtp_user = os.environ.get("SMTP_USER") smtp_pass = os.environ.get("SMTP_PASS") msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = from_email msg["To"] = to_email if format == "html": msg.attach(MIMEText(report, "html")) else: msg.attach(MIMEText(report, "plain")) with smtplib.SMTP(smtp_host, smtp_port) as server: if smtp_user and smtp_pass: server.starttls() server.login(smtp_user, smtp_pass) server.sendmail(from_email, [to_email], msg.as_string()) def download_from_hf(repo_id: str, telemetry_dir: Path) -> bool: """Download telemetry data from HuggingFace Dataset repository. Args: repo_id: HuggingFace Dataset repository ID telemetry_dir: Local directory to store downloaded files Returns: True if download was successful, False otherwise """ try: from mosaic.telemetry.storage import TelemetryStorage except ImportError: # Fallback for standalone usage without mosaic installed try: from huggingface_hub import HfApi, hf_hub_download except ImportError: print( "huggingface_hub not installed. Install with: pip install huggingface-hub", file=sys.stderr, ) return False api = HfApi() daily_dir = telemetry_dir / "daily" daily_dir.mkdir(parents=True, exist_ok=True) try: files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") except Exception as e: print(f"Failed to list files in {repo_id}: {e}", file=sys.stderr) return False jsonl_files = [ f for f in files if f.startswith("daily/") and f.endswith(".jsonl") ] if not jsonl_files: print(f"No telemetry files found in {repo_id}", file=sys.stderr) return False downloaded = 0 for remote_path in jsonl_files: try: local_path = hf_hub_download( repo_id=repo_id, filename=remote_path, repo_type="dataset", ) filename = os.path.basename(remote_path) target_path = daily_dir / filename with open(local_path, "r", encoding="utf-8") as f: remote_content = f.read() if target_path.exists(): with open(target_path, "r", encoding="utf-8") as f: local_content = f.read() local_lines = ( set(local_content.strip().split("\n")) if local_content.strip() else set() ) remote_lines = ( remote_content.strip().split("\n") if remote_content.strip() else [] ) new_lines = [ line for line in remote_lines if line and line not in local_lines ] if new_lines: with open(target_path, "a", encoding="utf-8") as f: for line in new_lines: f.write(line + "\n") print(f"Merged {len(new_lines)} new events into {filename}") else: with open(target_path, "w", encoding="utf-8") as f: f.write(remote_content) print(f"Downloaded: {filename}") downloaded += 1 except Exception as e: print(f"Failed to download {remote_path}: {e}", file=sys.stderr) return downloaded > 0 # Use TelemetryStorage if mosaic is available storage = TelemetryStorage(telemetry_dir) return storage.download_from_hf_dataset(repo_id) def main(): parser = argparse.ArgumentParser( description="Generate Mosaic telemetry reports", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument( "telemetry_dir", type=Path, nargs="?", default=Path("/tmp/mosaic_telemetry"), help="Telemetry directory (default: /tmp/mosaic_telemetry)", ) parser.add_argument( "--daily", action="store_true", help="Report for yesterday only", ) parser.add_argument( "--date", type=str, help="Report for specific date (YYYY-MM-DD)", ) parser.add_argument( "--email", type=str, help="Send report to this email address", ) parser.add_argument( "--format", choices=["text", "html"], default="text", help="Output format (default: text)", ) parser.add_argument( "--hf-repo", type=str, help="HuggingFace Dataset repository to pull telemetry from (e.g., PDM-Group/mosaic-telemetry)", ) parser.add_argument( "--skip-empty", action="store_true", help="Skip sending email if report has no data (useful for automated daily reports)", ) args = parser.parse_args() # If HF repo specified, download to a clean temp directory if args.hf_repo: import tempfile # Use a clean temp directory to avoid mixing with local data temp_dir = Path(tempfile.mkdtemp(prefix="mosaic_telemetry_")) print(f"Downloading telemetry from {args.hf_repo}...") if not download_from_hf(args.hf_repo, temp_dir): print( "Warning: Failed to download some or all telemetry data", file=sys.stderr, ) # Use the temp directory for report generation args.telemetry_dir = temp_dir if not args.telemetry_dir.exists(): print(f"Telemetry directory not found: {args.telemetry_dir}", file=sys.stderr) sys.exit(1) # Determine date filter date = args.date if args.daily and not date: date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") # Check if report would be empty before generating if args.skip_empty: sessions = load_events(args.telemetry_dir, "session", date) usage = load_events(args.telemetry_dir, "usage", date) resources = load_events(args.telemetry_dir, "resource", date) failures = load_events(args.telemetry_dir, "failure", date) if is_report_empty(sessions, usage, resources, failures): print(f"Skipping empty report for {date or 'all time'}") sys.exit(0) # Generate report if args.format == "html": report = generate_html_report(args.telemetry_dir, date=date) else: report = generate_text_report(args.telemetry_dir, date=date) # Output if args.email: subject = f"Mosaic Telemetry Report - {date or 'All Time'}" try: send_email(report, args.email, subject, args.format) print(f"Report sent to {args.email}") except Exception as e: print(f"Failed to send email: {e}", file=sys.stderr) print(report) # Print report to stdout as fallback sys.exit(1) else: print(report) if __name__ == "__main__": main()