| """ |
| Session Insights Engine for Hermes Agent. |
| |
| Analyzes historical session data from the SQLite state database to produce |
| comprehensive usage insights β token consumption, cost estimates, tool usage |
| patterns, activity trends, model/platform breakdowns, and session metrics. |
| |
| Inspired by Claude Code's /insights command, adapted for Hermes Agent's |
| multi-platform architecture with additional cost estimation and platform |
| breakdown capabilities. |
| |
| Usage: |
| from agent.insights import InsightsEngine |
| engine = InsightsEngine(db) |
| report = engine.generate(days=30) |
| print(engine.format_terminal(report)) |
| """ |
|
|
| import json |
| import time |
| from collections import Counter, defaultdict |
| from datetime import datetime |
| from typing import Any, Dict, List |
|
|
| from agent.usage_pricing import ( |
| CanonicalUsage, |
| DEFAULT_PRICING, |
| estimate_usage_cost, |
| format_duration_compact, |
| get_pricing, |
| has_known_pricing, |
| ) |
|
|
| _DEFAULT_PRICING = DEFAULT_PRICING |
|
|
|
|
| def _has_known_pricing(model_name: str, provider: str = None, base_url: str = None) -> bool: |
| """Check if a model has known pricing (vs unknown/custom endpoint).""" |
| return has_known_pricing(model_name, provider=provider, base_url=base_url) |
|
|
|
|
| def _get_pricing(model_name: str) -> Dict[str, float]: |
| """Look up pricing for a model. Uses fuzzy matching on model name. |
| |
| Returns _DEFAULT_PRICING (zero cost) for unknown/custom models β |
| we can't assume costs for self-hosted endpoints, local inference, etc. |
| """ |
| return get_pricing(model_name) |
|
|
|
|
| def _estimate_cost( |
| session_or_model: Dict[str, Any] | str, |
| input_tokens: int = 0, |
| output_tokens: int = 0, |
| *, |
| cache_read_tokens: int = 0, |
| cache_write_tokens: int = 0, |
| provider: str = None, |
| base_url: str = None, |
| ) -> tuple[float, str]: |
| """Estimate the USD cost for a session row or a model/token tuple.""" |
| if isinstance(session_or_model, dict): |
| session = session_or_model |
| model = session.get("model") or "" |
| usage = CanonicalUsage( |
| input_tokens=session.get("input_tokens") or 0, |
| output_tokens=session.get("output_tokens") or 0, |
| cache_read_tokens=session.get("cache_read_tokens") or 0, |
| cache_write_tokens=session.get("cache_write_tokens") or 0, |
| ) |
| provider = session.get("billing_provider") |
| base_url = session.get("billing_base_url") |
| else: |
| model = session_or_model or "" |
| usage = CanonicalUsage( |
| input_tokens=input_tokens, |
| output_tokens=output_tokens, |
| cache_read_tokens=cache_read_tokens, |
| cache_write_tokens=cache_write_tokens, |
| ) |
| result = estimate_usage_cost( |
| model, |
| usage, |
| provider=provider, |
| base_url=base_url, |
| ) |
| return float(result.amount_usd or 0.0), result.status |
|
|
|
|
| def _format_duration(seconds: float) -> str: |
| """Format seconds into a human-readable duration string.""" |
| return format_duration_compact(seconds) |
|
|
|
|
| def _bar_chart(values: List[int], max_width: int = 20) -> List[str]: |
| """Create simple horizontal bar chart strings from values.""" |
| peak = max(values) if values else 1 |
| if peak == 0: |
| return ["" for _ in values] |
| return ["β" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values] |
|
|
|
|
| class InsightsEngine: |
| """ |
| Analyzes session history and produces usage insights. |
| |
| Works directly with a SessionDB instance (or raw sqlite3 connection) |
| to query session and message data. |
| """ |
|
|
| def __init__(self, db): |
| """ |
| Initialize with a SessionDB instance. |
| |
| Args: |
| db: A SessionDB instance (from hermes_state.py) |
| """ |
| self.db = db |
| self._conn = db._conn |
|
|
| def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]: |
| """ |
| Generate a complete insights report. |
| |
| Args: |
| days: Number of days to look back (default: 30) |
| source: Optional filter by source platform |
| |
| Returns: |
| Dict with all computed insights |
| """ |
| cutoff = time.time() - (days * 86400) |
|
|
| |
| sessions = self._get_sessions(cutoff, source) |
| tool_usage = self._get_tool_usage(cutoff, source) |
| message_stats = self._get_message_stats(cutoff, source) |
|
|
| if not sessions: |
| return { |
| "days": days, |
| "source_filter": source, |
| "empty": True, |
| "overview": {}, |
| "models": [], |
| "platforms": [], |
| "tools": [], |
| "activity": {}, |
| "top_sessions": [], |
| } |
|
|
| |
| overview = self._compute_overview(sessions, message_stats) |
| models = self._compute_model_breakdown(sessions) |
| platforms = self._compute_platform_breakdown(sessions) |
| tools = self._compute_tool_breakdown(tool_usage) |
| activity = self._compute_activity_patterns(sessions) |
| top_sessions = self._compute_top_sessions(sessions) |
|
|
| return { |
| "days": days, |
| "source_filter": source, |
| "empty": False, |
| "generated_at": time.time(), |
| "overview": overview, |
| "models": models, |
| "platforms": platforms, |
| "tools": tools, |
| "activity": activity, |
| "top_sessions": top_sessions, |
| } |
|
|
| |
| |
| |
|
|
| |
| _SESSION_COLS = ("id, source, model, started_at, ended_at, " |
| "message_count, tool_call_count, input_tokens, output_tokens, " |
| "cache_read_tokens, cache_write_tokens, billing_provider, " |
| "billing_base_url, billing_mode, estimated_cost_usd, " |
| "actual_cost_usd, cost_status, cost_source") |
|
|
| |
| |
| _GET_SESSIONS_WITH_SOURCE = ( |
| f"SELECT {_SESSION_COLS} FROM sessions" |
| " WHERE started_at >= ? AND source = ?" |
| " ORDER BY started_at DESC" |
| ) |
| _GET_SESSIONS_ALL = ( |
| f"SELECT {_SESSION_COLS} FROM sessions" |
| " WHERE started_at >= ?" |
| " ORDER BY started_at DESC" |
| ) |
|
|
| def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]: |
| """Fetch sessions within the time window.""" |
| if source: |
| cursor = self._conn.execute(self._GET_SESSIONS_WITH_SOURCE, (cutoff, source)) |
| else: |
| cursor = self._conn.execute(self._GET_SESSIONS_ALL, (cutoff,)) |
| return [dict(row) for row in cursor.fetchall()] |
|
|
| def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]: |
| """Get tool call counts from messages. |
| |
| Uses two sources: |
| 1. tool_name column on 'tool' role messages (set by gateway) |
| 2. tool_calls JSON on 'assistant' role messages (covers CLI where |
| tool_name is not populated on tool responses) |
| """ |
| tool_counts = Counter() |
|
|
| |
| if source: |
| cursor = self._conn.execute( |
| """SELECT m.tool_name, COUNT(*) as count |
| FROM messages m |
| JOIN sessions s ON s.id = m.session_id |
| WHERE s.started_at >= ? AND s.source = ? |
| AND m.role = 'tool' AND m.tool_name IS NOT NULL |
| GROUP BY m.tool_name |
| ORDER BY count DESC""", |
| (cutoff, source), |
| ) |
| else: |
| cursor = self._conn.execute( |
| """SELECT m.tool_name, COUNT(*) as count |
| FROM messages m |
| JOIN sessions s ON s.id = m.session_id |
| WHERE s.started_at >= ? |
| AND m.role = 'tool' AND m.tool_name IS NOT NULL |
| GROUP BY m.tool_name |
| ORDER BY count DESC""", |
| (cutoff,), |
| ) |
| for row in cursor.fetchall(): |
| tool_counts[row["tool_name"]] += row["count"] |
|
|
| |
| |
| if source: |
| cursor2 = self._conn.execute( |
| """SELECT m.tool_calls |
| FROM messages m |
| JOIN sessions s ON s.id = m.session_id |
| WHERE s.started_at >= ? AND s.source = ? |
| AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""", |
| (cutoff, source), |
| ) |
| else: |
| cursor2 = self._conn.execute( |
| """SELECT m.tool_calls |
| FROM messages m |
| JOIN sessions s ON s.id = m.session_id |
| WHERE s.started_at >= ? |
| AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""", |
| (cutoff,), |
| ) |
|
|
| tool_calls_counts = Counter() |
| for row in cursor2.fetchall(): |
| try: |
| calls = row["tool_calls"] |
| if isinstance(calls, str): |
| calls = json.loads(calls) |
| if isinstance(calls, list): |
| for call in calls: |
| func = call.get("function", {}) if isinstance(call, dict) else {} |
| name = func.get("name") |
| if name: |
| tool_calls_counts[name] += 1 |
| except (json.JSONDecodeError, TypeError, AttributeError): |
| continue |
|
|
| |
| |
| if not tool_counts and tool_calls_counts: |
| |
| tool_counts = tool_calls_counts |
| elif tool_counts and tool_calls_counts: |
| |
| |
| all_tools = set(tool_counts) | set(tool_calls_counts) |
| merged = Counter() |
| for tool in all_tools: |
| merged[tool] = max(tool_counts.get(tool, 0), tool_calls_counts.get(tool, 0)) |
| tool_counts = merged |
|
|
| |
| return [ |
| {"tool_name": name, "count": count} |
| for name, count in tool_counts.most_common() |
| ] |
|
|
| def _get_message_stats(self, cutoff: float, source: str = None) -> Dict: |
| """Get aggregate message statistics.""" |
| if source: |
| cursor = self._conn.execute( |
| """SELECT |
| COUNT(*) as total_messages, |
| SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages, |
| SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, |
| SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages |
| FROM messages m |
| JOIN sessions s ON s.id = m.session_id |
| WHERE s.started_at >= ? AND s.source = ?""", |
| (cutoff, source), |
| ) |
| else: |
| cursor = self._conn.execute( |
| """SELECT |
| COUNT(*) as total_messages, |
| SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages, |
| SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, |
| SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages |
| FROM messages m |
| JOIN sessions s ON s.id = m.session_id |
| WHERE s.started_at >= ?""", |
| (cutoff,), |
| ) |
| row = cursor.fetchone() |
| return dict(row) if row else { |
| "total_messages": 0, "user_messages": 0, |
| "assistant_messages": 0, "tool_messages": 0, |
| } |
|
|
| |
| |
| |
|
|
| def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict: |
| """Compute high-level overview statistics.""" |
| total_input = sum(s.get("input_tokens") or 0 for s in sessions) |
| total_output = sum(s.get("output_tokens") or 0 for s in sessions) |
| total_cache_read = sum(s.get("cache_read_tokens") or 0 for s in sessions) |
| total_cache_write = sum(s.get("cache_write_tokens") or 0 for s in sessions) |
| total_tokens = total_input + total_output + total_cache_read + total_cache_write |
| total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions) |
| total_messages = sum(s.get("message_count") or 0 for s in sessions) |
|
|
| |
| total_cost = 0.0 |
| actual_cost = 0.0 |
| models_with_pricing = set() |
| models_without_pricing = set() |
| unknown_cost_sessions = 0 |
| included_cost_sessions = 0 |
| for s in sessions: |
| model = s.get("model") or "" |
| estimated, status = _estimate_cost(s) |
| total_cost += estimated |
| actual_cost += s.get("actual_cost_usd") or 0.0 |
| display = model.split("/")[-1] if "/" in model else (model or "unknown") |
| if status == "included": |
| included_cost_sessions += 1 |
| elif status == "unknown": |
| unknown_cost_sessions += 1 |
| if _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")): |
| models_with_pricing.add(display) |
| else: |
| models_without_pricing.add(display) |
|
|
| |
| durations = [] |
| for s in sessions: |
| start = s.get("started_at") |
| end = s.get("ended_at") |
| if start and end and end > start: |
| durations.append(end - start) |
|
|
| total_hours = sum(durations) / 3600 if durations else 0 |
| avg_duration = sum(durations) / len(durations) if durations else 0 |
|
|
| |
| started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")] |
| date_range_start = min(started_timestamps) if started_timestamps else None |
| date_range_end = max(started_timestamps) if started_timestamps else None |
|
|
| return { |
| "total_sessions": len(sessions), |
| "total_messages": total_messages, |
| "total_tool_calls": total_tool_calls, |
| "total_input_tokens": total_input, |
| "total_output_tokens": total_output, |
| "total_cache_read_tokens": total_cache_read, |
| "total_cache_write_tokens": total_cache_write, |
| "total_tokens": total_tokens, |
| "estimated_cost": total_cost, |
| "actual_cost": actual_cost, |
| "total_hours": total_hours, |
| "avg_session_duration": avg_duration, |
| "avg_messages_per_session": total_messages / len(sessions) if sessions else 0, |
| "avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0, |
| "user_messages": message_stats.get("user_messages") or 0, |
| "assistant_messages": message_stats.get("assistant_messages") or 0, |
| "tool_messages": message_stats.get("tool_messages") or 0, |
| "date_range_start": date_range_start, |
| "date_range_end": date_range_end, |
| "models_with_pricing": sorted(models_with_pricing), |
| "models_without_pricing": sorted(models_without_pricing), |
| "unknown_cost_sessions": unknown_cost_sessions, |
| "included_cost_sessions": included_cost_sessions, |
| } |
|
|
| def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]: |
| """Break down usage by model.""" |
| model_data = defaultdict(lambda: { |
| "sessions": 0, "input_tokens": 0, "output_tokens": 0, |
| "cache_read_tokens": 0, "cache_write_tokens": 0, |
| "total_tokens": 0, "tool_calls": 0, "cost": 0.0, |
| }) |
|
|
| for s in sessions: |
| model = s.get("model") or "unknown" |
| |
| display_model = model.split("/")[-1] if "/" in model else model |
| d = model_data[display_model] |
| d["sessions"] += 1 |
| inp = s.get("input_tokens") or 0 |
| out = s.get("output_tokens") or 0 |
| cache_read = s.get("cache_read_tokens") or 0 |
| cache_write = s.get("cache_write_tokens") or 0 |
| d["input_tokens"] += inp |
| d["output_tokens"] += out |
| d["cache_read_tokens"] += cache_read |
| d["cache_write_tokens"] += cache_write |
| d["total_tokens"] += inp + out + cache_read + cache_write |
| d["tool_calls"] += s.get("tool_call_count") or 0 |
| estimate, status = _estimate_cost(s) |
| d["cost"] += estimate |
| d["has_pricing"] = _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")) |
| d["cost_status"] = status |
|
|
| result = [ |
| {"model": model, **data} |
| for model, data in model_data.items() |
| ] |
| |
| result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True) |
| return result |
|
|
| def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]: |
| """Break down usage by platform/source.""" |
| platform_data = defaultdict(lambda: { |
| "sessions": 0, "messages": 0, "input_tokens": 0, |
| "output_tokens": 0, "cache_read_tokens": 0, |
| "cache_write_tokens": 0, "total_tokens": 0, "tool_calls": 0, |
| }) |
|
|
| for s in sessions: |
| source = s.get("source") or "unknown" |
| d = platform_data[source] |
| d["sessions"] += 1 |
| d["messages"] += s.get("message_count") or 0 |
| inp = s.get("input_tokens") or 0 |
| out = s.get("output_tokens") or 0 |
| cache_read = s.get("cache_read_tokens") or 0 |
| cache_write = s.get("cache_write_tokens") or 0 |
| d["input_tokens"] += inp |
| d["output_tokens"] += out |
| d["cache_read_tokens"] += cache_read |
| d["cache_write_tokens"] += cache_write |
| d["total_tokens"] += inp + out + cache_read + cache_write |
| d["tool_calls"] += s.get("tool_call_count") or 0 |
|
|
| result = [ |
| {"platform": platform, **data} |
| for platform, data in platform_data.items() |
| ] |
| result.sort(key=lambda x: x["sessions"], reverse=True) |
| return result |
|
|
| def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]: |
| """Process tool usage data into a ranked list with percentages.""" |
| total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0 |
| result = [] |
| for t in tool_usage: |
| pct = (t["count"] / total_calls * 100) if total_calls else 0 |
| result.append({ |
| "tool": t["tool_name"], |
| "count": t["count"], |
| "percentage": pct, |
| }) |
| return result |
|
|
| def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict: |
| """Analyze activity patterns by day of week and hour.""" |
| day_counts = Counter() |
| hour_counts = Counter() |
| daily_counts = Counter() |
|
|
| for s in sessions: |
| ts = s.get("started_at") |
| if not ts: |
| continue |
| dt = datetime.fromtimestamp(ts) |
| day_counts[dt.weekday()] += 1 |
| hour_counts[dt.hour] += 1 |
| daily_counts[dt.strftime("%Y-%m-%d")] += 1 |
|
|
| day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] |
| day_breakdown = [ |
| {"day": day_names[i], "count": day_counts.get(i, 0)} |
| for i in range(7) |
| ] |
|
|
| hour_breakdown = [ |
| {"hour": i, "count": hour_counts.get(i, 0)} |
| for i in range(24) |
| ] |
|
|
| |
| busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None |
| busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None |
|
|
| |
| active_days = len(daily_counts) |
|
|
| |
| if daily_counts: |
| all_dates = sorted(daily_counts.keys()) |
| current_streak = 1 |
| max_streak = 1 |
| for i in range(1, len(all_dates)): |
| d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d") |
| d2 = datetime.strptime(all_dates[i], "%Y-%m-%d") |
| if (d2 - d1).days == 1: |
| current_streak += 1 |
| max_streak = max(max_streak, current_streak) |
| else: |
| current_streak = 1 |
| else: |
| max_streak = 0 |
|
|
| return { |
| "by_day": day_breakdown, |
| "by_hour": hour_breakdown, |
| "busiest_day": busiest_day, |
| "busiest_hour": busiest_hour, |
| "active_days": active_days, |
| "max_streak": max_streak, |
| } |
|
|
| def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]: |
| """Find notable sessions (longest, most messages, most tokens).""" |
| top = [] |
|
|
| |
| sessions_with_duration = [ |
| s for s in sessions |
| if s.get("started_at") and s.get("ended_at") |
| ] |
| if sessions_with_duration: |
| longest = max( |
| sessions_with_duration, |
| key=lambda s: (s["ended_at"] - s["started_at"]), |
| ) |
| dur = longest["ended_at"] - longest["started_at"] |
| top.append({ |
| "label": "Longest session", |
| "session_id": longest["id"][:16], |
| "value": _format_duration(dur), |
| "date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"), |
| }) |
|
|
| |
| most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0) |
| if (most_msgs.get("message_count") or 0) > 0: |
| top.append({ |
| "label": "Most messages", |
| "session_id": most_msgs["id"][:16], |
| "value": f"{most_msgs['message_count']} msgs", |
| "date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?", |
| }) |
|
|
| |
| most_tokens = max( |
| sessions, |
| key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0), |
| ) |
| token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0) |
| if token_total > 0: |
| top.append({ |
| "label": "Most tokens", |
| "session_id": most_tokens["id"][:16], |
| "value": f"{token_total:,} tokens", |
| "date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?", |
| }) |
|
|
| |
| most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0) |
| if (most_tools.get("tool_call_count") or 0) > 0: |
| top.append({ |
| "label": "Most tool calls", |
| "session_id": most_tools["id"][:16], |
| "value": f"{most_tools['tool_call_count']} calls", |
| "date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?", |
| }) |
|
|
| return top |
|
|
| |
| |
| |
|
|
| def format_terminal(self, report: Dict) -> str: |
| """Format the insights report for terminal display (CLI).""" |
| if report.get("empty"): |
| days = report.get("days", 30) |
| src = f" (source: {report['source_filter']})" if report.get("source_filter") else "" |
| return f" No sessions found in the last {days} days{src}." |
|
|
| lines = [] |
| o = report["overview"] |
| days = report["days"] |
| src_filter = report.get("source_filter") |
|
|
| |
| lines.append("") |
| lines.append(" ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| lines.append(" β π Hermes Insights β") |
| period_label = f"Last {days} days" |
| if src_filter: |
| period_label += f" ({src_filter})" |
| padding = 58 - len(period_label) - 2 |
| left_pad = padding // 2 |
| right_pad = padding - left_pad |
| lines.append(f" β{' ' * left_pad} {period_label} {' ' * right_pad}β") |
| lines.append(" ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| lines.append("") |
|
|
| |
| if o.get("date_range_start") and o.get("date_range_end"): |
| start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y") |
| end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y") |
| lines.append(f" Period: {start_str} β {end_str}") |
| lines.append("") |
|
|
| |
| lines.append(" π Overview") |
| lines.append(" " + "β" * 56) |
| lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}") |
| lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}") |
| lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}") |
| cost_str = f"${o['estimated_cost']:.2f}" |
| if o.get("models_without_pricing"): |
| cost_str += " *" |
| lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: {cost_str}") |
| if o["total_hours"] > 0: |
| lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}") |
| lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}") |
| lines.append("") |
|
|
| |
| if report["models"]: |
| lines.append(" π€ Models Used") |
| lines.append(" " + "β" * 56) |
| lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}") |
| for m in report["models"]: |
| model_name = m["model"][:28] |
| if m.get("has_pricing"): |
| cost_cell = f"${m['cost']:>6.2f}" |
| else: |
| cost_cell = " N/A" |
| lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}") |
| if o.get("models_without_pricing"): |
| lines.append(f" * Cost N/A for custom/self-hosted models") |
| lines.append("") |
|
|
| |
| if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"): |
| lines.append(" π± Platforms") |
| lines.append(" " + "β" * 56) |
| lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}") |
| for p in report["platforms"]: |
| lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}") |
| lines.append("") |
|
|
| |
| if report["tools"]: |
| lines.append(" π§ Top Tools") |
| lines.append(" " + "β" * 56) |
| lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}") |
| for t in report["tools"][:15]: |
| lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%") |
| if len(report["tools"]) > 15: |
| lines.append(f" ... and {len(report['tools']) - 15} more tools") |
| lines.append("") |
|
|
| |
| act = report.get("activity", {}) |
| if act.get("by_day"): |
| lines.append(" π
Activity Patterns") |
| lines.append(" " + "β" * 56) |
|
|
| |
| day_values = [d["count"] for d in act["by_day"]] |
| bars = _bar_chart(day_values, max_width=15) |
| for i, d in enumerate(act["by_day"]): |
| bar = bars[i] |
| lines.append(f" {d['day']} {bar:<15} {d['count']}") |
|
|
| lines.append("") |
|
|
| |
| busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True) |
| busy_hours = [h for h in busy_hours if h["count"] > 0][:5] |
| if busy_hours: |
| hour_strs = [] |
| for h in busy_hours: |
| hr = h["hour"] |
| ampm = "AM" if hr < 12 else "PM" |
| display_hr = hr % 12 or 12 |
| hour_strs.append(f"{display_hr}{ampm} ({h['count']})") |
| lines.append(f" Peak hours: {', '.join(hour_strs)}") |
|
|
| if act.get("active_days"): |
| lines.append(f" Active days: {act['active_days']}") |
| if act.get("max_streak") and act["max_streak"] > 1: |
| lines.append(f" Best streak: {act['max_streak']} consecutive days") |
| lines.append("") |
|
|
| |
| if report.get("top_sessions"): |
| lines.append(" π Notable Sessions") |
| lines.append(" " + "β" * 56) |
| for ts in report["top_sessions"]: |
| lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})") |
| lines.append("") |
|
|
| return "\n".join(lines) |
|
|
| def format_gateway(self, report: Dict) -> str: |
| """Format the insights report for gateway/messaging (shorter).""" |
| if report.get("empty"): |
| days = report.get("days", 30) |
| return f"No sessions found in the last {days} days." |
|
|
| lines = [] |
| o = report["overview"] |
| days = report["days"] |
|
|
| lines.append(f"π **Hermes Insights** β Last {days} days\n") |
|
|
| |
| lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}") |
| lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})") |
| cost_note = "" |
| if o.get("models_without_pricing"): |
| cost_note = " _(excludes custom/self-hosted models)_" |
| lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}{cost_note}") |
| if o["total_hours"] > 0: |
| lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}") |
| lines.append("") |
|
|
| |
| if report["models"]: |
| lines.append("**π€ Models:**") |
| for m in report["models"][:5]: |
| cost_str = f"${m['cost']:.2f}" if m.get("has_pricing") else "N/A" |
| lines.append(f" {m['model'][:25]} β {m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}") |
| lines.append("") |
|
|
| |
| if len(report["platforms"]) > 1: |
| lines.append("**π± Platforms:**") |
| for p in report["platforms"]: |
| lines.append(f" {p['platform']} β {p['sessions']} sessions, {p['messages']:,} msgs") |
| lines.append("") |
|
|
| |
| if report["tools"]: |
| lines.append("**π§ Top Tools:**") |
| for t in report["tools"][:8]: |
| lines.append(f" {t['tool']} β {t['count']:,} calls ({t['percentage']:.1f}%)") |
| lines.append("") |
|
|
| |
| act = report.get("activity", {}) |
| if act.get("busiest_day") and act.get("busiest_hour"): |
| hr = act["busiest_hour"]["hour"] |
| ampm = "AM" if hr < 12 else "PM" |
| display_hr = hr % 12 or 12 |
| lines.append(f"**π
Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)") |
| if act.get("active_days"): |
| lines.append(f"**Active days:** {act['active_days']}", ) |
| if act.get("max_streak", 0) > 1: |
| lines.append(f"**Best streak:** {act['max_streak']} consecutive days") |
|
|
| return "\n".join(lines) |
|
|