File size: 11,939 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
Service for handling trace operations with external platforms like Langfuse
"""

import os
import json
import time
import logging
import random
import uuid
from typing import Dict, List, Any, Optional
from datetime import datetime

logger = logging.getLogger("agent_monitoring_server.services.platform.trace")

class TraceService:
    """Service for trace operations with external platforms"""
    
    @staticmethod
    def get_trace_metadata(
        limit: int = 20,
        offset: int = 0,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Get metadata for traces without downloading full details
        Uses retry logic for handling 500 errors from the server
        """
        import time
        import random
        
        # Get credentials from environment variables
        from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
        
        if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
            return {
                "status": "error",
                "message": "Langfuse credentials not found. Please connect to Langfuse first.",
                "timestamp": datetime.now().isoformat()
            }
        
        # Enforce reasonable limits
        if limit > 50:
            logger.warning(f"Requested limit {limit} exceeds maximum of 50, using 50 instead")
            limit = 50
        elif limit <= 0:
            logger.warning(f"Invalid limit {limit}, using default of 20")
            limit = 20
        
        # Convert offset to page (Langfuse uses page-based pagination, not offset)
        # Page numbering starts at 1
        page = (offset // limit) + 1
        
        logger.info(f"Fetching trace metadata (limit={limit}, page={page})")
        
        # Set up timestamp filters if provided
        from_timestamp = None
        to_timestamp = None
        
        if start_date:
            try:
                from_timestamp = datetime.fromisoformat(start_date)
            except ValueError:
                logger.warning(f"Invalid start_date format: {start_date}")
        
        if end_date:
            try:
                to_timestamp = datetime.fromisoformat(end_date)
            except ValueError:
                logger.warning(f"Invalid end_date format: {end_date}")
        
        # Retry logic for API calls
        max_retries = 3
        retry_count = 0
        
        while retry_count <= max_retries:
            try:
                # Initialize Langfuse client
                from langfuse import Langfuse
                client = Langfuse(
                    secret_key=LANGFUSE_SECRET_KEY,
                    public_key=LANGFUSE_PUBLIC_KEY,
                    host=LANGFUSE_HOST
                )
                
                # Fetch traces using page-based pagination
                traces_response = client.fetch_traces(
                    limit=limit,
                    page=page,
                    from_timestamp=from_timestamp,
                    to_timestamp=to_timestamp
                )
                
                # Convert response to serializable format
                from utils.fetch_langfuse_logs import convert_to_serializable
                traces_data = []
                
                if hasattr(traces_response, 'data'):
                    traces_data = convert_to_serializable(traces_response.data)
                elif hasattr(traces_response, 'model_dump'):
                    traces_dict = convert_to_serializable(traces_response.model_dump())
                    if isinstance(traces_dict, dict) and 'data' in traces_dict:
                        traces_data = traces_dict['data']
                    else:
                        traces_data = [traces_dict]
                
                # Extract only the metadata we need
                trace_metadata = []
                for trace in traces_data:
                    # Ensure trace is a dictionary
                    if not isinstance(trace, dict):
                        logger.warning(f"Skipping non-dictionary trace: {trace}")
                        continue
                        
                    # Extract basic metadata fields with safe dict access
                    metadata = {
                        "id": trace.get("id", "unknown"),
                        "name": trace.get("name", "Unnamed Trace"),
                        "timestamp": trace.get("timestamp"),
                        "status": trace.get("status", "unknown"),
                        "level": trace.get("level"),
                        "metadata": trace.get("metadata", {})
                    }
                    
                    # Include model information if available (with proper type checking)
                    if isinstance(trace.get("observations"), list) and trace["observations"]:
                        first_obs = trace["observations"][0]
                        if isinstance(first_obs, dict):
                            metadata["model"] = first_obs.get("model")
                        else:
                            metadata["model"] = None
                    else:
                        metadata["model"] = trace.get("model")
                    
                    # Include user information if available
                    metadata["user"] = trace.get("userId")
                    metadata["session"] = trace.get("sessionId")
                    
                    # Include any tags (with proper type checking)
                    if isinstance(trace.get("tags"), list):
                        metadata["tags"] = trace.get("tags", [])
                    else:
                        metadata["tags"] = []
                    
                    trace_metadata.append(metadata)
                
                # Success - break out of retry loop
                break
                
            except Exception as e:
                error_message = str(e)
                # Check if it's a 500 error from the server
                if "500" in error_message or "Internal Server Error" in error_message or "Memory limit" in error_message:
                    retry_count += 1
                    if retry_count <= max_retries:
                        # Exponential backoff with jitter
                        wait_time = (2 ** retry_count) + random.uniform(0, 1)
                        logger.warning(f"Langfuse server error, retrying in {wait_time:.2f} seconds (attempt {retry_count}/{max_retries})")
                        time.sleep(wait_time)
                    else:
                        logger.error(f"Failed after {max_retries} retries: {error_message}")
                        return {
                            "status": "error",
                            "message": f"Langfuse server error after {max_retries} retries. The service may be experiencing high load.",
                            "timestamp": datetime.now().isoformat()
                        }
                else:
                    # For non-500 errors, don't retry
                    logger.error(f"Error fetching trace metadata: {error_message}")
                    return {
                        "status": "error",
                        "message": f"Error fetching trace metadata: {error_message}",
                        "timestamp": datetime.now().isoformat()
                    }
        
        # Check if we have trace data
        if not 'trace_metadata' in locals() or not trace_metadata:
            return {
                "status": "error",
                "message": "No trace data returned from Langfuse",
                "timestamp": datetime.now().isoformat()
            }
        
        # Check if there are more traces available
        # If we received fewer traces than requested, we've reached the end
        has_more = len(traces_data) >= limit
        
        return {
            "status": "success",
            "traces": trace_metadata,
            "count": len(trace_metadata),
            "timestamp": datetime.now().isoformat(),
            "has_more": has_more
        }
    
    @staticmethod
    def get_trace_by_id(trace_id: str) -> Dict[str, Any]:
        """
        Get a specific trace by ID from Langfuse
        """
        try:
            # Get credentials from environment variables
            from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
            
            if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
                return {
                    "status": "error",
                    "message": "Langfuse credentials not found. Please connect to Langfuse first.",
                    "timestamp": datetime.now().isoformat()
                }
            
            # Initialize Langfuse client
            from langfuse import Langfuse
            client = Langfuse(
                secret_key=LANGFUSE_SECRET_KEY,
                public_key=LANGFUSE_PUBLIC_KEY,
                host=LANGFUSE_HOST
            )
            
            # Fetch trace with all details
            logger.info(f"Fetching trace with ID: {trace_id}")
            trace_response = client.fetch_trace(trace_id)
            
            # Get observations for this trace
            logger.info(f"Fetching observations for trace: {trace_id}")
            observations_response = client.fetch_observations(trace_id=trace_id, limit=100)
            
            # Convert response to serializable format
            from utils.fetch_langfuse_logs import convert_to_serializable
            
            trace_data = None
            observations_data = []
            
            # Convert trace data
            try:
                trace_data = convert_to_serializable(trace_response)
            except Exception as e:
                logger.error(f"Error converting trace response: {str(e)}")
                return {
                    "status": "error",
                    "message": f"Error processing trace data: {str(e)}",
                    "timestamp": datetime.now().isoformat()
                }
            
            # Convert observations data
            try:
                if hasattr(observations_response, 'data'):
                    observations_data = convert_to_serializable(observations_response.data)
                elif hasattr(observations_response, 'model_dump'):
                    observations_dict = convert_to_serializable(observations_response.model_dump())
                    if isinstance(observations_dict, dict) and 'data' in observations_dict:
                        observations_data = observations_dict['data']
            except Exception as e:
                logger.error(f"Error converting observations response: {str(e)}")
                # Continue even if observations conversion fails
            
            # Add observations to trace data
            if trace_data:
                trace_data["observations"] = observations_data
                
                # Generate a trace URL
                trace_url = f"{LANGFUSE_HOST}/project/unknown/traces/{trace_id}"
                trace_data["trace_url"] = trace_url
                
                return {
                    "status": "success",
                    "trace": trace_data,
                    "timestamp": datetime.now().isoformat()
                }
            else:
                return {
                    "status": "error",
                    "message": f"Trace with ID {trace_id} not found",
                    "timestamp": datetime.now().isoformat()
                }
        except Exception as e:
            logger.error(f"Error fetching trace by ID: {str(e)}")
            return {
                "status": "error",
                "message": f"Error fetching trace by ID: {str(e)}",
                "timestamp": datetime.now().isoformat()
            }