import datetime import json import os from opik import Opik import parameters from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed class DateTimeEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.isoformat() return super().default(obj) def get_trace_content(opik, trace_id): try: trace_content = opik.get_trace_content(trace_id) return trace_content.dict() except Exception as e: print(f"Error getting trace content {trace_id}: {e}") return None def get_span_content(opik, trace_id, span): try: content = opik.get_span_content(span.id) return {"trace_id": trace_id, "span_id": span.id, "content": content.dict()} except Exception as e: print(f"Error getting span content {span.id}: {e}") return None def get_traces_on_date(start_date_str, end_date_str, project_name, api_key,max_workers=10): try: print("Step 1: Converting date strings") date = datetime.date.fromisoformat(start_date_str) start_date_str = date.isoformat() + "T00:00:00Z" if not end_date_str: end_date = date + datetime.timedelta(days=1) end_date_str = end_date.isoformat() + "T00:00:00Z" else: end_date = datetime.date.fromisoformat(end_date_str) end_date_str = end_date.isoformat() + "T00:00:00Z" print(f"Start: {start_date_str} and end: {end_date_str}") filter_string = f'start_time >= "{start_date_str}" and end_time <= "{end_date_str}"' print("Filter string: ", filter_string) print("Step 2: Initializing Opik client") try: opik = Opik(api_key=api_key, project_name=project_name, workspace='verba-tech-ninja') print("Opik client initialized successfully") except Exception as e: print(f"Error initializing Opik client: {e}") return [], [] print("Step 3: Searching traces") try: traces = opik.search_traces(filter_string=filter_string, project_name=project_name) print("Total searches: ", len(traces)) except Exception as e: print(f"Error searching traces: {e}") return [], [] print("Step 4: Processing traces in parallel") all_traces_content = [] try: with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_trace = {executor.submit(get_trace_content, opik, trace.id): trace for trace in traces} for future in as_completed(future_to_trace): result = future.result() if result: all_traces_content.append(result) print(f"Completed processing {len(all_traces_content)} traces") except Exception as e: print(f"Error processing traces in parallel: {e}") print("Step 5: Processing spans in parallel") all_spans_content = [] try: with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_span = {} for i, trace in enumerate(traces): try: print(f"Searching spans for trace_id: {trace.id}:{i+1}/{len(traces)}") spans = opik.search_spans(project_name=parameters.project, trace_id=trace.id) print(f"Found {len(spans)} spans for trace_id: {trace.id}") for span in spans: future_to_span[executor.submit(get_span_content, opik, trace.id, span)] = span except Exception as e: print(f"Error searching spans for trace {trace.id}: {e}") for future in as_completed(future_to_span): result = future.result() if result: all_spans_content.append(result) print(f"Completed processing {len(all_spans_content)} spans") except Exception as e: print(f"Error processing spans in parallel: {e}") print("Step 6: Saving to JSON files") traces_file = 'all_traces_content.json' spans_file = 'all_spans_content.json' try: if os.path.exists(traces_file): os.remove(traces_file) print(f"Removed existing {traces_file}") if os.path.exists(spans_file): os.remove(spans_file) print(f"Removed existing {spans_file}") print(f"Writing {len(all_traces_content)} traces to {traces_file}") with open(traces_file, 'w') as f: json.dump(all_traces_content, f, indent=2, cls=DateTimeEncoder) print(f"Saved traces to {traces_file}") print(f"Writing {len(all_spans_content)} spans to {spans_file}") with open(spans_file, 'w') as f: json.dump(all_spans_content, f, indent=2, cls=DateTimeEncoder) print(f"Saved spans to {spans_file}") except Exception as e: print(f"Error saving to JSON files: {e}") with open('partial_traces_content.json', 'w') as f: json.dump(all_traces_content, f, indent=2, cls=DateTimeEncoder) with open('partial_spans_content.json', 'w') as f: json.dump(all_spans_content, f, indent=2, cls=DateTimeEncoder) print("Saved partial data to partial_traces_content.json and partial_spans_content.json") print("Step 7: Returning results") return all_traces_content, all_spans_content except Exception as e: print(f"Main function error: {e}") return [], [] def find_errors_and_metrics(traces, spans): try: print("Step 8: Analyzing outputs for errors") error_spans = [] error_metrics = defaultdict(list) for span in spans: content = span['content'] output = content.get("output") error_info = content.get("error_info", {}) if isinstance(output, dict) and 'output' in output: output_value = output.get("output") else: output_value = output if ((output_value is None or (isinstance(output, list) and len(output) == 0)) and len(error_info) > 0): error_type = error_info.get("exception_type", "unknown_error") error_spans.append({ "trace_id": span["trace_id"], "span_id": span["span_id"], "error_content": output, "exception_type": error_type }) error_metrics[error_type].append({"trace_id": span["trace_id"], "span_id": span["span_id"]}) print(f"Found {len(error_spans)} outputs with errors (empty/null)") print("Step 9: Saving error spans") error_file = 'error_spans.json' try: if os.path.exists(error_file): os.remove(error_file) print(f"Removed existing {error_file}") print(f"Writing {len(error_spans)} error outputs to {error_file}") with open(error_file, 'w') as f: json.dump(error_spans, f, indent=2, cls=DateTimeEncoder) print(f"Saved error outputs to {error_file}") except Exception as e: print(f"Error saving error spans: {e}") print("Step 10: Calculating metrics") metrics = { "total_errors": len(error_spans), "error_types": { error_type: { "count": len(entries), "instances": [ {"trace_id": entry["trace_id"], "span_id": entry["span_id"]} for entry in entries ] } for error_type, entries in error_metrics.items() } } print(f"Metrics calculated: {len(metrics['error_types'])} error types") for error_type, data in metrics["error_types"].items(): print(f"Error Type: {error_type}, Count: {data['count']}") for instance in data["instances"]: print(f" Trace ID: {instance['trace_id']}, Span ID: {instance['span_id']}") return metrics except Exception as e: print(f"Error in find_errors_and_metrics: {e}") return {} def process_dates(start_date, end_date, project): try: print("Pipeline Start: Processing dates") traces, spans = get_traces_on_date(start_date, end_date, project, parameters.api_key) metrics = find_errors_and_metrics(traces, spans) html_output = """
Error processing dates: {str(e)}