Qdonnars Cursor commited on
Commit
3fb80f4
·
1 Parent(s): 67fee40

feat: Enhanced logging with session tracking and usage patterns

Browse files

- Track sessions with unique IDs
- Log call sequence (call#1, call#2, prev=...)
- Log response size in bytes
- Track indicators and geographic levels queried
- Detect patterns (list→details→query)
- Prepare for MCP optimization analysis

Co-authored-by: Cursor <cursoragent@cursor.com>

Files changed (1) hide show
  1. src/tools.py +165 -16
src/tools.py CHANGED
@@ -3,9 +3,12 @@
3
  import json
4
  import logging
5
  import time
 
 
 
6
  from datetime import datetime, timezone
7
  from functools import wraps
8
- from typing import Any, Callable
9
 
10
  from .api_client import get_client, CubeJsClient, CubeJsClientError
11
 
@@ -18,43 +21,189 @@ logging.basicConfig(
18
  logger = logging.getLogger("mcp_tools")
19
 
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  def log_tool_call(func: Callable) -> Callable:
22
- """Decorator to log MCP tool calls with timing and parameters."""
23
  @wraps(func)
24
  async def wrapper(*args, **kwargs):
25
  tool_name = func.__name__
26
  start_time = time.time()
27
 
28
- # Log the call
29
- params = {k: v for k, v in kwargs.items() if v} # Only non-empty params
30
- logger.info(f"[CALL] {tool_name} | params={params}")
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
  try:
33
  result = await func(*args, **kwargs)
34
  elapsed_ms = int((time.time() - start_time) * 1000)
 
35
 
36
- # Parse result to get summary
 
 
37
  try:
38
  result_data = json.loads(result)
39
  if "error" in result_data:
40
- logger.warning(f"[ERROR] {tool_name} | {elapsed_ms}ms | error={result_data['error']}")
41
- elif "count" in result_data:
42
- logger.info(f"[OK] {tool_name} | {elapsed_ms}ms | count={result_data['count']}")
43
- elif "total_count" in result_data:
44
- logger.info(f"[OK] {tool_name} | {elapsed_ms}ms | count={result_data['total_count']}")
45
- elif "metadata" in result_data:
46
- ind_id = result_data.get("metadata", {}).get("id", "?")
47
- logger.info(f"[OK] {tool_name} | {elapsed_ms}ms | indicator_id={ind_id}")
48
  else:
49
- logger.info(f"[OK] {tool_name} | {elapsed_ms}ms")
 
 
 
 
 
 
 
 
 
50
  except json.JSONDecodeError:
51
- logger.info(f"[OK] {tool_name} | {elapsed_ms}ms | raw_response")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
  return result
54
 
55
  except Exception as e:
56
  elapsed_ms = int((time.time() - start_time) * 1000)
57
  logger.error(f"[EXCEPTION] {tool_name} | {elapsed_ms}ms | {type(e).__name__}: {e}")
 
58
  raise
59
 
60
  return wrapper
 
3
  import json
4
  import logging
5
  import time
6
+ import hashlib
7
+ from collections import defaultdict
8
+ from dataclasses import dataclass, field
9
  from datetime import datetime, timezone
10
  from functools import wraps
11
+ from typing import Any, Callable, Optional
12
 
13
  from .api_client import get_client, CubeJsClient, CubeJsClientError
14
 
 
21
  logger = logging.getLogger("mcp_tools")
22
 
23
 
24
+ # =============================================================================
25
+ # Session Tracker - Track usage patterns across calls
26
+ # =============================================================================
27
+ @dataclass
28
+ class SessionData:
29
+ """Track data for a single session."""
30
+ session_id: str
31
+ start_time: float = field(default_factory=time.time)
32
+ calls: list = field(default_factory=list)
33
+ last_call_time: float = 0
34
+ indicators_queried: set = field(default_factory=set)
35
+ levels_queried: set = field(default_factory=set)
36
+
37
+ def add_call(self, tool: str, params: dict, duration_ms: int,
38
+ result_count: int, response_size: int, status: str):
39
+ """Record a tool call."""
40
+ now = time.time()
41
+ time_since_last = int((now - self.last_call_time) * 1000) if self.last_call_time else 0
42
+
43
+ self.calls.append({
44
+ "tool": tool,
45
+ "params": params,
46
+ "duration_ms": duration_ms,
47
+ "result_count": result_count,
48
+ "response_size": response_size,
49
+ "status": status,
50
+ "time_since_last_ms": time_since_last,
51
+ })
52
+ self.last_call_time = now
53
+
54
+ # Track what's being queried
55
+ if "indicator_id" in params:
56
+ self.indicators_queried.add(params["indicator_id"])
57
+ if "geographic_level" in params:
58
+ self.levels_queried.add(params["geographic_level"])
59
+
60
+ def get_sequence(self) -> str:
61
+ """Get the sequence of tools called."""
62
+ return "→".join(c["tool"].replace("_indicators", "").replace("_indicator", "")
63
+ for c in self.calls)
64
+
65
+ def get_total_duration_ms(self) -> int:
66
+ """Total time spent in API calls."""
67
+ return sum(c["duration_ms"] for c in self.calls)
68
+
69
+
70
+ class UsageTracker:
71
+ """Track MCP usage patterns across sessions."""
72
+
73
+ # Session timeout in seconds (new session if no call for 5 minutes)
74
+ SESSION_TIMEOUT = 300
75
+
76
+ def __init__(self):
77
+ self.sessions: dict[str, SessionData] = {}
78
+ self.patterns: defaultdict[str, int] = defaultdict(int) # sequence -> count
79
+ self.tool_stats: defaultdict[str, dict] = defaultdict(
80
+ lambda: {"calls": 0, "total_ms": 0, "errors": 0}
81
+ )
82
+
83
+ def get_or_create_session(self, session_hint: str = "default") -> SessionData:
84
+ """Get existing session or create new one."""
85
+ # Simple session management based on hint (could be IP, user-agent hash, etc.)
86
+ session_id = hashlib.md5(session_hint.encode()).hexdigest()[:8]
87
+
88
+ now = time.time()
89
+
90
+ # Check if session exists and is not expired
91
+ if session_id in self.sessions:
92
+ session = self.sessions[session_id]
93
+ if session.last_call_time and (now - session.last_call_time) > self.SESSION_TIMEOUT:
94
+ # Session expired, log pattern and create new
95
+ self._finalize_session(session)
96
+ session = SessionData(session_id=session_id)
97
+ self.sessions[session_id] = session
98
+ logger.info(f"[SESSION] id={session_id} | new_session (previous expired)")
99
+ else:
100
+ session = SessionData(session_id=session_id)
101
+ self.sessions[session_id] = session
102
+ logger.info(f"[SESSION] id={session_id} | new_session")
103
+
104
+ return session
105
+
106
+ def _finalize_session(self, session: SessionData):
107
+ """Log session summary when it ends."""
108
+ if len(session.calls) > 1:
109
+ sequence = session.get_sequence()
110
+ self.patterns[sequence] += 1
111
+
112
+ logger.info(
113
+ f"[PATTERN] id={session.session_id} | "
114
+ f"sequence={sequence} | "
115
+ f"calls={len(session.calls)} | "
116
+ f"total_ms={session.get_total_duration_ms()} | "
117
+ f"indicators={list(session.indicators_queried)} | "
118
+ f"levels={list(session.levels_queried)}"
119
+ )
120
+
121
+ def log_stats_summary(self):
122
+ """Log accumulated statistics."""
123
+ if self.patterns:
124
+ top_patterns = sorted(self.patterns.items(), key=lambda x: -x[1])[:5]
125
+ logger.info(f"[STATS] top_patterns={top_patterns}")
126
+
127
+
128
+ # Global tracker instance
129
+ _tracker = UsageTracker()
130
+
131
+
132
  def log_tool_call(func: Callable) -> Callable:
133
+ """Decorator to log MCP tool calls with rich metrics."""
134
  @wraps(func)
135
  async def wrapper(*args, **kwargs):
136
  tool_name = func.__name__
137
  start_time = time.time()
138
 
139
+ # Get or create session
140
+ session = _tracker.get_or_create_session()
141
+
142
+ # Extract params (only non-empty)
143
+ params = {k: v for k, v in kwargs.items() if v}
144
+
145
+ # Build context info
146
+ call_num = len(session.calls) + 1
147
+ prev_tool = session.calls[-1]["tool"] if session.calls else None
148
+
149
+ # Log the call with context
150
+ context = f"call#{call_num}"
151
+ if prev_tool:
152
+ context += f" | prev={prev_tool}"
153
+ logger.info(f"[CALL] {tool_name} | {context} | params={params}")
154
 
155
  try:
156
  result = await func(*args, **kwargs)
157
  elapsed_ms = int((time.time() - start_time) * 1000)
158
+ response_size = len(result.encode('utf-8'))
159
 
160
+ # Parse result to get metrics
161
+ status = "ok"
162
+ result_count = 0
163
  try:
164
  result_data = json.loads(result)
165
  if "error" in result_data:
166
+ status = "error"
167
+ logger.warning(
168
+ f"[ERROR] {tool_name} | {elapsed_ms}ms | "
169
+ f"error={result_data['error'][:100]}"
170
+ )
 
 
 
171
  else:
172
+ result_count = (
173
+ result_data.get("count") or
174
+ result_data.get("total_count") or
175
+ len(result_data.get("data", [])) or
176
+ (1 if "metadata" in result_data else 0)
177
+ )
178
+ logger.info(
179
+ f"[OK] {tool_name} | {elapsed_ms}ms | "
180
+ f"count={result_count} | size={response_size}B"
181
+ )
182
  except json.JSONDecodeError:
183
+ logger.info(f"[OK] {tool_name} | {elapsed_ms}ms | size={response_size}B")
184
+
185
+ # Record in session
186
+ session.add_call(
187
+ tool=tool_name,
188
+ params=params,
189
+ duration_ms=elapsed_ms,
190
+ result_count=result_count,
191
+ response_size=response_size,
192
+ status=status,
193
+ )
194
+
195
+ # Update global stats
196
+ _tracker.tool_stats[tool_name]["calls"] += 1
197
+ _tracker.tool_stats[tool_name]["total_ms"] += elapsed_ms
198
+ if status == "error":
199
+ _tracker.tool_stats[tool_name]["errors"] += 1
200
 
201
  return result
202
 
203
  except Exception as e:
204
  elapsed_ms = int((time.time() - start_time) * 1000)
205
  logger.error(f"[EXCEPTION] {tool_name} | {elapsed_ms}ms | {type(e).__name__}: {e}")
206
+ _tracker.tool_stats[tool_name]["errors"] += 1
207
  raise
208
 
209
  return wrapper