//! # Handle calls. //! //! Internally, calls are bound a user-visible message initializing the call. //! This means, the "Call ID" is a "Message ID" - similar to Webxdc IDs. use crate::chat::ChatIdBlocked; use crate::chat::{Chat, ChatId, send_msg}; use crate::config::Config; use crate::constants::{Blocked, Chattype}; use crate::contact::ContactId; use crate::context::{Context, WeakContext}; use crate::events::EventType; use crate::headerdef::HeaderDef; use crate::log::warn; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::{MimeMessage, SystemMessage}; use crate::net::dns::lookup_host_with_cache; use crate::param::Param; use crate::tools::{normalize_text, time}; use anyhow::{Context as _, Result, ensure}; use deltachat_derive::{FromSql, ToSql}; use num_traits::FromPrimitive; use sdp::SessionDescription; use serde::Serialize; use std::io::Cursor; use std::str::FromStr; use std::time::Duration; use tokio::task; use tokio::time::sleep; /// How long callee's or caller's phone ring. /// /// For the callee, this is to prevent endless ringing /// in case the initial "call" is received, but then the caller went offline. /// Moreover, this prevents outdated calls to ring /// in case the initial "call" message arrives delayed. /// /// For the caller, this means they should also not wait longer, /// as the callee won't start the call afterwards. const RINGING_SECONDS: i64 = 120; // For persisting parameters in the call, we use Param::Arg* const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg; const CALL_ENDED_TIMESTAMP: Param = Param::Arg4; const STUN_PORT: u16 = 3478; /// Set if incoming call was ended explicitly /// by the other side before we accepted it. /// /// It is used to distinguish "ended" calls /// that are rejected by us from the calls /// canceled by the other side /// immediately after ringing started. const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2; /// Information about the status of a call. #[derive(Debug, Default)] pub struct CallInfo { /// User-defined text as given to place_outgoing_call() pub place_call_info: String, /// User-defined text as given to accept_incoming_call() pub accept_call_info: String, /// Message referring to the call. /// Data are persisted along with the message using Param::Arg* pub msg: Message, } impl CallInfo { /// Returns true if the call is an incoming call. pub fn is_incoming(&self) -> bool { self.msg.from_id != ContactId::SELF } /// Returns true if the call should not ring anymore. pub fn is_stale(&self) -> bool { (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0 } fn remaining_ring_seconds(&self) -> i64 { let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time(); remaining_seconds.clamp(0, RINGING_SECONDS) } async fn update_text(&self, context: &Context, text: &str) -> Result<()> { context .sql .execute( "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?", (text, normalize_text(text), self.msg.id), ) .await?; Ok(()) } async fn update_text_duration(&self, context: &Context) -> Result<()> { let minutes = self.duration_seconds() / 60; let duration = match minutes { 0 => "<1 minute".to_string(), 1 => "1 minute".to_string(), n => format!("{n} minutes"), }; if self.is_incoming() { self.update_text(context, &format!("Incoming call\n{duration}")) .await?; } else { self.update_text(context, &format!("Outgoing call\n{duration}")) .await?; } Ok(()) } /// Mark calls as accepted. /// This is needed for all devices where a stale-timer runs, to prevent accepted calls being terminated as stale. async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> { self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time()); self.msg.update_param(context).await?; Ok(()) } /// Returns true if the call is accepted. pub fn is_accepted(&self) -> bool { self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP) } /// Returns true if the call is missed /// because the caller canceled it /// explicitly before ringing stopped. /// /// For outgoing calls this means /// the receiver has rejected the call /// explicitly. pub fn is_canceled(&self) -> bool { self.msg.param.exists(CALL_CANCELED_TIMESTAMP) } async fn mark_as_ended(&mut self, context: &Context) -> Result<()> { self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time()); self.msg.update_param(context).await?; Ok(()) } /// Explicitly mark the call as canceled. /// /// For incoming calls this should be called /// when "call ended" message is received /// from the caller before we picked up the call. /// In this case the call becomes "missed" early /// before the ringing timeout. async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> { let now = time(); self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now); self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now); self.msg.update_param(context).await?; Ok(()) } /// Returns true if the call is ended. pub fn is_ended(&self) -> bool { self.msg.param.exists(CALL_ENDED_TIMESTAMP) } /// Returns call duration in seconds. pub fn duration_seconds(&self) -> i64 { if let (Some(start), Some(end)) = ( self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP), self.msg.param.get_i64(CALL_ENDED_TIMESTAMP), ) { let seconds = end - start; if seconds <= 0 { return 1; } return seconds; } 0 } } impl Context { /// Start an outgoing call. pub async fn place_outgoing_call( &self, chat_id: ChatId, place_call_info: String, ) -> Result { let chat = Chat::load_from_db(self, chat_id).await?; ensure!( chat.typ == Chattype::Single, "Can only place calls in 1:1 chats" ); ensure!(!chat.is_self_talk(), "Cannot call self"); let mut call = Message { viewtype: Viewtype::Call, text: "Outgoing call".into(), ..Default::default() }; call.param.set(Param::WebrtcRoom, &place_call_info); call.id = send_msg(self, chat_id, &mut call).await?; let wait = RINGING_SECONDS; let context = self.get_weak_context(); task::spawn(Context::emit_end_call_if_unaccepted( context, wait.try_into()?, call.id, )); Ok(call.id) } /// Accept an incoming call. pub async fn accept_incoming_call( &self, call_id: MsgId, accept_call_info: String, ) -> Result<()> { let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| { format!("accept_incoming_call is called with {call_id} which does not refer to a call") })?; ensure!(call.is_incoming()); if call.is_accepted() || call.is_ended() { info!(self, "Call already accepted/ended"); return Ok(()); } call.mark_as_accepted(self).await?; let chat = Chat::load_from_db(self, call.msg.chat_id).await?; if chat.is_contact_request() { chat.id.accept(self).await?; } // send an acceptance message around: to the caller as well as to the other devices of the callee let mut msg = Message { viewtype: Viewtype::Text, text: "[Call accepted]".into(), ..Default::default() }; msg.param.set_cmd(SystemMessage::CallAccepted); msg.hidden = true; msg.param .set(Param::WebrtcAccepted, accept_call_info.to_string()); msg.set_quote(self, Some(&call.msg)).await?; msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?; self.emit_event(EventType::IncomingCallAccepted { msg_id: call.msg.id, chat_id: call.msg.chat_id, }); self.emit_msgs_changed(call.msg.chat_id, call_id); Ok(()) } /// Cancel, decline or hangup an incoming or outgoing call. pub async fn end_call(&self, call_id: MsgId) -> Result<()> { let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| { format!("end_call is called with {call_id} which does not refer to a call") })?; if call.is_ended() { info!(self, "Call already ended"); return Ok(()); } if !call.is_accepted() { if call.is_incoming() { call.mark_as_ended(self).await?; call.update_text(self, "Declined call").await?; } else { call.mark_as_canceled(self).await?; call.update_text(self, "Canceled call").await?; } } else { call.mark_as_ended(self).await?; call.update_text_duration(self).await?; } let mut msg = Message { viewtype: Viewtype::Text, text: "[Call ended]".into(), ..Default::default() }; msg.param.set_cmd(SystemMessage::CallEnded); msg.hidden = true; msg.set_quote(self, Some(&call.msg)).await?; msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?; self.emit_event(EventType::CallEnded { msg_id: call.msg.id, chat_id: call.msg.chat_id, }); self.emit_msgs_changed(call.msg.chat_id, call_id); Ok(()) } async fn emit_end_call_if_unaccepted( context: WeakContext, wait: u64, call_id: MsgId, ) -> Result<()> { sleep(Duration::from_secs(wait)).await; let context = context.upgrade()?; let Some(mut call) = context.load_call_by_id(call_id).await? else { warn!( context, "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call." ); return Ok(()); }; if !call.is_accepted() && !call.is_ended() { if call.is_incoming() { call.mark_as_canceled(&context).await?; call.update_text(&context, "Missed call").await?; } else { call.mark_as_ended(&context).await?; call.update_text(&context, "Canceled call").await?; } context.emit_msgs_changed(call.msg.chat_id, call_id); context.emit_event(EventType::CallEnded { msg_id: call.msg.id, chat_id: call.msg.chat_id, }); } Ok(()) } pub(crate) async fn handle_call_msg( &self, call_id: MsgId, mime_message: &MimeMessage, from_id: ContactId, ) -> Result<()> { if mime_message.is_call() { let Some(call) = self.load_call_by_id(call_id).await? else { warn!(self, "{call_id} does not refer to a call message"); return Ok(()); }; if call.is_incoming() { if call.is_stale() { call.update_text(self, "Missed call").await?; self.emit_incoming_msg(call.msg.chat_id, call_id); // notify missed call } else { call.update_text(self, "Incoming call").await?; self.emit_msgs_changed(call.msg.chat_id, call_id); // ringing calls are not additionally notified let has_video = match sdp_has_video(&call.place_call_info) { Ok(has_video) => has_video, Err(err) => { warn!(self, "Failed to determine if SDP offer has video: {err:#}."); false } }; let can_call_me = match who_can_call_me(self).await? { WhoCanCallMe::Contacts => ChatIdBlocked::lookup_by_contact(self, from_id) .await? .is_some_and(|chat_id_blocked| { match chat_id_blocked.blocked { Blocked::Not => true, Blocked::Yes | Blocked::Request => { // Do not notify about incoming calls // from contact requests and blocked contacts. // // User can still access the call and accept it // via the chat in case of contact requests. false } } }), WhoCanCallMe::Everybody => ChatIdBlocked::lookup_by_contact(self, from_id) .await? .is_none_or(|chat_id_blocked| chat_id_blocked.blocked != Blocked::Yes), WhoCanCallMe::Nobody => false, }; if can_call_me { self.emit_event(EventType::IncomingCall { msg_id: call.msg.id, chat_id: call.msg.chat_id, place_call_info: call.place_call_info.to_string(), has_video, }); } let wait = call.remaining_ring_seconds(); let context = self.get_weak_context(); task::spawn(Context::emit_end_call_if_unaccepted( context, wait.try_into()?, call.msg.id, )); } } else { call.update_text(self, "Outgoing call").await?; self.emit_msgs_changed(call.msg.chat_id, call_id); } } else { match mime_message.is_system_message { SystemMessage::CallAccepted => { let Some(mut call) = self.load_call_by_id(call_id).await? else { warn!(self, "{call_id} does not refer to a call message"); return Ok(()); }; if call.is_ended() || call.is_accepted() { info!(self, "CallAccepted received for accepted/ended call"); return Ok(()); } call.mark_as_accepted(self).await?; self.emit_msgs_changed(call.msg.chat_id, call_id); if call.is_incoming() { self.emit_event(EventType::IncomingCallAccepted { msg_id: call.msg.id, chat_id: call.msg.chat_id, }); } else { let accept_call_info = mime_message .get_header(HeaderDef::ChatWebrtcAccepted) .unwrap_or_default(); self.emit_event(EventType::OutgoingCallAccepted { msg_id: call.msg.id, chat_id: call.msg.chat_id, accept_call_info: accept_call_info.to_string(), }); } } SystemMessage::CallEnded => { let Some(mut call) = self.load_call_by_id(call_id).await? else { warn!(self, "{call_id} does not refer to a call message"); return Ok(()); }; if call.is_ended() { // may happen eg. if a a message is missed info!(self, "CallEnded received for ended call"); return Ok(()); } if !call.is_accepted() { if call.is_incoming() { if from_id == ContactId::SELF { call.mark_as_ended(self).await?; call.update_text(self, "Declined call").await?; } else { call.mark_as_canceled(self).await?; call.update_text(self, "Missed call").await?; } } else { // outgoing if from_id == ContactId::SELF { call.mark_as_canceled(self).await?; call.update_text(self, "Canceled call").await?; } else { call.mark_as_ended(self).await?; call.update_text(self, "Declined call").await?; } } } else { call.mark_as_ended(self).await?; call.update_text_duration(self).await?; } self.emit_msgs_changed(call.msg.chat_id, call_id); self.emit_event(EventType::CallEnded { msg_id: call.msg.id, chat_id: call.msg.chat_id, }); } _ => {} } } Ok(()) } /// Loads information about the call given its ID. /// /// If the message referred to by ID is /// not a call message, returns `None`. pub async fn load_call_by_id(&self, call_id: MsgId) -> Result> { let call = Message::load_from_db(self, call_id).await?; Ok(self.load_call_by_message(call)) } // Loads information about the call given the `Message`. // // If the `Message` is not a call message, returns `None` fn load_call_by_message(&self, call: Message) -> Option { if call.viewtype != Viewtype::Call { // This can happen e.g. if a "call accepted" // or "call ended" message is received // with `In-Reply-To` referring to non-call message. return None; } Some(CallInfo { place_call_info: call .param .get(Param::WebrtcRoom) .unwrap_or_default() .to_string(), accept_call_info: call .param .get(Param::WebrtcAccepted) .unwrap_or_default() .to_string(), msg: call, }) } } /// Returns true if SDP offer has a video. pub fn sdp_has_video(sdp: &str) -> Result { let mut cursor = Cursor::new(sdp); let session_description = SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?; for media_description in &session_description.media_descriptions { if media_description.media_name.media == "video" { return Ok(true); } } Ok(false) } /// State of the call for display in the message bubble. #[derive(Debug, PartialEq, Eq)] pub enum CallState { /// Fresh incoming or outgoing call that is still ringing. /// /// There is no separate state for outgoing call /// that has been dialled but not ringing on the other side yet /// as we don't know whether the other side received our call. Alerting, /// Active call. Active, /// Completed call that was once active /// and then was terminated for any reason. Completed { /// Call duration in seconds. duration: i64, }, /// Incoming call that was not picked up within a timeout /// or was explicitly ended by the caller before we picked up. Missed, /// Incoming call that was explicitly ended on our side /// before picking up or outgoing call /// that was declined before the timeout. Declined, /// Outgoing call that has been canceled on our side /// before receiving a response. /// /// Incoming calls cannot be canceled, /// on the receiver side canceled calls /// usually result in missed calls. Canceled, } /// Returns call state given the message ID. /// /// Returns an error if the message is not a call message. pub async fn call_state(context: &Context, msg_id: MsgId) -> Result { let call = context .load_call_by_id(msg_id) .await? .with_context(|| format!("{msg_id} is not a call message"))?; let state = if call.is_incoming() { if call.is_accepted() { if call.is_ended() { CallState::Completed { duration: call.duration_seconds(), } } else { CallState::Active } } else if call.is_canceled() { // Call was explicitly canceled // by the caller before we picked it up. CallState::Missed } else if call.is_ended() { CallState::Declined } else if call.is_stale() { CallState::Missed } else { CallState::Alerting } } else if call.is_accepted() { if call.is_ended() { CallState::Completed { duration: call.duration_seconds(), } } else { CallState::Active } } else if call.is_canceled() { CallState::Canceled } else if call.is_ended() || call.is_stale() { CallState::Declined } else { CallState::Alerting }; Ok(state) } /// ICE server for JSON serialization. #[derive(Serialize, Debug, Clone, PartialEq)] struct IceServer { /// STUN or TURN URLs. pub urls: Vec, /// Username for TURN server authentication. pub username: Option, /// Password for logging into the server. pub credential: Option, } /// Creates JSON with ICE servers. async fn create_ice_servers( context: &Context, hostname: &str, port: u16, username: &str, password: &str, ) -> Result { // Do not use cache because there is no TLS. let load_cache = false; let urls: Vec = lookup_host_with_cache(context, hostname, port, "", load_cache) .await? .into_iter() .map(|addr| format!("turn:{addr}")) .collect(); let ice_server = IceServer { urls, username: Some(username.to_string()), credential: Some(password.to_string()), }; let json = serde_json::to_string(&[ice_server])?; Ok(json) } /// Creates JSON with ICE servers from a line received over IMAP METADATA. /// /// IMAP METADATA returns a line such as /// `example.com:3478:1758650868:8Dqkyyu11MVESBqjbIylmB06rv8=` /// /// 1758650868 is the username and expiration timestamp /// at the same time, /// while `8Dqkyyu11MVESBqjbIylmB06rv8=` /// is the password. pub(crate) async fn create_ice_servers_from_metadata( context: &Context, metadata: &str, ) -> Result<(i64, String)> { let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?; let (port, rest) = rest.split_once(':').context("Missing port")?; let port = u16::from_str(port).context("Failed to parse the port")?; let (ts, password) = rest.split_once(':').context("Missing timestamp")?; let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?; let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?; Ok((expiration_timestamp, ice_servers)) } /// Creates JSON with ICE servers when no TURN servers are known. pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result { // Do not use public STUN server from https://stunprotocol.org/. // It changes the hostname every year // (e.g. stunserver2025.stunprotocol.org // which was previously stunserver2024.stunprotocol.org) // because of bandwidth costs: // let hostname = "nine.testrun.org"; // Do not use cache because there is no TLS. let load_cache = false; let urls: Vec = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache) .await? .into_iter() .map(|addr| format!("stun:{addr}")) .collect(); let stun_server = IceServer { urls, username: None, credential: None, }; let hostname = "turn.delta.chat"; // Do not use cache because there is no TLS. let load_cache = false; let urls: Vec = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache) .await? .into_iter() .map(|addr| format!("turn:{addr}")) .collect(); let turn_server = IceServer { urls, username: Some("public".to_string()), credential: Some("o4tR7yG4rG2slhXqRUf9zgmHz".to_string()), }; let json = serde_json::to_string(&[stun_server, turn_server])?; Ok(json) } /// Returns JSON with ICE servers. /// /// /// /// All returned servers are resolved to their IP addresses. /// The primary point of DNS lookup is that Delta Chat Desktop /// relies on the servers being specified by IP, /// because it itself cannot utilize DNS. See /// . pub async fn ice_servers(context: &Context) -> Result { if let Some(ref metadata) = *context.metadata.read().await { Ok(metadata.ice_servers.clone()) } else { Ok("[]".to_string()) } } /// "Who can call me" config options. #[derive( Debug, Default, Display, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql, )] #[repr(u8)] pub enum WhoCanCallMe { /// Everybody can call me if they are not blocked. /// /// This includes contact requests. Everybody = 0, /// Every contact who is not blocked and not a contact request, can call. #[default] Contacts = 1, /// Nobody can call me. Nobody = 2, } /// Returns currently configuration of the "who can call me" option. async fn who_can_call_me(context: &Context) -> Result { let who_can_call_me = WhoCanCallMe::from_i32(context.get_config_int(Config::WhoCanCallMe).await?) .unwrap_or_default(); Ok(who_can_call_me) } #[cfg(test)] mod calls_tests;