import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt from io import BytesIO from datetime import datetime import opik import warnings warnings.filterwarnings('ignore') # -------------------------------------------------------- # CONFIG # -------------------------------------------------------- OPIK_PROJECT_NAME = 'production-vf-ai' OPIK_WORKSPACE_NAME = 'verba-tech-ninja' OPIK_API_KEY = 'jYThN94NefoHKwEto3gPzhTSb' # -------------------------------------------------------- # INIT OPik CLIENT # -------------------------------------------------------- client = opik.Opik( api_key=OPIK_API_KEY, workspace=OPIK_WORKSPACE_NAME, project_name=OPIK_PROJECT_NAME ) # -------------------------------------------------------- # FETCH TRACES # -------------------------------------------------------- def fetch_traces(client_name, start_iso, end_iso): filter_string = ( 'name contains "analyse_transcript" ' f'AND start_time >= "{start_iso}" ' f'AND end_time <= "{end_iso}" ' f'AND tags contains "{client_name}"' ) traces = client.search_traces( project_name=OPIK_PROJECT_NAME, filter_string=filter_string, max_results=50000 ) return list(traces) # -------------------------------------------------------- # FILTER TRACES # -------------------------------------------------------- def filter_traces(traces): final = [] for trace in traces: tags = trace.tags or [] if "_call_" in tags or "[CAMPAIGN_CONVERSATION]" in tags: continue output = trace.output if not output: continue category = output.get("category") use_case = output.get("campaign_payload", {}).get("use_case") if category != "customer" and use_case is None: final.append(trace) return final # -------------------------------------------------------- # PARSE SPANS # -------------------------------------------------------- def extract_meta(trace): spans = client.search_spans(project_name=OPIK_PROJECT_NAME, trace_id=trace.id) out = [] for s in spans: if s.name != "chat_completion_parse": continue usage = s.metadata.get("usage", {}) out.append({ "duration": s.duration / 1000, "tier": s.metadata.get("service_tier", "default"), "model": s.metadata.get("model"), "tokens": usage.get("completion_tokens", 0), "error": bool(s.error_info) }) return out # -------------------------------------------------------- # RUN MAIN PIPELINE # -------------------------------------------------------- def run_pipeline(client_name, start_dt, end_dt, metadata_fields): start_iso = start_dt + "Z" end_iso = end_dt + "Z" traces = fetch_traces(client_name, start_iso, end_iso) traces = filter_traces(traces) rows = [] for t in traces: rows.extend(extract_meta(t)) if not rows: return "No data", None, None, None # Filter selected metadata fields df = pd.DataFrame(rows) df_filtered = df[metadata_fields] # ---------------- Stats ----------------- durations = df.loc[~df["error"], "duration"] tokens = df["tokens"] stats = { "total_spans": len(df), "errors": int(df["error"].sum()), "error_rate_%": round(100 * df["error"].mean(), 2), "mean_latency_sec": round(durations.mean(), 3) if len(durations) else None, "median_latency_sec": round(durations.median(), 3) if len(durations) else None, "p90_latency_sec": round(durations.quantile(0.9), 3) if len(durations) else None, "p95_latency_sec": round(durations.quantile(0.95), 3) if len(durations) else None, "min_latency": round(durations.min(), 3) if len(durations) else None, "max_latency": round(durations.max(), 3) if len(durations) else None, "avg_tokens": round(tokens.mean(), 2), "max_tokens": int(tokens.max()) } # ---------------- Charts ----------------- fig1, ax1 = plt.subplots() ax1.hist(df["duration"], bins=30) ax1.set_title("Latency Distribution (seconds)") ax1.set_xlabel("Seconds") ax1.set_ylabel("Frequency") fig2, ax2 = plt.subplots() ax2.hist(df["tokens"], bins=25) ax2.set_title("Completion Token Distribution") ax2.set_xlabel("Tokens") ax2.set_ylabel("Frequency") # Convert figs to image buf1, buf2 = BytesIO(), BytesIO() fig1.savefig(buf1, format="png") fig2.savefig(buf2, format="png") buf1.seek(0) buf2.seek(0) plt.close(fig1) plt.close(fig2) # CSV csv_data = df_filtered.to_csv(index=False) return stats, df_filtered, buf1, buf2, csv_data # -------------------------------------------------------- # GRADIO UI # -------------------------------------------------------- with gr.Blocks(title="Opik Analytics Dashboard") as demo: gr.Markdown("# 📊 **Opik Analytics Dashboard** (Gradio)") gr.Markdown("Analyze traces by client, date range, and metadata fields.") with gr.Row(): client_name = gr.Dropdown( ["fusiongroup", "vita", "staragent", "testclient", "other"], label="Select Client", value="fusiongroup" ) with gr.Row(): start_dt = gr.Textbox(label="Start DateTime UTC (YYYY-MM-DDTHH:MM:SS)", value="2025-11-17T00:00:00") end_dt = gr.Textbox(label="End DateTime UTC (YYYY-MM-DDTHH:MM:SS)", value="2025-11-17T12:00:00") metadata_fields = gr.CheckboxGroup( ["duration", "tier", "tokens", "model", "error"], label="Select Metadata Fields", value=["duration", "tier", "tokens"] ) run_btn = gr.Button("Run Analysis") stats_output = gr.JSON(label="📈 Summary Statistics") table_output = gr.DataFrame(label="📄 Raw Data") plot_latency = gr.Image(label="⏱ Latency Distribution") plot_tokens = gr.Image(label="🔢 Token Distribution") csv_download = gr.File(label="⬇ Download CSV") run_btn.click( fn=run_pipeline, inputs=[client_name, start_dt, end_dt, metadata_fields], outputs=[stats_output, table_output, plot_latency, plot_tokens, csv_download] ) demo.launch()