092_UI_core / src /modules /dms /dms.service.ts
anotherath's picture
agent typing
de11f27
import {
Injectable,
Logger,
NotFoundException,
ForbiddenException,
ConflictException,
BadRequestException,
} from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { SupabaseService } from '../../database/supabase.service';
import { RedisService } from '../../redis/redis.service';
import { RedisKeys } from '../../redis/keys';
import { AgentService } from '../agent/agent.service';
import {
CreateDMConversationDto,
SendDMDto,
SendDMMessageDto,
QueryDMMessagesDto,
QueryDMConversationsDto,
} from './dto';
interface DMConversation {
id: string;
user1_id: string;
user2_id: string;
created_at: string;
}
interface DMConversationWithUser extends DMConversation {
other_user: {
id: string;
email: string;
username: string;
display_name: string;
avatar_url: string | null;
bio: string | null;
color: string | null;
status: string;
last_seen: string | null;
created_at: string;
updated_at: string;
};
last_message?: DMMessage | null;
unread_count: number;
is_new?: boolean;
}
interface DMMessage {
id: string;
conversation_id: string;
sender_id: string;
sender_username: string;
content: string;
is_read: boolean;
created_at: string;
deleted_at: string | null;
}
interface DMMessageWithSender extends DMMessage {
updated_at?: string;
reply_to?: string | null;
sender: {
id: string;
email: string;
username: string;
display_name: string;
avatar_url: string | null;
bio: string | null;
color: string | null;
status: string;
};
read_by_usernames?: string[];
}
@Injectable()
export class DMsService {
private readonly logger = new Logger(DMsService.name);
private readonly DM_CACHE_TTL = 1209600; // 2 weeks
private readonly RECENT_MESSAGES_LIMIT = 50;
constructor(
private readonly supabaseService: SupabaseService,
private readonly redisService: RedisService,
private readonly agentService: AgentService,
private readonly eventEmitter: EventEmitter2,
) {}
/**
* Get or create DM conversation
*/
async getOrCreateConversation(
userId: string,
dto: CreateDMConversationDto,
): Promise<DMConversationWithUser> {
const { userId: otherUserId } = dto;
// Prevent self-messaging
if (userId === otherUserId) {
throw new BadRequestException('Cannot create conversation with yourself');
}
// Check if other user exists and is not blocked
const { data: otherUser, error: userError } = await this.supabaseService
.from('profiles')
.select('id, email, username, display_name, avatar_url, bio, color, status, last_seen, created_at, updated_at')
.eq('id', otherUserId)
.single();
if (userError || !otherUser) {
throw new NotFoundException('User not found');
}
// Check if blocked
const isBlocked = await this.isBlocked(userId, otherUserId);
if (isBlocked) {
throw new ForbiddenException(
'You have blocked this user or they have blocked you',
);
}
// Check for existing conversation
const conversation = await this.findExistingConversation(
userId,
otherUserId,
);
let finalConversation: DMConversation;
let isNew = false;
if (!conversation) {
// Create new conversation
const { data: newConversation, error } = await this.supabaseService
.from('dm_conversations')
.insert({
user1_id: userId < otherUserId ? userId : otherUserId,
user2_id: userId < otherUserId ? otherUserId : userId,
})
.select()
.single();
if (error) {
// Handle race condition: another request may have created the conversation
if (error.code === '23505') {
this.logger.warn(
`Race condition detected for conversation between ${userId} and ${otherUserId}`,
);
const existingConversation = await this.findExistingConversation(
userId,
otherUserId,
);
if (existingConversation) {
finalConversation = existingConversation;
} else {
this.logger.error('Failed to find conversation after race condition');
throw new ConflictException('Failed to create conversation');
}
} else {
this.logger.error('Failed to create conversation:', error.message);
throw new ConflictException('Failed to create conversation');
}
} else {
finalConversation = newConversation;
isNew = true;
// Cache the conversation
await this.cacheConversation(finalConversation);
}
} else {
finalConversation = conversation;
}
// Get last message
const lastMessage = await this.getLastMessage(finalConversation.id);
// Get unread count
const unreadCount = await this.getUnreadCount(userId, finalConversation.id);
return {
id: finalConversation.id,
user1_id: finalConversation.user1_id,
user2_id: finalConversation.user2_id,
created_at: finalConversation.created_at,
other_user: {
id: otherUser.id,
email: otherUser.email,
username: otherUser.username,
display_name: otherUser.display_name,
avatar_url: otherUser.avatar_url,
bio: otherUser.bio,
color: otherUser.color,
status: otherUser.status,
last_seen: otherUser.last_seen,
created_at: otherUser.created_at,
updated_at: otherUser.updated_at,
},
last_message: lastMessage,
unread_count: unreadCount,
is_new: isNew,
};
}
/**
* Get user's DM conversations
*/
async getConversations(
userId: string,
query: QueryDMConversationsDto,
): Promise<{
conversations: DMConversationWithUser[];
total: number;
hasMore: boolean;
}> {
const { page = 1, limit = 20 } = query;
const offset = (page - 1) * limit;
// Get conversations where user is either user1 or user2
const {
data: conversations,
error,
count,
} = await this.supabaseService
.from('dm_conversations')
.select('*', { count: 'exact' })
.or(`user1_id.eq.${userId},user2_id.eq.${userId}`)
.order('created_at', { ascending: false })
.range(offset, offset + limit - 1);
if (error) {
this.logger.error('Failed to fetch conversations:', error.message);
throw new ConflictException('Failed to fetch conversations');
}
// Batch fetch all related data to avoid N+1 queries
const conversationIds = (conversations || []).map((c) => c.id);
const otherUserIds = (conversations || []).map((conv) =>
conv.user1_id === userId ? conv.user2_id : conv.user1_id,
);
// Batch fetch all related data in parallel to avoid N+1 queries
const [
profilesResult,
lastMessagesResult,
] = await Promise.all([
// Batch 1: Fetch all profiles
otherUserIds.length > 0
? this.supabaseService
.from('profiles')
.select('id, email, username, display_name, avatar_url, bio, color, status, last_seen, created_at, updated_at')
.in('id', otherUserIds)
: Promise.resolve({ data: [] }),
// Batch 2: Fetch last messages for all conversations
// Query each conversation's last message individually to ensure correctness
conversationIds.length > 0
? (async () => {
const messages: any[] = [];
const msgPromises = conversationIds.map(async (convId) => {
const { data, error } = await this.supabaseService
.from('dm_messages')
.select('id, conversation_id, sender_id, content, is_read, created_at, deleted_at')
.eq('conversation_id', convId)
.is('deleted_at', null)
.order('created_at', { ascending: false })
.limit(1)
.single();
if (error) {
this.logger.debug(`[DEBUG] getLastMsg error for ${convId}: ${error.message}`);
return null;
}
return data;
});
const results = await Promise.all(msgPromises);
return { data: results.filter((m) => m !== null) };
})()
: Promise.resolve({ data: [] }),
]);
const profileMap = new Map(
(profilesResult.data || []).map((p) => [p.id, p]),
);
// Keep only the latest message per conversation
const lastMessagesMap = new Map<string, any | null>();
for (const msg of lastMessagesResult.data || []) {
if (!lastMessagesMap.has(msg.conversation_id)) {
lastMessagesMap.set(msg.conversation_id, msg);
}
}
// Batch 3: Fetch unread counts for all conversations (parallel)
const unreadCountsMap = new Map<string, number>();
if (conversationIds.length > 0) {
const unreadPromises = conversationIds.map(async (convId) => {
const { count } = await this.supabaseService
.from('dm_messages')
.select('*', { count: 'exact', head: true })
.eq('conversation_id', convId)
.eq('is_read', false)
.neq('sender_id', userId);
return { convId, count: count || 0 };
});
const unreadResults = await Promise.all(unreadPromises);
for (const { convId, count } of unreadResults) {
unreadCountsMap.set(convId, count);
}
}
// Enhance conversations with batched data
const enhancedConversations = (conversations || []).map((conv) => {
const otherUserId =
conv.user1_id === userId ? conv.user2_id : conv.user1_id;
const otherUser = profileMap.get(otherUserId);
return {
...conv,
other_user: otherUser || {
id: otherUserId,
email: '',
username: 'unknown',
display_name: 'Unknown',
avatar_url: null,
bio: null,
color: null,
status: 'offline',
last_seen: null,
created_at: '',
updated_at: '',
},
last_message: lastMessagesMap.get(conv.id) || null,
unread_count: unreadCountsMap.get(conv.id) || 0,
};
});
return {
conversations: enhancedConversations,
total: count || 0,
hasMore: (count || 0) > offset + limit,
};
}
/**
* Get DM messages
*/
async getMessages(
conversationId: string,
userId: string,
username: string,
query: QueryDMMessagesDto,
): Promise<{
messages: Array<DMMessageWithSender & { read_by_usernames?: string[] }>;
total: number;
hasMore: boolean;
}> {
// Verify user is part of this conversation
const conversation = await this.getConversationById(conversationId, userId);
if (!conversation) {
throw new NotFoundException('Conversation not found');
}
const { page = 1, limit = 20, before, after } = query;
const offset = (page - 1) * limit;
// Build query
let dbQuery = this.supabaseService
.from('dm_messages')
.select(
`*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`,
{ count: 'exact' },
)
.eq('conversation_id', conversationId)
.is('deleted_at', null)
.order('created_at', { ascending: false })
.range(offset, offset + limit - 1);
// Apply cursor pagination
if (before) {
const beforeMessage = await this.getDMMessageById(before);
dbQuery = dbQuery.lt('created_at', beforeMessage.created_at);
}
if (after) {
const afterMessage = await this.getDMMessageById(after);
dbQuery = dbQuery.gt('created_at', afterMessage.created_at);
}
const { data: messages, error, count } = await dbQuery;
if (error) {
this.logger.error('Failed to fetch DM messages:', error.message);
throw new ConflictException('Failed to fetch messages');
}
// Mark messages as read
const unreadMessages = (messages || [])
.filter((m) => m.sender_id !== userId && !m.is_read)
.map((m) => m.id);
if (unreadMessages.length > 0) {
await this.markAsRead(conversationId, userId, username, unreadMessages);
}
// Enrich messages with read receipts from Redis
const enrichedMessages = await Promise.all(
(messages || []).map(async (msg) => {
const readBy = await this.getDMReadReceipts(msg.id);
return {
...this.transformMessageWithSender(msg),
read_by_usernames: readBy.length > 0 ? readBy : undefined,
};
}),
);
return {
messages: enrichedMessages,
total: count || 0,
hasMore: (count || 0) > offset + limit,
};
}
/**
* Send a system/agent DM without participant check.
* Used by AI agents, bots, and system notifications.
*/
async sendSystemDM(
conversationId: string,
senderId: string,
dto: SendDMDto,
): Promise<DMMessageWithSender> {
// Verify conversation exists (but don't check membership)
const { data: conversation, error: convError } = await this.supabaseService
.from('dm_conversations')
.select('*')
.eq('id', conversationId)
.single();
if (convError || !conversation) {
throw new NotFoundException('Conversation not found');
}
const receiverId =
conversation.user1_id === senderId
? conversation.user2_id
: conversation.user1_id;
// Create message
const { data: message, error } = await this.supabaseService
.from('dm_messages')
.insert({
conversation_id: conversationId,
sender_id: senderId,
content: dto.content,
is_read: false,
})
.select(
`*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`,
)
.single();
if (error) {
this.logger.error('Failed to send system DM:', error.message);
throw new ConflictException('Failed to send message');
}
// Pipeline Redis operations
const pipeline = this.redisService.pipeline();
const dmKey = RedisKeys.dm.messages(
conversation.user1_id,
conversation.user2_id,
);
// Cache message
pipeline.hset(`dmmsg:${message.id}`, {
id: message.id,
conversation_id: message.conversation_id,
sender_id: message.sender_id,
sender_username: message.sender?.username || '',
content: message.content,
created_at: message.created_at,
deleted_at: message.deleted_at || '',
});
pipeline.expire(`dmmsg:${message.id}`, this.DM_CACHE_TTL);
// Add to conversation messages list
pipeline.zadd(
dmKey,
new Date(message.created_at).getTime(),
message.id,
);
// Increment unread count for receiver
const unreadKey = RedisKeys.dm.unread(receiverId, senderId);
pipeline.incr(unreadKey);
await pipeline.exec();
// Create notification for receiver
await this.supabaseService.from('notifications').insert({
user_id: receiverId,
type: 'dm',
title: 'New direct message',
message: 'You have a new direct message',
data: { conversationId, senderId, messageId: message.id },
});
// Note: Realtime broadcast is handled by EventEmitter in handleAgentMention
this.logger.log(`System DM sent from ${senderId} to ${receiverId}`);
return this.transformMessageWithSender(message);
}
/**
* Send DM
*/
async sendMessage(
conversationId: string,
senderId: string,
dto: SendDMDto,
): Promise<DMMessageWithSender> {
// Verify conversation exists and user is part of it
const conversation = await this.getConversationById(conversationId, senderId);
if (!conversation) {
throw new NotFoundException('Conversation not found');
}
const receiverId =
conversation.user1_id === senderId
? conversation.user2_id
: conversation.user1_id;
// Check if blocked
const blocked = await this.isBlocked(senderId, receiverId);
if (blocked) {
throw new ForbiddenException(
'You have blocked this user or they have blocked you',
);
}
// Create message
const { data: message, error } = await this.supabaseService
.from('dm_messages')
.insert({
conversation_id: conversationId,
sender_id: senderId,
content: dto.content,
is_read: false,
})
.select(
`*, sender:profiles!dm_messages_sender_id_fkey ( id, email, username, display_name, avatar_url, bio, color, status )`,
)
.single();
if (error) {
this.logger.error('Failed to send DM:', error.message);
throw new ConflictException('Failed to send message');
}
// Pipeline Redis operations
const pipeline = this.redisService.pipeline();
const dmKey = RedisKeys.dm.messages(
conversation.user1_id,
conversation.user2_id,
);
// Cache message (without is_read — use read_by_usernames instead)
pipeline.hset(`dmmsg:${message.id}`, {
id: message.id,
conversation_id: message.conversation_id,
sender_id: message.sender_id,
sender_username: message.sender?.username || '',
content: message.content,
created_at: message.created_at,
deleted_at: message.deleted_at || '',
});
pipeline.expire(`dmmsg:${message.id}`, this.DM_CACHE_TTL);
// Add to conversation messages list
pipeline.zadd(
dmKey,
new Date(message.created_at).getTime(),
message.id,
);
// Increment unread count for receiver
const unreadKey = RedisKeys.dm.unread(receiverId, senderId);
pipeline.incr(unreadKey);
await pipeline.exec();
// Create notification for receiver
await this.supabaseService.from('notifications').insert({
user_id: receiverId,
type: 'dm',
title: 'New direct message',
message: 'You have a new direct message',
data: { conversationId, senderId, messageId: message.id },
});
this.logger.log(`DM sent from ${senderId} to ${receiverId}`);
// Check for bot mention and handle asynchronously (fire-and-forget)
if (this.agentService.containsMention(dto.content)) {
this.handleAgentMention(conversationId, senderId, dto.content, message.sender?.username)
.catch((err) => this.logger.error('Agent DM mention handling failed:', err));
}
return this.transformMessageWithSender(message);
}
/**
* Send DM message with auto-create conversation support
* This is a convenience method that combines getOrCreateConversation + sendMessage
*/
async sendDMMessage(
userId: string,
dto: SendDMMessageDto,
): Promise<{
conversationId: string;
isNewConversation: boolean;
conversation?: DMConversationWithUser;
message: DMMessageWithSender;
}> {
const { recipientId, conversationId, content, replyToId } = dto;
// Validate: must have at least one of recipientId or conversationId
if (!recipientId && !conversationId) {
throw new BadRequestException(
'Either recipientId or conversationId is required',
);
}
let finalConversationId: string;
let isNewConversation = false;
let conversation: DMConversationWithUser | undefined;
if (conversationId) {
// Mode: existing conversation
const existing = await this.getConversationById(conversationId, userId);
if (!existing) {
throw new NotFoundException('Conversation not found');
}
finalConversationId = conversationId;
isNewConversation = false;
} else {
// Mode: new conversation (auto-create)
const result = await this.getOrCreateConversation(userId, {
userId: recipientId!,
});
finalConversationId = result.id;
isNewConversation = result.is_new || false;
if (isNewConversation) {
conversation = result;
}
}
// Send the message
const message = await this.sendMessage(finalConversationId, userId, {
content,
replyToId,
});
return {
conversationId: finalConversationId,
isNewConversation,
...(conversation ? { conversation } : {}),
message,
};
}
/**
* Delete DM message (soft delete)
*/
async deleteMessage(messageId: string, userId: string): Promise<void> {
const message = await this.getDMMessageById(messageId);
// Verify ownership
if (message.sender_id !== userId) {
throw new ForbiddenException('You can only delete your own messages');
}
// Soft delete
const { error } = await this.supabaseService
.from('dm_messages')
.update({
deleted_at: new Date().toISOString(),
content: '[deleted]',
})
.eq('id', messageId);
if (error) {
this.logger.error('Failed to delete DM:', error.message);
throw new ConflictException('Failed to delete message');
}
this.logger.log(`DM ${messageId} deleted`);
}
/**
* Get unread count for a conversation
*/
async getUnreadCount(
userId: string,
conversationId: string,
): Promise<number> {
const conversation = await this.getConversationById(conversationId, userId);
if (!conversation) return 0;
const otherUserId =
conversation.user1_id === userId
? conversation.user2_id
: conversation.user1_id;
// Try Redis first
const unreadKey = RedisKeys.dm.unread(userId, otherUserId);
const cached = await this.redisService.get(unreadKey);
if (cached) {
return parseInt(cached, 10) || 0;
}
// Count from database
const { count, error } = await this.supabaseService
.from('dm_messages')
.select('*', { count: 'exact', head: true })
.eq('conversation_id', conversationId)
.eq('is_read', false)
.neq('sender_id', userId);
if (error) {
this.logger.error('Failed to get unread count:', error.message);
return 0;
}
// Cache the count
await this.redisService.set(
unreadKey,
String(count || 0),
this.DM_CACHE_TTL,
);
return count || 0;
}
/**
* Mark messages as read
*/
async markAsRead(
conversationId: string,
userId: string,
username: string,
messageIds?: string[],
): Promise<string[]> {
const conversation = await this.getConversationById(conversationId, userId);
if (!conversation) return [];
let query = this.supabaseService
.from('dm_messages')
.update({ is_read: true })
.eq('conversation_id', conversationId)
.neq('sender_id', userId)
.eq('is_read', false);
if (messageIds && messageIds.length > 0) {
query = query.in('id', messageIds);
}
const { data: updatedMessages, error } = await query.select('id');
if (error) {
this.logger.error('Failed to mark messages as read:', error.message);
return [];
}
// Add username to Redis read receipts for each message
const markedIds: string[] = [];
const now = new Date().toISOString();
if (updatedMessages && updatedMessages.length > 0) {
const pipeline = this.redisService.pipeline();
for (const msg of updatedMessages) {
const readKey = RedisKeys.dm.read(msg.id);
const readAtKey = RedisKeys.dm.readAt(msg.id);
pipeline.sadd(readKey, username);
pipeline.expire(readKey, this.DM_CACHE_TTL);
pipeline.hset(readAtKey, username, now);
pipeline.expire(readAtKey, this.DM_CACHE_TTL);
markedIds.push(msg.id);
}
await pipeline.exec();
}
// Clear unread count in Redis
const otherUserId =
conversation.user1_id === userId
? conversation.user2_id
: conversation.user1_id;
const unreadKey = RedisKeys.dm.unread(userId, otherUserId);
await this.redisService.del(unreadKey);
return markedIds;
}
/**
* Get DM message read receipts (usernames who read)
*/
async getDMReadReceipts(messageId: string): Promise<string[]> {
const readKey = RedisKeys.dm.read(messageId);
const readers = await this.redisService.smembers(readKey);
return readers || [];
}
/**
* Get DM message read receipts with timestamps
*/
async getDMReadReceiptsWithTiming(
messageId: string,
): Promise<Array<{ username: string; readAt: string }>> {
const readAtKey = RedisKeys.dm.readAt(messageId);
const readAtMap = await this.redisService.hgetall(readAtKey);
return Object.entries(readAtMap).map(([username, readAt]) => ({
username,
readAt,
}));
}
/**
* Get DM message read latency (time from send to read)
*/
async getDMReadLatency(
messageId: string,
): Promise<{
messageId: string;
createdAt: string | null;
readReceipts: Array<{ username: string; readAt: string; latencyMs: number }>;
averageLatencyMs: number | null;
}> {
// Get message created_at from DB
const { data: message, error } = await this.supabaseService
.from('dm_messages')
.select('id, created_at')
.eq('id', messageId)
.single();
if (error || !message) {
this.logger.warn(`Message not found for latency: ${messageId}`);
return {
messageId,
createdAt: null,
readReceipts: [],
averageLatencyMs: null,
};
}
const createdAt = new Date(message.created_at).getTime();
// Get read timestamps from Redis
const readAtKey = RedisKeys.dm.readAt(messageId);
const readAtMap = await this.redisService.hgetall(readAtKey);
const readReceipts = Object.entries(readAtMap).map(([username, readAt]) => {
const readAtMs = new Date(readAt).getTime();
const latencyMs = readAtMs - createdAt;
return {
username,
readAt,
latencyMs: Math.max(0, latencyMs),
};
});
const averageLatencyMs =
readReceipts.length > 0
? Math.round(
readReceipts.reduce((sum, r) => sum + r.latencyMs, 0) /
readReceipts.length,
)
: null;
return {
messageId,
createdAt: message.created_at,
readReceipts,
averageLatencyMs,
};
}
/**
* Block a user
*/
async blockUser(blockerId: string, blockedId: string): Promise<void> {
if (blockerId === blockedId) {
throw new BadRequestException('Cannot block yourself');
}
const { error } = await this.supabaseService.from('blocked_users').insert({
blocker_id: blockerId,
blocked_id: blockedId,
});
if (error) {
if (error.code === '23505') {
throw new ConflictException('User already blocked');
}
this.logger.error('Failed to block user:', error.message);
throw new ConflictException('Failed to block user');
}
this.logger.log(`User ${blockerId} blocked ${blockedId}`);
}
/**
* Unblock a user
*/
async unblockUser(blockerId: string, blockedId: string): Promise<void> {
const { error } = await this.supabaseService
.from('blocked_users')
.delete()
.eq('blocker_id', blockerId)
.eq('blocked_id', blockedId);
if (error) {
this.logger.error('Failed to unblock user:', error.message);
throw new ConflictException('Failed to unblock user');
}
this.logger.log(`User ${blockerId} unblocked ${blockedId}`);
}
/**
* Get blocked users
*/
async getBlockedUsers(userId: string): Promise<any[]> {
const { data: blocked, error } = await this.supabaseService
.from('blocked_users')
.select(
`
blocked_id,
created_at,
blocked:profiles!blocked_users_blocked_id_fkey (
id,
email,
display_name,
avatar_url,
bio,
color,
status
)
`,
)
.eq('blocker_id', userId);
if (error) {
this.logger.error('Failed to get blocked users:', error.message);
return [];
}
return (blocked || []).map((b: any) => ({
id: b.blocked?.id,
email: b.blocked?.email,
displayName: b.blocked?.display_name,
avatar: b.blocked?.avatar_url,
bio: b.blocked?.bio,
color: b.blocked?.color,
status: b.blocked?.status,
blockedAt: b.created_at,
}));
}
/**
* Find existing conversation between two users
*/
private async findExistingConversation(
user1Id: string,
user2Id: string,
): Promise<DMConversation | null> {
// Try cache first
const dmKey = RedisKeys.dm.byId(user1Id, user2Id);
const cached = await this.redisService.get(dmKey);
if (cached) {
return JSON.parse(cached);
}
// Query database using parameterized filters instead of string interpolation
const { data: conversation, error } = await this.supabaseService
.from('dm_conversations')
.select('*')
.or(
`and(user1_id.eq.${user1Id},user2_id.eq.${user2Id}),and(user1_id.eq.${user2Id},user2_id.eq.${user1Id})`,
)
.single();
if (error || !conversation) {
return null;
}
await this.cacheConversation(conversation);
return conversation;
}
/**
* Get conversation by ID
*/
async getConversationById(
conversationId: string,
userId?: string,
): Promise<DMConversation | null> {
// Try cache first
const cacheKey = `dm:conversation:${conversationId}`;
const cached = await this.redisService.get(cacheKey);
if (cached) {
const conversation: DMConversation = JSON.parse(cached);
if (!userId || conversation.user1_id === userId || conversation.user2_id === userId) {
return conversation;
}
}
const { data: conversation, error } = await this.supabaseService
.from('dm_conversations')
.select('*')
.eq('id', conversationId)
.single();
if (error || !conversation) {
return null;
}
// Verify user is member if userId provided
if (userId && conversation.user1_id !== userId && conversation.user2_id !== userId) {
return null;
}
// Cache result
await this.redisService.set(cacheKey, JSON.stringify(conversation), 300);
return conversation;
}
/**
* Get DM message by ID
*/
private async getDMMessageById(messageId: string): Promise<DMMessage> {
const { data: message, error } = await this.supabaseService
.from('dm_messages')
.select('*')
.eq('id', messageId)
.single();
if (error || !message) {
throw new NotFoundException('Message not found');
}
return message;
}
/**
* Get last message in conversation
*/
private async getLastMessage(
conversationId: string,
): Promise<DMMessage | null> {
const { data: message, error } = await this.supabaseService
.from('dm_messages')
.select('*')
.eq('conversation_id', conversationId)
.is('deleted_at', null)
.order('created_at', { ascending: false })
.limit(1)
.single();
if (error) {
return null;
}
return message;
}
/**
* Check if users have blocked each other
*/
private async isBlocked(user1Id: string, user2Id: string): Promise<boolean> {
const { data: block, error } = await this.supabaseService
.from('blocked_users')
.select('id')
.or(
`and(blocker_id.eq.${user1Id},blocked_id.eq.${user2Id}),and(blocker_id.eq.${user2Id},blocked_id.eq.${user1Id})`,
)
.maybeSingle();
if (error) {
this.logger.error('Failed to check block status:', error.message);
throw new ConflictException('Failed to check block status');
}
return !!block;
}
/**
* Cache conversation
*/
private async cacheConversation(conversation: DMConversation): Promise<void> {
const dmKey = RedisKeys.dm.byId(
conversation.user1_id,
conversation.user2_id,
);
await this.redisService.set(
dmKey,
JSON.stringify(conversation),
this.DM_CACHE_TTL,
);
}
/**
* Cache DM message
*/
private async cacheDMMessage(message: DMMessage): Promise<void> {
const key = `dmmsg:${message.id}`;
await this.redisService.hset(key, {
id: message.id,
conversation_id: message.conversation_id,
sender_id: message.sender_id,
sender_username: message.sender_username || '',
content: message.content,
created_at: message.created_at,
deleted_at: message.deleted_at || '',
});
await this.redisService.expire(key, this.DM_CACHE_TTL);
}
/**
* Transform message with sender
*/
private transformMessageWithSender(msg: any): DMMessageWithSender {
return {
id: msg.id,
conversation_id: msg.conversation_id,
sender_id: msg.sender_id,
sender_username: msg.sender_username || msg.sender?.username || '',
content: msg.content,
is_read: msg.is_read ?? (msg.read_by_usernames?.length > 0),
read_by_usernames: msg.read_by_usernames || [],
created_at: msg.created_at,
updated_at: msg.updated_at || msg.created_at,
deleted_at: msg.deleted_at,
reply_to: msg.reply_to || null,
sender: msg.sender || {
id: msg.sender_id,
email: '',
display_name: 'Unknown',
avatar_url: null,
bio: null,
color: null,
status: 'offline',
},
};
}
/**
* Handle agent mention in DM: call agent API and send reply
*/
private async handleAgentMention(
conversationId: string,
userId: string,
content: string,
username?: string,
): Promise<void> {
const botUserId = this.agentService.getBotUserId();
if (!botUserId) {
this.logger.warn('AGENT_BOT_USER_ID not configured, skipping DM mention');
return;
}
const prompt = this.agentService.extractPrompt(content);
// Emit "bot thinking" event immediately so FE shows indicator
this.eventEmitter.emit('bot.dm.thinking', {
conversationId,
botUserId,
});
const reply = await this.agentService.callAgent(conversationId, userId, prompt, username);
if (reply) {
const botMessage = await this.sendSystemDM(conversationId, botUserId, { content: reply });
this.logger.log(`Agent replied in DM conversation ${conversationId}`);
// Emit event for gateway to broadcast directly (same as user message)
this.eventEmitter.emit('bot.dm.reply', {
conversationId,
message: botMessage,
});
}
}
}