import { Injectable, Logger, NotFoundException, ForbiddenException, ConflictException, BadRequestException, } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { SupabaseService } from '../../database/supabase.service'; import { RedisService } from '../../redis/redis.service'; import { RedisKeys } from '../../redis/keys'; import { AgentService } from '../agent/agent.service'; import { CreateDMConversationDto, SendDMDto, SendDMMessageDto, QueryDMMessagesDto, QueryDMConversationsDto, } from './dto'; interface DMConversation { id: string; user1_id: string; user2_id: string; created_at: string; } interface DMConversationWithUser extends DMConversation { other_user: { id: string; email: string; username: string; display_name: string; avatar_url: string | null; bio: string | null; color: string | null; status: string; last_seen: string | null; created_at: string; updated_at: string; }; last_message?: DMMessage | null; unread_count: number; is_new?: boolean; } interface DMMessage { id: string; conversation_id: string; sender_id: string; sender_username: string; content: string; is_read: boolean; created_at: string; deleted_at: string | null; } interface DMMessageWithSender extends DMMessage { updated_at?: string; reply_to?: string | null; sender: { id: string; email: string; username: string; display_name: string; avatar_url: string | null; bio: string | null; color: string | null; status: string; }; read_by_usernames?: string[]; } @Injectable() export class DMsService { private readonly logger = new Logger(DMsService.name); private readonly DM_CACHE_TTL = 1209600; // 2 weeks private readonly RECENT_MESSAGES_LIMIT = 50; constructor( private readonly supabaseService: SupabaseService, private readonly redisService: RedisService, private readonly agentService: AgentService, private readonly eventEmitter: EventEmitter2, ) {} /** * Get or create DM conversation */ async getOrCreateConversation( userId: string, dto: CreateDMConversationDto, ): Promise { const { userId: otherUserId } = dto; // Prevent self-messaging if (userId === otherUserId) { throw new BadRequestException('Cannot create conversation with yourself'); } // Check if other user exists and is not blocked const { data: otherUser, error: userError } = await this.supabaseService .from('profiles') .select('id, email, username, display_name, avatar_url, bio, color, status, last_seen, created_at, updated_at') .eq('id', otherUserId) .single(); if (userError || !otherUser) { throw new NotFoundException('User not found'); } // Check if blocked const isBlocked = await this.isBlocked(userId, otherUserId); if (isBlocked) { throw new ForbiddenException( 'You have blocked this user or they have blocked you', ); } // Check for existing conversation const conversation = await this.findExistingConversation( userId, otherUserId, ); let finalConversation: DMConversation; let isNew = false; if (!conversation) { // Create new conversation const { data: newConversation, error } = await this.supabaseService .from('dm_conversations') .insert({ user1_id: userId < otherUserId ? userId : otherUserId, user2_id: userId < otherUserId ? otherUserId : userId, }) .select() .single(); if (error) { // Handle race condition: another request may have created the conversation if (error.code === '23505') { this.logger.warn( `Race condition detected for conversation between ${userId} and ${otherUserId}`, ); const existingConversation = await this.findExistingConversation( userId, otherUserId, ); if (existingConversation) { finalConversation = existingConversation; } else { this.logger.error('Failed to find conversation after race condition'); throw new ConflictException('Failed to create conversation'); } } else { this.logger.error('Failed to create conversation:', error.message); throw new ConflictException('Failed to create conversation'); } } else { finalConversation = newConversation; isNew = true; // Cache the conversation await this.cacheConversation(finalConversation); } } else { finalConversation = conversation; } // Get last message const lastMessage = await this.getLastMessage(finalConversation.id); // Get unread count const unreadCount = await this.getUnreadCount(userId, finalConversation.id); return { id: finalConversation.id, user1_id: finalConversation.user1_id, user2_id: finalConversation.user2_id, created_at: finalConversation.created_at, other_user: { id: otherUser.id, email: otherUser.email, username: otherUser.username, display_name: otherUser.display_name, avatar_url: otherUser.avatar_url, bio: otherUser.bio, color: otherUser.color, status: otherUser.status, last_seen: otherUser.last_seen, created_at: otherUser.created_at, updated_at: otherUser.updated_at, }, last_message: lastMessage, unread_count: unreadCount, is_new: isNew, }; } /** * Get user's DM conversations */ async getConversations( userId: string, query: QueryDMConversationsDto, ): Promise<{ conversations: DMConversationWithUser[]; total: number; hasMore: boolean; }> { const { page = 1, limit = 20 } = query; const offset = (page - 1) * limit; // Get conversations where user is either user1 or user2 const { data: conversations, error, count, } = await this.supabaseService .from('dm_conversations') .select('*', { count: 'exact' }) .or(`user1_id.eq.${userId},user2_id.eq.${userId}`) .order('created_at', { ascending: false }) .range(offset, offset + limit - 1); if (error) { this.logger.error('Failed to fetch conversations:', error.message); throw new ConflictException('Failed to fetch conversations'); } // Batch fetch all related data to avoid N+1 queries const conversationIds = (conversations || []).map((c) => c.id); const otherUserIds = (conversations || []).map((conv) => conv.user1_id === userId ? conv.user2_id : conv.user1_id, ); // Batch fetch all related data in parallel to avoid N+1 queries const [ profilesResult, lastMessagesResult, ] = await Promise.all([ // Batch 1: Fetch all profiles otherUserIds.length > 0 ? this.supabaseService .from('profiles') .select('id, email, username, display_name, avatar_url, bio, color, status, last_seen, created_at, updated_at') .in('id', otherUserIds) : Promise.resolve({ data: [] }), // Batch 2: Fetch last messages for all conversations // Query each conversation's last message individually to ensure correctness conversationIds.length > 0 ? (async () => { const messages: any[] = []; const msgPromises = conversationIds.map(async (convId) => { const { data, error } = await this.supabaseService .from('dm_messages') .select('id, conversation_id, sender_id, content, is_read, created_at, deleted_at') .eq('conversation_id', convId) .is('deleted_at', null) .order('created_at', { ascending: false }) .limit(1) .single(); if (error) { this.logger.debug(`[DEBUG] getLastMsg error for ${convId}: ${error.message}`); return null; } return data; }); const results = await Promise.all(msgPromises); return { data: results.filter((m) => m !== null) }; })() : Promise.resolve({ data: [] }), ]); const profileMap = new Map( (profilesResult.data || []).map((p) => [p.id, p]), ); // Keep only the latest message per conversation const lastMessagesMap = new Map(); for (const msg of lastMessagesResult.data || []) { if (!lastMessagesMap.has(msg.conversation_id)) { lastMessagesMap.set(msg.conversation_id, msg); } } // Batch 3: Fetch unread counts for all conversations (parallel) const unreadCountsMap = new Map(); if (conversationIds.length > 0) { const unreadPromises = conversationIds.map(async (convId) => { const { count } = await this.supabaseService .from('dm_messages') .select('*', { count: 'exact', head: true }) .eq('conversation_id', convId) .eq('is_read', false) .neq('sender_id', userId); return { convId, count: count || 0 }; }); const unreadResults = await Promise.all(unreadPromises); for (const { convId, count } of unreadResults) { unreadCountsMap.set(convId, count); } } // Enhance conversations with batched data const enhancedConversations = (conversations || []).map((conv) => { const otherUserId = conv.user1_id === userId ? conv.user2_id : conv.user1_id; const otherUser = profileMap.get(otherUserId); return { ...conv, other_user: otherUser || { id: otherUserId, email: '', username: 'unknown', display_name: 'Unknown', avatar_url: null, bio: null, color: null, status: 'offline', last_seen: null, created_at: '', updated_at: '', }, last_message: lastMessagesMap.get(conv.id) || null, unread_count: unreadCountsMap.get(conv.id) || 0, }; }); return { conversations: enhancedConversations, total: count || 0, hasMore: (count || 0) > offset + limit, }; } /** * Get DM messages */ async getMessages( conversationId: string, userId: string, username: string, query: QueryDMMessagesDto, ): Promise<{ messages: Array; total: number; hasMore: boolean; }> { // Verify user is part of this conversation const conversation = await this.getConversationById(conversationId, userId); if (!conversation) { throw new NotFoundException('Conversation not found'); } const { page = 1, limit = 20, before, after } = query; const offset = (page - 1) * limit; // Build query let dbQuery = this.supabaseService .from('dm_messages') .select( `*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`, { count: 'exact' }, ) .eq('conversation_id', conversationId) .is('deleted_at', null) .order('created_at', { ascending: false }) .range(offset, offset + limit - 1); // Apply cursor pagination if (before) { const beforeMessage = await this.getDMMessageById(before); dbQuery = dbQuery.lt('created_at', beforeMessage.created_at); } if (after) { const afterMessage = await this.getDMMessageById(after); dbQuery = dbQuery.gt('created_at', afterMessage.created_at); } const { data: messages, error, count } = await dbQuery; if (error) { this.logger.error('Failed to fetch DM messages:', error.message); throw new ConflictException('Failed to fetch messages'); } // Mark messages as read const unreadMessages = (messages || []) .filter((m) => m.sender_id !== userId && !m.is_read) .map((m) => m.id); if (unreadMessages.length > 0) { await this.markAsRead(conversationId, userId, username, unreadMessages); } // Enrich messages with read receipts from Redis const enrichedMessages = await Promise.all( (messages || []).map(async (msg) => { const readBy = await this.getDMReadReceipts(msg.id); return { ...this.transformMessageWithSender(msg), read_by_usernames: readBy.length > 0 ? readBy : undefined, }; }), ); return { messages: enrichedMessages, total: count || 0, hasMore: (count || 0) > offset + limit, }; } /** * Send a system/agent DM without participant check. * Used by AI agents, bots, and system notifications. */ async sendSystemDM( conversationId: string, senderId: string, dto: SendDMDto, ): Promise { // Verify conversation exists (but don't check membership) const { data: conversation, error: convError } = await this.supabaseService .from('dm_conversations') .select('*') .eq('id', conversationId) .single(); if (convError || !conversation) { throw new NotFoundException('Conversation not found'); } const receiverId = conversation.user1_id === senderId ? conversation.user2_id : conversation.user1_id; // Create message const { data: message, error } = await this.supabaseService .from('dm_messages') .insert({ conversation_id: conversationId, sender_id: senderId, content: dto.content, is_read: false, }) .select( `*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`, ) .single(); if (error) { this.logger.error('Failed to send system DM:', error.message); throw new ConflictException('Failed to send message'); } // Pipeline Redis operations const pipeline = this.redisService.pipeline(); const dmKey = RedisKeys.dm.messages( conversation.user1_id, conversation.user2_id, ); // Cache message pipeline.hset(`dmmsg:${message.id}`, { id: message.id, conversation_id: message.conversation_id, sender_id: message.sender_id, sender_username: message.sender?.username || '', content: message.content, created_at: message.created_at, deleted_at: message.deleted_at || '', }); pipeline.expire(`dmmsg:${message.id}`, this.DM_CACHE_TTL); // Add to conversation messages list pipeline.zadd( dmKey, new Date(message.created_at).getTime(), message.id, ); // Increment unread count for receiver const unreadKey = RedisKeys.dm.unread(receiverId, senderId); pipeline.incr(unreadKey); await pipeline.exec(); // Create notification for receiver await this.supabaseService.from('notifications').insert({ user_id: receiverId, type: 'dm', title: 'New direct message', message: 'You have a new direct message', data: { conversationId, senderId, messageId: message.id }, }); // Note: Realtime broadcast is handled by EventEmitter in handleAgentMention this.logger.log(`System DM sent from ${senderId} to ${receiverId}`); return this.transformMessageWithSender(message); } /** * Send DM */ async sendMessage( conversationId: string, senderId: string, dto: SendDMDto, ): Promise { // Verify conversation exists and user is part of it const conversation = await this.getConversationById(conversationId, senderId); if (!conversation) { throw new NotFoundException('Conversation not found'); } const receiverId = conversation.user1_id === senderId ? conversation.user2_id : conversation.user1_id; // Check if blocked const blocked = await this.isBlocked(senderId, receiverId); if (blocked) { throw new ForbiddenException( 'You have blocked this user or they have blocked you', ); } // Create message const { data: message, error } = await this.supabaseService .from('dm_messages') .insert({ conversation_id: conversationId, sender_id: senderId, content: dto.content, is_read: false, }) .select( `*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`, ) .single(); if (error) { this.logger.error('Failed to send DM:', error.message); throw new ConflictException('Failed to send message'); } // Pipeline Redis operations const pipeline = this.redisService.pipeline(); const dmKey = RedisKeys.dm.messages( conversation.user1_id, conversation.user2_id, ); // Cache message (without is_read — use read_by_usernames instead) pipeline.hset(`dmmsg:${message.id}`, { id: message.id, conversation_id: message.conversation_id, sender_id: message.sender_id, sender_username: message.sender?.username || '', content: message.content, created_at: message.created_at, deleted_at: message.deleted_at || '', }); pipeline.expire(`dmmsg:${message.id}`, this.DM_CACHE_TTL); // Add to conversation messages list pipeline.zadd( dmKey, new Date(message.created_at).getTime(), message.id, ); // Increment unread count for receiver const unreadKey = RedisKeys.dm.unread(receiverId, senderId); pipeline.incr(unreadKey); await pipeline.exec(); // Create notification for receiver await this.supabaseService.from('notifications').insert({ user_id: receiverId, type: 'dm', title: 'New direct message', message: 'You have a new direct message', data: { conversationId, senderId, messageId: message.id }, }); this.logger.log(`DM sent from ${senderId} to ${receiverId}`); // Check for bot mention and handle asynchronously (fire-and-forget) if (this.agentService.containsMention(dto.content)) { this.handleAgentMention(conversationId, senderId, dto.content, message.sender?.username) .catch((err) => this.logger.error('Agent DM mention handling failed:', err)); } return this.transformMessageWithSender(message); } /** * Send DM message with auto-create conversation support * This is a convenience method that combines getOrCreateConversation + sendMessage */ async sendDMMessage( userId: string, dto: SendDMMessageDto, ): Promise<{ conversationId: string; isNewConversation: boolean; conversation?: DMConversationWithUser; message: DMMessageWithSender; }> { const { recipientId, conversationId, content, replyToId } = dto; // Validate: must have at least one of recipientId or conversationId if (!recipientId && !conversationId) { throw new BadRequestException( 'Either recipientId or conversationId is required', ); } let finalConversationId: string; let isNewConversation = false; let conversation: DMConversationWithUser | undefined; if (conversationId) { // Mode: existing conversation const existing = await this.getConversationById(conversationId, userId); if (!existing) { throw new NotFoundException('Conversation not found'); } finalConversationId = conversationId; isNewConversation = false; } else { // Mode: new conversation (auto-create) const result = await this.getOrCreateConversation(userId, { userId: recipientId!, }); finalConversationId = result.id; isNewConversation = result.is_new || false; if (isNewConversation) { conversation = result; } } // Send the message const message = await this.sendMessage(finalConversationId, userId, { content, replyToId, }); return { conversationId: finalConversationId, isNewConversation, ...(conversation ? { conversation } : {}), message, }; } /** * Delete DM message (soft delete) */ async deleteMessage(messageId: string, userId: string): Promise { const message = await this.getDMMessageById(messageId); // Verify ownership if (message.sender_id !== userId) { throw new ForbiddenException('You can only delete your own messages'); } // Soft delete const { error } = await this.supabaseService .from('dm_messages') .update({ deleted_at: new Date().toISOString(), content: '[deleted]', }) .eq('id', messageId); if (error) { this.logger.error('Failed to delete DM:', error.message); throw new ConflictException('Failed to delete message'); } this.logger.log(`DM ${messageId} deleted`); } /** * Get unread count for a conversation */ async getUnreadCount( userId: string, conversationId: string, ): Promise { const conversation = await this.getConversationById(conversationId, userId); if (!conversation) return 0; const otherUserId = conversation.user1_id === userId ? conversation.user2_id : conversation.user1_id; // Try Redis first const unreadKey = RedisKeys.dm.unread(userId, otherUserId); const cached = await this.redisService.get(unreadKey); if (cached) { return parseInt(cached, 10) || 0; } // Count from database const { count, error } = await this.supabaseService .from('dm_messages') .select('*', { count: 'exact', head: true }) .eq('conversation_id', conversationId) .eq('is_read', false) .neq('sender_id', userId); if (error) { this.logger.error('Failed to get unread count:', error.message); return 0; } // Cache the count await this.redisService.set( unreadKey, String(count || 0), this.DM_CACHE_TTL, ); return count || 0; } /** * Mark messages as read */ async markAsRead( conversationId: string, userId: string, username: string, messageIds?: string[], ): Promise { const conversation = await this.getConversationById(conversationId, userId); if (!conversation) return []; let query = this.supabaseService .from('dm_messages') .update({ is_read: true }) .eq('conversation_id', conversationId) .neq('sender_id', userId) .eq('is_read', false); if (messageIds && messageIds.length > 0) { query = query.in('id', messageIds); } const { data: updatedMessages, error } = await query.select('id'); if (error) { this.logger.error('Failed to mark messages as read:', error.message); return []; } // Add username to Redis read receipts for each message const markedIds: string[] = []; const now = new Date().toISOString(); if (updatedMessages && updatedMessages.length > 0) { const pipeline = this.redisService.pipeline(); for (const msg of updatedMessages) { const readKey = RedisKeys.dm.read(msg.id); const readAtKey = RedisKeys.dm.readAt(msg.id); pipeline.sadd(readKey, username); pipeline.expire(readKey, this.DM_CACHE_TTL); pipeline.hset(readAtKey, username, now); pipeline.expire(readAtKey, this.DM_CACHE_TTL); markedIds.push(msg.id); } await pipeline.exec(); } // Clear unread count in Redis const otherUserId = conversation.user1_id === userId ? conversation.user2_id : conversation.user1_id; const unreadKey = RedisKeys.dm.unread(userId, otherUserId); await this.redisService.del(unreadKey); return markedIds; } /** * Get DM message read receipts (usernames who read) */ async getDMReadReceipts(messageId: string): Promise { const readKey = RedisKeys.dm.read(messageId); const readers = await this.redisService.smembers(readKey); return readers || []; } /** * Get DM message read receipts with timestamps */ async getDMReadReceiptsWithTiming( messageId: string, ): Promise> { const readAtKey = RedisKeys.dm.readAt(messageId); const readAtMap = await this.redisService.hgetall(readAtKey); return Object.entries(readAtMap).map(([username, readAt]) => ({ username, readAt, })); } /** * Get DM message read latency (time from send to read) */ async getDMReadLatency( messageId: string, ): Promise<{ messageId: string; createdAt: string | null; readReceipts: Array<{ username: string; readAt: string; latencyMs: number }>; averageLatencyMs: number | null; }> { // Get message created_at from DB const { data: message, error } = await this.supabaseService .from('dm_messages') .select('id, created_at') .eq('id', messageId) .single(); if (error || !message) { this.logger.warn(`Message not found for latency: ${messageId}`); return { messageId, createdAt: null, readReceipts: [], averageLatencyMs: null, }; } const createdAt = new Date(message.created_at).getTime(); // Get read timestamps from Redis const readAtKey = RedisKeys.dm.readAt(messageId); const readAtMap = await this.redisService.hgetall(readAtKey); const readReceipts = Object.entries(readAtMap).map(([username, readAt]) => { const readAtMs = new Date(readAt).getTime(); const latencyMs = readAtMs - createdAt; return { username, readAt, latencyMs: Math.max(0, latencyMs), }; }); const averageLatencyMs = readReceipts.length > 0 ? Math.round( readReceipts.reduce((sum, r) => sum + r.latencyMs, 0) / readReceipts.length, ) : null; return { messageId, createdAt: message.created_at, readReceipts, averageLatencyMs, }; } /** * Block a user */ async blockUser(blockerId: string, blockedId: string): Promise { if (blockerId === blockedId) { throw new BadRequestException('Cannot block yourself'); } const { error } = await this.supabaseService.from('blocked_users').insert({ blocker_id: blockerId, blocked_id: blockedId, }); if (error) { if (error.code === '23505') { throw new ConflictException('User already blocked'); } this.logger.error('Failed to block user:', error.message); throw new ConflictException('Failed to block user'); } this.logger.log(`User ${blockerId} blocked ${blockedId}`); } /** * Unblock a user */ async unblockUser(blockerId: string, blockedId: string): Promise { const { error } = await this.supabaseService .from('blocked_users') .delete() .eq('blocker_id', blockerId) .eq('blocked_id', blockedId); if (error) { this.logger.error('Failed to unblock user:', error.message); throw new ConflictException('Failed to unblock user'); } this.logger.log(`User ${blockerId} unblocked ${blockedId}`); } /** * Get blocked users */ async getBlockedUsers(userId: string): Promise { const { data: blocked, error } = await this.supabaseService .from('blocked_users') .select( ` blocked_id, created_at, blocked:profiles!blocked_users_blocked_id_fkey ( id, email, display_name, avatar_url, bio, color, status ) `, ) .eq('blocker_id', userId); if (error) { this.logger.error('Failed to get blocked users:', error.message); return []; } return (blocked || []).map((b: any) => ({ id: b.blocked?.id, email: b.blocked?.email, displayName: b.blocked?.display_name, avatar: b.blocked?.avatar_url, bio: b.blocked?.bio, color: b.blocked?.color, status: b.blocked?.status, blockedAt: b.created_at, })); } /** * Find existing conversation between two users */ private async findExistingConversation( user1Id: string, user2Id: string, ): Promise { // Try cache first const dmKey = RedisKeys.dm.byId(user1Id, user2Id); const cached = await this.redisService.get(dmKey); if (cached) { return JSON.parse(cached); } // Query database using parameterized filters instead of string interpolation const { data: conversation, error } = await this.supabaseService .from('dm_conversations') .select('*') .or( `and(user1_id.eq.${user1Id},user2_id.eq.${user2Id}),and(user1_id.eq.${user2Id},user2_id.eq.${user1Id})`, ) .single(); if (error || !conversation) { return null; } await this.cacheConversation(conversation); return conversation; } /** * Get conversation by ID */ async getConversationById( conversationId: string, userId?: string, ): Promise { // Try cache first const cacheKey = `dm:conversation:${conversationId}`; const cached = await this.redisService.get(cacheKey); if (cached) { const conversation: DMConversation = JSON.parse(cached); if (!userId || conversation.user1_id === userId || conversation.user2_id === userId) { return conversation; } } const { data: conversation, error } = await this.supabaseService .from('dm_conversations') .select('*') .eq('id', conversationId) .single(); if (error || !conversation) { return null; } // Verify user is member if userId provided if (userId && conversation.user1_id !== userId && conversation.user2_id !== userId) { return null; } // Cache result await this.redisService.set(cacheKey, JSON.stringify(conversation), 300); return conversation; } /** * Get DM message by ID */ private async getDMMessageById(messageId: string): Promise { const { data: message, error } = await this.supabaseService .from('dm_messages') .select('*') .eq('id', messageId) .single(); if (error || !message) { throw new NotFoundException('Message not found'); } return message; } /** * Get last message in conversation */ private async getLastMessage( conversationId: string, ): Promise { const { data: message, error } = await this.supabaseService .from('dm_messages') .select('*') .eq('conversation_id', conversationId) .is('deleted_at', null) .order('created_at', { ascending: false }) .limit(1) .single(); if (error) { return null; } return message; } /** * Check if users have blocked each other */ private async isBlocked(user1Id: string, user2Id: string): Promise { const { data: block, error } = await this.supabaseService .from('blocked_users') .select('id') .or( `and(blocker_id.eq.${user1Id},blocked_id.eq.${user2Id}),and(blocker_id.eq.${user2Id},blocked_id.eq.${user1Id})`, ) .maybeSingle(); if (error) { this.logger.error('Failed to check block status:', error.message); throw new ConflictException('Failed to check block status'); } return !!block; } /** * Cache conversation */ private async cacheConversation(conversation: DMConversation): Promise { const dmKey = RedisKeys.dm.byId( conversation.user1_id, conversation.user2_id, ); await this.redisService.set( dmKey, JSON.stringify(conversation), this.DM_CACHE_TTL, ); } /** * Cache DM message */ private async cacheDMMessage(message: DMMessage): Promise { const key = `dmmsg:${message.id}`; await this.redisService.hset(key, { id: message.id, conversation_id: message.conversation_id, sender_id: message.sender_id, sender_username: message.sender_username || '', content: message.content, created_at: message.created_at, deleted_at: message.deleted_at || '', }); await this.redisService.expire(key, this.DM_CACHE_TTL); } /** * Transform message with sender */ private transformMessageWithSender(msg: any): DMMessageWithSender { return { id: msg.id, conversation_id: msg.conversation_id, sender_id: msg.sender_id, sender_username: msg.sender_username || msg.sender?.username || '', content: msg.content, is_read: msg.is_read ?? (msg.read_by_usernames?.length > 0), read_by_usernames: msg.read_by_usernames || [], created_at: msg.created_at, updated_at: msg.updated_at || msg.created_at, deleted_at: msg.deleted_at, reply_to: msg.reply_to || null, sender: msg.sender || { id: msg.sender_id, email: '', display_name: 'Unknown', avatar_url: null, bio: null, color: null, status: 'offline', }, }; } /** * Handle agent mention in DM: call agent API and send reply */ private async handleAgentMention( conversationId: string, userId: string, content: string, username?: string, ): Promise { const botUserId = this.agentService.getBotUserId(); if (!botUserId) { this.logger.warn('AGENT_BOT_USER_ID not configured, skipping DM mention'); return; } const prompt = this.agentService.extractPrompt(content); // Emit "bot thinking" event immediately so FE shows indicator this.eventEmitter.emit('bot.dm.thinking', { conversationId, botUserId, }); const reply = await this.agentService.callAgent(conversationId, userId, prompt, username); if (reply) { const botMessage = await this.sendSystemDM(conversationId, botUserId, { content: reply }); this.logger.log(`Agent replied in DM conversation ${conversationId}`); // Emit event for gateway to broadcast directly (same as user message) this.eventEmitter.emit('bot.dm.reply', { conversationId, message: botMessage, }); } } }