Spaces:
Runtime error
Runtime error
| import datetime as dt | |
| import json | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Dict, Iterator, List, Optional, Tuple | |
| import uuid | |
| from urllib.parse import urlparse, unquote | |
| import gradio as gr | |
| from gradio_flatpickr_calendar import Calendar | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| from google.cloud import storage | |
| from google.oauth2 import service_account | |
| import requests | |
| from pymongo import MongoClient | |
| from pymongo.collection import Collection | |
| from pymongo.errors import PyMongoError | |
| MONGODB_URI = os.environ.get("MONGODB_URI") | |
| MONGODB_DB_NAME = os.environ.get("MONGODB_DB_NAME", "ira_rumik") | |
| CALLS_COLLECTION_NAME = os.environ.get("MONGODB_CALLS_COLLECTION", "calls") | |
| USERS_COLLECTION_NAME = os.environ.get("MONGODB_USERS_COLLECTION", "users") | |
| DEFAULT_LIMIT = int(os.environ.get("CALL_DASHBOARD_LIMIT", "50")) | |
| SIGNED_URL_EXPIRY_MINUTES = int(os.environ.get("SIGNED_URL_EXPIRY_MINUTES", "60")) | |
| FALLBACK_BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME") | |
| if not MONGODB_URI: | |
| raise RuntimeError("MONGODB_URI environment variable is required") | |
| def _build_storage_client() -> storage.Client: | |
| """Create a storage client using either raw JSON or a credential file.""" | |
| json_info = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") | |
| credentials: Optional[service_account.Credentials] = None | |
| if json_info: | |
| credentials = service_account.Credentials.from_service_account_info(json.loads(json_info)) | |
| else: | |
| file_path = os.environ.get("GCP_SERVICE_ACCOUNT_FILE") | |
| if file_path and os.path.exists(file_path): | |
| with open(file_path, "r", encoding="utf-8") as handle: | |
| credentials = service_account.Credentials.from_service_account_info(json.load(handle)) | |
| return storage.Client(credentials=credentials) | |
| def _build_mongo_collections() -> Tuple[Collection, Collection]: | |
| """Connect to MongoDB and return handles to the calls and users collections.""" | |
| client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000) | |
| database = client[MONGODB_DB_NAME] | |
| return database[CALLS_COLLECTION_NAME], database[USERS_COLLECTION_NAME] | |
| storage_client = _build_storage_client() | |
| calls_collection, users_collection = _build_mongo_collections() | |
| def _format_datetime(value: Optional[dt.datetime]) -> str: | |
| if not value: | |
| return "-" | |
| if value.tzinfo is None: | |
| value = value.replace(tzinfo=dt.timezone.utc) | |
| return value.astimezone(dt.timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| def _format_duration(duration_seconds: Optional[int]) -> str: | |
| if not duration_seconds and duration_seconds != 0: | |
| return "-" | |
| minutes, seconds = divmod(int(duration_seconds), 60) | |
| hours, minutes = divmod(minutes, 60) | |
| if hours: | |
| return f"{hours:d}h {minutes:02d}m {seconds:02d}s" | |
| if minutes: | |
| return f"{minutes:d}m {seconds:02d}s" | |
| return f"{seconds:d}s" | |
| def _parse_date_input(value: Optional[str]) -> Optional[dt.date]: | |
| if not value: | |
| return None | |
| if isinstance(value, dt.date): | |
| return value | |
| if isinstance(value, dt.datetime): | |
| return value.date() | |
| if isinstance(value, (int, float)): | |
| try: | |
| return dt.datetime.fromtimestamp(value).date() | |
| except (OverflowError, OSError, ValueError): | |
| return None | |
| if isinstance(value, dict): | |
| candidate = value.get("value") or value.get("date") or value.get("datetime") | |
| return _parse_date_input(candidate) | |
| if isinstance(value, str): | |
| trimmed = value.strip() | |
| if not trimmed: | |
| return None | |
| if "T" in trimmed: | |
| trimmed = trimmed.split("T", 1)[0] | |
| try: | |
| return dt.date.fromisoformat(trimmed) | |
| except ValueError: | |
| return None | |
| return None | |
| def _has_date_input(value: Any) -> bool: | |
| if value is None: | |
| return False | |
| if isinstance(value, dict): | |
| candidate = value.get("value") or value.get("date") or value.get("datetime") | |
| return bool(candidate) | |
| if isinstance(value, str): | |
| return bool(value.strip()) | |
| return True | |
| DATE_PRESETS = ( | |
| "Custom", | |
| "Today", | |
| "Last 7 days", | |
| "Last 30 days", | |
| "This month", | |
| "Last month", | |
| "Clear", | |
| ) | |
| DEFAULT_DATE_PRESET = "Last 7 days" | |
| def _date_range_for_preset(preset: str) -> Tuple[Optional[str], Optional[str]]: | |
| today = dt.date.today() | |
| if preset == "Today": | |
| start = end = today | |
| elif preset == "Last 7 days": | |
| start = today - dt.timedelta(days=6) | |
| end = today | |
| elif preset == "Last 30 days": | |
| start = today - dt.timedelta(days=29) | |
| end = today | |
| elif preset == "This month": | |
| start = today.replace(day=1) | |
| end = today | |
| elif preset == "Last month": | |
| first_this_month = today.replace(day=1) | |
| last_month_end = first_this_month - dt.timedelta(days=1) | |
| start = last_month_end.replace(day=1) | |
| end = last_month_end | |
| elif preset == "Clear": | |
| return None, None | |
| else: | |
| return None, None | |
| return start.isoformat(), end.isoformat() | |
| DEFAULT_START_DATE_VALUE, DEFAULT_END_DATE_VALUE = _date_range_for_preset(DEFAULT_DATE_PRESET) | |
| def _date_updates_for_preset(preset: str) -> Tuple[Any, Any]: | |
| if preset == "Custom": | |
| return gr.update(), gr.update() | |
| start_value, end_value = _date_range_for_preset(preset) | |
| if start_value is None and end_value is None: | |
| return gr.update(value=None), gr.update(value=None) | |
| return gr.update(value=start_value), gr.update(value=end_value) | |
| def _parse_gcs_target(url: str) -> Optional[Tuple[str, str]]: | |
| """Extract bucket and blob name from a GCS or HTTPS storage URL.""" | |
| if not url: | |
| return None | |
| if url.startswith("gs://"): | |
| bucket_and_blob = url[5:] | |
| parts = bucket_and_blob.split("/", 1) | |
| if len(parts) == 2: | |
| return parts[0], parts[1] | |
| return None | |
| parsed = urlparse(url) | |
| if parsed.netloc not in {"storage.googleapis.com", "storage.cloud.google.com"}: | |
| if FALLBACK_BUCKET_NAME: | |
| # Treat URL path as object name under fallback bucket. | |
| blob_name = parsed.path.lstrip("/") | |
| return FALLBACK_BUCKET_NAME, blob_name | |
| return None | |
| path = parsed.path.lstrip("/") | |
| parts = path.split("/", 1) | |
| if len(parts) != 2: | |
| return None | |
| bucket_name, blob_name = parts | |
| # storage.cloud.google.com paths are URL-encoded. | |
| return bucket_name, unquote(blob_name) | |
| def _signed_recording_url(recording_url: str) -> Optional[str]: | |
| target = _parse_gcs_target(recording_url) | |
| if not target: | |
| return None | |
| bucket_name, blob_name = target | |
| bucket = storage_client.bucket(bucket_name) | |
| blob = bucket.blob(blob_name) | |
| return blob.generate_signed_url( | |
| expiration=dt.timedelta(minutes=SIGNED_URL_EXPIRY_MINUTES), | |
| version="v4", | |
| ) | |
| def _download_audio(call_id: str, download_url: str, blob_name: str) -> str: | |
| """Download the audio file via signed URL to a temp path and return that path.""" | |
| suffix = Path(blob_name).suffix or ".wav" | |
| temp_dir = Path(tempfile.gettempdir()) / "call-dashboard" | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| temp_path = temp_dir / f"{call_id}_{uuid.uuid4().hex}{suffix}" | |
| with requests.get(download_url, stream=True, timeout=60) as response: | |
| response.raise_for_status() | |
| with open(temp_path, "wb") as handle: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| handle.write(chunk) | |
| return str(temp_path) | |
| _AUDIO_CACHE: Dict[str, str] = {} | |
| class _SelectionTracker: | |
| """Per-session selection guard to avoid cross-session interference.""" | |
| def __init__(self) -> None: | |
| self.token: str = "" | |
| def _build_user_map(user_ids: List[str]) -> Dict[str, Dict[str, Any]]: | |
| object_ids = [] | |
| for user_id in user_ids: | |
| if not user_id: | |
| continue | |
| try: | |
| object_ids.append(ObjectId(user_id)) | |
| except (InvalidId, TypeError): | |
| continue | |
| if not object_ids: | |
| return {} | |
| cursor = users_collection.find( | |
| {"_id": {"$in": object_ids}}, | |
| {"name": 1, "email": 1}, | |
| ) | |
| return {str(doc["_id"]): {"name": doc.get("name", ""), "email": doc.get("email", "")} for doc in cursor} | |
| def fetch_calls( | |
| user_search: Optional[str], | |
| start_date: Optional[str], | |
| end_date: Optional[str], | |
| limit: Optional[int], | |
| ) -> Tuple[List[List[Any]], List[Dict[str, Any]], str]: | |
| limit = int(limit or DEFAULT_LIMIT) | |
| limit = max(1, min(limit, 500)) | |
| query: Dict[str, Any] = {} | |
| user_hint_map: Dict[str, Dict[str, Any]] = {} | |
| if user_search: | |
| regex = {"$regex": user_search, "$options": "i"} | |
| matched_users = list( | |
| users_collection.find( | |
| {"$or": [{"name": regex}, {"email": regex}]}, | |
| {"name": 1, "email": 1}, | |
| ).limit(200) | |
| ) | |
| matched_user_ids = [str(doc["_id"]) for doc in matched_users] | |
| if not matched_user_ids: | |
| return [], [], "No calls found for that user search." | |
| query["userId"] = {"$in": matched_user_ids} | |
| user_hint_map = {str(doc["_id"]): {"name": doc.get("name", ""), "email": doc.get("email", "")} for doc in matched_users} | |
| date_filters: Dict[str, Any] = {} | |
| start_date_value = _parse_date_input(start_date) | |
| end_date_value = _parse_date_input(end_date) | |
| if _has_date_input(start_date) and start_date_value is None: | |
| return [], [], "Start date must be a valid date." | |
| if _has_date_input(end_date) and end_date_value is None: | |
| return [], [], "End date must be a valid date." | |
| if start_date_value: | |
| start = dt.datetime.combine(start_date_value, dt.time.min) | |
| date_filters["$gte"] = start | |
| if end_date_value: | |
| end = dt.datetime.combine(end_date_value, dt.time.max) | |
| date_filters["$lte"] = end | |
| if date_filters: | |
| query["startTime"] = date_filters | |
| try: | |
| cursor = ( | |
| calls_collection.find(query) | |
| .sort("startTime", -1) | |
| .limit(limit) | |
| ) | |
| documents = list(cursor) | |
| except PyMongoError as exc: | |
| return [], [], f"Database error: {exc}" | |
| user_ids = {str(doc.get("userId")) for doc in documents if doc.get("userId")} | |
| missing_user_ids = [uid for uid in user_ids if uid not in user_hint_map] | |
| user_hint_map.update(_build_user_map(list(missing_user_ids))) | |
| rows: List[List[Any]] = [] | |
| metadata: List[Dict[str, Any]] = [] | |
| for doc in documents: | |
| user_id_value = doc.get("userId") | |
| user_id = str(user_id_value) if user_id_value else "" | |
| user_details = user_hint_map.get(user_id, {}) | |
| user_email = (user_details.get("email") or "").lower() | |
| if user_email.endswith("@rumik.ai") or user_email == "vatsal.bharti@gmail.com": | |
| continue | |
| start_time = doc.get("startTime") | |
| created_at = doc.get("createdAt") | |
| topics = doc.get("topicsDiscussed") or [] | |
| topics = [str(topic) for topic in topics if topic] | |
| signed_url = None | |
| audio_status = "Unavailable" | |
| bucket_name = None | |
| blob_name = None | |
| try: | |
| target = _parse_gcs_target(doc.get("recordingUrl", "")) | |
| if target: | |
| bucket_name, blob_name = target | |
| signed_url = _signed_recording_url(doc.get("recordingUrl", "")) | |
| if signed_url: | |
| audio_status = "Ready for download" | |
| else: | |
| audio_status = "Available" | |
| except Exception: | |
| signed_url = None | |
| audio_status = "Error generating link" | |
| metadata.append( | |
| { | |
| "call_id": str(doc.get("_id")), | |
| "user_id": user_id, | |
| "user_name": user_details.get("name") or "Unknown", | |
| "user_email": user_details.get("email") or "", | |
| "start_time_display": _format_datetime(start_time), | |
| "start_time_iso": start_time.isoformat() if isinstance(start_time, dt.datetime) else "", | |
| "duration_seconds": doc.get("durationSeconds"), | |
| "duration_display": _format_duration(doc.get("durationSeconds")), | |
| "summary": doc.get("summary") or "", | |
| "transcription": doc.get("transcription") or "", | |
| "topics": topics, | |
| "recording_url": doc.get("recordingUrl"), | |
| "signed_recording_url": signed_url, | |
| "gcs_bucket": bucket_name, | |
| "gcs_blob": blob_name, | |
| "audio_status": audio_status, | |
| "status": doc.get("status"), | |
| "created_at_display": _format_datetime(created_at), | |
| "created_at_iso": created_at.isoformat() if isinstance(created_at, dt.datetime) else "", | |
| } | |
| ) | |
| rows.append( | |
| [ | |
| metadata[-1]["user_name"], | |
| metadata[-1]["user_email"], | |
| metadata[-1]["start_time_display"], | |
| metadata[-1]["duration_display"], | |
| metadata[-1]["summary"][:120] + ("…" if len(metadata[-1]["summary"]) > 120 else ""), | |
| ] | |
| ) | |
| if not rows: | |
| return [], [], "No calls found for the selected filters." | |
| return rows, metadata, f"Loaded {len(rows)} call(s)." | |
| def _build_summary_lines(call: Dict[str, Any], topics: str, audio_status: str, signed_url: Optional[str]) -> List[str]: | |
| lines = [ | |
| f"**User:** {call['user_name']} ({call['user_email'] or 'no email'}) \n", | |
| f"**Start:** {call['start_time_display']} \n", | |
| f"**Duration:** {call['duration_display']} \n", | |
| f"**Topics:** {topics} \n", | |
| f"**Audio status:** {audio_status}", | |
| ] | |
| if signed_url: | |
| lines.append(f"[🔗 Open recording in new tab]({signed_url}) \n") | |
| return lines | |
| def on_select( | |
| calls: List[Dict[str, Any]], | |
| selection_tracker: Optional[_SelectionTracker], | |
| evt: gr.SelectData, | |
| ) -> Iterator[Tuple[Any, str, str, Any, _SelectionTracker]]: | |
| if selection_tracker is None: | |
| selection_tracker = _SelectionTracker() | |
| if not calls or evt is None or evt.index is None: | |
| yield ( | |
| gr.update(value=None, autoplay=False), | |
| "Select a call to view details.", | |
| "", | |
| gr.update(value="🎧 Select a row to load audio. When the banner shows “Audio ready,” press Play."), | |
| selection_tracker, | |
| ) | |
| return | |
| index = evt.index | |
| if isinstance(index, (list, tuple)): | |
| row_index = index[0] | |
| else: | |
| row_index = index | |
| try: | |
| row_index = int(row_index) | |
| except (TypeError, ValueError): | |
| yield ( | |
| gr.update(value=None, autoplay=False), | |
| "Invalid selection.", | |
| "", | |
| gr.update(value="⚠️ Invalid selection."), | |
| selection_tracker, | |
| ) | |
| return | |
| if row_index < 0 or row_index >= len(calls): | |
| yield ( | |
| gr.update(value=None, autoplay=False), | |
| "Invalid selection.", | |
| "", | |
| gr.update(value="⚠️ Invalid selection."), | |
| selection_tracker, | |
| ) | |
| return | |
| call = dict(calls[row_index]) | |
| selection_token = uuid.uuid4().hex | |
| selection_tracker.token = selection_token | |
| topics = ", ".join(call["topics"]) if call["topics"] else "—" | |
| call_id = call.get("call_id", f"call_{row_index}") | |
| bucket_name = call.get("gcs_bucket") | |
| blob_name = call.get("gcs_blob") | |
| audio_path = None | |
| audio_status = call.get("audio_status", "Unavailable") | |
| signed_url = call.get("signed_recording_url") | |
| transcription_text = call["transcription"] or "No transcription available." | |
| loading_summary = "".join( | |
| _build_summary_lines( | |
| call, | |
| topics, | |
| "Loading audio…", | |
| signed_url, | |
| ) | |
| ) | |
| yield ( | |
| gr.update(value=None, autoplay=False), | |
| loading_summary, | |
| transcription_text, | |
| gr.update(value="⏳ Preparing audio… please wait for the “Audio ready” banner before pressing Play."), | |
| selection_tracker, | |
| ) | |
| if bucket_name and blob_name and signed_url: | |
| cached_path = _AUDIO_CACHE.get(call_id) | |
| if cached_path and Path(cached_path).exists(): | |
| audio_path = cached_path | |
| audio_status = "Ready (loaded from cache) — press Play to listen." | |
| else: | |
| try: | |
| audio_path = _download_audio(call_id, signed_url, blob_name) | |
| previous_path = _AUDIO_CACHE.get(call_id) | |
| if previous_path and Path(previous_path).exists(): | |
| try: | |
| Path(previous_path).unlink() | |
| except OSError: | |
| pass | |
| _AUDIO_CACHE[call_id] = audio_path | |
| audio_status = "Ready (downloaded) — press Play to listen." | |
| except Exception as exc: | |
| audio_status = f"Error loading audio ({exc})" | |
| else: | |
| audio_status = "Recording unavailable" | |
| summary_md = "".join(_build_summary_lines(call, topics, audio_status, signed_url)) | |
| audio_value = gr.update(value=audio_path if audio_path else None, autoplay=False) | |
| status_html = ( | |
| "▶️ Audio ready — press Play to listen." | |
| if audio_path | |
| else "⚠️ Audio unavailable" | |
| ) | |
| if selection_tracker.token != selection_token: | |
| return | |
| yield audio_value, summary_md, transcription_text, gr.update(value=status_html), selection_tracker | |
| with gr.Blocks(title="Call Recording Dashboard") as demo: | |
| gr.Markdown( | |
| """ | |
| # Call Recording Dashboard | |
| Use the filters below to browse call recordings. Select a row to review the details and play the audio. | |
| """ | |
| ) | |
| with gr.Row(): | |
| user_search_box = gr.Textbox(label="User search (name or email)", placeholder="e.g. Sahil") | |
| limit_slider = gr.Slider(10, 200, value=min(DEFAULT_LIMIT, 100), step=10, label="Max calls to load") | |
| date_preset_selector = gr.Radio( | |
| choices=list(DATE_PRESETS), | |
| value=DEFAULT_DATE_PRESET, | |
| label="Quick date range", | |
| info="Choose a preset to auto-fill the calendars or switch to Custom to pick exact dates.", | |
| ) | |
| with gr.Row(): | |
| start_date_picker = Calendar( | |
| label="Start date", | |
| type="string", | |
| value=DEFAULT_START_DATE_VALUE, | |
| info="Click to open the calendar. Leave empty for no lower bound.", | |
| ) | |
| end_date_picker = Calendar( | |
| label="End date", | |
| type="string", | |
| value=DEFAULT_END_DATE_VALUE, | |
| info="Click to open the calendar. Leave empty for no upper bound.", | |
| ) | |
| load_button = gr.Button("Load calls", variant="primary") | |
| date_preset_selector.change( | |
| _date_updates_for_preset, | |
| inputs=date_preset_selector, | |
| outputs=[start_date_picker, end_date_picker], | |
| ) | |
| status_markdown = gr.Markdown("") | |
| calls_table = gr.Dataframe( | |
| headers=["User", "Email", "Start (UTC)", "Duration", "Summary (preview)"], | |
| datatype=["str", "str", "str", "str", "str"], | |
| interactive=False, | |
| ) | |
| call_store = gr.State([]) | |
| selection_tracker = gr.State(_SelectionTracker()) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_player = gr.Audio( | |
| label="Recording", | |
| autoplay=False, | |
| interactive=False, | |
| show_download_button=True, | |
| sources=[], | |
| type="filepath", | |
| ) | |
| playback_status = gr.HTML("🎧 Select a row to load audio.") | |
| with gr.Column(scale=1): | |
| call_details = gr.Markdown("Select a call to see the details.") | |
| transcription_box = gr.Textbox( | |
| label="Transcription", | |
| lines=12, | |
| interactive=False, | |
| show_label=True, | |
| ) | |
| load_button.click( | |
| fetch_calls, | |
| inputs=[user_search_box, start_date_picker, end_date_picker, limit_slider], | |
| outputs=[calls_table, call_store, status_markdown], | |
| ) | |
| calls_table.select( | |
| on_select, | |
| inputs=[call_store, selection_tracker], | |
| outputs=[audio_player, call_details, transcription_box, playback_status, selection_tracker], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860"))) | |