Spaces:
Sleeping
Sleeping
| import { | |
| Injectable, | |
| Logger, | |
| NotFoundException, | |
| ForbiddenException, | |
| ConflictException, | |
| BadRequestException, | |
| } from '@nestjs/common'; | |
| import { EventEmitter2 } from '@nestjs/event-emitter'; | |
| import { SupabaseService } from '../../database/supabase.service'; | |
| import { RedisService } from '../../redis/redis.service'; | |
| import { RedisKeys } from '../../redis/keys'; | |
| import { AgentService } from '../agent/agent.service'; | |
| import { | |
| CreateDMConversationDto, | |
| SendDMDto, | |
| SendDMMessageDto, | |
| QueryDMMessagesDto, | |
| QueryDMConversationsDto, | |
| } from './dto'; | |
| interface DMConversation { | |
| id: string; | |
| user1_id: string; | |
| user2_id: string; | |
| created_at: string; | |
| } | |
| interface DMConversationWithUser extends DMConversation { | |
| other_user: { | |
| id: string; | |
| email: string; | |
| username: string; | |
| display_name: string; | |
| avatar_url: string | null; | |
| bio: string | null; | |
| color: string | null; | |
| status: string; | |
| last_seen: string | null; | |
| created_at: string; | |
| updated_at: string; | |
| }; | |
| last_message?: DMMessage | null; | |
| unread_count: number; | |
| is_new?: boolean; | |
| } | |
| interface DMMessage { | |
| id: string; | |
| conversation_id: string; | |
| sender_id: string; | |
| sender_username: string; | |
| content: string; | |
| is_read: boolean; | |
| created_at: string; | |
| deleted_at: string | null; | |
| } | |
| interface DMMessageWithSender extends DMMessage { | |
| updated_at?: string; | |
| reply_to?: string | null; | |
| sender: { | |
| id: string; | |
| email: string; | |
| username: string; | |
| display_name: string; | |
| avatar_url: string | null; | |
| bio: string | null; | |
| color: string | null; | |
| status: string; | |
| }; | |
| read_by_usernames?: string[]; | |
| } | |
| () | |
| export class DMsService { | |
| private readonly logger = new Logger(DMsService.name); | |
| private readonly DM_CACHE_TTL = 1209600; // 2 weeks | |
| private readonly RECENT_MESSAGES_LIMIT = 50; | |
| constructor( | |
| private readonly supabaseService: SupabaseService, | |
| private readonly redisService: RedisService, | |
| private readonly agentService: AgentService, | |
| private readonly eventEmitter: EventEmitter2, | |
| ) {} | |
| /** | |
| * Get or create DM conversation | |
| */ | |
| async getOrCreateConversation( | |
| userId: string, | |
| dto: CreateDMConversationDto, | |
| ): Promise<DMConversationWithUser> { | |
| const { userId: otherUserId } = dto; | |
| // Prevent self-messaging | |
| if (userId === otherUserId) { | |
| throw new BadRequestException('Cannot create conversation with yourself'); | |
| } | |
| // Check if other user exists and is not blocked | |
| const { data: otherUser, error: userError } = await this.supabaseService | |
| .from('profiles') | |
| .select('id, email, username, display_name, avatar_url, bio, color, status, last_seen, created_at, updated_at') | |
| .eq('id', otherUserId) | |
| .single(); | |
| if (userError || !otherUser) { | |
| throw new NotFoundException('User not found'); | |
| } | |
| // Check if blocked | |
| const isBlocked = await this.isBlocked(userId, otherUserId); | |
| if (isBlocked) { | |
| throw new ForbiddenException( | |
| 'You have blocked this user or they have blocked you', | |
| ); | |
| } | |
| // Check for existing conversation | |
| const conversation = await this.findExistingConversation( | |
| userId, | |
| otherUserId, | |
| ); | |
| let finalConversation: DMConversation; | |
| let isNew = false; | |
| if (!conversation) { | |
| // Create new conversation | |
| const { data: newConversation, error } = await this.supabaseService | |
| .from('dm_conversations') | |
| .insert({ | |
| user1_id: userId < otherUserId ? userId : otherUserId, | |
| user2_id: userId < otherUserId ? otherUserId : userId, | |
| }) | |
| .select() | |
| .single(); | |
| if (error) { | |
| // Handle race condition: another request may have created the conversation | |
| if (error.code === '23505') { | |
| this.logger.warn( | |
| `Race condition detected for conversation between ${userId} and ${otherUserId}`, | |
| ); | |
| const existingConversation = await this.findExistingConversation( | |
| userId, | |
| otherUserId, | |
| ); | |
| if (existingConversation) { | |
| finalConversation = existingConversation; | |
| } else { | |
| this.logger.error('Failed to find conversation after race condition'); | |
| throw new ConflictException('Failed to create conversation'); | |
| } | |
| } else { | |
| this.logger.error('Failed to create conversation:', error.message); | |
| throw new ConflictException('Failed to create conversation'); | |
| } | |
| } else { | |
| finalConversation = newConversation; | |
| isNew = true; | |
| // Cache the conversation | |
| await this.cacheConversation(finalConversation); | |
| } | |
| } else { | |
| finalConversation = conversation; | |
| } | |
| // Get last message | |
| const lastMessage = await this.getLastMessage(finalConversation.id); | |
| // Get unread count | |
| const unreadCount = await this.getUnreadCount(userId, finalConversation.id); | |
| return { | |
| id: finalConversation.id, | |
| user1_id: finalConversation.user1_id, | |
| user2_id: finalConversation.user2_id, | |
| created_at: finalConversation.created_at, | |
| other_user: { | |
| id: otherUser.id, | |
| email: otherUser.email, | |
| username: otherUser.username, | |
| display_name: otherUser.display_name, | |
| avatar_url: otherUser.avatar_url, | |
| bio: otherUser.bio, | |
| color: otherUser.color, | |
| status: otherUser.status, | |
| last_seen: otherUser.last_seen, | |
| created_at: otherUser.created_at, | |
| updated_at: otherUser.updated_at, | |
| }, | |
| last_message: lastMessage, | |
| unread_count: unreadCount, | |
| is_new: isNew, | |
| }; | |
| } | |
| /** | |
| * Get user's DM conversations | |
| */ | |
| async getConversations( | |
| userId: string, | |
| query: QueryDMConversationsDto, | |
| ): Promise<{ | |
| conversations: DMConversationWithUser[]; | |
| total: number; | |
| hasMore: boolean; | |
| }> { | |
| const { page = 1, limit = 20 } = query; | |
| const offset = (page - 1) * limit; | |
| // Get conversations where user is either user1 or user2 | |
| const { | |
| data: conversations, | |
| error, | |
| count, | |
| } = await this.supabaseService | |
| .from('dm_conversations') | |
| .select('*', { count: 'exact' }) | |
| .or(`user1_id.eq.${userId},user2_id.eq.${userId}`) | |
| .order('created_at', { ascending: false }) | |
| .range(offset, offset + limit - 1); | |
| if (error) { | |
| this.logger.error('Failed to fetch conversations:', error.message); | |
| throw new ConflictException('Failed to fetch conversations'); | |
| } | |
| // Batch fetch all related data to avoid N+1 queries | |
| const conversationIds = (conversations || []).map((c) => c.id); | |
| const otherUserIds = (conversations || []).map((conv) => | |
| conv.user1_id === userId ? conv.user2_id : conv.user1_id, | |
| ); | |
| // Batch fetch all related data in parallel to avoid N+1 queries | |
| const [ | |
| profilesResult, | |
| lastMessagesResult, | |
| ] = await Promise.all([ | |
| // Batch 1: Fetch all profiles | |
| otherUserIds.length > 0 | |
| ? this.supabaseService | |
| .from('profiles') | |
| .select('id, email, username, display_name, avatar_url, bio, color, status, last_seen, created_at, updated_at') | |
| .in('id', otherUserIds) | |
| : Promise.resolve({ data: [] }), | |
| // Batch 2: Fetch last messages for all conversations | |
| // Query each conversation's last message individually to ensure correctness | |
| conversationIds.length > 0 | |
| ? (async () => { | |
| const messages: any[] = []; | |
| const msgPromises = conversationIds.map(async (convId) => { | |
| const { data, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .select('id, conversation_id, sender_id, content, is_read, created_at, deleted_at') | |
| .eq('conversation_id', convId) | |
| .is('deleted_at', null) | |
| .order('created_at', { ascending: false }) | |
| .limit(1) | |
| .single(); | |
| if (error) { | |
| this.logger.debug(`[DEBUG] getLastMsg error for ${convId}: ${error.message}`); | |
| return null; | |
| } | |
| return data; | |
| }); | |
| const results = await Promise.all(msgPromises); | |
| return { data: results.filter((m) => m !== null) }; | |
| })() | |
| : Promise.resolve({ data: [] }), | |
| ]); | |
| const profileMap = new Map( | |
| (profilesResult.data || []).map((p) => [p.id, p]), | |
| ); | |
| // Keep only the latest message per conversation | |
| const lastMessagesMap = new Map<string, any | null>(); | |
| for (const msg of lastMessagesResult.data || []) { | |
| if (!lastMessagesMap.has(msg.conversation_id)) { | |
| lastMessagesMap.set(msg.conversation_id, msg); | |
| } | |
| } | |
| // Batch 3: Fetch unread counts for all conversations (parallel) | |
| const unreadCountsMap = new Map<string, number>(); | |
| if (conversationIds.length > 0) { | |
| const unreadPromises = conversationIds.map(async (convId) => { | |
| const { count } = await this.supabaseService | |
| .from('dm_messages') | |
| .select('*', { count: 'exact', head: true }) | |
| .eq('conversation_id', convId) | |
| .eq('is_read', false) | |
| .neq('sender_id', userId); | |
| return { convId, count: count || 0 }; | |
| }); | |
| const unreadResults = await Promise.all(unreadPromises); | |
| for (const { convId, count } of unreadResults) { | |
| unreadCountsMap.set(convId, count); | |
| } | |
| } | |
| // Enhance conversations with batched data | |
| const enhancedConversations = (conversations || []).map((conv) => { | |
| const otherUserId = | |
| conv.user1_id === userId ? conv.user2_id : conv.user1_id; | |
| const otherUser = profileMap.get(otherUserId); | |
| return { | |
| ...conv, | |
| other_user: otherUser || { | |
| id: otherUserId, | |
| email: '', | |
| username: 'unknown', | |
| display_name: 'Unknown', | |
| avatar_url: null, | |
| bio: null, | |
| color: null, | |
| status: 'offline', | |
| last_seen: null, | |
| created_at: '', | |
| updated_at: '', | |
| }, | |
| last_message: lastMessagesMap.get(conv.id) || null, | |
| unread_count: unreadCountsMap.get(conv.id) || 0, | |
| }; | |
| }); | |
| return { | |
| conversations: enhancedConversations, | |
| total: count || 0, | |
| hasMore: (count || 0) > offset + limit, | |
| }; | |
| } | |
| /** | |
| * Get DM messages | |
| */ | |
| async getMessages( | |
| conversationId: string, | |
| userId: string, | |
| username: string, | |
| query: QueryDMMessagesDto, | |
| ): Promise<{ | |
| messages: Array<DMMessageWithSender & { read_by_usernames?: string[] }>; | |
| total: number; | |
| hasMore: boolean; | |
| }> { | |
| // Verify user is part of this conversation | |
| const conversation = await this.getConversationById(conversationId, userId); | |
| if (!conversation) { | |
| throw new NotFoundException('Conversation not found'); | |
| } | |
| const { page = 1, limit = 20, before, after } = query; | |
| const offset = (page - 1) * limit; | |
| // Build query | |
| let dbQuery = this.supabaseService | |
| .from('dm_messages') | |
| .select( | |
| `*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`, | |
| { count: 'exact' }, | |
| ) | |
| .eq('conversation_id', conversationId) | |
| .is('deleted_at', null) | |
| .order('created_at', { ascending: false }) | |
| .range(offset, offset + limit - 1); | |
| // Apply cursor pagination | |
| if (before) { | |
| const beforeMessage = await this.getDMMessageById(before); | |
| dbQuery = dbQuery.lt('created_at', beforeMessage.created_at); | |
| } | |
| if (after) { | |
| const afterMessage = await this.getDMMessageById(after); | |
| dbQuery = dbQuery.gt('created_at', afterMessage.created_at); | |
| } | |
| const { data: messages, error, count } = await dbQuery; | |
| if (error) { | |
| this.logger.error('Failed to fetch DM messages:', error.message); | |
| throw new ConflictException('Failed to fetch messages'); | |
| } | |
| // Mark messages as read | |
| const unreadMessages = (messages || []) | |
| .filter((m) => m.sender_id !== userId && !m.is_read) | |
| .map((m) => m.id); | |
| if (unreadMessages.length > 0) { | |
| await this.markAsRead(conversationId, userId, username, unreadMessages); | |
| } | |
| // Enrich messages with read receipts from Redis | |
| const enrichedMessages = await Promise.all( | |
| (messages || []).map(async (msg) => { | |
| const readBy = await this.getDMReadReceipts(msg.id); | |
| return { | |
| ...this.transformMessageWithSender(msg), | |
| read_by_usernames: readBy.length > 0 ? readBy : undefined, | |
| }; | |
| }), | |
| ); | |
| return { | |
| messages: enrichedMessages, | |
| total: count || 0, | |
| hasMore: (count || 0) > offset + limit, | |
| }; | |
| } | |
| /** | |
| * Send a system/agent DM without participant check. | |
| * Used by AI agents, bots, and system notifications. | |
| */ | |
| async sendSystemDM( | |
| conversationId: string, | |
| senderId: string, | |
| dto: SendDMDto, | |
| ): Promise<DMMessageWithSender> { | |
| // Verify conversation exists (but don't check membership) | |
| const { data: conversation, error: convError } = await this.supabaseService | |
| .from('dm_conversations') | |
| .select('*') | |
| .eq('id', conversationId) | |
| .single(); | |
| if (convError || !conversation) { | |
| throw new NotFoundException('Conversation not found'); | |
| } | |
| const receiverId = | |
| conversation.user1_id === senderId | |
| ? conversation.user2_id | |
| : conversation.user1_id; | |
| // Create message | |
| const { data: message, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .insert({ | |
| conversation_id: conversationId, | |
| sender_id: senderId, | |
| content: dto.content, | |
| is_read: false, | |
| }) | |
| .select( | |
| `*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`, | |
| ) | |
| .single(); | |
| if (error) { | |
| this.logger.error('Failed to send system DM:', error.message); | |
| throw new ConflictException('Failed to send message'); | |
| } | |
| // Pipeline Redis operations | |
| const pipeline = this.redisService.pipeline(); | |
| const dmKey = RedisKeys.dm.messages( | |
| conversation.user1_id, | |
| conversation.user2_id, | |
| ); | |
| // Cache message | |
| pipeline.hset(`dmmsg:${message.id}`, { | |
| id: message.id, | |
| conversation_id: message.conversation_id, | |
| sender_id: message.sender_id, | |
| sender_username: message.sender?.username || '', | |
| content: message.content, | |
| created_at: message.created_at, | |
| deleted_at: message.deleted_at || '', | |
| }); | |
| pipeline.expire(`dmmsg:${message.id}`, this.DM_CACHE_TTL); | |
| // Add to conversation messages list | |
| pipeline.zadd( | |
| dmKey, | |
| new Date(message.created_at).getTime(), | |
| message.id, | |
| ); | |
| // Increment unread count for receiver | |
| const unreadKey = RedisKeys.dm.unread(receiverId, senderId); | |
| pipeline.incr(unreadKey); | |
| await pipeline.exec(); | |
| // Create notification for receiver | |
| await this.supabaseService.from('notifications').insert({ | |
| user_id: receiverId, | |
| type: 'dm', | |
| title: 'New direct message', | |
| message: 'You have a new direct message', | |
| data: { conversationId, senderId, messageId: message.id }, | |
| }); | |
| // Note: Realtime broadcast is handled by EventEmitter in handleAgentMention | |
| this.logger.log(`System DM sent from ${senderId} to ${receiverId}`); | |
| return this.transformMessageWithSender(message); | |
| } | |
| /** | |
| * Send DM | |
| */ | |
| async sendMessage( | |
| conversationId: string, | |
| senderId: string, | |
| dto: SendDMDto, | |
| ): Promise<DMMessageWithSender> { | |
| // Verify conversation exists and user is part of it | |
| const conversation = await this.getConversationById(conversationId, senderId); | |
| if (!conversation) { | |
| throw new NotFoundException('Conversation not found'); | |
| } | |
| const receiverId = | |
| conversation.user1_id === senderId | |
| ? conversation.user2_id | |
| : conversation.user1_id; | |
| // Check if blocked | |
| const blocked = await this.isBlocked(senderId, receiverId); | |
| if (blocked) { | |
| throw new ForbiddenException( | |
| 'You have blocked this user or they have blocked you', | |
| ); | |
| } | |
| // Create message | |
| const { data: message, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .insert({ | |
| conversation_id: conversationId, | |
| sender_id: senderId, | |
| content: dto.content, | |
| is_read: false, | |
| }) | |
| .select( | |
| `*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`, | |
| ) | |
| .single(); | |
| if (error) { | |
| this.logger.error('Failed to send DM:', error.message); | |
| throw new ConflictException('Failed to send message'); | |
| } | |
| // Pipeline Redis operations | |
| const pipeline = this.redisService.pipeline(); | |
| const dmKey = RedisKeys.dm.messages( | |
| conversation.user1_id, | |
| conversation.user2_id, | |
| ); | |
| // Cache message (without is_read — use read_by_usernames instead) | |
| pipeline.hset(`dmmsg:${message.id}`, { | |
| id: message.id, | |
| conversation_id: message.conversation_id, | |
| sender_id: message.sender_id, | |
| sender_username: message.sender?.username || '', | |
| content: message.content, | |
| created_at: message.created_at, | |
| deleted_at: message.deleted_at || '', | |
| }); | |
| pipeline.expire(`dmmsg:${message.id}`, this.DM_CACHE_TTL); | |
| // Add to conversation messages list | |
| pipeline.zadd( | |
| dmKey, | |
| new Date(message.created_at).getTime(), | |
| message.id, | |
| ); | |
| // Increment unread count for receiver | |
| const unreadKey = RedisKeys.dm.unread(receiverId, senderId); | |
| pipeline.incr(unreadKey); | |
| await pipeline.exec(); | |
| // Create notification for receiver | |
| await this.supabaseService.from('notifications').insert({ | |
| user_id: receiverId, | |
| type: 'dm', | |
| title: 'New direct message', | |
| message: 'You have a new direct message', | |
| data: { conversationId, senderId, messageId: message.id }, | |
| }); | |
| this.logger.log(`DM sent from ${senderId} to ${receiverId}`); | |
| // Check for bot mention and handle asynchronously (fire-and-forget) | |
| if (this.agentService.containsMention(dto.content)) { | |
| this.handleAgentMention(conversationId, senderId, dto.content, message.sender?.username) | |
| .catch((err) => this.logger.error('Agent DM mention handling failed:', err)); | |
| } | |
| return this.transformMessageWithSender(message); | |
| } | |
| /** | |
| * Send DM message with auto-create conversation support | |
| * This is a convenience method that combines getOrCreateConversation + sendMessage | |
| */ | |
| async sendDMMessage( | |
| userId: string, | |
| dto: SendDMMessageDto, | |
| ): Promise<{ | |
| conversationId: string; | |
| isNewConversation: boolean; | |
| conversation?: DMConversationWithUser; | |
| message: DMMessageWithSender; | |
| }> { | |
| const { recipientId, conversationId, content, replyToId } = dto; | |
| // Validate: must have at least one of recipientId or conversationId | |
| if (!recipientId && !conversationId) { | |
| throw new BadRequestException( | |
| 'Either recipientId or conversationId is required', | |
| ); | |
| } | |
| let finalConversationId: string; | |
| let isNewConversation = false; | |
| let conversation: DMConversationWithUser | undefined; | |
| if (conversationId) { | |
| // Mode: existing conversation | |
| const existing = await this.getConversationById(conversationId, userId); | |
| if (!existing) { | |
| throw new NotFoundException('Conversation not found'); | |
| } | |
| finalConversationId = conversationId; | |
| isNewConversation = false; | |
| } else { | |
| // Mode: new conversation (auto-create) | |
| const result = await this.getOrCreateConversation(userId, { | |
| userId: recipientId!, | |
| }); | |
| finalConversationId = result.id; | |
| isNewConversation = result.is_new || false; | |
| if (isNewConversation) { | |
| conversation = result; | |
| } | |
| } | |
| // Send the message | |
| const message = await this.sendMessage(finalConversationId, userId, { | |
| content, | |
| replyToId, | |
| }); | |
| return { | |
| conversationId: finalConversationId, | |
| isNewConversation, | |
| ...(conversation ? { conversation } : {}), | |
| message, | |
| }; | |
| } | |
| /** | |
| * Delete DM message (soft delete) | |
| */ | |
| async deleteMessage(messageId: string, userId: string): Promise<void> { | |
| const message = await this.getDMMessageById(messageId); | |
| // Verify ownership | |
| if (message.sender_id !== userId) { | |
| throw new ForbiddenException('You can only delete your own messages'); | |
| } | |
| // Soft delete | |
| const { error } = await this.supabaseService | |
| .from('dm_messages') | |
| .update({ | |
| deleted_at: new Date().toISOString(), | |
| content: '[deleted]', | |
| }) | |
| .eq('id', messageId); | |
| if (error) { | |
| this.logger.error('Failed to delete DM:', error.message); | |
| throw new ConflictException('Failed to delete message'); | |
| } | |
| this.logger.log(`DM ${messageId} deleted`); | |
| } | |
| /** | |
| * Get unread count for a conversation | |
| */ | |
| async getUnreadCount( | |
| userId: string, | |
| conversationId: string, | |
| ): Promise<number> { | |
| const conversation = await this.getConversationById(conversationId, userId); | |
| if (!conversation) return 0; | |
| const otherUserId = | |
| conversation.user1_id === userId | |
| ? conversation.user2_id | |
| : conversation.user1_id; | |
| // Try Redis first | |
| const unreadKey = RedisKeys.dm.unread(userId, otherUserId); | |
| const cached = await this.redisService.get(unreadKey); | |
| if (cached) { | |
| return parseInt(cached, 10) || 0; | |
| } | |
| // Count from database | |
| const { count, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .select('*', { count: 'exact', head: true }) | |
| .eq('conversation_id', conversationId) | |
| .eq('is_read', false) | |
| .neq('sender_id', userId); | |
| if (error) { | |
| this.logger.error('Failed to get unread count:', error.message); | |
| return 0; | |
| } | |
| // Cache the count | |
| await this.redisService.set( | |
| unreadKey, | |
| String(count || 0), | |
| this.DM_CACHE_TTL, | |
| ); | |
| return count || 0; | |
| } | |
| /** | |
| * Mark messages as read | |
| */ | |
| async markAsRead( | |
| conversationId: string, | |
| userId: string, | |
| username: string, | |
| messageIds?: string[], | |
| ): Promise<string[]> { | |
| const conversation = await this.getConversationById(conversationId, userId); | |
| if (!conversation) return []; | |
| let query = this.supabaseService | |
| .from('dm_messages') | |
| .update({ is_read: true }) | |
| .eq('conversation_id', conversationId) | |
| .neq('sender_id', userId) | |
| .eq('is_read', false); | |
| if (messageIds && messageIds.length > 0) { | |
| query = query.in('id', messageIds); | |
| } | |
| const { data: updatedMessages, error } = await query.select('id'); | |
| if (error) { | |
| this.logger.error('Failed to mark messages as read:', error.message); | |
| return []; | |
| } | |
| // Add username to Redis read receipts for each message | |
| const markedIds: string[] = []; | |
| const now = new Date().toISOString(); | |
| if (updatedMessages && updatedMessages.length > 0) { | |
| const pipeline = this.redisService.pipeline(); | |
| for (const msg of updatedMessages) { | |
| const readKey = RedisKeys.dm.read(msg.id); | |
| const readAtKey = RedisKeys.dm.readAt(msg.id); | |
| pipeline.sadd(readKey, username); | |
| pipeline.expire(readKey, this.DM_CACHE_TTL); | |
| pipeline.hset(readAtKey, username, now); | |
| pipeline.expire(readAtKey, this.DM_CACHE_TTL); | |
| markedIds.push(msg.id); | |
| } | |
| await pipeline.exec(); | |
| } | |
| // Clear unread count in Redis | |
| const otherUserId = | |
| conversation.user1_id === userId | |
| ? conversation.user2_id | |
| : conversation.user1_id; | |
| const unreadKey = RedisKeys.dm.unread(userId, otherUserId); | |
| await this.redisService.del(unreadKey); | |
| return markedIds; | |
| } | |
| /** | |
| * Get DM message read receipts (usernames who read) | |
| */ | |
| async getDMReadReceipts(messageId: string): Promise<string[]> { | |
| const readKey = RedisKeys.dm.read(messageId); | |
| const readers = await this.redisService.smembers(readKey); | |
| return readers || []; | |
| } | |
| /** | |
| * Get DM message read receipts with timestamps | |
| */ | |
| async getDMReadReceiptsWithTiming( | |
| messageId: string, | |
| ): Promise<Array<{ username: string; readAt: string }>> { | |
| const readAtKey = RedisKeys.dm.readAt(messageId); | |
| const readAtMap = await this.redisService.hgetall(readAtKey); | |
| return Object.entries(readAtMap).map(([username, readAt]) => ({ | |
| username, | |
| readAt, | |
| })); | |
| } | |
| /** | |
| * Get DM message read latency (time from send to read) | |
| */ | |
| async getDMReadLatency( | |
| messageId: string, | |
| ): Promise<{ | |
| messageId: string; | |
| createdAt: string | null; | |
| readReceipts: Array<{ username: string; readAt: string; latencyMs: number }>; | |
| averageLatencyMs: number | null; | |
| }> { | |
| // Get message created_at from DB | |
| const { data: message, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .select('id, created_at') | |
| .eq('id', messageId) | |
| .single(); | |
| if (error || !message) { | |
| this.logger.warn(`Message not found for latency: ${messageId}`); | |
| return { | |
| messageId, | |
| createdAt: null, | |
| readReceipts: [], | |
| averageLatencyMs: null, | |
| }; | |
| } | |
| const createdAt = new Date(message.created_at).getTime(); | |
| // Get read timestamps from Redis | |
| const readAtKey = RedisKeys.dm.readAt(messageId); | |
| const readAtMap = await this.redisService.hgetall(readAtKey); | |
| const readReceipts = Object.entries(readAtMap).map(([username, readAt]) => { | |
| const readAtMs = new Date(readAt).getTime(); | |
| const latencyMs = readAtMs - createdAt; | |
| return { | |
| username, | |
| readAt, | |
| latencyMs: Math.max(0, latencyMs), | |
| }; | |
| }); | |
| const averageLatencyMs = | |
| readReceipts.length > 0 | |
| ? Math.round( | |
| readReceipts.reduce((sum, r) => sum + r.latencyMs, 0) / | |
| readReceipts.length, | |
| ) | |
| : null; | |
| return { | |
| messageId, | |
| createdAt: message.created_at, | |
| readReceipts, | |
| averageLatencyMs, | |
| }; | |
| } | |
| /** | |
| * Block a user | |
| */ | |
| async blockUser(blockerId: string, blockedId: string): Promise<void> { | |
| if (blockerId === blockedId) { | |
| throw new BadRequestException('Cannot block yourself'); | |
| } | |
| const { error } = await this.supabaseService.from('blocked_users').insert({ | |
| blocker_id: blockerId, | |
| blocked_id: blockedId, | |
| }); | |
| if (error) { | |
| if (error.code === '23505') { | |
| throw new ConflictException('User already blocked'); | |
| } | |
| this.logger.error('Failed to block user:', error.message); | |
| throw new ConflictException('Failed to block user'); | |
| } | |
| this.logger.log(`User ${blockerId} blocked ${blockedId}`); | |
| } | |
| /** | |
| * Unblock a user | |
| */ | |
| async unblockUser(blockerId: string, blockedId: string): Promise<void> { | |
| const { error } = await this.supabaseService | |
| .from('blocked_users') | |
| .delete() | |
| .eq('blocker_id', blockerId) | |
| .eq('blocked_id', blockedId); | |
| if (error) { | |
| this.logger.error('Failed to unblock user:', error.message); | |
| throw new ConflictException('Failed to unblock user'); | |
| } | |
| this.logger.log(`User ${blockerId} unblocked ${blockedId}`); | |
| } | |
| /** | |
| * Get blocked users | |
| */ | |
| async getBlockedUsers(userId: string): Promise<any[]> { | |
| const { data: blocked, error } = await this.supabaseService | |
| .from('blocked_users') | |
| .select( | |
| ` | |
| blocked_id, | |
| created_at, | |
| blocked:profiles!blocked_users_blocked_id_fkey ( | |
| id, | |
| email, | |
| display_name, | |
| avatar_url, | |
| bio, | |
| color, | |
| status | |
| ) | |
| `, | |
| ) | |
| .eq('blocker_id', userId); | |
| if (error) { | |
| this.logger.error('Failed to get blocked users:', error.message); | |
| return []; | |
| } | |
| return (blocked || []).map((b: any) => ({ | |
| id: b.blocked?.id, | |
| email: b.blocked?.email, | |
| displayName: b.blocked?.display_name, | |
| avatar: b.blocked?.avatar_url, | |
| bio: b.blocked?.bio, | |
| color: b.blocked?.color, | |
| status: b.blocked?.status, | |
| blockedAt: b.created_at, | |
| })); | |
| } | |
| /** | |
| * Find existing conversation between two users | |
| */ | |
| private async findExistingConversation( | |
| user1Id: string, | |
| user2Id: string, | |
| ): Promise<DMConversation | null> { | |
| // Try cache first | |
| const dmKey = RedisKeys.dm.byId(user1Id, user2Id); | |
| const cached = await this.redisService.get(dmKey); | |
| if (cached) { | |
| return JSON.parse(cached); | |
| } | |
| // Query database using parameterized filters instead of string interpolation | |
| const { data: conversation, error } = await this.supabaseService | |
| .from('dm_conversations') | |
| .select('*') | |
| .or( | |
| `and(user1_id.eq.${user1Id},user2_id.eq.${user2Id}),and(user1_id.eq.${user2Id},user2_id.eq.${user1Id})`, | |
| ) | |
| .single(); | |
| if (error || !conversation) { | |
| return null; | |
| } | |
| await this.cacheConversation(conversation); | |
| return conversation; | |
| } | |
| /** | |
| * Get conversation by ID | |
| */ | |
| async getConversationById( | |
| conversationId: string, | |
| userId?: string, | |
| ): Promise<DMConversation | null> { | |
| // Try cache first | |
| const cacheKey = `dm:conversation:${conversationId}`; | |
| const cached = await this.redisService.get(cacheKey); | |
| if (cached) { | |
| const conversation: DMConversation = JSON.parse(cached); | |
| if (!userId || conversation.user1_id === userId || conversation.user2_id === userId) { | |
| return conversation; | |
| } | |
| } | |
| const { data: conversation, error } = await this.supabaseService | |
| .from('dm_conversations') | |
| .select('*') | |
| .eq('id', conversationId) | |
| .single(); | |
| if (error || !conversation) { | |
| return null; | |
| } | |
| // Verify user is member if userId provided | |
| if (userId && conversation.user1_id !== userId && conversation.user2_id !== userId) { | |
| return null; | |
| } | |
| // Cache result | |
| await this.redisService.set(cacheKey, JSON.stringify(conversation), 300); | |
| return conversation; | |
| } | |
| /** | |
| * Get DM message by ID | |
| */ | |
| private async getDMMessageById(messageId: string): Promise<DMMessage> { | |
| const { data: message, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .select('*') | |
| .eq('id', messageId) | |
| .single(); | |
| if (error || !message) { | |
| throw new NotFoundException('Message not found'); | |
| } | |
| return message; | |
| } | |
| /** | |
| * Get last message in conversation | |
| */ | |
| private async getLastMessage( | |
| conversationId: string, | |
| ): Promise<DMMessage | null> { | |
| const { data: message, error } = await this.supabaseService | |
| .from('dm_messages') | |
| .select('*') | |
| .eq('conversation_id', conversationId) | |
| .is('deleted_at', null) | |
| .order('created_at', { ascending: false }) | |
| .limit(1) | |
| .single(); | |
| if (error) { | |
| return null; | |
| } | |
| return message; | |
| } | |
| /** | |
| * Check if users have blocked each other | |
| */ | |
| private async isBlocked(user1Id: string, user2Id: string): Promise<boolean> { | |
| const { data: block, error } = await this.supabaseService | |
| .from('blocked_users') | |
| .select('id') | |
| .or( | |
| `and(blocker_id.eq.${user1Id},blocked_id.eq.${user2Id}),and(blocker_id.eq.${user2Id},blocked_id.eq.${user1Id})`, | |
| ) | |
| .maybeSingle(); | |
| if (error) { | |
| this.logger.error('Failed to check block status:', error.message); | |
| throw new ConflictException('Failed to check block status'); | |
| } | |
| return !!block; | |
| } | |
| /** | |
| * Cache conversation | |
| */ | |
| private async cacheConversation(conversation: DMConversation): Promise<void> { | |
| const dmKey = RedisKeys.dm.byId( | |
| conversation.user1_id, | |
| conversation.user2_id, | |
| ); | |
| await this.redisService.set( | |
| dmKey, | |
| JSON.stringify(conversation), | |
| this.DM_CACHE_TTL, | |
| ); | |
| } | |
| /** | |
| * Cache DM message | |
| */ | |
| private async cacheDMMessage(message: DMMessage): Promise<void> { | |
| const key = `dmmsg:${message.id}`; | |
| await this.redisService.hset(key, { | |
| id: message.id, | |
| conversation_id: message.conversation_id, | |
| sender_id: message.sender_id, | |
| sender_username: message.sender_username || '', | |
| content: message.content, | |
| created_at: message.created_at, | |
| deleted_at: message.deleted_at || '', | |
| }); | |
| await this.redisService.expire(key, this.DM_CACHE_TTL); | |
| } | |
| /** | |
| * Transform message with sender | |
| */ | |
| private transformMessageWithSender(msg: any): DMMessageWithSender { | |
| return { | |
| id: msg.id, | |
| conversation_id: msg.conversation_id, | |
| sender_id: msg.sender_id, | |
| sender_username: msg.sender_username || msg.sender?.username || '', | |
| content: msg.content, | |
| is_read: msg.is_read ?? (msg.read_by_usernames?.length > 0), | |
| read_by_usernames: msg.read_by_usernames || [], | |
| created_at: msg.created_at, | |
| updated_at: msg.updated_at || msg.created_at, | |
| deleted_at: msg.deleted_at, | |
| reply_to: msg.reply_to || null, | |
| sender: msg.sender || { | |
| id: msg.sender_id, | |
| email: '', | |
| display_name: 'Unknown', | |
| avatar_url: null, | |
| bio: null, | |
| color: null, | |
| status: 'offline', | |
| }, | |
| }; | |
| } | |
| /** | |
| * Handle agent mention in DM: call agent API and send reply | |
| */ | |
| private async handleAgentMention( | |
| conversationId: string, | |
| userId: string, | |
| content: string, | |
| username?: string, | |
| ): Promise<void> { | |
| const botUserId = this.agentService.getBotUserId(); | |
| if (!botUserId) { | |
| this.logger.warn('AGENT_BOT_USER_ID not configured, skipping DM mention'); | |
| return; | |
| } | |
| const prompt = this.agentService.extractPrompt(content); | |
| // Emit "bot thinking" event immediately so FE shows indicator | |
| this.eventEmitter.emit('bot.dm.thinking', { | |
| conversationId, | |
| botUserId, | |
| }); | |
| const reply = await this.agentService.callAgent(conversationId, userId, prompt, username); | |
| if (reply) { | |
| const botMessage = await this.sendSystemDM(conversationId, botUserId, { content: reply }); | |
| this.logger.log(`Agent replied in DM conversation ${conversationId}`); | |
| // Emit event for gateway to broadcast directly (same as user message) | |
| this.eventEmitter.emit('bot.dm.reply', { | |
| conversationId, | |
| message: botMessage, | |
| }); | |
| } | |
| } | |
| } | |