| | |
| | |
| | |
| | |
| | use crate::chat::ChatIdBlocked; |
| | use crate::chat::{Chat, ChatId, send_msg}; |
| | use crate::config::Config; |
| | use crate::constants::{Blocked, Chattype}; |
| | use crate::contact::ContactId; |
| | use crate::context::{Context, WeakContext}; |
| | use crate::events::EventType; |
| | use crate::headerdef::HeaderDef; |
| | use crate::log::warn; |
| | use crate::message::{Message, MsgId, Viewtype}; |
| | use crate::mimeparser::{MimeMessage, SystemMessage}; |
| | use crate::net::dns::lookup_host_with_cache; |
| | use crate::param::Param; |
| | use crate::tools::{normalize_text, time}; |
| | use anyhow::{Context as _, Result, ensure}; |
| | use deltachat_derive::{FromSql, ToSql}; |
| | use num_traits::FromPrimitive; |
| | use sdp::SessionDescription; |
| | use serde::Serialize; |
| | use std::io::Cursor; |
| | use std::str::FromStr; |
| | use std::time::Duration; |
| | use tokio::task; |
| | use tokio::time::sleep; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | const RINGING_SECONDS: i64 = 120; |
| |
|
| | |
| |
|
| | const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg; |
| | const CALL_ENDED_TIMESTAMP: Param = Param::Arg4; |
| |
|
| | const STUN_PORT: u16 = 3478; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2; |
| |
|
| | |
| | #[derive(Debug, Default)] |
| | pub struct CallInfo { |
| | |
| | pub place_call_info: String, |
| |
|
| | |
| | pub accept_call_info: String, |
| |
|
| | |
| | |
| | pub msg: Message, |
| | } |
| |
|
| | impl CallInfo { |
| | |
| | pub fn is_incoming(&self) -> bool { |
| | self.msg.from_id != ContactId::SELF |
| | } |
| |
|
| | |
| | pub fn is_stale(&self) -> bool { |
| | (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0 |
| | } |
| |
|
| | fn remaining_ring_seconds(&self) -> i64 { |
| | let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time(); |
| | remaining_seconds.clamp(0, RINGING_SECONDS) |
| | } |
| |
|
| | async fn update_text(&self, context: &Context, text: &str) -> Result<()> { |
| | context |
| | .sql |
| | .execute( |
| | "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?", |
| | (text, normalize_text(text), self.msg.id), |
| | ) |
| | .await?; |
| | Ok(()) |
| | } |
| |
|
| | async fn update_text_duration(&self, context: &Context) -> Result<()> { |
| | let minutes = self.duration_seconds() / 60; |
| | let duration = match minutes { |
| | 0 => "<1 minute".to_string(), |
| | 1 => "1 minute".to_string(), |
| | n => format!("{n} minutes"), |
| | }; |
| |
|
| | if self.is_incoming() { |
| | self.update_text(context, &format!("Incoming call\n{duration}")) |
| | .await?; |
| | } else { |
| | self.update_text(context, &format!("Outgoing call\n{duration}")) |
| | .await?; |
| | } |
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> { |
| | self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time()); |
| | self.msg.update_param(context).await?; |
| | Ok(()) |
| | } |
| |
|
| | |
| | pub fn is_accepted(&self) -> bool { |
| | self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | pub fn is_canceled(&self) -> bool { |
| | self.msg.param.exists(CALL_CANCELED_TIMESTAMP) |
| | } |
| |
|
| | async fn mark_as_ended(&mut self, context: &Context) -> Result<()> { |
| | self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time()); |
| | self.msg.update_param(context).await?; |
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> { |
| | let now = time(); |
| | self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now); |
| | self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now); |
| | self.msg.update_param(context).await?; |
| | Ok(()) |
| | } |
| |
|
| | |
| | pub fn is_ended(&self) -> bool { |
| | self.msg.param.exists(CALL_ENDED_TIMESTAMP) |
| | } |
| |
|
| | |
| | pub fn duration_seconds(&self) -> i64 { |
| | if let (Some(start), Some(end)) = ( |
| | self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP), |
| | self.msg.param.get_i64(CALL_ENDED_TIMESTAMP), |
| | ) { |
| | let seconds = end - start; |
| | if seconds <= 0 { |
| | return 1; |
| | } |
| | return seconds; |
| | } |
| | 0 |
| | } |
| | } |
| |
|
| | impl Context { |
| | |
| | pub async fn place_outgoing_call( |
| | &self, |
| | chat_id: ChatId, |
| | place_call_info: String, |
| | ) -> Result<MsgId> { |
| | let chat = Chat::load_from_db(self, chat_id).await?; |
| | ensure!( |
| | chat.typ == Chattype::Single, |
| | "Can only place calls in 1:1 chats" |
| | ); |
| | ensure!(!chat.is_self_talk(), "Cannot call self"); |
| |
|
| | let mut call = Message { |
| | viewtype: Viewtype::Call, |
| | text: "Outgoing call".into(), |
| | ..Default::default() |
| | }; |
| | call.param.set(Param::WebrtcRoom, &place_call_info); |
| | call.id = send_msg(self, chat_id, &mut call).await?; |
| |
|
| | let wait = RINGING_SECONDS; |
| | let context = self.get_weak_context(); |
| | task::spawn(Context::emit_end_call_if_unaccepted( |
| | context, |
| | wait.try_into()?, |
| | call.id, |
| | )); |
| |
|
| | Ok(call.id) |
| | } |
| |
|
| | |
| | pub async fn accept_incoming_call( |
| | &self, |
| | call_id: MsgId, |
| | accept_call_info: String, |
| | ) -> Result<()> { |
| | let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| { |
| | format!("accept_incoming_call is called with {call_id} which does not refer to a call") |
| | })?; |
| | ensure!(call.is_incoming()); |
| | if call.is_accepted() || call.is_ended() { |
| | info!(self, "Call already accepted/ended"); |
| | return Ok(()); |
| | } |
| |
|
| | call.mark_as_accepted(self).await?; |
| | let chat = Chat::load_from_db(self, call.msg.chat_id).await?; |
| | if chat.is_contact_request() { |
| | chat.id.accept(self).await?; |
| | } |
| |
|
| | |
| | let mut msg = Message { |
| | viewtype: Viewtype::Text, |
| | text: "[Call accepted]".into(), |
| | ..Default::default() |
| | }; |
| | msg.param.set_cmd(SystemMessage::CallAccepted); |
| | msg.hidden = true; |
| | msg.param |
| | .set(Param::WebrtcAccepted, accept_call_info.to_string()); |
| | msg.set_quote(self, Some(&call.msg)).await?; |
| | msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?; |
| | self.emit_event(EventType::IncomingCallAccepted { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | }); |
| | self.emit_msgs_changed(call.msg.chat_id, call_id); |
| | Ok(()) |
| | } |
| |
|
| | |
| | pub async fn end_call(&self, call_id: MsgId) -> Result<()> { |
| | let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| { |
| | format!("end_call is called with {call_id} which does not refer to a call") |
| | })?; |
| | if call.is_ended() { |
| | info!(self, "Call already ended"); |
| | return Ok(()); |
| | } |
| |
|
| | if !call.is_accepted() { |
| | if call.is_incoming() { |
| | call.mark_as_ended(self).await?; |
| | call.update_text(self, "Declined call").await?; |
| | } else { |
| | call.mark_as_canceled(self).await?; |
| | call.update_text(self, "Canceled call").await?; |
| | } |
| | } else { |
| | call.mark_as_ended(self).await?; |
| | call.update_text_duration(self).await?; |
| | } |
| |
|
| | let mut msg = Message { |
| | viewtype: Viewtype::Text, |
| | text: "[Call ended]".into(), |
| | ..Default::default() |
| | }; |
| | msg.param.set_cmd(SystemMessage::CallEnded); |
| | msg.hidden = true; |
| | msg.set_quote(self, Some(&call.msg)).await?; |
| | msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?; |
| |
|
| | self.emit_event(EventType::CallEnded { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | }); |
| | self.emit_msgs_changed(call.msg.chat_id, call_id); |
| | Ok(()) |
| | } |
| |
|
| | async fn emit_end_call_if_unaccepted( |
| | context: WeakContext, |
| | wait: u64, |
| | call_id: MsgId, |
| | ) -> Result<()> { |
| | sleep(Duration::from_secs(wait)).await; |
| | let context = context.upgrade()?; |
| | let Some(mut call) = context.load_call_by_id(call_id).await? else { |
| | warn!( |
| | context, |
| | "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call." |
| | ); |
| | return Ok(()); |
| | }; |
| | if !call.is_accepted() && !call.is_ended() { |
| | if call.is_incoming() { |
| | call.mark_as_canceled(&context).await?; |
| | call.update_text(&context, "Missed call").await?; |
| | } else { |
| | call.mark_as_ended(&context).await?; |
| | call.update_text(&context, "Canceled call").await?; |
| | } |
| | context.emit_msgs_changed(call.msg.chat_id, call_id); |
| | context.emit_event(EventType::CallEnded { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | }); |
| | } |
| | Ok(()) |
| | } |
| |
|
| | pub(crate) async fn handle_call_msg( |
| | &self, |
| | call_id: MsgId, |
| | mime_message: &MimeMessage, |
| | from_id: ContactId, |
| | ) -> Result<()> { |
| | if mime_message.is_call() { |
| | let Some(call) = self.load_call_by_id(call_id).await? else { |
| | warn!(self, "{call_id} does not refer to a call message"); |
| | return Ok(()); |
| | }; |
| |
|
| | if call.is_incoming() { |
| | if call.is_stale() { |
| | call.update_text(self, "Missed call").await?; |
| | self.emit_incoming_msg(call.msg.chat_id, call_id); |
| | } else { |
| | call.update_text(self, "Incoming call").await?; |
| | self.emit_msgs_changed(call.msg.chat_id, call_id); |
| | let has_video = match sdp_has_video(&call.place_call_info) { |
| | Ok(has_video) => has_video, |
| | Err(err) => { |
| | warn!(self, "Failed to determine if SDP offer has video: {err:#}."); |
| | false |
| | } |
| | }; |
| | let can_call_me = match who_can_call_me(self).await? { |
| | WhoCanCallMe::Contacts => ChatIdBlocked::lookup_by_contact(self, from_id) |
| | .await? |
| | .is_some_and(|chat_id_blocked| { |
| | match chat_id_blocked.blocked { |
| | Blocked::Not => true, |
| | Blocked::Yes | Blocked::Request => { |
| | |
| | |
| | |
| | |
| | |
| | false |
| | } |
| | } |
| | }), |
| | WhoCanCallMe::Everybody => ChatIdBlocked::lookup_by_contact(self, from_id) |
| | .await? |
| | .is_none_or(|chat_id_blocked| chat_id_blocked.blocked != Blocked::Yes), |
| | WhoCanCallMe::Nobody => false, |
| | }; |
| | if can_call_me { |
| | self.emit_event(EventType::IncomingCall { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | place_call_info: call.place_call_info.to_string(), |
| | has_video, |
| | }); |
| | } |
| | let wait = call.remaining_ring_seconds(); |
| | let context = self.get_weak_context(); |
| | task::spawn(Context::emit_end_call_if_unaccepted( |
| | context, |
| | wait.try_into()?, |
| | call.msg.id, |
| | )); |
| | } |
| | } else { |
| | call.update_text(self, "Outgoing call").await?; |
| | self.emit_msgs_changed(call.msg.chat_id, call_id); |
| | } |
| | } else { |
| | match mime_message.is_system_message { |
| | SystemMessage::CallAccepted => { |
| | let Some(mut call) = self.load_call_by_id(call_id).await? else { |
| | warn!(self, "{call_id} does not refer to a call message"); |
| | return Ok(()); |
| | }; |
| |
|
| | if call.is_ended() || call.is_accepted() { |
| | info!(self, "CallAccepted received for accepted/ended call"); |
| | return Ok(()); |
| | } |
| |
|
| | call.mark_as_accepted(self).await?; |
| | self.emit_msgs_changed(call.msg.chat_id, call_id); |
| | if call.is_incoming() { |
| | self.emit_event(EventType::IncomingCallAccepted { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | }); |
| | } else { |
| | let accept_call_info = mime_message |
| | .get_header(HeaderDef::ChatWebrtcAccepted) |
| | .unwrap_or_default(); |
| | self.emit_event(EventType::OutgoingCallAccepted { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | accept_call_info: accept_call_info.to_string(), |
| | }); |
| | } |
| | } |
| | SystemMessage::CallEnded => { |
| | let Some(mut call) = self.load_call_by_id(call_id).await? else { |
| | warn!(self, "{call_id} does not refer to a call message"); |
| | return Ok(()); |
| | }; |
| |
|
| | if call.is_ended() { |
| | |
| | info!(self, "CallEnded received for ended call"); |
| | return Ok(()); |
| | } |
| |
|
| | if !call.is_accepted() { |
| | if call.is_incoming() { |
| | if from_id == ContactId::SELF { |
| | call.mark_as_ended(self).await?; |
| | call.update_text(self, "Declined call").await?; |
| | } else { |
| | call.mark_as_canceled(self).await?; |
| | call.update_text(self, "Missed call").await?; |
| | } |
| | } else { |
| | |
| | if from_id == ContactId::SELF { |
| | call.mark_as_canceled(self).await?; |
| | call.update_text(self, "Canceled call").await?; |
| | } else { |
| | call.mark_as_ended(self).await?; |
| | call.update_text(self, "Declined call").await?; |
| | } |
| | } |
| | } else { |
| | call.mark_as_ended(self).await?; |
| | call.update_text_duration(self).await?; |
| | } |
| |
|
| | self.emit_msgs_changed(call.msg.chat_id, call_id); |
| | self.emit_event(EventType::CallEnded { |
| | msg_id: call.msg.id, |
| | chat_id: call.msg.chat_id, |
| | }); |
| | } |
| | _ => {} |
| | } |
| | } |
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> { |
| | let call = Message::load_from_db(self, call_id).await?; |
| | Ok(self.load_call_by_message(call)) |
| | } |
| |
|
| | |
| | |
| | |
| | fn load_call_by_message(&self, call: Message) -> Option<CallInfo> { |
| | if call.viewtype != Viewtype::Call { |
| | |
| | |
| | |
| | return None; |
| | } |
| |
|
| | Some(CallInfo { |
| | place_call_info: call |
| | .param |
| | .get(Param::WebrtcRoom) |
| | .unwrap_or_default() |
| | .to_string(), |
| | accept_call_info: call |
| | .param |
| | .get(Param::WebrtcAccepted) |
| | .unwrap_or_default() |
| | .to_string(), |
| | msg: call, |
| | }) |
| | } |
| | } |
| |
|
| | |
| | pub fn sdp_has_video(sdp: &str) -> Result<bool> { |
| | let mut cursor = Cursor::new(sdp); |
| | let session_description = |
| | SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?; |
| | for media_description in &session_description.media_descriptions { |
| | if media_description.media_name.media == "video" { |
| | return Ok(true); |
| | } |
| | } |
| | Ok(false) |
| | } |
| |
|
| | |
| | #[derive(Debug, PartialEq, Eq)] |
| | pub enum CallState { |
| | |
| | |
| | |
| | |
| | |
| | Alerting, |
| |
|
| | |
| | Active, |
| |
|
| | |
| | |
| | Completed { |
| | |
| | duration: i64, |
| | }, |
| |
|
| | |
| | |
| | Missed, |
| |
|
| | |
| | |
| | |
| | Declined, |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | Canceled, |
| | } |
| |
|
| | |
| | |
| | |
| | pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> { |
| | let call = context |
| | .load_call_by_id(msg_id) |
| | .await? |
| | .with_context(|| format!("{msg_id} is not a call message"))?; |
| | let state = if call.is_incoming() { |
| | if call.is_accepted() { |
| | if call.is_ended() { |
| | CallState::Completed { |
| | duration: call.duration_seconds(), |
| | } |
| | } else { |
| | CallState::Active |
| | } |
| | } else if call.is_canceled() { |
| | |
| | |
| | CallState::Missed |
| | } else if call.is_ended() { |
| | CallState::Declined |
| | } else if call.is_stale() { |
| | CallState::Missed |
| | } else { |
| | CallState::Alerting |
| | } |
| | } else if call.is_accepted() { |
| | if call.is_ended() { |
| | CallState::Completed { |
| | duration: call.duration_seconds(), |
| | } |
| | } else { |
| | CallState::Active |
| | } |
| | } else if call.is_canceled() { |
| | CallState::Canceled |
| | } else if call.is_ended() || call.is_stale() { |
| | CallState::Declined |
| | } else { |
| | CallState::Alerting |
| | }; |
| | Ok(state) |
| | } |
| |
|
| | |
| | #[derive(Serialize, Debug, Clone, PartialEq)] |
| | struct IceServer { |
| | |
| | pub urls: Vec<String>, |
| |
|
| | |
| | pub username: Option<String>, |
| |
|
| | |
| | pub credential: Option<String>, |
| | } |
| |
|
| | |
| | async fn create_ice_servers( |
| | context: &Context, |
| | hostname: &str, |
| | port: u16, |
| | username: &str, |
| | password: &str, |
| | ) -> Result<String> { |
| | |
| | let load_cache = false; |
| | let urls: Vec<String> = lookup_host_with_cache(context, hostname, port, "", load_cache) |
| | .await? |
| | .into_iter() |
| | .map(|addr| format!("turn:{addr}")) |
| | .collect(); |
| |
|
| | let ice_server = IceServer { |
| | urls, |
| | username: Some(username.to_string()), |
| | credential: Some(password.to_string()), |
| | }; |
| |
|
| | let json = serde_json::to_string(&[ice_server])?; |
| | Ok(json) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | pub(crate) async fn create_ice_servers_from_metadata( |
| | context: &Context, |
| | metadata: &str, |
| | ) -> Result<(i64, String)> { |
| | let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?; |
| | let (port, rest) = rest.split_once(':').context("Missing port")?; |
| | let port = u16::from_str(port).context("Failed to parse the port")?; |
| | let (ts, password) = rest.split_once(':').context("Missing timestamp")?; |
| | let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?; |
| | let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?; |
| | Ok((expiration_timestamp, ice_servers)) |
| | } |
| |
|
| | |
| | pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result<String> { |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | let hostname = "nine.testrun.org"; |
| | |
| | let load_cache = false; |
| | let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache) |
| | .await? |
| | .into_iter() |
| | .map(|addr| format!("stun:{addr}")) |
| | .collect(); |
| | let stun_server = IceServer { |
| | urls, |
| | username: None, |
| | credential: None, |
| | }; |
| |
|
| | let hostname = "turn.delta.chat"; |
| | |
| | let load_cache = false; |
| | let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache) |
| | .await? |
| | .into_iter() |
| | .map(|addr| format!("turn:{addr}")) |
| | .collect(); |
| | let turn_server = IceServer { |
| | urls, |
| | username: Some("public".to_string()), |
| | credential: Some("o4tR7yG4rG2slhXqRUf9zgmHz".to_string()), |
| | }; |
| |
|
| | let json = serde_json::to_string(&[stun_server, turn_server])?; |
| | Ok(json) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | pub async fn ice_servers(context: &Context) -> Result<String> { |
| | if let Some(ref metadata) = *context.metadata.read().await { |
| | Ok(metadata.ice_servers.clone()) |
| | } else { |
| | Ok("[]".to_string()) |
| | } |
| | } |
| |
|
| | |
| | #[derive( |
| | Debug, Default, Display, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql, |
| | )] |
| | #[repr(u8)] |
| | pub enum WhoCanCallMe { |
| | |
| | |
| | |
| | Everybody = 0, |
| |
|
| | |
| | #[default] |
| | Contacts = 1, |
| |
|
| | |
| | Nobody = 2, |
| | } |
| |
|
| | |
| | async fn who_can_call_me(context: &Context) -> Result<WhoCanCallMe> { |
| | let who_can_call_me = |
| | WhoCanCallMe::from_i32(context.get_config_int(Config::WhoCanCallMe).await?) |
| | .unwrap_or_default(); |
| | Ok(who_can_call_me) |
| | } |
| |
|
| | #[cfg(test)] |
| | mod calls_tests; |
| |
|