ash27kh's picture
Update app.py
05c14cd verified
import gradio as gr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from io import BytesIO
from datetime import datetime
import opik
import warnings
warnings.filterwarnings('ignore')
# --------------------------------------------------------
# CONFIG
# --------------------------------------------------------
OPIK_PROJECT_NAME = 'production-vf-ai'
OPIK_WORKSPACE_NAME = 'verba-tech-ninja'
OPIK_API_KEY = 'jYThN94NefoHKwEto3gPzhTSb'
# --------------------------------------------------------
# INIT OPik CLIENT
# --------------------------------------------------------
client = opik.Opik(
api_key=OPIK_API_KEY,
workspace=OPIK_WORKSPACE_NAME,
project_name=OPIK_PROJECT_NAME
)
# --------------------------------------------------------
# FETCH TRACES
# --------------------------------------------------------
def fetch_traces(client_name, start_iso, end_iso):
filter_string = (
'name contains "analyse_transcript" '
f'AND start_time >= "{start_iso}" '
f'AND end_time <= "{end_iso}" '
f'AND tags contains "{client_name}"'
)
traces = client.search_traces(
project_name=OPIK_PROJECT_NAME,
filter_string=filter_string,
max_results=50000
)
return list(traces)
# --------------------------------------------------------
# FILTER TRACES
# --------------------------------------------------------
def filter_traces(traces):
final = []
for trace in traces:
tags = trace.tags or []
if "_call_" in tags or "[CAMPAIGN_CONVERSATION]" in tags:
continue
output = trace.output
if not output:
continue
category = output.get("category")
use_case = output.get("campaign_payload", {}).get("use_case")
if category != "customer" and use_case is None:
final.append(trace)
return final
# --------------------------------------------------------
# PARSE SPANS
# --------------------------------------------------------
def extract_meta(trace):
spans = client.search_spans(project_name=OPIK_PROJECT_NAME, trace_id=trace.id)
out = []
for s in spans:
if s.name != "chat_completion_parse":
continue
usage = s.metadata.get("usage", {})
out.append({
"duration": s.duration / 1000,
"tier": s.metadata.get("service_tier", "default"),
"model": s.metadata.get("model"),
"tokens": usage.get("completion_tokens", 0),
"error": bool(s.error_info)
})
return out
# --------------------------------------------------------
# RUN MAIN PIPELINE
# --------------------------------------------------------
def run_pipeline(client_name, start_dt, end_dt, metadata_fields):
start_iso = start_dt + "Z"
end_iso = end_dt + "Z"
traces = fetch_traces(client_name, start_iso, end_iso)
traces = filter_traces(traces)
rows = []
for t in traces:
rows.extend(extract_meta(t))
if not rows:
return "No data", None, None, None
# Filter selected metadata fields
df = pd.DataFrame(rows)
df_filtered = df[metadata_fields]
# ---------------- Stats -----------------
durations = df.loc[~df["error"], "duration"]
tokens = df["tokens"]
stats = {
"total_spans": len(df),
"errors": int(df["error"].sum()),
"error_rate_%": round(100 * df["error"].mean(), 2),
"mean_latency_sec": round(durations.mean(), 3) if len(durations) else None,
"median_latency_sec": round(durations.median(), 3) if len(durations) else None,
"p90_latency_sec": round(durations.quantile(0.9), 3) if len(durations) else None,
"p95_latency_sec": round(durations.quantile(0.95), 3) if len(durations) else None,
"min_latency": round(durations.min(), 3) if len(durations) else None,
"max_latency": round(durations.max(), 3) if len(durations) else None,
"avg_tokens": round(tokens.mean(), 2),
"max_tokens": int(tokens.max())
}
# ---------------- Charts -----------------
fig1, ax1 = plt.subplots()
ax1.hist(df["duration"], bins=30)
ax1.set_title("Latency Distribution (seconds)")
ax1.set_xlabel("Seconds")
ax1.set_ylabel("Frequency")
fig2, ax2 = plt.subplots()
ax2.hist(df["tokens"], bins=25)
ax2.set_title("Completion Token Distribution")
ax2.set_xlabel("Tokens")
ax2.set_ylabel("Frequency")
# Convert figs to image
buf1, buf2 = BytesIO(), BytesIO()
fig1.savefig(buf1, format="png")
fig2.savefig(buf2, format="png")
buf1.seek(0)
buf2.seek(0)
plt.close(fig1)
plt.close(fig2)
# CSV
csv_data = df_filtered.to_csv(index=False)
return stats, df_filtered, buf1, buf2, csv_data
# --------------------------------------------------------
# GRADIO UI
# --------------------------------------------------------
with gr.Blocks(title="Opik Analytics Dashboard") as demo:
gr.Markdown("# πŸ“Š **Opik Analytics Dashboard** (Gradio)")
gr.Markdown("Analyze traces by client, date range, and metadata fields.")
with gr.Row():
client_name = gr.Dropdown(
["fusiongroup", "vita", "staragent", "testclient", "other"],
label="Select Client",
value="fusiongroup"
)
with gr.Row():
start_dt = gr.Textbox(label="Start DateTime UTC (YYYY-MM-DDTHH:MM:SS)", value="2025-11-17T00:00:00")
end_dt = gr.Textbox(label="End DateTime UTC (YYYY-MM-DDTHH:MM:SS)", value="2025-11-17T12:00:00")
metadata_fields = gr.CheckboxGroup(
["duration", "tier", "tokens", "model", "error"],
label="Select Metadata Fields",
value=["duration", "tier", "tokens"]
)
run_btn = gr.Button("Run Analysis")
stats_output = gr.JSON(label="πŸ“ˆ Summary Statistics")
table_output = gr.DataFrame(label="πŸ“„ Raw Data")
plot_latency = gr.Image(label="⏱ Latency Distribution")
plot_tokens = gr.Image(label="πŸ”’ Token Distribution")
csv_download = gr.File(label="⬇ Download CSV")
run_btn.click(
fn=run_pipeline,
inputs=[client_name, start_dt, end_dt, metadata_fields],
outputs=[stats_output, table_output, plot_latency, plot_tokens, csv_download]
)
demo.launch()