temparia's picture
reverse to audiowave new
0dc8914
import datetime as dt
import json
import os
import tempfile
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple
import uuid
from urllib.parse import urlparse, unquote
import gradio as gr
from gradio_flatpickr_calendar import Calendar
from bson import ObjectId
from bson.errors import InvalidId
from google.cloud import storage
from google.oauth2 import service_account
import requests
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.errors import PyMongoError
MONGODB_URI = os.environ.get("MONGODB_URI")
MONGODB_DB_NAME = os.environ.get("MONGODB_DB_NAME", "ira_rumik")
CALLS_COLLECTION_NAME = os.environ.get("MONGODB_CALLS_COLLECTION", "calls")
USERS_COLLECTION_NAME = os.environ.get("MONGODB_USERS_COLLECTION", "users")
DEFAULT_LIMIT = int(os.environ.get("CALL_DASHBOARD_LIMIT", "50"))
SIGNED_URL_EXPIRY_MINUTES = int(os.environ.get("SIGNED_URL_EXPIRY_MINUTES", "60"))
FALLBACK_BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME")
if not MONGODB_URI:
raise RuntimeError("MONGODB_URI environment variable is required")
def _build_storage_client() -> storage.Client:
"""Create a storage client using either raw JSON or a credential file."""
json_info = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
credentials: Optional[service_account.Credentials] = None
if json_info:
credentials = service_account.Credentials.from_service_account_info(json.loads(json_info))
else:
file_path = os.environ.get("GCP_SERVICE_ACCOUNT_FILE")
if file_path and os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as handle:
credentials = service_account.Credentials.from_service_account_info(json.load(handle))
return storage.Client(credentials=credentials)
def _build_mongo_collections() -> Tuple[Collection, Collection]:
"""Connect to MongoDB and return handles to the calls and users collections."""
client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000)
database = client[MONGODB_DB_NAME]
return database[CALLS_COLLECTION_NAME], database[USERS_COLLECTION_NAME]
storage_client = _build_storage_client()
calls_collection, users_collection = _build_mongo_collections()
def _format_datetime(value: Optional[dt.datetime]) -> str:
if not value:
return "-"
if value.tzinfo is None:
value = value.replace(tzinfo=dt.timezone.utc)
return value.astimezone(dt.timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
def _format_duration(duration_seconds: Optional[int]) -> str:
if not duration_seconds and duration_seconds != 0:
return "-"
minutes, seconds = divmod(int(duration_seconds), 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours:d}h {minutes:02d}m {seconds:02d}s"
if minutes:
return f"{minutes:d}m {seconds:02d}s"
return f"{seconds:d}s"
def _parse_date_input(value: Optional[str]) -> Optional[dt.date]:
if not value:
return None
if isinstance(value, dt.date):
return value
if isinstance(value, dt.datetime):
return value.date()
if isinstance(value, (int, float)):
try:
return dt.datetime.fromtimestamp(value).date()
except (OverflowError, OSError, ValueError):
return None
if isinstance(value, dict):
candidate = value.get("value") or value.get("date") or value.get("datetime")
return _parse_date_input(candidate)
if isinstance(value, str):
trimmed = value.strip()
if not trimmed:
return None
if "T" in trimmed:
trimmed = trimmed.split("T", 1)[0]
try:
return dt.date.fromisoformat(trimmed)
except ValueError:
return None
return None
def _has_date_input(value: Any) -> bool:
if value is None:
return False
if isinstance(value, dict):
candidate = value.get("value") or value.get("date") or value.get("datetime")
return bool(candidate)
if isinstance(value, str):
return bool(value.strip())
return True
DATE_PRESETS = (
"Custom",
"Today",
"Last 7 days",
"Last 30 days",
"This month",
"Last month",
"Clear",
)
DEFAULT_DATE_PRESET = "Last 7 days"
def _date_range_for_preset(preset: str) -> Tuple[Optional[str], Optional[str]]:
today = dt.date.today()
if preset == "Today":
start = end = today
elif preset == "Last 7 days":
start = today - dt.timedelta(days=6)
end = today
elif preset == "Last 30 days":
start = today - dt.timedelta(days=29)
end = today
elif preset == "This month":
start = today.replace(day=1)
end = today
elif preset == "Last month":
first_this_month = today.replace(day=1)
last_month_end = first_this_month - dt.timedelta(days=1)
start = last_month_end.replace(day=1)
end = last_month_end
elif preset == "Clear":
return None, None
else:
return None, None
return start.isoformat(), end.isoformat()
DEFAULT_START_DATE_VALUE, DEFAULT_END_DATE_VALUE = _date_range_for_preset(DEFAULT_DATE_PRESET)
def _date_updates_for_preset(preset: str) -> Tuple[Any, Any]:
if preset == "Custom":
return gr.update(), gr.update()
start_value, end_value = _date_range_for_preset(preset)
if start_value is None and end_value is None:
return gr.update(value=None), gr.update(value=None)
return gr.update(value=start_value), gr.update(value=end_value)
def _parse_gcs_target(url: str) -> Optional[Tuple[str, str]]:
"""Extract bucket and blob name from a GCS or HTTPS storage URL."""
if not url:
return None
if url.startswith("gs://"):
bucket_and_blob = url[5:]
parts = bucket_and_blob.split("/", 1)
if len(parts) == 2:
return parts[0], parts[1]
return None
parsed = urlparse(url)
if parsed.netloc not in {"storage.googleapis.com", "storage.cloud.google.com"}:
if FALLBACK_BUCKET_NAME:
# Treat URL path as object name under fallback bucket.
blob_name = parsed.path.lstrip("/")
return FALLBACK_BUCKET_NAME, blob_name
return None
path = parsed.path.lstrip("/")
parts = path.split("/", 1)
if len(parts) != 2:
return None
bucket_name, blob_name = parts
# storage.cloud.google.com paths are URL-encoded.
return bucket_name, unquote(blob_name)
def _signed_recording_url(recording_url: str) -> Optional[str]:
target = _parse_gcs_target(recording_url)
if not target:
return None
bucket_name, blob_name = target
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
return blob.generate_signed_url(
expiration=dt.timedelta(minutes=SIGNED_URL_EXPIRY_MINUTES),
version="v4",
)
def _download_audio(call_id: str, download_url: str, blob_name: str) -> str:
"""Download the audio file via signed URL to a temp path and return that path."""
suffix = Path(blob_name).suffix or ".wav"
temp_dir = Path(tempfile.gettempdir()) / "call-dashboard"
temp_dir.mkdir(parents=True, exist_ok=True)
temp_path = temp_dir / f"{call_id}_{uuid.uuid4().hex}{suffix}"
with requests.get(download_url, stream=True, timeout=60) as response:
response.raise_for_status()
with open(temp_path, "wb") as handle:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
handle.write(chunk)
return str(temp_path)
_AUDIO_CACHE: Dict[str, str] = {}
class _SelectionTracker:
"""Per-session selection guard to avoid cross-session interference."""
def __init__(self) -> None:
self.token: str = ""
def _build_user_map(user_ids: List[str]) -> Dict[str, Dict[str, Any]]:
object_ids = []
for user_id in user_ids:
if not user_id:
continue
try:
object_ids.append(ObjectId(user_id))
except (InvalidId, TypeError):
continue
if not object_ids:
return {}
cursor = users_collection.find(
{"_id": {"$in": object_ids}},
{"name": 1, "email": 1},
)
return {str(doc["_id"]): {"name": doc.get("name", ""), "email": doc.get("email", "")} for doc in cursor}
def fetch_calls(
user_search: Optional[str],
start_date: Optional[str],
end_date: Optional[str],
limit: Optional[int],
) -> Tuple[List[List[Any]], List[Dict[str, Any]], str]:
limit = int(limit or DEFAULT_LIMIT)
limit = max(1, min(limit, 500))
query: Dict[str, Any] = {}
user_hint_map: Dict[str, Dict[str, Any]] = {}
if user_search:
regex = {"$regex": user_search, "$options": "i"}
matched_users = list(
users_collection.find(
{"$or": [{"name": regex}, {"email": regex}]},
{"name": 1, "email": 1},
).limit(200)
)
matched_user_ids = [str(doc["_id"]) for doc in matched_users]
if not matched_user_ids:
return [], [], "No calls found for that user search."
query["userId"] = {"$in": matched_user_ids}
user_hint_map = {str(doc["_id"]): {"name": doc.get("name", ""), "email": doc.get("email", "")} for doc in matched_users}
date_filters: Dict[str, Any] = {}
start_date_value = _parse_date_input(start_date)
end_date_value = _parse_date_input(end_date)
if _has_date_input(start_date) and start_date_value is None:
return [], [], "Start date must be a valid date."
if _has_date_input(end_date) and end_date_value is None:
return [], [], "End date must be a valid date."
if start_date_value:
start = dt.datetime.combine(start_date_value, dt.time.min)
date_filters["$gte"] = start
if end_date_value:
end = dt.datetime.combine(end_date_value, dt.time.max)
date_filters["$lte"] = end
if date_filters:
query["startTime"] = date_filters
try:
cursor = (
calls_collection.find(query)
.sort("startTime", -1)
.limit(limit)
)
documents = list(cursor)
except PyMongoError as exc:
return [], [], f"Database error: {exc}"
user_ids = {str(doc.get("userId")) for doc in documents if doc.get("userId")}
missing_user_ids = [uid for uid in user_ids if uid not in user_hint_map]
user_hint_map.update(_build_user_map(list(missing_user_ids)))
rows: List[List[Any]] = []
metadata: List[Dict[str, Any]] = []
for doc in documents:
user_id_value = doc.get("userId")
user_id = str(user_id_value) if user_id_value else ""
user_details = user_hint_map.get(user_id, {})
user_email = (user_details.get("email") or "").lower()
if user_email.endswith("@rumik.ai") or user_email == "vatsal.bharti@gmail.com":
continue
start_time = doc.get("startTime")
created_at = doc.get("createdAt")
topics = doc.get("topicsDiscussed") or []
topics = [str(topic) for topic in topics if topic]
signed_url = None
audio_status = "Unavailable"
bucket_name = None
blob_name = None
try:
target = _parse_gcs_target(doc.get("recordingUrl", ""))
if target:
bucket_name, blob_name = target
signed_url = _signed_recording_url(doc.get("recordingUrl", ""))
if signed_url:
audio_status = "Ready for download"
else:
audio_status = "Available"
except Exception:
signed_url = None
audio_status = "Error generating link"
metadata.append(
{
"call_id": str(doc.get("_id")),
"user_id": user_id,
"user_name": user_details.get("name") or "Unknown",
"user_email": user_details.get("email") or "",
"start_time_display": _format_datetime(start_time),
"start_time_iso": start_time.isoformat() if isinstance(start_time, dt.datetime) else "",
"duration_seconds": doc.get("durationSeconds"),
"duration_display": _format_duration(doc.get("durationSeconds")),
"summary": doc.get("summary") or "",
"transcription": doc.get("transcription") or "",
"topics": topics,
"recording_url": doc.get("recordingUrl"),
"signed_recording_url": signed_url,
"gcs_bucket": bucket_name,
"gcs_blob": blob_name,
"audio_status": audio_status,
"status": doc.get("status"),
"created_at_display": _format_datetime(created_at),
"created_at_iso": created_at.isoformat() if isinstance(created_at, dt.datetime) else "",
}
)
rows.append(
[
metadata[-1]["user_name"],
metadata[-1]["user_email"],
metadata[-1]["start_time_display"],
metadata[-1]["duration_display"],
metadata[-1]["summary"][:120] + ("…" if len(metadata[-1]["summary"]) > 120 else ""),
]
)
if not rows:
return [], [], "No calls found for the selected filters."
return rows, metadata, f"Loaded {len(rows)} call(s)."
def _build_summary_lines(call: Dict[str, Any], topics: str, audio_status: str, signed_url: Optional[str]) -> List[str]:
lines = [
f"**User:** {call['user_name']} ({call['user_email'] or 'no email'}) \n",
f"**Start:** {call['start_time_display']} \n",
f"**Duration:** {call['duration_display']} \n",
f"**Topics:** {topics} \n",
f"**Audio status:** {audio_status}",
]
if signed_url:
lines.append(f"[🔗 Open recording in new tab]({signed_url}) \n")
return lines
def on_select(
calls: List[Dict[str, Any]],
selection_tracker: Optional[_SelectionTracker],
evt: gr.SelectData,
) -> Iterator[Tuple[Any, str, str, Any, _SelectionTracker]]:
if selection_tracker is None:
selection_tracker = _SelectionTracker()
if not calls or evt is None or evt.index is None:
yield (
gr.update(value=None, autoplay=False),
"Select a call to view details.",
"",
gr.update(value="🎧 Select a row to load audio. When the banner shows “Audio ready,” press Play."),
selection_tracker,
)
return
index = evt.index
if isinstance(index, (list, tuple)):
row_index = index[0]
else:
row_index = index
try:
row_index = int(row_index)
except (TypeError, ValueError):
yield (
gr.update(value=None, autoplay=False),
"Invalid selection.",
"",
gr.update(value="⚠️ Invalid selection."),
selection_tracker,
)
return
if row_index < 0 or row_index >= len(calls):
yield (
gr.update(value=None, autoplay=False),
"Invalid selection.",
"",
gr.update(value="⚠️ Invalid selection."),
selection_tracker,
)
return
call = dict(calls[row_index])
selection_token = uuid.uuid4().hex
selection_tracker.token = selection_token
topics = ", ".join(call["topics"]) if call["topics"] else "—"
call_id = call.get("call_id", f"call_{row_index}")
bucket_name = call.get("gcs_bucket")
blob_name = call.get("gcs_blob")
audio_path = None
audio_status = call.get("audio_status", "Unavailable")
signed_url = call.get("signed_recording_url")
transcription_text = call["transcription"] or "No transcription available."
loading_summary = "".join(
_build_summary_lines(
call,
topics,
"Loading audio…",
signed_url,
)
)
yield (
gr.update(value=None, autoplay=False),
loading_summary,
transcription_text,
gr.update(value="⏳ Preparing audio… please wait for the “Audio ready” banner before pressing Play."),
selection_tracker,
)
if bucket_name and blob_name and signed_url:
cached_path = _AUDIO_CACHE.get(call_id)
if cached_path and Path(cached_path).exists():
audio_path = cached_path
audio_status = "Ready (loaded from cache) — press Play to listen."
else:
try:
audio_path = _download_audio(call_id, signed_url, blob_name)
previous_path = _AUDIO_CACHE.get(call_id)
if previous_path and Path(previous_path).exists():
try:
Path(previous_path).unlink()
except OSError:
pass
_AUDIO_CACHE[call_id] = audio_path
audio_status = "Ready (downloaded) — press Play to listen."
except Exception as exc:
audio_status = f"Error loading audio ({exc})"
else:
audio_status = "Recording unavailable"
summary_md = "".join(_build_summary_lines(call, topics, audio_status, signed_url))
audio_value = gr.update(value=audio_path if audio_path else None, autoplay=False)
status_html = (
"▶️ Audio ready — press Play to listen."
if audio_path
else "⚠️ Audio unavailable"
)
if selection_tracker.token != selection_token:
return
yield audio_value, summary_md, transcription_text, gr.update(value=status_html), selection_tracker
with gr.Blocks(title="Call Recording Dashboard") as demo:
gr.Markdown(
"""
# Call Recording Dashboard
Use the filters below to browse call recordings. Select a row to review the details and play the audio.
"""
)
with gr.Row():
user_search_box = gr.Textbox(label="User search (name or email)", placeholder="e.g. Sahil")
limit_slider = gr.Slider(10, 200, value=min(DEFAULT_LIMIT, 100), step=10, label="Max calls to load")
date_preset_selector = gr.Radio(
choices=list(DATE_PRESETS),
value=DEFAULT_DATE_PRESET,
label="Quick date range",
info="Choose a preset to auto-fill the calendars or switch to Custom to pick exact dates.",
)
with gr.Row():
start_date_picker = Calendar(
label="Start date",
type="string",
value=DEFAULT_START_DATE_VALUE,
info="Click to open the calendar. Leave empty for no lower bound.",
)
end_date_picker = Calendar(
label="End date",
type="string",
value=DEFAULT_END_DATE_VALUE,
info="Click to open the calendar. Leave empty for no upper bound.",
)
load_button = gr.Button("Load calls", variant="primary")
date_preset_selector.change(
_date_updates_for_preset,
inputs=date_preset_selector,
outputs=[start_date_picker, end_date_picker],
)
status_markdown = gr.Markdown("")
calls_table = gr.Dataframe(
headers=["User", "Email", "Start (UTC)", "Duration", "Summary (preview)"],
datatype=["str", "str", "str", "str", "str"],
interactive=False,
)
call_store = gr.State([])
selection_tracker = gr.State(_SelectionTracker())
with gr.Row():
with gr.Column(scale=1):
audio_player = gr.Audio(
label="Recording",
autoplay=False,
interactive=False,
show_download_button=True,
sources=[],
type="filepath",
)
playback_status = gr.HTML("🎧 Select a row to load audio.")
with gr.Column(scale=1):
call_details = gr.Markdown("Select a call to see the details.")
transcription_box = gr.Textbox(
label="Transcription",
lines=12,
interactive=False,
show_label=True,
)
load_button.click(
fetch_calls,
inputs=[user_search_box, start_date_picker, end_date_picker, limit_slider],
outputs=[calls_table, call_store, status_markdown],
)
calls_table.select(
on_select,
inputs=[call_store, selection_tracker],
outputs=[audio_player, call_details, transcription_box, playback_status, selection_tracker],
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")))