import os import sys import time import uuid import requests import streamlit as st import dotenv from streamlit_card import card from annotated_text import annotated_text import json # Add project root to path so models/services are importable when running from ui/ _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) # Load environment variables dotenv.load_dotenv(verbose=True) # Default system prompt template from shared module. # HF Space should include `shared/`; if not, fall back to embedded constant. try: from shared.prompt_templates import DEFAULT_SYSTEM_PROMPT_TEMPLATE except ModuleNotFoundError: DEFAULT_SYSTEM_PROMPT_TEMPLATE = """\ You are an AI calling agent working under a campaign. Campaign Goal: {goal} --- CAMPAIGN KNOWLEDGE BASE --- {campaign_knowledge_base} --- CONTACT --- Name: {contact_name} {personalized_context_block} Use both knowledge sources to conduct a natural, goal-oriented conversation. Prioritize personalized context over general knowledge. Speak like a human, be concise, and adapt based on user responses. Do not hallucinate. If unsure, ask. --- OUTPUT FORMAT --- Respond with the spoken reply first (what the caller should hear), then on a new line emit a metadata block exactly like this and nothing after it: <<>>{{"intent": "...", "next_action": "..."}}<<>> Intent values: greeting | qualifying | objecting | booking | ending | other. Next_action values: ask_question | confirm | book_demo | schedule_followup | hang_up | transfer | other.\ """ # Page configuration with custom theme st.set_page_config( page_title="VoiceGenius AI Calling", page_icon="📞", layout="wide", initial_sidebar_state="expanded" ) # Apply custom CSS st.markdown(""" """, unsafe_allow_html=True) # Initialize session state variables if 'call_active' not in st.session_state: st.session_state.call_active = False st.session_state.call_sid = None st.session_state.transcript = [] st.session_state.system_message = os.getenv("SYSTEM_MESSAGE") or "You are a helpful AI assistant making a phone call. Be respectful, concise, and helpful." st.session_state.initial_message = os.getenv("INITIAL_MESSAGE") or "Hello, this is an AI assistant calling. How can I help you today?" st.session_state.all_transcripts = [] st.session_state.recording_info = None st.session_state.call_selector = "Current Call" st.session_state.total_calls = 0 st.session_state.successful_calls = 0 st.session_state.call_duration = 0 st.session_state.selected_template = None st.session_state.should_reset_selector = False # Campaign session state if 'app_mode' not in st.session_state: st.session_state.app_mode = "Single Call" if 'selected_campaign_id' not in st.session_state: st.session_state.selected_campaign_id = None if 'campaigns_cache' not in st.session_state: st.session_state.campaigns_cache = [] if 'voice_profile_id' not in st.session_state: st.session_state.voice_profile_id = None if 'voice_profiles_cache' not in st.session_state: st.session_state.voice_profiles_cache = [] if 'start_call_request_id' not in st.session_state: st.session_state.start_call_request_id = None if 'start_call_in_flight' not in st.session_state: st.session_state.start_call_in_flight = False # Predefined prompt templates prompt_templates = { "medical": { "system": { "en": "You are a medical appointment scheduling assistant for Dr. Smith's office. You should collect patient information, reason for appointment, and preferred times. You cannot give medical advice.", "es": "Eres un asistente de programación de citas médicas para la oficina del Dr. Smith. Debes recopilar información del paciente, motivo de la cita y horarios preferidos. No puedes dar consejos médicos." }, "initial": { "en": "Hello, this is the virtual assistant from Dr. Smith's office.", "es": "Hola, soy el asistente virtual de la oficina del Dr. Smith" } }, "finance": { "system": { "en": "You are a financial services assistant for GrowWealth Advisors. You can discuss appointment scheduling and general services offered, but cannot give specific investment advice during this call.", "es": "Eres un asistente de servicios financieros para GrowWealth Advisors. Puedes discutir la programación de citas y los servicios generales ofrecidos, pero no puedes dar consejos específicos de inversión durante esta llamada." }, "initial": { "en": "Hello, this is the virtual assistant from GrowWealth Financial Advisors.", "es": "Hola, soy el asistente virtual de GrowWealth Financial Advisors." } }, "sports": { "system": { "en": "You are a membership coordinator for SportsFit Gym. You should provide information about membership options, facility hours, and classes offered. Be enthusiastic and encouraging.", "es": "Eres un coordinador de membresías para el gimnasio SportsFit. Debes proporcionar información sobre las opciones de membresía, horarios de las instalaciones y clases ofrecidas. Sé entusiasta y alentador." }, "initial": { "en": "Hi there! This is the virtual assistant from SportsFit Gym.", "es": "¡Hola! Soy el asistente virtual del gimnasio SportsFit." } }, "customer_service": { "system": { "en": "You are a customer service representative following up on a recent purchase. You should check satisfaction levels, address any concerns, and offer assistance if needed.", "es": "Eres un representante de servicio al cliente dando seguimiento a una compra reciente. Debes verificar los niveles de satisfacción, atender cualquier inquietud y ofrecer asistencia si es necesario." }, "initial": { "en": "Hello, this is the customer service team from Acme Products.", "es": "Hola, soy del equipo de servicio al cliente de Acme Products." } } } voice_options = { "en": { # English voices "Emma (Female)": "11labs_emma", "Daniel (Male)": "11labs_daniel", "Rachel (Female)": "11labs_rachel", "John (Male)": "11labs_john" }, "es": { # Spanish voices "Sofia (Female)": "11labs_sofia", "Miguel (Male)": "11labs_miguel", "Isabella (Female)": "11labs_isabella", "Carlos (Male)": "11labs_carlos", "JuanRestrepoPro (Male)": "11labs_JuanRestrepoPro" } } # Helper functions def fetch_all_transcripts(): try: response = requests.get(f"https://{os.getenv('SERVER')}/all_transcripts") transcripts = response.json().get('transcripts', []) st.session_state.total_calls = len(transcripts) st.session_state.successful_calls = sum(1 for t in transcripts if any(m['role'] == 'user' for m in t.get('transcript', []))) return transcripts except requests.RequestException as e: st.error(f"Error fetching call list: {str(e)}") return [] def format_duration(seconds): if seconds < 60: return f"{seconds}s" minutes = seconds // 60 remaining_seconds = seconds % 60 return f"{minutes}m {remaining_seconds}s" def apply_template(template_name): if template_name in prompt_templates: lang_code = language_options[selected_language] st.session_state.system_message = prompt_templates[template_name]["system"].get(lang_code, prompt_templates[template_name]["system"]["en"]) st.session_state.initial_message = prompt_templates[template_name]["initial"].get(lang_code, prompt_templates[template_name]["initial"]["en"]) st.session_state.selected_template = template_name return True return False def fetch_recording_info(call_sid): try: response = requests.get(f"https://{os.getenv('SERVER')}/call_recording/{call_sid}") if media_url := response.json().get('recording_url'): media_response = requests.get(media_url) if media_response.status_code == 200: media_data = media_response.json() return { 'url': f"{media_data.get('media_url')}.mp3", 'duration': media_data.get('duration', 0) } except requests.RequestException as e: st.error(f"Error fetching recording info: {str(e)}") return None def _extract_call_sid(selector_label): """Extract call_sid from selector label like '[AI] Call CA...' or 'Call CA...'.""" # Strip the [AI]/[MANUAL] prefix if present if "] Call " in selector_label: return selector_label.split("] Call ", 1)[1] if selector_label.startswith("Call "): return selector_label[5:] return selector_label def on_call_selector_change(): if st.session_state.call_selector != "Current Call": sid = _extract_call_sid(st.session_state.call_selector) selected_transcript = next((t for t in st.session_state.all_transcripts if t['call_sid'] == sid), None) if selected_transcript: st.session_state.recording_info = fetch_recording_info(selected_transcript['call_sid']) else: st.warning("No transcript found for the selected call.") else: st.session_state.recording_info = None # Check if we need to reset the selector if st.session_state.should_reset_selector: st.session_state.call_selector = "Current Call" st.session_state.should_reset_selector = False # --------------------------------------------------------------------------- # Campaign API helpers # --------------------------------------------------------------------------- _BASE = lambda: f"https://{os.getenv('SERVER')}" def api_list_voice_profiles(usable_only=False): try: r = requests.get(f"{_BASE()}/voices", params={"usable_only": str(usable_only).lower()}, timeout=15) return r.json().get("voices", []) except Exception: return [] def api_create_voice_clone(display_name, description, labels_json, consent_confirmed, files): try: multipart_files = [] for f in files: multipart_files.append(("files", (f.name, f.getvalue(), f.type or "audio/mpeg"))) data = { "display_name": display_name, "description": description, "labels": labels_json, "consent_confirmed": str(bool(consent_confirmed)).lower(), } r = requests.post(f"{_BASE()}/voices/clones", data=data, files=multipart_files, timeout=90) res = r.json() if r.status_code >= 400: return {"error": res.get("error", f"Server error ({r.status_code})")} return res except Exception as e: return {"error": str(e)} def api_register_voice(display_name, provider_voice_id, description, labels): try: r = requests.post( f"{_BASE()}/voices/register", json={ "display_name": display_name, "provider_voice_id": provider_voice_id, "description": description, "labels": labels or {}, "consent_confirmed": True, }, timeout=30, ) res = r.json() if r.status_code >= 400: return {"error": res.get("error", f"Server error ({r.status_code})")} return res except Exception as e: return {"error": str(e)} def api_delete_voice_profile(voice_profile_id, delete_remote=False): try: r = requests.delete( f"{_BASE()}/voices/{voice_profile_id}", params={"delete_remote": str(bool(delete_remote)).lower()}, timeout=30, ) return r.json() except Exception as e: return {"error": str(e)} def api_list_campaigns(): try: r = requests.get(f"{_BASE()}/campaigns", timeout=10) return r.json().get("campaigns", []) except Exception: return [] def api_get_campaign(campaign_id): try: r = requests.get(f"{_BASE()}/campaigns/{campaign_id}", timeout=10) return r.json() except Exception: return {} def api_create_campaign(data): try: r = requests.post(f"{_BASE()}/campaigns", json=data, timeout=10) result = r.json() if r.status_code >= 400: return {"error": result.get("error", f"Server error ({r.status_code})")} return result except Exception as e: return {"error": str(e)} def api_update_campaign(campaign_id, data): try: r = requests.patch(f"{_BASE()}/campaigns/{campaign_id}", json=data, timeout=10) result = r.json() if r.status_code >= 400: return {"error": result.get("error", f"Server error ({r.status_code})")} return result except Exception as e: return {"error": str(e)} def api_delete_campaign(campaign_id): try: r = requests.delete(f"{_BASE()}/campaigns/{campaign_id}", timeout=10) return r.json() except Exception as e: return {"error": str(e)} def api_list_contacts(campaign_id, status=None): try: url = f"{_BASE()}/campaigns/{campaign_id}/contacts" if status: url += f"?status={status}" r = requests.get(url, timeout=10) return r.json().get("contacts", []) except Exception: return [] def api_upload_contacts(campaign_id, file_bytes, filename): try: r = requests.post( f"{_BASE()}/campaigns/{campaign_id}/contacts/upload", files={"file": (filename, file_bytes)}, timeout=30, ) return r.json() except Exception as e: return {"error": str(e)} def api_call_contact(campaign_id, contact_id): try: r = requests.post( f"{_BASE()}/campaigns/{campaign_id}/contacts/{contact_id}/call", timeout=10, ) return r.json() except Exception as e: return {"error": str(e)} def api_start_campaign(campaign_id, max_concurrent=1): try: r = requests.post( f"{_BASE()}/campaigns/{campaign_id}/start", json={"max_concurrent": max_concurrent}, timeout=10, ) return r.json() except Exception as e: return {"error": str(e)} def api_stop_campaign(campaign_id): try: r = requests.post(f"{_BASE()}/campaigns/{campaign_id}/stop", timeout=10) return r.json() except Exception as e: return {"error": str(e)} def api_list_calls(campaign_id): try: r = requests.get(f"{_BASE()}/campaigns/{campaign_id}/calls", timeout=10) return r.json().get("calls", []) except Exception: return [] def api_get_call_detail(campaign_id, call_id): try: r = requests.get(f"{_BASE()}/campaigns/{campaign_id}/calls/{call_id}", timeout=10) return r.json() except Exception: return {} def _sample_csv() -> bytes: return b"name,phone,personalized_context,voice_profile_name\nJane Doe,+14155551234,Prefers morning calls. Interested in pro tier.,\nJohn Roe,+14085550000,,\n" def _voice_status_badge(status: str) -> str: status_map = { "active": "active", "pending_verification": "pending", "failed": "failed", "creating": "creating", } return status_map.get(status, status or "unknown") def render_voice_library_panel(): st.subheader("Voice Library") st.caption("Create or register ElevenLabs voices for single calls and campaigns.") with st.expander("Create Clone (IVC)", expanded=False): clone_name = st.text_input("Display Name", key="clone_name") clone_desc = st.text_area("Description", key="clone_desc", height=80) clone_labels_text = st.text_input("Labels JSON", value="{}", key="clone_labels_json") clone_files = st.file_uploader( "Upload sample audio files", type=["mp3", "wav", "m4a", "flac", "ogg"], accept_multiple_files=True, key="clone_files_uploader", ) consent = st.checkbox( "I confirm I have consent/rights to clone and use this voice.", key="clone_consent_checkbox", ) if st.button("Create Clone Voice", key="create_clone_voice_btn", use_container_width=True): if not clone_name.strip(): st.error("Display name is required.") elif not clone_files: st.error("Upload at least one sample file.") elif not consent: st.error("Consent confirmation is required.") else: result = api_create_voice_clone( display_name=clone_name.strip(), description=clone_desc.strip(), labels_json=clone_labels_text.strip() or "{}", consent_confirmed=consent, files=clone_files, ) if result.get("error"): st.error(result["error"]) else: st.success(f"Voice clone created: {result.get('display_name')}") st.rerun() with st.expander("Register Existing Voice ID", expanded=False): reg_name = st.text_input("Display Name", key="register_voice_name") reg_voice_id = st.text_input("ElevenLabs Voice ID", key="register_provider_voice_id") reg_desc = st.text_area("Description", key="register_voice_desc", height=60) reg_labels_text = st.text_input("Labels JSON", value="{}", key="register_labels_json") if st.button("Register Voice", key="register_voice_btn", use_container_width=True): try: labels = json.loads(reg_labels_text.strip() or "{}") if not isinstance(labels, dict): raise ValueError("Labels must be a JSON object") except Exception as exc: st.error(f"Invalid labels JSON: {exc}") labels = None if labels is not None: res = api_register_voice(reg_name.strip(), reg_voice_id.strip(), reg_desc.strip(), labels) if res.get("error"): st.error(res["error"]) else: st.success(f"Voice registered: {res.get('display_name')}") st.rerun() voices = api_list_voice_profiles(usable_only=False) if not voices: st.info("No voice profiles yet.") return [] with st.expander(f"Manage Voices ({len(voices)})", expanded=False): for v in voices: provider_voice = v.get("provider_voice_id") or "-" status = _voice_status_badge(v.get("status", "unknown")) st.markdown( f"**{v.get('display_name','Unnamed')}** \n" f"`{v.get('id')}` \n" f"status: **{status}** \n" f"provider_voice_id: `{provider_voice}`" ) col1, col2 = st.columns(2) delete_remote = col1.checkbox("Delete remote too", key=f"del_remote_{v['id']}") if col2.button("Delete", key=f"del_voice_{v['id']}", use_container_width=True): res = api_delete_voice_profile(v["id"], delete_remote=delete_remote) if res.get("error"): st.error(res["error"]) else: st.success("Voice profile deleted.") st.rerun() st.divider() return voices # --------------------------------------------------------------------------- # Campaign UI renderers # --------------------------------------------------------------------------- def render_campaigns_sidebar(campaigns): st.subheader("Campaign") campaign_names = ["+ New Campaign"] + [c["name"] for c in campaigns] current_name = "+ New Campaign" if st.session_state.selected_campaign_id: match = next((c for c in campaigns if c["id"] == st.session_state.selected_campaign_id), None) if match: current_name = match["name"] default_idx = campaign_names.index(current_name) if current_name in campaign_names else 0 chosen = st.selectbox("Select campaign", campaign_names, index=default_idx, key="campaign_selector_sb") if chosen == "+ New Campaign": st.session_state.selected_campaign_id = None else: c = next((x for x in campaigns if x["name"] == chosen), None) if c: st.session_state.selected_campaign_id = c["id"] if st.session_state.selected_campaign_id: c_detail = next((x for x in campaigns if x["id"] == st.session_state.selected_campaign_id), {}) counts = c_detail.get("contact_counts", {}) pending = counts.get("pending", 0) done = counts.get("completed", 0) failed = counts.get("failed", 0) + counts.get("no-answer", 0) col1, col2, col3 = st.columns(3) col1.metric("Pending", pending) col2.metric("Done", done) col3.metric("Failed", failed) is_running = c_detail.get("is_running", False) if is_running: st.markdown("**Status:** Running") if st.button("Stop Campaign", key="stop_camp_btn", use_container_width=True): res = api_stop_campaign(st.session_state.selected_campaign_id) st.success("Stop requested.") if not res.get("error") else st.error(res["error"]) st.rerun() else: if st.button("Start Campaign", key="start_camp_btn", disabled=pending == 0, use_container_width=True): res = api_start_campaign(st.session_state.selected_campaign_id) if res.get("error"): st.error(res["error"]) else: st.success(f"Started — {res.get('count', 0)} contacts queued.") st.rerun() def render_campaigns_main(campaigns, voice_profiles): cid = st.session_state.selected_campaign_id campaign = next((c for c in campaigns if c["id"] == cid), None) if cid else None usable_voice_profiles = [ v for v in voice_profiles if v.get("status") == "active" and v.get("provider_voice_id") ] vp_label_to_id = {"None": None} for v in usable_voice_profiles: vp_label_to_id[f"{v['display_name']} ({v['id'][:6]})"] = v["id"] vp_labels = list(vp_label_to_id.keys()) st.markdown("

Campaign Manager

", unsafe_allow_html=True) if not cid: # ---- Create new campaign form ---- st.subheader("Create New Campaign") with st.form("new_campaign_form"): name = st.text_input("Campaign Name *", placeholder="Q2 Demo Outreach") goal = st.text_input("Campaign Goal *", placeholder="Book a product demo") knowledge_base = st.text_area( "Knowledge Base", placeholder="Paste product info, FAQs, pricing, talking points…", height=200, ) col1, col2 = st.columns(2) with col1: lang = st.selectbox("Language", ["en", "es"]) model = st.selectbox("AI Model", ["openai", "anthropic"]) with col2: voice_id = st.selectbox("Voice", list(voice_options.get(lang, voice_options["en"]).values())) default_voice_profile_label = st.selectbox( "Default Cloned Voice (optional)", options=vp_labels, index=0, help="Campaign-level default cloned voice. Contact override can still replace this.", ) initial_msg = st.text_input("Initial Message (use {name})", value="Hi {name}, this is an AI assistant calling. How are you today?") customize_prompt = st.checkbox("Customize system prompt template") prompt_tmpl = st.text_area("System Prompt Template", value=DEFAULT_SYSTEM_PROMPT_TEMPLATE, height=300) if customize_prompt else DEFAULT_SYSTEM_PROMPT_TEMPLATE submitted = st.form_submit_button("Create Campaign") if submitted: if not name or not goal: st.error("Campaign Name and Goal are required.") else: res = api_create_campaign({ "name": name, "goal": goal, "knowledge_base": knowledge_base, "system_prompt_template": prompt_tmpl, "initial_message_template": initial_msg, "language": lang, "model": model, "voice_id": voice_id, "default_voice_profile_id": vp_label_to_id[default_voice_profile_label], }) if res.get("error"): st.error(res["error"]) else: st.session_state.selected_campaign_id = res["id"] st.success(f"Campaign '{res['name']}' created!") st.rerun() return # ---- Campaign detail tabs ---- tab_settings, tab_contacts, tab_calls = st.tabs(["Settings", "Contacts", "Call History"]) # ---- Settings tab ---- with tab_settings: st.subheader(f"Campaign: {campaign['name']}") with st.form("edit_campaign_form"): name = st.text_input("Name", value=campaign["name"]) goal = st.text_input("Goal", value=campaign["goal"]) knowledge_base = st.text_area("Knowledge Base", value=campaign.get("knowledge_base", ""), height=250) col1, col2 = st.columns(2) with col1: lang = st.selectbox("Language", ["en", "es"], index=0 if campaign.get("language", "en") == "en" else 1) model = st.selectbox("AI Model", ["openai", "anthropic"], index=0 if campaign.get("model", "openai") == "openai" else 1) with col2: voice_map = voice_options.get(lang, voice_options["en"]) voice_vals = list(voice_map.values()) cur_voice = campaign.get("voice_id") or voice_vals[0] voice_idx = voice_vals.index(cur_voice) if cur_voice in voice_vals else 0 voice_id = st.selectbox("Voice", voice_vals, index=voice_idx) current_default_vp = campaign.get("default_voice_profile_id") current_default_label = "None" for label, pid in vp_label_to_id.items(): if pid == current_default_vp: current_default_label = label break default_voice_profile_label = st.selectbox( "Default Cloned Voice (optional)", options=vp_labels, index=vp_labels.index(current_default_label) if current_default_label in vp_labels else 0, ) initial_msg = st.text_input("Initial Message (use {name})", value=campaign.get("initial_message_template", "Hi {name}, how are you?")) customize = st.checkbox("Edit system prompt template") prompt_tmpl = st.text_area("System Prompt Template", value=campaign.get("system_prompt_template", DEFAULT_SYSTEM_PROMPT_TEMPLATE), height=300) if customize else campaign.get("system_prompt_template", DEFAULT_SYSTEM_PROMPT_TEMPLATE) col_save, col_del = st.columns([3, 1]) save = col_save.form_submit_button("Save Changes") delete = col_del.form_submit_button("Delete Campaign", type="secondary") if save: res = api_update_campaign(cid, { "name": name, "goal": goal, "knowledge_base": knowledge_base, "language": lang, "model": model, "voice_id": voice_id, "default_voice_profile_id": vp_label_to_id[default_voice_profile_label], "initial_message_template": initial_msg, "system_prompt_template": prompt_tmpl, }) st.success("Saved.") if not res.get("error") else st.error(res["error"]) st.rerun() if delete: api_delete_campaign(cid) st.session_state.selected_campaign_id = None st.success("Campaign deleted.") st.rerun() # ---- Contacts tab ---- with tab_contacts: st.subheader("Upload Contacts") col_dl, col_up = st.columns([1, 3]) with col_dl: st.download_button( "Download Sample CSV", data=_sample_csv(), file_name="contacts_template.csv", mime="text/csv", ) with col_up: uploaded = st.file_uploader("CSV or Excel file", type=["csv", "xlsx", "xls"], key="contact_uploader") if uploaded: res = api_upload_contacts(cid, uploaded.read(), uploaded.name) if res.get("error"): st.error(res["error"]) else: st.success(f"Inserted: {res['inserted']} | Updated: {res['updated']} | Skipped: {res['skipped']}") if res.get("errors"): with st.expander(f"Upload errors ({len(res['errors'])})"): for e in res["errors"]: st.warning(f"Row {e['row']}, {e['field']}: {e['message']}") st.rerun() st.divider() st.subheader("Add Single Contact") with st.form("add_contact_form", clear_on_submit=True): c1, c2 = st.columns(2) c_name = c1.text_input("Name *") c_phone = c2.text_input("Phone *", placeholder="+14155551234") c_ctx = st.text_area("Personalized Context (optional)", height=80) c_voice_profile_label = st.selectbox( "Per-contact Cloned Voice Override", options=vp_labels, index=0, help="Optional. Overrides campaign default cloned voice for this contact.", ) add_submitted = st.form_submit_button("Add Contact") if add_submitted: if not c_name or not c_phone: st.error("Name and phone are required.") else: res = requests.post(f"{_BASE()}/campaigns/{cid}/contacts", json={ "name": c_name, "phone": c_phone, "personalized_context": c_ctx or None, "voice_profile_id": vp_label_to_id[c_voice_profile_label], }, timeout=10).json() st.success(f"Added {res.get('name', c_name)}.") if not res.get("error") else st.error(res["error"]) st.rerun() st.divider() st.subheader("Contact List") contacts = api_list_contacts(cid) if not contacts: st.info("No contacts yet. Upload a CSV or add one above.") else: import pandas as pd df = pd.DataFrame([{ "Name": c["name"], "Phone": c["phone"], "Status": c["status"], "Context": (c.get("personalized_context") or "")[:60], "id": c["id"], } for c in contacts]) # Status color badges status_colors = { "pending": "gray", "queued": "blue", "in_progress": "orange", "completed": "green", "failed": "red", "no-answer": "orange", } for _, row in df.iterrows(): color = status_colors.get(row["Status"], "gray") contact = next((c for c in contacts if c["id"] == row["id"]), None) contact_voice_profile_id = (contact or {}).get("voice_profile_id") contact_voice_label = "None" for label, pid in vp_label_to_id.items(): if pid == contact_voice_profile_id: contact_voice_label = label break col_info, col_voice, col_btn = st.columns([4, 2, 1]) with col_info: st.markdown( f"**{row['Name']}**   `{row['Phone']}`   " f" {row['Status']}" + (f"
{row['Context']}" if row["Context"] else ""), unsafe_allow_html=True, ) with col_voice: selected_contact_voice = st.selectbox( "Voice override", options=vp_labels, index=vp_labels.index(contact_voice_label) if contact_voice_label in vp_labels else 0, key=f"contact_voice_profile_{row['id']}", label_visibility="collapsed", ) if st.button("Save Voice", key=f"save_contact_voice_{row['id']}", use_container_width=True): save_res = requests.patch( f"{_BASE()}/campaigns/{cid}/contacts/{row['id']}", json={"voice_profile_id": vp_label_to_id[selected_contact_voice]}, timeout=10, ).json() if save_res.get("error"): st.error(save_res["error"]) else: st.success("Voice override updated.") st.rerun() with col_btn: if st.button("Call", key=f"call_{row['id']}"): res = api_call_contact(cid, row["id"]) if res.get("error"): st.error(res["error"]) else: st.success(f"Call started: {res.get('call_sid', '')[:12]}…") st.rerun() # ---- Call History tab ---- with tab_calls: st.subheader("Call History") if st.button("Refresh", key="refresh_calls"): st.rerun() calls = api_list_calls(cid) if not calls: st.info("No calls yet.") else: for call in calls: status = call.get("final_status") or "in_progress" dot = {"completed": "🟢", "failed": "🔴", "no-answer": "🟠"}.get(status, "🔵") header = ( f"{call['contact_name']} {call['contact_phone']} — " f"{dot} {status} | {(call.get('started_at') or '')[:16]}" ) with st.expander(header, expanded=False): detail = api_get_call_detail(cid, call["id"]) transcript = detail.get("transcript", []) meta_events = detail.get("meta_events", []) if transcript: st.markdown("**Transcript**") for entry in transcript: if entry["role"] == "user": st.markdown(f"""
Caller: {entry['content']}
""", unsafe_allow_html=True) elif entry["role"] == "assistant": st.markdown(f"""
AI: {entry['content']}
""", unsafe_allow_html=True) else: st.info("Transcript not yet available.") if meta_events: st.markdown("**Conversation Analytics (intent / next_action)**") import pandas as pd st.dataframe(pd.DataFrame(meta_events), use_container_width=True) # Sidebar content with st.sidebar: st.markdown(f"""

VoiceGenius

AI-Powered Phone Calls

""", unsafe_allow_html=True) st.divider() # Call stats display col1, col2 = st.columns(2) with col1: st.markdown("""

📊

Total Calls

{}

""".format(st.session_state.total_calls), unsafe_allow_html=True) with col2: st.markdown("""

Completed

{}

""".format(st.session_state.successful_calls), unsafe_allow_html=True) st.divider() # Top-level mode selector app_mode = st.radio( "Mode", options=["Single Call", "Campaigns"], index=0 if st.session_state.app_mode == "Single Call" else 1, horizontal=True, key="app_mode_radio", ) st.session_state.app_mode = app_mode st.divider() st.session_state.voice_profiles_cache = render_voice_library_panel() st.divider() # Default values (overridden in Single Call branch) call_mode = "AI Call" phone_number = "" operator_number = "" start_call = False end_call = False if app_mode == "Single Call": # Call mode selection call_mode = st.radio( "Call Mode", options=["AI Call", "Manual Call"], index=0, help="AI Call: AI agent handles the conversation. Manual Call: You speak directly via your phone.", horizontal=True ) st.divider() # Phone number input (target to call) phone_number = st.text_input( "Target Phone Number", placeholder="+1XXXXXXXXXX", value=os.getenv("YOUR_NUMBER") or "", help="Enter the phone number to call in international format" ) # Operator phone number (only for manual call mode) if call_mode == "Manual Call": operator_number = st.text_input( "Your Phone Number (Operator)", placeholder="+1XXXXXXXXXX", value=os.getenv("OPERATOR_NUMBER") or "", help="Your phone number — Twilio will call you first, then connect you to the target" ) else: operator_number = "" if app_mode == "Single Call": # Language selection language_options = { "English": "en", "Spanish": "es" } selected_language = st.selectbox( "Call Language", options=list(language_options.keys()), index=0, help="Select the language for the conversation" ) language_code = language_options[selected_language] selected_lang_code = language_options[selected_language] # AI-only settings (hidden in Manual Call mode) if call_mode == "AI Call": model_options = { "OpenAI GPT-4o": "openai", "Anthropic Claude": "anthropic" } selected_model = st.selectbox( "AI Model", options=list(model_options.keys()), index=0, help="Select the AI language model to use" ) model_code = model_options[selected_model] if 'model_selection' not in st.session_state: st.session_state.model_selection = model_code else: st.session_state.model_selection = model_code voice_source = st.radio( "Voice Source", options=["Preset Voice", "Voice Library"], index=0, horizontal=True, help="Preset Voice uses built-in mapped voices. Voice Library uses your ElevenLabs clones." ) if voice_source == "Preset Voice": available_voices = voice_options.get(selected_lang_code, voice_options["en"]) selected_voice_name = st.selectbox( "Voice", options=list(available_voices.keys()), index=0, help="Select the voice for the AI assistant" ) selected_voice_id = available_voices[selected_voice_name] st.session_state.voice_id = selected_voice_id st.session_state.voice_profile_id = None else: usable_profiles = api_list_voice_profiles(usable_only=True) if not usable_profiles: st.warning("No active voice profiles found. Create one in Voice Library.") available_voices = voice_options.get(selected_lang_code, voice_options["en"]) selected_voice_name = st.selectbox( "Fallback Preset Voice", options=list(available_voices.keys()), index=0, ) selected_voice_id = available_voices[selected_voice_name] st.session_state.voice_id = selected_voice_id st.session_state.voice_profile_id = None else: voice_profile_map = { f"{v.get('display_name')} ({v.get('id')[:6]})": v.get("id") for v in usable_profiles } voice_profile_labels = list(voice_profile_map.keys()) selected_voice_profile_label = st.selectbox( "Cloned Voice", options=voice_profile_labels, index=0, help="Use a voice from your voice library for this call.", ) st.session_state.voice_profile_id = voice_profile_map[selected_voice_profile_label] st.session_state.voice_id = None st.divider() with st.sidebar.expander("Voice Settings ", expanded=False): st.markdown("### Customize Voice Parameters") if 'voice_settings' not in st.session_state: st.session_state.voice_settings = { "stability": 0.5, "similarity_boost": 0.75, "style": 0.0, "use_speaker_boost": True, "speed": 1.0 } st.session_state.voice_settings["stability"] = st.slider( "Stability", min_value=0.0, max_value=1.0, value=st.session_state.voice_settings.get("stability", 0.5), step=0.05, help="Higher values make the voice more consistent between re-generations but can reduce expressiveness." ) st.session_state.voice_settings["similarity_boost"] = st.slider( "Similarity Boost", min_value=0.0, max_value=1.0, value=st.session_state.voice_settings.get("similarity_boost", 0.75), step=0.05, help="Higher values make the voice more similar to the original voice but can reduce quality." ) st.session_state.voice_settings["style"] = st.slider( "Style", min_value=0.0, max_value=1.0, value=st.session_state.voice_settings.get("style", 0.0), step=0.05, help="Higher values amplify unique speaking style of the cloned voice." ) st.session_state.voice_settings["speed"] = st.slider( "Speed", min_value=0.7, max_value=1.2, value=st.session_state.voice_settings.get("speed", 1.0), step=0.01, help="Adjust the speaking speed of the voice." ) st.session_state.voice_settings["use_speaker_boost"] = st.checkbox( "Speaker Boost", value=st.session_state.voice_settings.get("use_speaker_boost", True), help="Improves voice clarity and target speaker similarity." ) if st.button("Reset to Defaults"): st.session_state.voice_settings = { "stability": 0.5, "similarity_boost": 0.75, "style": 0.0, "use_speaker_boost": True, "speed": 1.0 } st.rerun() st.divider() # Status indicator status_class = "status-active" if st.session_state.call_active else "status-inactive" status_text = "Call in progress" if st.session_state.call_active else "Ready to call" st.markdown(f"""
{status_text}
""", unsafe_allow_html=True) # Call controls call_col1, call_col2 = st.columns(2) with call_col1: start_call = st.button( "📞 Start Call", disabled=st.session_state.call_active or st.session_state.start_call_in_flight, use_container_width=True ) with call_col2: end_call = st.button("🔴 End Call", disabled=not st.session_state.call_active, use_container_width=True) st.divider() # Call history st.subheader("Call History") st.session_state.all_transcripts = fetch_all_transcripts() call_history_options = ["Current Call"] for t in st.session_state.all_transcripts: ctype = t.get('call_type', 'ai').upper() call_history_options.append(f"[{ctype}] Call {t['call_sid']}") st.selectbox( "Select a call", options=call_history_options, key="call_selector", index=0, disabled=st.session_state.call_active, on_change=on_call_selector_change ) if st.button("🔄 Refresh Calls", use_container_width=True): try: st.session_state.all_transcripts = fetch_all_transcripts() on_call_selector_change() except requests.RequestException as e: st.error(f"Error fetching call list: {str(e)}") else: # Campaigns mode sidebar st.session_state.campaigns_cache = api_list_campaigns() render_campaigns_sidebar(st.session_state.campaigns_cache) # Main content area — campaigns mode gets its own full page if st.session_state.app_mode == "Campaigns": render_campaigns_main(st.session_state.campaigns_cache, st.session_state.voice_profiles_cache) st.stop() st.markdown("

AI Voice Calling Assistant

", unsafe_allow_html=True) if call_mode == "AI Call": # Prompt template selection (AI mode only) st.subheader("Select Prompt Template") prompt_cols = st.columns(4) with prompt_cols[0]: medical_card = st.markdown("""

🩺 Medical Office

Schedule appointments and collect patient information

""", unsafe_allow_html=True) if medical_card: if st.button("Select Medical", key="medical_btn", use_container_width=True): apply_template("medical") with prompt_cols[1]: finance_card = st.markdown("""

💹 Financial Services

Schedule consultations and discuss available services

""", unsafe_allow_html=True) if finance_card: if st.button("Select Finance", key="finance_btn", use_container_width=True): apply_template("finance") with prompt_cols[2]: sports_card = st.markdown("""

🏋️ Sports & Fitness

Discuss gym memberships and class schedules

""", unsafe_allow_html=True) if sports_card: if st.button("Select Sports", key="sports_btn", use_container_width=True): apply_template("sports") with prompt_cols[3]: custom_card = st.markdown("""

✨ Customer Service

Follow up on purchases and customer satisfaction

""", unsafe_allow_html=True) if custom_card: if st.button("Select Customer Service", key="custom_btn", use_container_width=True): apply_template("customer_service") st.divider() # Custom prompt inputs in an expandable section with st.expander("Customize AI Instructions", expanded=st.session_state.selected_template is None): st.session_state.system_message = st.text_area( "System Instructions (AI's role and guidelines)", value=st.session_state.system_message, disabled=st.session_state.call_active, height=100 ) st.session_state.initial_message = st.text_area( "Initial Message (First thing the AI will say)", value=st.session_state.initial_message, disabled=st.session_state.call_active, height=100 ) st.divider() else: # Manual Call mode info st.info("**Manual Call Mode** — Twilio will call your phone first. When you pick up, it connects you to the target number. You talk directly — no AI involved.") st.divider() # Handle call actions if start_call and phone_number: if call_mode == "Manual Call": # Manual Call: bridge operator phone to target number if not operator_number: st.warning("Please enter your phone number (Operator) for Manual Call mode.") else: if not st.session_state.start_call_request_id: st.session_state.start_call_request_id = uuid.uuid4().hex st.session_state.start_call_in_flight = True with st.spinner(f"📞 Calling your phone {operator_number}, then connecting to {phone_number}..."): try: response = requests.post(f"https://{os.getenv('SERVER')}/start_manual_call", json={ "to_number": phone_number, "operator_number": operator_number, }, timeout=10) call_data = response.json() if call_sid := call_data.get('call_sid'): st.session_state.call_sid = call_sid st.session_state.transcript = [] progress_bar = st.progress(0) connection_status = st.empty() for i in range(60): progress_value = min(i / 30, 1.0) progress_bar.progress(progress_value) connection_status.info(f"Ringing your phone... ({i+1}s)") time.sleep(1) status = requests.get(f"https://{os.getenv('SERVER')}/call_status/{call_sid}").json().get('status') if status == 'in-progress': progress_bar.progress(1.0) connection_status.success("Connected! You are now talking to the target number.") st.session_state.call_active = True st.session_state.start_call_in_flight = False st.session_state.should_reset_selector = True time.sleep(1) st.rerun() break if status in ['completed', 'failed', 'busy', 'no-answer']: progress_bar.empty() connection_status.error(f"Call ended: {status}") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None break else: progress_bar.empty() connection_status.error("Timeout waiting for call to connect.") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None else: st.error(f"Failed to initiate manual call: {call_data}") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None except requests.RequestException as e: st.error(f"Error: {str(e)}") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None else: # AI Call: existing behavior if not st.session_state.start_call_request_id: st.session_state.start_call_request_id = uuid.uuid4().hex st.session_state.start_call_in_flight = True with st.spinner(f"📞 Calling {phone_number}..."): try: start_call_payload = { "to_number": phone_number, "system_message": st.session_state.system_message, "initial_message": st.session_state.initial_message, "language": language_code, "model": st.session_state.model_selection, "voice_settings": json.dumps(st.session_state.voice_settings), "request_id": st.session_state.start_call_request_id, } if st.session_state.voice_id: start_call_payload["voice_id"] = st.session_state.voice_id if st.session_state.voice_profile_id: start_call_payload["voice_profile_id"] = st.session_state.voice_profile_id response = requests.post( f"https://{os.getenv('SERVER')}/start_call", json=start_call_payload, timeout=10 ) call_data = response.json() if call_sid := call_data.get('call_sid'): st.session_state.call_sid = call_sid st.session_state.transcript = [] progress_bar = st.progress(0) connection_status = st.empty() for i in range(60): progress_value = min(i / 30, 1.0) progress_bar.progress(progress_value) connection_status.info(f"Establishing connection... ({i+1}s)") time.sleep(1) status = requests.get(f"https://{os.getenv('SERVER')}/call_status/{call_sid}").json().get('status') if status == 'in-progress': progress_bar.progress(1.0) connection_status.success("Call connected!") st.session_state.call_active = True st.session_state.start_call_in_flight = False st.session_state.should_reset_selector = True time.sleep(1) st.rerun() break if status in ['completed', 'failed', 'busy', 'no-answer']: progress_bar.empty() connection_status.error(f"Call ended: {status}") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None break else: progress_bar.empty() connection_status.error("Timeout waiting for call to connect.") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None else: st.error(f"Failed to initiate call: {call_data}") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None except requests.RequestException as e: st.error(f"Error: {str(e)}") st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None elif start_call: st.warning("Please enter a valid phone number.") if end_call: try: with st.spinner("Ending call..."): response = requests.post(f"https://{os.getenv('SERVER')}/end_call", json={"call_sid": st.session_state.call_sid}) if response.status_code == 200: st.success("Call ended successfully.") st.session_state.call_active = False st.session_state.call_sid = None st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None time.sleep(1) st.rerun() else: st.error(f"Failed to end call: {response.text}") except requests.RequestException as e: st.error(f"Error ending call: {str(e)}") # Display call transcript or recording st.subheader("Call Transcript") # Call Recording display if st.session_state.call_selector != "Current Call" and st.session_state.recording_info: st.markdown(f"""

📞 Call Recording

Duration: {format_duration(st.session_state.recording_info['duration'])}

""", unsafe_allow_html=True) st.audio(st.session_state.recording_info['url'], format="audio/mp3", start_time=0) # Display transcript in chat-like format transcript_container = st.container() with transcript_container: if st.session_state.call_active and st.session_state.call_sid: for entry in st.session_state.transcript: if entry['role'] == 'user': st.markdown(f"""
Caller: {entry['content']}
""", unsafe_allow_html=True) elif entry['role'] == 'assistant': st.markdown(f"""
AI: {entry['content']}
""", unsafe_allow_html=True) elif st.session_state.call_selector != "Current Call": _sid = _extract_call_sid(st.session_state.call_selector) if transcript := next((t for t in st.session_state.all_transcripts if t['call_sid'] == _sid), None): for entry in transcript['transcript']: if entry['role'] == 'user': st.markdown(f"""
Caller: {entry['content']}
""", unsafe_allow_html=True) elif entry['role'] == 'assistant': st.markdown(f"""
AI: {entry['content']}
""", unsafe_allow_html=True) else: st.info("No call transcript available. Start a call or select a previous call from the sidebar.") # Live call updates if st.session_state.call_active: def update_call_info(): try: status = requests.get(f"https://{os.getenv('SERVER')}/call_status/{st.session_state.call_sid}").json().get('status') if status not in ['in-progress', 'ringing']: st.session_state.call_active = False st.warning(f"Call ended: {status}") return False transcript_data = requests.get(f"https://{os.getenv('SERVER')}/transcript/{st.session_state.call_sid}").json() if transcript_data.get('call_ended', False): st.session_state.call_active = False st.info(f"Call ended. Status: {transcript_data.get('final_status', 'Unknown')}") return False st.session_state.transcript = transcript_data.get('transcript', []) return True except requests.RequestException as e: st.error(f"Error updating call info: {str(e)}") return False if update_call_info(): time.sleep(1) st.rerun() else: st.session_state.call_active = False st.session_state.call_sid = None st.session_state.start_call_in_flight = False st.session_state.start_call_request_id = None st.info("Call has ended. You can start a new call if needed.") time.sleep(1) st.rerun() # Footer st.markdown("""

VoiceGenius AI Calling Platform • v2.0.3

""", unsafe_allow_html=True)