ai_caller / streamlit_app.py
guifav's picture
Initial commit
d912ffc
import os
import time
import requests
import streamlit as st
import dotenv
from openai import AsyncClient
dotenv.load_dotenv(verbose=True)
st.set_page_config(page_title="Guilherme Favaron - AI Dialer", page_icon="๐Ÿ“ž", layout="wide")
def get_api_url():
"""Get the appropriate API URL based on the environment"""
# Porta fixa para desenvolvimento local
api_port = "8001" # Porta onde o FastAPI estรก rodando
if os.getenv('SPACE_ID'):
return f"http://localhost:{api_port}"
return f"http://localhost:{api_port}"
def make_request(method, endpoint, **kwargs):
"""Unified request handler with proper error handling"""
base_url = get_api_url()
# Make sure the endpoint starts with a slash
if not endpoint.startswith('/'):
endpoint = f"/{endpoint}"
url = f"{base_url}{endpoint}"
try:
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
st.error(f"Error connecting to {endpoint}: {str(e)}")
if kwargs.get('debug', False):
st.error(f"Attempted URL: {url}")
return None
def fetch_all_transcripts():
"""Fetch all available call transcripts"""
result = make_request('GET', '/all_transcripts')
if result:
return result.get('transcripts', [])
return []
def fetch_recording_info(call_sid):
"""Fetch recording information for a specific call"""
result = make_request('GET', f'/call_recording/{call_sid}')
if result and 'recording_url' in result:
return {'url': result['recording_url']}
return None
def display_call_interface():
"""Display the phone number input interface"""
return st.text_input(
"Phone Number",
placeholder="+1234567890",
disabled=st.session_state.call_active
)
def init_session_state():
if 'call_active' not in st.session_state:
st.session_state.call_active = False
st.session_state.call_sid = None
st.session_state.transcript = []
st.session_state.system_message = os.getenv("SYSTEM_MESSAGE")
st.session_state.initial_message = os.getenv("INITIAL_MESSAGE")
st.session_state.all_transcripts = fetch_all_transcripts()
st.session_state.recording_info = None
st.session_state.call_selector = "Current Call"
def handle_call_start(phone_number):
with st.spinner(f"Calling {phone_number}..."):
payload = {
"to_number": phone_number,
"system_message": st.session_state.system_message,
"initial_message": st.session_state.initial_message
}
result = make_request('POST', '/start_call', json=payload, timeout=10)
if not result:
return
if call_sid := result.get('call_sid'):
st.session_state.call_sid = call_sid
st.session_state.transcript = []
st.success(f"Call initiated. SID: {call_sid}")
for _ in range(60):
time.sleep(1)
status_result = make_request('GET', f'/call_status/{call_sid}')
if not status_result:
continue
status = status_result.get('status')
if status == 'in-progress':
st.session_state.call_active = True
st.session_state.call_selector = "Current Call"
break
if status in ['completed', 'failed', 'busy', 'no-answer']:
st.error(f"Call ended: {status}")
break
else:
st.error("Timeout waiting for call to connect.")
else:
st.error(f"Failed to initiate call: {result}")
def handle_call_end():
result = make_request('POST', '/end_call', json={"call_sid": st.session_state.call_sid})
if result and result.get('status') == 'success':
st.success("Call ended successfully.")
st.session_state.call_active = False
st.session_state.call_sid = None
st.experimental_rerun()
def on_call_selector_change():
if st.session_state.call_selector != "Current Call":
selected_transcript = next(
(t for t in st.session_state.all_transcripts if f"Call {t['call_sid']}" == st.session_state.call_selector),
None
)
if selected_transcript:
st.session_state.recording_info = fetch_recording_info(selected_transcript['call_sid'])
else:
st.warning("No transcript found for the selected call.")
else:
st.session_state.recording_info = None
def update_call_info():
status_result = make_request('GET', f'/call_status/{st.session_state.call_sid}')
if not status_result:
return False
status = status_result.get('status')
if status not in ['in-progress', 'ringing']:
st.session_state.call_active = False
st.warning(f"Call ended: {status}")
return False
transcript_result = make_request('GET', f'/transcript/{st.session_state.call_sid}')
if not transcript_result:
return False
if transcript_result.get('call_ended', False):
st.session_state.call_active = False
st.info(f"Call ended. Status: {transcript_result.get('final_status', 'Unknown')}")
return False
st.session_state.transcript = transcript_result.get('transcript', [])
return True
def main():
init_session_state()
with st.sidebar:
st.markdown(
"<h2 style='text-align: center; font-size: 2.5em;'>๐Ÿ“ž Guilherme Favaron - AI Dialer</h2>",
unsafe_allow_html=True
)
st.markdown("---")
phone_number = display_call_interface()
st.session_state.system_message = st.text_area(
"System Message",
value=st.session_state.system_message,
disabled=st.session_state.call_active
)
st.session_state.initial_message = st.text_area(
"Initial Message",
value=st.session_state.initial_message,
disabled=st.session_state.call_active
)
start_call = st.button("Start Call", disabled=st.session_state.call_active)
end_call = st.button("End Call", disabled=not st.session_state.call_active)
if start_call and phone_number:
handle_call_start(phone_number)
elif start_call:
st.warning("Please enter a valid phone number.")
if end_call:
handle_call_end()
if st.session_state.call_active:
st.success("Call in progress")
st.markdown("---")
st.selectbox(
"Select a call",
options=["Current Call"] + [f"Call {t['call_sid']}" for t in st.session_state.all_transcripts],
key="call_selector",
index=0,
disabled=st.session_state.call_active,
on_change=on_call_selector_change
)
if st.button("Refresh Call List"):
transcripts = fetch_all_transcripts()
if transcripts is not None:
st.session_state.all_transcripts = transcripts
on_call_selector_change()
st.markdown("---")
with st.spinner("Loading recording and transcript..."):
if st.session_state.call_selector != "Current Call" and st.session_state.recording_info:
st.subheader("Call Recording")
audio_url = st.session_state.recording_info['url']
st.audio(audio_url, format="audio/mp3", start_time=0)
st.markdown("---")
if st.session_state.call_active and st.session_state.call_sid:
st.subheader(f"Transcript for Current Call {st.session_state.call_sid}")
for entry in st.session_state.transcript:
if entry['role'] == 'user':
st.markdown(f"๐Ÿ‘ค **User**: {entry['content']}")
elif entry['role'] == 'assistant':
st.markdown(f"๐Ÿค– **Assistant**: {entry['content']}")
elif st.session_state.call_selector != "Current Call":
selected_transcript = next(
(t for t in st.session_state.all_transcripts if f"Call {t['call_sid']}" == st.session_state.call_selector),
None
)
if selected_transcript:
st.subheader(f"Transcript for {st.session_state.call_selector}")
for entry in selected_transcript['transcript']:
if entry['role'] == 'user':
st.markdown(f"๐Ÿ‘ค **User**: {entry['content']}")
elif entry['role'] == 'assistant':
st.markdown(f"๐Ÿค– **Assistant**: {entry['content']}")
if st.session_state.call_active:
if update_call_info():
time.sleep(1)
st.experimental_rerun()
else:
st.session_state.call_active = False
st.session_state.call_sid = None
st.sidebar.info("Call has ended. You can start a new call if needed.")
st.experimental_rerun()
if __name__ == "__main__":
main()