import axios, { AxiosError } from 'axios'; import FormData from 'form-data'; import { Agent } from '../entities/Agent'; import { ChatMessage, MessageRole } from '../entities/ChatMessage'; import { AppDataSource } from '../config/database'; export interface ChatRequest { message: string; conversationId?: string; userId?: string; metadata?: Record; files?: Array<{ fieldname: string; originalname: string; mimetype: string; size: number; buffer: Buffer; ipfsHash?: string; // CID if uploaded to IPFS url?: string; // Direct URL if agent provides }>; } export interface ChatResponse { response: string; conversationId: string; metadata?: Record; error?: string; } /** * Chat Protocol Service * Handles communication with agents via standardized chat interface */ export class ChatProtocolService { private timeout: number; private maxFileSize: number; constructor() { this.timeout = parseInt(process.env.AGENT_CHAT_TIMEOUT || '30000'); this.maxFileSize = parseInt(process.env.MAX_FILE_SIZE || '10485760'); // 10MB default } /** * Validate files (no IPFS upload - files stored temporarily in memory) * Files are kept in memory during request processing and sent directly to agent endpoints */ private validateFiles(files: ChatRequest['files']): void { if (!files || files.length === 0) { return; } for (const file of files) { // Validate file size if (file.size > this.maxFileSize) { throw new Error( `File ${file.originalname} exceeds maximum size of ${this.maxFileSize} bytes` ); } } } /** * Send a chat message to an agent */ async sendMessage( agent: Agent, request: ChatRequest ): Promise { try { // Validate message length const maxLength = parseInt(process.env.MAX_MESSAGE_LENGTH || '10000'); if (request.message && request.message.length > maxLength) { throw new Error(`Message exceeds maximum length of ${maxLength} characters`); } // Store files temporarily (don't upload to IPFS) // Files will be available for 1 hour before cleanup let uploadedFiles: Array<{ fieldname: string; originalname: string; mimetype: string; size: number; buffer: Buffer; tempUrl?: string; // Temporary URL for chat viewing expiresAt: Date; // Cleanup after 1 hour }> = []; if (request.files && request.files.length > 0) { // Validate files first this.validateFiles(request.files); // Store files temporarily without IPFS upload // Files are kept in memory during request processing // Metadata stored in DB with 1-hour expiration timestamp const oneHourFromNow = new Date(Date.now() + 60 * 60 * 1000); uploadedFiles = request.files.map(file => ({ fieldname: file.fieldname, originalname: file.originalname, mimetype: file.mimetype, size: file.size, buffer: file.buffer, expiresAt: oneHourFromNow, })); } const conversationId = request.conversationId || this.generateConversationId(); // Build metadata with file information (temporary storage, no IPFS) const metadata = { ...request.metadata, agentId: agent.id, timestamp: new Date().toISOString(), ...(uploadedFiles.length > 0 && { files: uploadedFiles.map(f => ({ fieldname: f.fieldname, originalname: f.originalname, mimetype: f.mimetype, size: f.size, expiresAt: f.expiresAt.toISOString(), // Note: Files are stored temporarily, not on IPFS })), }), }; // Determine if we need to send as multipart/form-data or JSON const hasFiles = uploadedFiles.length > 0; // Detect file-only requests (single file, no message, endpoint suggests file-only) const isFileOnly = hasFiles && uploadedFiles.length === 1 && (!request.message || request.message.trim() === '') && (agent.endpoint?.toLowerCase().includes('/remove-background') || agent.endpoint?.toLowerCase().includes('/transcribe') || agent.endpoint?.toLowerCase().includes('/edit')); let response; if (hasFiles) { // Send as multipart/form-data with files const formData = new FormData(); // For file-only requests, only send the file (no message, conversationId, metadata, systemPrompt) if (isFileOnly) { const file = request.files![0]; const audioFiles = request.files!.filter(f => f.mimetype.startsWith('audio/')); // If single audio file, use 'audio_file' (common for transcription agents) if (audioFiles.length === 1) { formData.append('audio_file', file.buffer, { filename: file.originalname, contentType: file.mimetype, }); } else { // For other single files, use 'file' formData.append('file', file.buffer, { filename: file.originalname, contentType: file.mimetype, }); } } else { // Regular requests with files - include message, conversationId, metadata // Add message if (request.message) { formData.append('message', request.message); } formData.append('conversationId', conversationId); formData.append('metadata', JSON.stringify(metadata)); if (agent.promptTemplate) { formData.append('systemPrompt', agent.promptTemplate); } // Send actual files directly to agent endpoint (no IPFS upload) // Use smart field naming based on file type and count if (request.files && request.files.length > 0) { const audioFiles = request.files.filter(f => f.mimetype.startsWith('audio/')); // If single audio file, use 'audio_file' (common for transcription agents) if (audioFiles.length === 1 && request.files.length === 1) { const audioFile = request.files.find(f => f.mimetype.startsWith('audio/')); if (audioFile) { formData.append('audio_file', audioFile.buffer, { filename: audioFile.originalname, contentType: audioFile.mimetype, }); } } // If single file of any type, use 'file' else if (request.files.length === 1) { const file = request.files[0]; formData.append('file', file.buffer, { filename: file.originalname, contentType: file.mimetype, }); } // Multiple files - use 'files' array or indexed fields else { request.files.forEach((file, index) => { // Try common field names first if (file.mimetype.startsWith('audio/')) { formData.append(`audio_file${index > 0 ? `_${index}` : ''}`, file.buffer, { filename: file.originalname, contentType: file.mimetype, }); } else if (file.mimetype.startsWith('image/')) { formData.append(`image_file${index > 0 ? `_${index}` : ''}`, file.buffer, { filename: file.originalname, contentType: file.mimetype, }); } else { formData.append(`file${index > 0 ? `_${index}` : ''}`, file.buffer, { filename: file.originalname, contentType: file.mimetype, }); } }); } } } response = await axios.post(agent.endpoint, formData, { timeout: this.timeout, headers: { ...formData.getHeaders(), ...(agent.metadata?.headers && agent.metadata.headers), }, responseType: 'arraybuffer', }); } else { // Send as JSON (no files) const payload = { message: request.message || '', conversationId, metadata, ...(agent.promptTemplate && { systemPrompt: agent.promptTemplate }), }; response = await axios.post(agent.endpoint, payload, { timeout: this.timeout, headers: { 'Content-Type': 'application/json', ...(agent.metadata?.headers && agent.metadata.headers), }, responseType: 'arraybuffer', }); } // Check if response is a binary image const contentType = response.headers['content-type'] || response.headers['Content-Type'] || ''; let agentResponse: any; let imageBase64: string | null = null; if (contentType.startsWith('image/')) { // Response is a binary image - convert to base64 const buffer = Buffer.isBuffer(response.data) ? response.data : Buffer.from(response.data); imageBase64 = buffer.toString('base64'); const mimeType = contentType.split(';')[0].trim(); agentResponse = { response: `data:${mimeType};base64,${imageBase64}`, }; } else { // Try to parse as JSON or text try { const buffer = Buffer.isBuffer(response.data) ? response.data : Buffer.from(response.data); const textResponse = buffer.toString('utf-8'); try { agentResponse = JSON.parse(textResponse); } catch { // Not JSON, treat as plain text agentResponse = { response: textResponse, }; } } catch { // Fallback: treat as text const buffer = Buffer.isBuffer(response.data) ? response.data : Buffer.from(response.data); const textResponse = buffer.toString('utf-8'); agentResponse = { response: textResponse, }; } } const finalConversationId = agentResponse.conversationId || conversationId; // Build content with file references const userContent = request.message || ''; const filesInfo = uploadedFiles.length > 0 ? `\n[Files: ${uploadedFiles.map(f => f.originalname).join(', ')}]` : ''; // Save message to database with file metadata (files stored temporarily, not on IPFS) await this.saveMessage({ agentId: agent.id, userId: request.userId, role: MessageRole.USER, content: userContent + filesInfo, metadata: { ...metadata, files: uploadedFiles.map(f => ({ fieldname: f.fieldname, originalname: f.originalname, mimetype: f.mimetype, size: f.size, expiresAt: f.expiresAt.toISOString(), // Note: File buffers are not stored in DB, only metadata // Files are available temporarily in memory for 1 hour })), }, }); const assistantContent = agentResponse.response || agentResponse.message || (typeof agentResponse === 'string' ? agentResponse : JSON.stringify(agentResponse)); await this.saveMessage({ agentId: agent.id, userId: request.userId, role: MessageRole.ASSISTANT, content: assistantContent, metadata: { ...agentResponse.metadata, files: agentResponse.files || [], }, }); // Update agent usage count agent.usageCount += 1; await AppDataSource.getRepository(Agent).save(agent); return { response: assistantContent, conversationId: finalConversationId, metadata: { ...agentResponse.metadata, files: uploadedFiles, }, }; } catch (error) { const axiosError = error as AxiosError; if (axios.isAxiosError(axiosError)) { if (axiosError.code === 'ECONNABORTED') { throw new Error('Agent request timeout'); } if (axiosError.response) { throw new Error( `Agent returned error: ${axiosError.response.status} - ${JSON.stringify(axiosError.response.data)}` ); } if (axiosError.request) { throw new Error('Agent endpoint unreachable'); } } throw error; } } /** * Save chat message to database */ private async saveMessage(data: { agentId: string; userId?: string; role: MessageRole; content: string; metadata?: Record; }): Promise { const messageRepository = AppDataSource.getRepository(ChatMessage); const message = messageRepository.create(data); return await messageRepository.save(message); } /** * Generate a unique conversation ID */ private generateConversationId(): string { return `conv_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } /** * Get conversation history */ async getConversationHistory( agentId: string, userId: string, conversationId?: string, limit: number = 50 ): Promise { const messageRepository = AppDataSource.getRepository(ChatMessage); const query = messageRepository .createQueryBuilder('message') .where('message.agentId = :agentId', { agentId }) .andWhere('message.userId = :userId', { userId }) .orderBy('message.createdAt', 'ASC') .limit(limit); if (conversationId) { query.andWhere('message.metadata->>conversationId = :conversationId', { conversationId, }); } return await query.getMany(); } }