| import datetime | |
| import json | |
| import os | |
| from opik import Opik | |
| import parameters | |
| from collections import defaultdict | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| class DateTimeEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, datetime.datetime): | |
| return obj.isoformat() | |
| return super().default(obj) | |
| def get_trace_content(opik, trace_id): | |
| try: | |
| trace_content = opik.get_trace_content(trace_id) | |
| return trace_content.dict() | |
| except Exception as e: | |
| print(f"Error getting trace content {trace_id}: {e}") | |
| return None | |
| def get_span_content(opik, trace_id, span): | |
| try: | |
| content = opik.get_span_content(span.id) | |
| return {"trace_id": trace_id, "span_id": span.id, "content": content.dict()} | |
| except Exception as e: | |
| print(f"Error getting span content {span.id}: {e}") | |
| return None | |
| def get_traces_on_date(start_date_str, end_date_str, project_name, api_key,max_workers=10): | |
| try: | |
| print("Step 1: Converting date strings") | |
| date = datetime.date.fromisoformat(start_date_str) | |
| start_date_str = date.isoformat() + "T00:00:00Z" | |
| if not end_date_str: | |
| end_date = date + datetime.timedelta(days=1) | |
| end_date_str = end_date.isoformat() + "T00:00:00Z" | |
| else: | |
| end_date = datetime.date.fromisoformat(end_date_str) | |
| end_date_str = end_date.isoformat() + "T00:00:00Z" | |
| print(f"Start: {start_date_str} and end: {end_date_str}") | |
| filter_string = f'start_time >= "{start_date_str}" and end_time <= "{end_date_str}"' | |
| print("Filter string: ", filter_string) | |
| print("Step 2: Initializing Opik client") | |
| try: | |
| opik = Opik(api_key=api_key, project_name=project_name, workspace='verba-tech-ninja') | |
| print("Opik client initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing Opik client: {e}") | |
| return [], [] | |
| print("Step 3: Searching traces") | |
| try: | |
| traces = opik.search_traces(filter_string=filter_string, project_name=project_name) | |
| print("Total searches: ", len(traces)) | |
| except Exception as e: | |
| print(f"Error searching traces: {e}") | |
| return [], [] | |
| print("Step 4: Processing traces in parallel") | |
| all_traces_content = [] | |
| try: | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_trace = {executor.submit(get_trace_content, opik, trace.id): trace for trace in traces} | |
| for future in as_completed(future_to_trace): | |
| result = future.result() | |
| if result: | |
| all_traces_content.append(result) | |
| print(f"Completed processing {len(all_traces_content)} traces") | |
| except Exception as e: | |
| print(f"Error processing traces in parallel: {e}") | |
| print("Step 5: Processing spans in parallel") | |
| all_spans_content = [] | |
| try: | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_span = {} | |
| for i, trace in enumerate(traces): | |
| try: | |
| print(f"Searching spans for trace_id: {trace.id}:{i+1}/{len(traces)}") | |
| spans = opik.search_spans(project_name=parameters.project, trace_id=trace.id) | |
| print(f"Found {len(spans)} spans for trace_id: {trace.id}") | |
| for span in spans: | |
| future_to_span[executor.submit(get_span_content, opik, trace.id, span)] = span | |
| except Exception as e: | |
| print(f"Error searching spans for trace {trace.id}: {e}") | |
| for future in as_completed(future_to_span): | |
| result = future.result() | |
| if result: | |
| all_spans_content.append(result) | |
| print(f"Completed processing {len(all_spans_content)} spans") | |
| except Exception as e: | |
| print(f"Error processing spans in parallel: {e}") | |
| print("Step 6: Saving to JSON files") | |
| traces_file = 'all_traces_content.json' | |
| spans_file = 'all_spans_content.json' | |
| try: | |
| if os.path.exists(traces_file): | |
| os.remove(traces_file) | |
| print(f"Removed existing {traces_file}") | |
| if os.path.exists(spans_file): | |
| os.remove(spans_file) | |
| print(f"Removed existing {spans_file}") | |
| print(f"Writing {len(all_traces_content)} traces to {traces_file}") | |
| with open(traces_file, 'w') as f: | |
| json.dump(all_traces_content, f, indent=2, cls=DateTimeEncoder) | |
| print(f"Saved traces to {traces_file}") | |
| print(f"Writing {len(all_spans_content)} spans to {spans_file}") | |
| with open(spans_file, 'w') as f: | |
| json.dump(all_spans_content, f, indent=2, cls=DateTimeEncoder) | |
| print(f"Saved spans to {spans_file}") | |
| except Exception as e: | |
| print(f"Error saving to JSON files: {e}") | |
| with open('partial_traces_content.json', 'w') as f: | |
| json.dump(all_traces_content, f, indent=2, cls=DateTimeEncoder) | |
| with open('partial_spans_content.json', 'w') as f: | |
| json.dump(all_spans_content, f, indent=2, cls=DateTimeEncoder) | |
| print("Saved partial data to partial_traces_content.json and partial_spans_content.json") | |
| print("Step 7: Returning results") | |
| return all_traces_content, all_spans_content | |
| except Exception as e: | |
| print(f"Main function error: {e}") | |
| return [], [] | |
| def find_errors_and_metrics(traces, spans): | |
| try: | |
| print("Step 8: Analyzing outputs for errors") | |
| error_spans = [] | |
| error_metrics = defaultdict(list) | |
| for span in spans: | |
| content = span['content'] | |
| output = content.get("output") | |
| error_info = content.get("error_info", {}) | |
| if isinstance(output, dict) and 'output' in output: | |
| output_value = output.get("output") | |
| else: | |
| output_value = output | |
| if ((output_value is None or (isinstance(output, list) and len(output) == 0)) and len(error_info) > 0): | |
| error_type = error_info.get("exception_type", "unknown_error") | |
| error_spans.append({ | |
| "trace_id": span["trace_id"], | |
| "span_id": span["span_id"], | |
| "error_content": output, | |
| "exception_type": error_type | |
| }) | |
| error_metrics[error_type].append({"trace_id": span["trace_id"], "span_id": span["span_id"]}) | |
| print(f"Found {len(error_spans)} outputs with errors (empty/null)") | |
| print("Step 9: Saving error spans") | |
| error_file = 'error_spans.json' | |
| try: | |
| if os.path.exists(error_file): | |
| os.remove(error_file) | |
| print(f"Removed existing {error_file}") | |
| print(f"Writing {len(error_spans)} error outputs to {error_file}") | |
| with open(error_file, 'w') as f: | |
| json.dump(error_spans, f, indent=2, cls=DateTimeEncoder) | |
| print(f"Saved error outputs to {error_file}") | |
| except Exception as e: | |
| print(f"Error saving error spans: {e}") | |
| print("Step 10: Calculating metrics") | |
| metrics = { | |
| "total_errors": len(error_spans), | |
| "error_types": { | |
| error_type: { | |
| "count": len(entries), | |
| "instances": [ | |
| {"trace_id": entry["trace_id"], "span_id": entry["span_id"]} | |
| for entry in entries | |
| ] | |
| } | |
| for error_type, entries in error_metrics.items() | |
| } | |
| } | |
| print(f"Metrics calculated: {len(metrics['error_types'])} error types") | |
| for error_type, data in metrics["error_types"].items(): | |
| print(f"Error Type: {error_type}, Count: {data['count']}") | |
| for instance in data["instances"]: | |
| print(f" Trace ID: {instance['trace_id']}, Span ID: {instance['span_id']}") | |
| return metrics | |
| except Exception as e: | |
| print(f"Error in find_errors_and_metrics: {e}") | |
| return {} | |
| def process_dates(start_date, end_date, project): | |
| try: | |
| print("Pipeline Start: Processing dates") | |
| traces, spans = get_traces_on_date(start_date, end_date, project, parameters.api_key) | |
| metrics = find_errors_and_metrics(traces, spans) | |
| html_output = """ | |
| <div style='font-family: Arial, sans-serif; padding: 20px; background-color: #f5f5f5; border-radius: 10px;'> | |
| <h2 style='color: #2c3e50; border-bottom: 2px solid #3498db; padding-bottom: 5px;'>Metrics Report</h2> | |
| <div style='margin: 15px 0;'> | |
| <span style='color: #e74c3c; font-weight: bold;'>Total Empty/Null Outputs Found: </span> | |
| <span style='color: #2980b9'>{empty_count}</span> | |
| </div> | |
| <div style='margin: 15px 0;'> | |
| <span style='color: #27ae60; font-weight: bold;'>Total Traces Found: </span> | |
| <span style='color: #2980b9'>{traces_count}</span> | |
| <br> | |
| <span style='color: #27ae60; font-weight: bold;'>Total Spans Processed: </span> | |
| <span style='color: #2980b9'>{spans_count}</span> | |
| </div> | |
| <h3 style='color: #8e44ad; margin-top: 20px;'>Error Metrics</h3> | |
| {error_section} | |
| </div> | |
| """ | |
| if metrics.get('error_types', {}): | |
| error_html = "<div style='background-color: #fff; padding: 15px; border-radius: 5px; box-shadow: 0 2px 5px rgba(0,0,0,0.1);'>" | |
| for error_type, data in metrics.get('error_types', {}).items(): | |
| error_html += f""" | |
| <div style='margin: 10px 0;'> | |
| <span style='color: #d35400; font-weight: bold;'>{error_type.replace('_', ' ').title()}: </span> | |
| <span style='color: #2980b9'>{data['count']} occurrences</span> | |
| <div style='margin-left: 20px;'> | |
| <span style='color: #7f8c8d;'>Instances:</span> | |
| <ul style='list-style-type: none; padding-left: 10px;'> | |
| """ | |
| for instance in data['instances'][:5]: | |
| error_html += f""" | |
| <li style='color: #34495e;'> | |
| Trace ID: <span style='color: #16a085'>{instance['trace_id']}</span>, | |
| Span ID: <span style='color: #16a085'>{instance['span_id']}</span> | |
| </li> | |
| """ | |
| if len(data['instances']) > 5: | |
| error_html += "<li style='color: #7f8c8d;'>... (and more)</li>" | |
| error_html += "</ul></div></div>" | |
| error_html += "</div>" | |
| else: | |
| error_html = """ | |
| <div style='color: #27ae60; background-color: #ecf0f1; padding: 10px; border-radius: 5px;'> | |
| No empty or null outputs with exceptions detected. | |
| </div> | |
| """ | |
| formatted_output = html_output.format( | |
| empty_count=metrics.get('total_errors', 0), | |
| traces_count=len(traces), | |
| spans_count=len(spans), | |
| error_section=error_html | |
| ) | |
| print("Pipeline End: Results formatted") | |
| return formatted_output | |
| except Exception as e: | |
| print(f"Error processing dates: {e}") | |
| error_html = f""" | |
| <div style='font-family: Arial, sans-serif; padding: 20px; background-color: #f5f5f5; border-radius: 10px;'> | |
| <h2 style='color: #e74c3c;'>Error Occurred</h2> | |
| <p style='color: #c0392b;'>Error processing dates: {str(e)}</p> | |
| </div> | |
| """ | |
| return error_html | |