| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | use std::cmp::max; |
| | use std::collections::BTreeSet; |
| | use std::fmt; |
| | use std::num::ParseIntError; |
| | use std::str::FromStr; |
| | use std::time::{Duration, UNIX_EPOCH}; |
| |
|
| | use anyhow::{Context as _, Result, ensure}; |
| | use async_channel::Receiver; |
| | use serde::{Deserialize, Serialize}; |
| | use tokio::time::timeout; |
| |
|
| | use crate::chat::{ChatId, ChatIdBlocked, send_msg}; |
| | use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH}; |
| | use crate::contact::ContactId; |
| | use crate::context::Context; |
| | use crate::download::MIN_DELETE_SERVER_AFTER; |
| | use crate::events::EventType; |
| | use crate::log::{LogExt, warn}; |
| | use crate::message::{Message, MessageState, MsgId, Viewtype}; |
| | use crate::mimeparser::SystemMessage; |
| | use crate::stock_str; |
| | use crate::tools::{SystemTime, duration_to_str, time}; |
| | use crate::{location, stats}; |
| |
|
| | |
| | #[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize, Default)] |
| | pub enum Timer { |
| | |
| | #[default] |
| | Disabled, |
| |
|
| | |
| | Enabled { |
| | |
| | |
| | |
| | duration: u32, |
| | }, |
| | } |
| |
|
| | impl Timer { |
| | |
| | |
| | |
| | pub fn to_u32(self) -> u32 { |
| | match self { |
| | Self::Disabled => 0, |
| | Self::Enabled { duration } => duration, |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | pub fn from_u32(duration: u32) -> Self { |
| | if duration == 0 { |
| | Self::Disabled |
| | } else { |
| | Self::Enabled { duration } |
| | } |
| | } |
| | } |
| |
|
| | impl fmt::Display for Timer { |
| | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| | write!(f, "{}", self.to_u32()) |
| | } |
| | } |
| |
|
| | impl FromStr for Timer { |
| | type Err = ParseIntError; |
| |
|
| | fn from_str(input: &str) -> Result<Timer, ParseIntError> { |
| | input.parse::<u32>().map(Self::from_u32) |
| | } |
| | } |
| |
|
| | impl rusqlite::types::ToSql for Timer { |
| | fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { |
| | let val = rusqlite::types::Value::Integer(match self { |
| | Self::Disabled => 0, |
| | Self::Enabled { duration } => i64::from(*duration), |
| | }); |
| | let out = rusqlite::types::ToSqlOutput::Owned(val); |
| | Ok(out) |
| | } |
| | } |
| |
|
| | impl rusqlite::types::FromSql for Timer { |
| | fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> { |
| | i64::column_result(value).and_then(|value| { |
| | if value == 0 { |
| | Ok(Self::Disabled) |
| | } else if let Ok(duration) = u32::try_from(value) { |
| | Ok(Self::Enabled { duration }) |
| | } else { |
| | Err(rusqlite::types::FromSqlError::OutOfRange(value)) |
| | } |
| | }) |
| | } |
| | } |
| |
|
| | impl ChatId { |
| | |
| | pub async fn get_ephemeral_timer(self, context: &Context) -> Result<Timer> { |
| | let timer = context |
| | .sql |
| | .query_get_value( |
| | "SELECT IFNULL(ephemeral_timer, 0) FROM chats WHERE id=?", |
| | (self,), |
| | ) |
| | .await? |
| | .with_context(|| format!("Chat {self} not found"))?; |
| | Ok(timer) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | pub(crate) async fn inner_set_ephemeral_timer( |
| | self, |
| | context: &Context, |
| | timer: Timer, |
| | ) -> Result<()> { |
| | ensure!(!self.is_special(), "Invalid chat ID"); |
| |
|
| | context |
| | .sql |
| | .execute( |
| | "UPDATE chats |
| | SET ephemeral_timer=? |
| | WHERE id=?;", |
| | (timer, self), |
| | ) |
| | .await?; |
| |
|
| | context.emit_event(EventType::ChatEphemeralTimerModified { |
| | chat_id: self, |
| | timer, |
| | }); |
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | |
| | pub async fn set_ephemeral_timer(self, context: &Context, timer: Timer) -> Result<()> { |
| | if timer == self.get_ephemeral_timer(context).await? { |
| | return Ok(()); |
| | } |
| | self.inner_set_ephemeral_timer(context, timer).await?; |
| |
|
| | if self.is_promoted(context).await? { |
| | let mut msg = Message::new_text( |
| | stock_ephemeral_timer_changed(context, timer, ContactId::SELF).await, |
| | ); |
| | msg.param.set_cmd(SystemMessage::EphemeralTimerChanged); |
| | if let Err(err) = send_msg(context, self, &mut msg).await { |
| | error!( |
| | context, |
| | "Failed to send a message about ephemeral message timer change: {:?}", err |
| | ); |
| | } |
| | } |
| | Ok(()) |
| | } |
| | } |
| |
|
| | |
| | pub(crate) async fn stock_ephemeral_timer_changed( |
| | context: &Context, |
| | timer: Timer, |
| | from_id: ContactId, |
| | ) -> String { |
| | match timer { |
| | Timer::Disabled => stock_str::msg_ephemeral_timer_disabled(context, from_id).await, |
| | Timer::Enabled { duration } => match duration { |
| | 0..=60 => { |
| | stock_str::msg_ephemeral_timer_enabled(context, &timer.to_string(), from_id).await |
| | } |
| | 61..=3599 => { |
| | stock_str::msg_ephemeral_timer_minutes( |
| | context, |
| | &format!("{}", (f64::from(duration) / 6.0).round() / 10.0), |
| | from_id, |
| | ) |
| | .await |
| | } |
| | 3600 => stock_str::msg_ephemeral_timer_hour(context, from_id).await, |
| | 3601..=86399 => { |
| | stock_str::msg_ephemeral_timer_hours( |
| | context, |
| | &format!("{}", (f64::from(duration) / 360.0).round() / 10.0), |
| | from_id, |
| | ) |
| | .await |
| | } |
| | 86400 => stock_str::msg_ephemeral_timer_day(context, from_id).await, |
| | 86401..=604_799 => { |
| | stock_str::msg_ephemeral_timer_days( |
| | context, |
| | &format!("{}", (f64::from(duration) / 8640.0).round() / 10.0), |
| | from_id, |
| | ) |
| | .await |
| | } |
| | 604_800 => stock_str::msg_ephemeral_timer_week(context, from_id).await, |
| | 31_536_000..=31_708_800 => stock_str::msg_ephemeral_timer_year(context, from_id).await, |
| | _ => { |
| | stock_str::msg_ephemeral_timer_weeks( |
| | context, |
| | &format!("{}", (f64::from(duration) / 60480.0).round() / 10.0), |
| | from_id, |
| | ) |
| | .await |
| | } |
| | }, |
| | } |
| | } |
| |
|
| | impl MsgId { |
| | |
| | pub(crate) async fn ephemeral_timer(self, context: &Context) -> Result<Timer> { |
| | let res = match context |
| | .sql |
| | .query_get_value("SELECT ephemeral_timer FROM msgs WHERE id=?", (self,)) |
| | .await? |
| | { |
| | None | Some(0) => Timer::Disabled, |
| | Some(duration) => Timer::Enabled { duration }, |
| | }; |
| | Ok(res) |
| | } |
| |
|
| | |
| | pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> Result<()> { |
| | if let Timer::Enabled { duration } = self.ephemeral_timer(context).await? { |
| | let ephemeral_timestamp = time().saturating_add(duration.into()); |
| |
|
| | context |
| | .sql |
| | .execute( |
| | "UPDATE msgs SET ephemeral_timestamp = ? \ |
| | WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?) \ |
| | AND id = ?", |
| | (ephemeral_timestamp, ephemeral_timestamp, self), |
| | ) |
| | .await?; |
| | context.scheduler.interrupt_ephemeral_task().await; |
| | } |
| | Ok(()) |
| | } |
| | } |
| |
|
| | pub(crate) async fn start_ephemeral_timers_msgids( |
| | context: &Context, |
| | msg_ids: &[MsgId], |
| | ) -> Result<()> { |
| | let now = time(); |
| | let should_interrupt = |
| | context |
| | .sql |
| | .transaction(move |transaction| { |
| | let mut should_interrupt = false; |
| | let mut stmt = |
| | transaction.prepare( |
| | "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer |
| | WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer) AND ephemeral_timer > 0 |
| | AND id=?2")?; |
| | for msg_id in msg_ids { |
| | should_interrupt |= stmt.execute((now, msg_id))? > 0; |
| | } |
| | Ok(should_interrupt) |
| | }).await?; |
| | if should_interrupt { |
| | context.scheduler.interrupt_ephemeral_task().await; |
| | } |
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | |
| | pub(crate) async fn start_chat_ephemeral_timers(context: &Context, chat_id: ChatId) -> Result<()> { |
| | let now = time(); |
| | let should_interrupt = context |
| | .sql |
| | .execute( |
| | "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer |
| | WHERE chat_id = ?2 |
| | AND ephemeral_timer > 0 |
| | AND (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer)", |
| | (now, chat_id), |
| | ) |
| | .await? |
| | > 0; |
| | if should_interrupt { |
| | context.scheduler.interrupt_ephemeral_task().await; |
| | } |
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async fn select_expired_messages( |
| | context: &Context, |
| | now: i64, |
| | ) -> Result<Vec<(MsgId, ChatId, Viewtype, u32)>> { |
| | let mut rows = context |
| | .sql |
| | .query_map_vec( |
| | r#" |
| | SELECT id, chat_id, type, location_id |
| | FROM msgs |
| | WHERE |
| | ephemeral_timestamp != 0 |
| | AND ephemeral_timestamp <= ? |
| | AND chat_id != ? |
| | "#, |
| | (now, DC_CHAT_ID_TRASH), |
| | |row| { |
| | let id: MsgId = row.get("id")?; |
| | let chat_id: ChatId = row.get("chat_id")?; |
| | let viewtype: Viewtype = row |
| | .get("type") |
| | .context("Using default viewtype for ephemeral handling.") |
| | .log_err(context) |
| | .unwrap_or_default(); |
| | let location_id: u32 = row.get("location_id")?; |
| | Ok((id, chat_id, viewtype, location_id)) |
| | }, |
| | ) |
| | .await?; |
| |
|
| | if let Some(delete_device_after) = context.get_config_delete_device_after().await? { |
| | let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF) |
| | .await? |
| | .map(|c| c.id) |
| | .unwrap_or_default(); |
| | let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE) |
| | .await? |
| | .map(|c| c.id) |
| | .unwrap_or_default(); |
| |
|
| | let threshold_timestamp = now.saturating_sub(delete_device_after); |
| |
|
| | let rows_expired = context |
| | .sql |
| | .query_map_vec( |
| | r#" |
| | SELECT id, chat_id, type, location_id |
| | FROM msgs |
| | WHERE |
| | timestamp < ?1 |
| | AND timestamp_rcvd < ?1 |
| | AND chat_id > ? |
| | AND chat_id != ? |
| | AND chat_id != ? |
| | "#, |
| | ( |
| | threshold_timestamp, |
| | DC_CHAT_ID_LAST_SPECIAL, |
| | self_chat_id, |
| | device_chat_id, |
| | ), |
| | |row| { |
| | let id: MsgId = row.get("id")?; |
| | let chat_id: ChatId = row.get("chat_id")?; |
| | let viewtype: Viewtype = row |
| | .get("type") |
| | .context("Using default viewtype for delete-old handling.") |
| | .log_err(context) |
| | .unwrap_or_default(); |
| | let location_id: u32 = row.get("location_id")?; |
| | Ok((id, chat_id, viewtype, location_id)) |
| | }, |
| | ) |
| | .await?; |
| |
|
| | rows.extend(rows_expired); |
| | } |
| |
|
| | Ok(rows) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> { |
| | let rows = select_expired_messages(context, now).await?; |
| |
|
| | if !rows.is_empty() { |
| | info!(context, "Attempting to delete {} messages.", rows.len()); |
| |
|
| | let (msgs_changed, webxdc_deleted) = context |
| | .sql |
| | .transaction(|transaction| { |
| | let mut msgs_changed = Vec::with_capacity(rows.len()); |
| | let mut webxdc_deleted = Vec::new(); |
| | |
| | |
| | let mut del_msg_stmt = transaction.prepare( |
| | "INSERT OR REPLACE INTO msgs (id, rfc724_mid, timestamp, chat_id) |
| | SELECT ?1, rfc724_mid, timestamp, ? FROM msgs WHERE id=?1", |
| | )?; |
| | let mut del_location_stmt = |
| | transaction.prepare("DELETE FROM locations WHERE independent=1 AND id=?")?; |
| | for (msg_id, chat_id, viewtype, location_id) in rows { |
| | del_msg_stmt.execute((msg_id, DC_CHAT_ID_TRASH))?; |
| | if location_id > 0 { |
| | del_location_stmt.execute((location_id,))?; |
| | } |
| |
|
| | msgs_changed.push((chat_id, msg_id)); |
| | if viewtype == Viewtype::Webxdc { |
| | webxdc_deleted.push(msg_id) |
| | } |
| | } |
| | Ok((msgs_changed, webxdc_deleted)) |
| | }) |
| | .await?; |
| |
|
| | let mut modified_chat_ids = BTreeSet::new(); |
| |
|
| | for (chat_id, msg_id) in msgs_changed { |
| | context.emit_event(EventType::MsgDeleted { chat_id, msg_id }); |
| | modified_chat_ids.insert(chat_id); |
| | } |
| |
|
| | for modified_chat_id in modified_chat_ids { |
| | context.emit_msgs_changed_without_msg_id(modified_chat_id); |
| | } |
| |
|
| | for msg_id in webxdc_deleted { |
| | context.emit_event(EventType::WebxdcInstanceDeleted { msg_id }); |
| | } |
| | } |
| |
|
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<i64>> { |
| | if let Some(delete_device_after) = context.get_config_delete_device_after().await? { |
| | let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF) |
| | .await? |
| | .map(|c| c.id) |
| | .unwrap_or_default(); |
| | let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE) |
| | .await? |
| | .map(|c| c.id) |
| | .unwrap_or_default(); |
| |
|
| | let oldest_message_timestamp: Option<i64> = context |
| | .sql |
| | .query_get_value( |
| | r#" |
| | SELECT min(max(timestamp, timestamp_rcvd)) |
| | FROM msgs |
| | WHERE chat_id > ? |
| | AND chat_id != ? |
| | AND chat_id != ? |
| | HAVING count(*) > 0 |
| | "#, |
| | (DC_CHAT_ID_TRASH, self_chat_id, device_chat_id), |
| | ) |
| | .await?; |
| |
|
| | Ok(oldest_message_timestamp.map(|x| x.saturating_add(delete_device_after))) |
| | } else { |
| | Ok(None) |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | async fn next_expiration_timestamp(context: &Context) -> Option<i64> { |
| | let ephemeral_timestamp: Option<i64> = match context |
| | .sql |
| | .query_get_value( |
| | r#" |
| | SELECT min(ephemeral_timestamp) |
| | FROM msgs |
| | WHERE ephemeral_timestamp != 0 |
| | AND chat_id != ? |
| | HAVING count(*) > 0 |
| | "#, |
| | (DC_CHAT_ID_TRASH,), |
| | ) |
| | .await |
| | { |
| | Err(err) => { |
| | warn!(context, "Can't calculate next ephemeral timeout: {}", err); |
| | None |
| | } |
| | Ok(ephemeral_timestamp) => ephemeral_timestamp, |
| | }; |
| |
|
| | let delete_device_after_timestamp: Option<i64> = |
| | match next_delete_device_after_timestamp(context).await { |
| | Err(err) => { |
| | warn!( |
| | context, |
| | "Can't calculate timestamp of the next message expiration: {}", err |
| | ); |
| | None |
| | } |
| | Ok(timestamp) => timestamp, |
| | }; |
| |
|
| | ephemeral_timestamp |
| | .into_iter() |
| | .chain(delete_device_after_timestamp) |
| | .min() |
| | } |
| |
|
| | pub(crate) async fn ephemeral_loop(context: &Context, interrupt_receiver: Receiver<()>) { |
| | loop { |
| | let ephemeral_timestamp = next_expiration_timestamp(context).await; |
| |
|
| | let now = SystemTime::now(); |
| | let until = if let Some(ephemeral_timestamp) = ephemeral_timestamp { |
| | UNIX_EPOCH |
| | + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX)) |
| | + Duration::from_secs(1) |
| | } else { |
| | |
| | now + Duration::from_secs(86400) |
| | }; |
| |
|
| | if let Ok(duration) = until.duration_since(now) { |
| | info!( |
| | context, |
| | "Ephemeral loop waiting for deletion in {} or interrupt", |
| | duration_to_str(duration) |
| | ); |
| | match timeout(duration, interrupt_receiver.recv()).await { |
| | Ok(Ok(())) => { |
| | |
| | continue; |
| | } |
| | Ok(Err(err)) => { |
| | warn!( |
| | context, |
| | "Interrupt channel closed, ephemeral loop exits now: {err:#}." |
| | ); |
| | return; |
| | } |
| | Err(_err) => { |
| | |
| | } |
| | } |
| | } |
| |
|
| | |
| | stats::maybe_update_message_stats(context) |
| | .await |
| | .log_err(context) |
| | .ok(); |
| |
|
| | delete_expired_messages(context, time()) |
| | .await |
| | .log_err(context) |
| | .ok(); |
| |
|
| | location::delete_expired(context, time()) |
| | .await |
| | .log_err(context) |
| | .ok(); |
| | } |
| | } |
| |
|
| | |
| | pub(crate) async fn delete_expired_imap_messages(context: &Context) -> Result<()> { |
| | let now = time(); |
| |
|
| | let (threshold_timestamp, threshold_timestamp_extended) = |
| | match context.get_config_delete_server_after().await? { |
| | None => (0, 0), |
| | Some(delete_server_after) => ( |
| | match delete_server_after { |
| | |
| | 0 => i64::MAX, |
| | _ => now - delete_server_after, |
| | }, |
| | now - max(delete_server_after, MIN_DELETE_SERVER_AFTER), |
| | ), |
| | }; |
| | let target = context.get_delete_msgs_target().await?; |
| |
|
| | context |
| | .sql |
| | .execute( |
| | "UPDATE imap |
| | SET target=? |
| | WHERE rfc724_mid IN ( |
| | SELECT rfc724_mid FROM msgs |
| | WHERE ((download_state = 0 AND timestamp < ?) OR |
| | (download_state != 0 AND timestamp < ?) OR |
| | (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?)) |
| | )", |
| | ( |
| | &target, |
| | threshold_timestamp, |
| | threshold_timestamp_extended, |
| | now, |
| | ), |
| | ) |
| | .await?; |
| |
|
| | Ok(()) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> { |
| | context |
| | .sql |
| | .execute( |
| | "UPDATE msgs \ |
| | SET ephemeral_timestamp = ? + ephemeral_timer \ |
| | WHERE ephemeral_timer > 0 \ |
| | AND ephemeral_timestamp = 0 \ |
| | AND state NOT IN (?, ?, ?)", |
| | ( |
| | time(), |
| | MessageState::InFresh, |
| | MessageState::InNoticed, |
| | MessageState::OutDraft, |
| | ), |
| | ) |
| | .await?; |
| |
|
| | Ok(()) |
| | } |
| |
|
| | #[cfg(test)] |
| | mod ephemeral_tests; |
| |
|