hemantvirmani commited on
Commit
38bdec7
·
1 Parent(s): a1e2111

Fixing langfuse issues

Browse files
Files changed (1) hide show
  1. langfuse_tracking.py +87 -83
langfuse_tracking.py CHANGED
@@ -15,9 +15,9 @@ from contextlib import contextmanager
15
 
16
  # Langfuse will be imported conditionally
17
  langfuse = None
 
18
  try:
19
- from langfuse import Langfuse
20
- from langfuse.decorators import observe, langfuse_context
21
  LANGFUSE_AVAILABLE = True
22
  except ImportError:
23
  LANGFUSE_AVAILABLE = False
@@ -41,7 +41,7 @@ class LangfuseTracker:
41
  if self._client is None and LANGFUSE_AVAILABLE:
42
  public_key = os.getenv("LANGFUSE_PUBLIC_KEY")
43
  secret_key = os.getenv("LANGFUSE_SECRET_KEY")
44
- host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
45
 
46
  if public_key and secret_key:
47
  self._client = Langfuse(
@@ -88,39 +88,42 @@ def track_agent_execution(agent_type: str):
88
  @functools.wraps(func)
89
  def wrapper(self, question: str, file_name: str = None, *args, **kwargs):
90
  # Add metadata to current observation
91
- langfuse_context.update_current_observation(
92
- metadata={
93
- "agent_type": agent_type,
94
- "has_file": file_name is not None,
95
- "file_name": file_name or "none",
96
- "question_length": len(question)
97
- },
98
- input={"question": question[:500], "file_name": file_name} # Limit question length
99
- )
 
100
 
101
  start_time = time.time()
102
  try:
103
  result = func(self, question, file_name, *args, **kwargs)
104
 
105
  # Update with output and success metrics
106
- langfuse_context.update_current_observation(
107
- output={"answer": str(result)[:500]}, # Limit answer length
108
- metadata={
109
- "execution_time_seconds": time.time() - start_time,
110
- "success": not result.startswith("Error:")
111
- }
112
- )
 
113
  return result
114
  except Exception as e:
115
  # Track errors
116
- langfuse_context.update_current_observation(
117
- level="ERROR",
118
- status_message=str(e),
119
- metadata={
120
- "execution_time_seconds": time.time() - start_time,
121
- "error": str(e)
122
- }
123
- )
 
124
  raise
125
 
126
  return wrapper
@@ -152,23 +155,25 @@ def track_llm_call(model_name: str):
152
  result = func(*args, **kwargs)
153
 
154
  # Update generation with model info
155
- langfuse_context.update_current_observation(
156
- model=model_name,
157
- metadata={
158
- "latency_seconds": time.time() - start_time,
159
- }
160
- )
 
161
 
162
  return result
163
  except Exception as e:
164
- langfuse_context.update_current_observation(
165
- level="ERROR",
166
- status_message=str(e),
167
- metadata={
168
- "latency_seconds": time.time() - start_time,
169
- "error": str(e)
170
- }
171
- )
 
172
  raise
173
 
174
  return wrapper
@@ -194,14 +199,15 @@ def track_tool_call(tool_name: str):
194
  @functools.wraps(func)
195
  def wrapper(*args, **kwargs):
196
  # Capture input parameters
197
- langfuse_context.update_current_observation(
198
- input={
199
- "tool": tool_name,
200
- "args": args[:3] if args else [], # Limit args
201
- "kwargs": {k: str(v)[:100] for k, v in list(kwargs.items())[:5]} # Limit kwargs
202
- },
203
- metadata={"tool_name": tool_name}
204
- )
 
205
 
206
  start_time = time.time()
207
  try:
@@ -209,27 +215,29 @@ def track_tool_call(tool_name: str):
209
 
210
  # Track output
211
  result_str = str(result)
212
- langfuse_context.update_current_observation(
213
- output={
214
- "result_preview": result_str[:500],
215
- "result_length": len(result_str)
216
- },
217
- metadata={
218
- "execution_time_seconds": time.time() - start_time,
219
- "success": True
220
- }
221
- )
 
222
 
223
  return result
224
  except Exception as e:
225
- langfuse_context.update_current_observation(
226
- level="ERROR",
227
- status_message=str(e),
228
- metadata={
229
- "execution_time_seconds": time.time() - start_time,
230
- "error": str(e)
231
- }
232
- )
 
233
  raise
234
 
235
  return wrapper
@@ -253,17 +261,17 @@ def track_session(session_name: str, metadata: Optional[Dict[str, Any]] = None):
253
  yield
254
  return
255
 
256
- trace = tracker.client.trace(
 
257
  name=session_name,
258
  metadata=metadata or {}
259
- )
260
-
261
- try:
262
- yield trace
263
- finally:
264
- # Flush to ensure data is sent
265
- if tracker.client:
266
- tracker.client.flush()
267
 
268
 
269
  @contextmanager
@@ -283,16 +291,12 @@ def track_question_processing(task_id: str, question: str):
283
  yield None
284
  return
285
 
286
- span = tracker.client.span(
287
  name=f"Question_{task_id[:8]}",
288
  input={"task_id": task_id, "question": question[:300]},
289
  metadata={"task_id": task_id}
290
- )
291
-
292
- try:
293
  yield span
294
- finally:
295
- span.end()
296
 
297
 
298
  # Convenience function for manual span creation
@@ -310,7 +314,7 @@ def create_span(name: str, input_data: Optional[Dict] = None, metadata: Optional
310
  if not tracker.enabled:
311
  return None
312
 
313
- return langfuse_context.span(
314
  name=name,
315
  input=input_data,
316
  metadata=metadata
 
15
 
16
  # Langfuse will be imported conditionally
17
  langfuse = None
18
+
19
  try:
20
+ from langfuse import Langfuse, observe
 
21
  LANGFUSE_AVAILABLE = True
22
  except ImportError:
23
  LANGFUSE_AVAILABLE = False
 
41
  if self._client is None and LANGFUSE_AVAILABLE:
42
  public_key = os.getenv("LANGFUSE_PUBLIC_KEY")
43
  secret_key = os.getenv("LANGFUSE_SECRET_KEY")
44
+ host = os.getenv("LANGFUSE_HOST", "https://us.cloud.langfuse.com")
45
 
46
  if public_key and secret_key:
47
  self._client = Langfuse(
 
88
  @functools.wraps(func)
89
  def wrapper(self, question: str, file_name: str = None, *args, **kwargs):
90
  # Add metadata to current observation
91
+ if tracker.client:
92
+ tracker.client.update_current_span(
93
+ metadata={
94
+ "agent_type": agent_type,
95
+ "has_file": file_name is not None,
96
+ "file_name": file_name or "none",
97
+ "question_length": len(question)
98
+ },
99
+ input={"question": question[:500], "file_name": file_name} # Limit question length
100
+ )
101
 
102
  start_time = time.time()
103
  try:
104
  result = func(self, question, file_name, *args, **kwargs)
105
 
106
  # Update with output and success metrics
107
+ if tracker.client:
108
+ tracker.client.update_current_span(
109
+ output={"answer": str(result)[:500]}, # Limit answer length
110
+ metadata={
111
+ "execution_time_seconds": time.time() - start_time,
112
+ "success": not result.startswith("Error:")
113
+ }
114
+ )
115
  return result
116
  except Exception as e:
117
  # Track errors
118
+ if tracker.client:
119
+ tracker.client.update_current_span(
120
+ level="ERROR",
121
+ status_message=str(e),
122
+ metadata={
123
+ "execution_time_seconds": time.time() - start_time,
124
+ "error": str(e)
125
+ }
126
+ )
127
  raise
128
 
129
  return wrapper
 
155
  result = func(*args, **kwargs)
156
 
157
  # Update generation with model info
158
+ if tracker.client:
159
+ tracker.client.update_current_generation(
160
+ model=model_name,
161
+ metadata={
162
+ "latency_seconds": time.time() - start_time,
163
+ }
164
+ )
165
 
166
  return result
167
  except Exception as e:
168
+ if tracker.client:
169
+ tracker.client.update_current_generation(
170
+ level="ERROR",
171
+ status_message=str(e),
172
+ metadata={
173
+ "latency_seconds": time.time() - start_time,
174
+ "error": str(e)
175
+ }
176
+ )
177
  raise
178
 
179
  return wrapper
 
199
  @functools.wraps(func)
200
  def wrapper(*args, **kwargs):
201
  # Capture input parameters
202
+ if tracker.client:
203
+ tracker.client.update_current_span(
204
+ input={
205
+ "tool": tool_name,
206
+ "args": args[:3] if args else [], # Limit args
207
+ "kwargs": {k: str(v)[:100] for k, v in list(kwargs.items())[:5]} # Limit kwargs
208
+ },
209
+ metadata={"tool_name": tool_name}
210
+ )
211
 
212
  start_time = time.time()
213
  try:
 
215
 
216
  # Track output
217
  result_str = str(result)
218
+ if tracker.client:
219
+ tracker.client.update_current_span(
220
+ output={
221
+ "result_preview": result_str[:500],
222
+ "result_length": len(result_str)
223
+ },
224
+ metadata={
225
+ "execution_time_seconds": time.time() - start_time,
226
+ "success": True
227
+ }
228
+ )
229
 
230
  return result
231
  except Exception as e:
232
+ if tracker.client:
233
+ tracker.client.update_current_span(
234
+ level="ERROR",
235
+ status_message=str(e),
236
+ metadata={
237
+ "execution_time_seconds": time.time() - start_time,
238
+ "error": str(e)
239
+ }
240
+ )
241
  raise
242
 
243
  return wrapper
 
261
  yield
262
  return
263
 
264
+ # Use start_as_current_span to create a root span/trace for the session
265
+ with tracker.client.start_as_current_span(
266
  name=session_name,
267
  metadata=metadata or {}
268
+ ) as span:
269
+ try:
270
+ yield span
271
+ finally:
272
+ # Flush to ensure data is sent
273
+ if tracker.client:
274
+ tracker.client.flush()
 
275
 
276
 
277
  @contextmanager
 
291
  yield None
292
  return
293
 
294
+ with tracker.client.start_as_current_span(
295
  name=f"Question_{task_id[:8]}",
296
  input={"task_id": task_id, "question": question[:300]},
297
  metadata={"task_id": task_id}
298
+ ) as span:
 
 
299
  yield span
 
 
300
 
301
 
302
  # Convenience function for manual span creation
 
314
  if not tracker.enabled:
315
  return None
316
 
317
+ return tracker.client.start_span(
318
  name=name,
319
  input=input_data,
320
  metadata=metadata