zurri / src /services /chatProtocol.ts
nexusbert's picture
fixed handlers
881a982
import axios, { AxiosError } from 'axios';
import FormData from 'form-data';
import { Agent } from '../entities/Agent';
import { ChatMessage, MessageRole } from '../entities/ChatMessage';
import { AppDataSource } from '../config/database';
export interface ChatRequest {
message: string;
conversationId?: string;
userId?: string;
metadata?: Record<string, any>;
files?: Array<{
fieldname: string;
originalname: string;
mimetype: string;
size: number;
buffer: Buffer;
ipfsHash?: string; // CID if uploaded to IPFS
url?: string; // Direct URL if agent provides
}>;
}
export interface ChatResponse {
response: string;
conversationId: string;
metadata?: Record<string, any>;
error?: string;
}
/**
* Chat Protocol Service
* Handles communication with agents via standardized chat interface
*/
export class ChatProtocolService {
private timeout: number;
private maxFileSize: number;
constructor() {
this.timeout = parseInt(process.env.AGENT_CHAT_TIMEOUT || '30000');
this.maxFileSize = parseInt(process.env.MAX_FILE_SIZE || '10485760'); // 10MB default
}
/**
* Validate files (no IPFS upload - files stored temporarily in memory)
* Files are kept in memory during request processing and sent directly to agent endpoints
*/
private validateFiles(files: ChatRequest['files']): void {
if (!files || files.length === 0) {
return;
}
for (const file of files) {
// Validate file size
if (file.size > this.maxFileSize) {
throw new Error(
`File ${file.originalname} exceeds maximum size of ${this.maxFileSize} bytes`
);
}
}
}
/**
* Send a chat message to an agent
*/
async sendMessage(
agent: Agent,
request: ChatRequest
): Promise<ChatResponse> {
try {
// Validate message length
const maxLength = parseInt(process.env.MAX_MESSAGE_LENGTH || '10000');
if (request.message && request.message.length > maxLength) {
throw new Error(`Message exceeds maximum length of ${maxLength} characters`);
}
// Store files temporarily (don't upload to IPFS)
// Files will be available for 1 hour before cleanup
let uploadedFiles: Array<{
fieldname: string;
originalname: string;
mimetype: string;
size: number;
buffer: Buffer;
tempUrl?: string; // Temporary URL for chat viewing
expiresAt: Date; // Cleanup after 1 hour
}> = [];
if (request.files && request.files.length > 0) {
// Validate files first
this.validateFiles(request.files);
// Store files temporarily without IPFS upload
// Files are kept in memory during request processing
// Metadata stored in DB with 1-hour expiration timestamp
const oneHourFromNow = new Date(Date.now() + 60 * 60 * 1000);
uploadedFiles = request.files.map(file => ({
fieldname: file.fieldname,
originalname: file.originalname,
mimetype: file.mimetype,
size: file.size,
buffer: file.buffer,
expiresAt: oneHourFromNow,
}));
}
const conversationId = request.conversationId || this.generateConversationId();
// Build metadata with file information (temporary storage, no IPFS)
const metadata = {
...request.metadata,
agentId: agent.id,
timestamp: new Date().toISOString(),
...(uploadedFiles.length > 0 && {
files: uploadedFiles.map(f => ({
fieldname: f.fieldname,
originalname: f.originalname,
mimetype: f.mimetype,
size: f.size,
expiresAt: f.expiresAt.toISOString(),
// Note: Files are stored temporarily, not on IPFS
})),
}),
};
// Determine if we need to send as multipart/form-data or JSON
const hasFiles = uploadedFiles.length > 0;
// Detect file-only requests (single file, no message, endpoint suggests file-only)
const isFileOnly = hasFiles &&
uploadedFiles.length === 1 &&
(!request.message || request.message.trim() === '') &&
(agent.endpoint?.toLowerCase().includes('/remove-background') ||
agent.endpoint?.toLowerCase().includes('/transcribe') ||
agent.endpoint?.toLowerCase().includes('/edit'));
let response;
if (hasFiles) {
// Send as multipart/form-data with files
const formData = new FormData();
// For file-only requests, only send the file (no message, conversationId, metadata, systemPrompt)
if (isFileOnly) {
const file = request.files![0];
const audioFiles = request.files!.filter(f => f.mimetype.startsWith('audio/'));
// If single audio file, use 'audio_file' (common for transcription agents)
if (audioFiles.length === 1) {
formData.append('audio_file', file.buffer, {
filename: file.originalname,
contentType: file.mimetype,
});
} else {
// For other single files, use 'file'
formData.append('file', file.buffer, {
filename: file.originalname,
contentType: file.mimetype,
});
}
} else {
// Regular requests with files - include message, conversationId, metadata
// Add message
if (request.message) {
formData.append('message', request.message);
}
formData.append('conversationId', conversationId);
formData.append('metadata', JSON.stringify(metadata));
if (agent.promptTemplate) {
formData.append('systemPrompt', agent.promptTemplate);
}
// Send actual files directly to agent endpoint (no IPFS upload)
// Use smart field naming based on file type and count
if (request.files && request.files.length > 0) {
const audioFiles = request.files.filter(f => f.mimetype.startsWith('audio/'));
// If single audio file, use 'audio_file' (common for transcription agents)
if (audioFiles.length === 1 && request.files.length === 1) {
const audioFile = request.files.find(f => f.mimetype.startsWith('audio/'));
if (audioFile) {
formData.append('audio_file', audioFile.buffer, {
filename: audioFile.originalname,
contentType: audioFile.mimetype,
});
}
}
// If single file of any type, use 'file'
else if (request.files.length === 1) {
const file = request.files[0];
formData.append('file', file.buffer, {
filename: file.originalname,
contentType: file.mimetype,
});
}
// Multiple files - use 'files' array or indexed fields
else {
request.files.forEach((file, index) => {
// Try common field names first
if (file.mimetype.startsWith('audio/')) {
formData.append(`audio_file${index > 0 ? `_${index}` : ''}`, file.buffer, {
filename: file.originalname,
contentType: file.mimetype,
});
} else if (file.mimetype.startsWith('image/')) {
formData.append(`image_file${index > 0 ? `_${index}` : ''}`, file.buffer, {
filename: file.originalname,
contentType: file.mimetype,
});
} else {
formData.append(`file${index > 0 ? `_${index}` : ''}`, file.buffer, {
filename: file.originalname,
contentType: file.mimetype,
});
}
});
}
}
}
response = await axios.post(agent.endpoint, formData, {
timeout: this.timeout,
headers: {
...formData.getHeaders(),
...(agent.metadata?.headers && agent.metadata.headers),
},
responseType: 'arraybuffer',
});
} else {
// Send as JSON (no files)
const payload = {
message: request.message || '',
conversationId,
metadata,
...(agent.promptTemplate && { systemPrompt: agent.promptTemplate }),
};
response = await axios.post(agent.endpoint, payload, {
timeout: this.timeout,
headers: {
'Content-Type': 'application/json',
...(agent.metadata?.headers && agent.metadata.headers),
},
responseType: 'arraybuffer',
});
}
// Check if response is a binary image
const contentType = response.headers['content-type'] || response.headers['Content-Type'] || '';
let agentResponse: any;
let imageBase64: string | null = null;
if (contentType.startsWith('image/')) {
// Response is a binary image - convert to base64
const buffer = Buffer.isBuffer(response.data) ? response.data : Buffer.from(response.data);
imageBase64 = buffer.toString('base64');
const mimeType = contentType.split(';')[0].trim();
agentResponse = {
response: `data:${mimeType};base64,${imageBase64}`,
};
} else {
// Try to parse as JSON or text
try {
const buffer = Buffer.isBuffer(response.data) ? response.data : Buffer.from(response.data);
const textResponse = buffer.toString('utf-8');
try {
agentResponse = JSON.parse(textResponse);
} catch {
// Not JSON, treat as plain text
agentResponse = {
response: textResponse,
};
}
} catch {
// Fallback: treat as text
const buffer = Buffer.isBuffer(response.data) ? response.data : Buffer.from(response.data);
const textResponse = buffer.toString('utf-8');
agentResponse = {
response: textResponse,
};
}
}
const finalConversationId = agentResponse.conversationId || conversationId;
// Build content with file references
const userContent = request.message || '';
const filesInfo = uploadedFiles.length > 0
? `\n[Files: ${uploadedFiles.map(f => f.originalname).join(', ')}]`
: '';
// Save message to database with file metadata (files stored temporarily, not on IPFS)
await this.saveMessage({
agentId: agent.id,
userId: request.userId,
role: MessageRole.USER,
content: userContent + filesInfo,
metadata: {
...metadata,
files: uploadedFiles.map(f => ({
fieldname: f.fieldname,
originalname: f.originalname,
mimetype: f.mimetype,
size: f.size,
expiresAt: f.expiresAt.toISOString(),
// Note: File buffers are not stored in DB, only metadata
// Files are available temporarily in memory for 1 hour
})),
},
});
const assistantContent = agentResponse.response || agentResponse.message || (typeof agentResponse === 'string' ? agentResponse : JSON.stringify(agentResponse));
await this.saveMessage({
agentId: agent.id,
userId: request.userId,
role: MessageRole.ASSISTANT,
content: assistantContent,
metadata: {
...agentResponse.metadata,
files: agentResponse.files || [],
},
});
// Update agent usage count
agent.usageCount += 1;
await AppDataSource.getRepository(Agent).save(agent);
return {
response: assistantContent,
conversationId: finalConversationId,
metadata: {
...agentResponse.metadata,
files: uploadedFiles,
},
};
} catch (error) {
const axiosError = error as AxiosError;
if (axios.isAxiosError(axiosError)) {
if (axiosError.code === 'ECONNABORTED') {
throw new Error('Agent request timeout');
}
if (axiosError.response) {
throw new Error(
`Agent returned error: ${axiosError.response.status} - ${JSON.stringify(axiosError.response.data)}`
);
}
if (axiosError.request) {
throw new Error('Agent endpoint unreachable');
}
}
throw error;
}
}
/**
* Save chat message to database
*/
private async saveMessage(data: {
agentId: string;
userId?: string;
role: MessageRole;
content: string;
metadata?: Record<string, any>;
}): Promise<ChatMessage> {
const messageRepository = AppDataSource.getRepository(ChatMessage);
const message = messageRepository.create(data);
return await messageRepository.save(message);
}
/**
* Generate a unique conversation ID
*/
private generateConversationId(): string {
return `conv_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Get conversation history
*/
async getConversationHistory(
agentId: string,
userId: string,
conversationId?: string,
limit: number = 50
): Promise<ChatMessage[]> {
const messageRepository = AppDataSource.getRepository(ChatMessage);
const query = messageRepository
.createQueryBuilder('message')
.where('message.agentId = :agentId', { agentId })
.andWhere('message.userId = :userId', { userId })
.orderBy('message.createdAt', 'ASC')
.limit(limit);
if (conversationId) {
query.andWhere('message.metadata->>conversationId = :conversationId', {
conversationId,
});
}
return await query.getMany();
}
}