inbound-calls / app.js
saikumaraili
added app.js
6d147f9
import Fastify from 'fastify';
import WebSocket from 'ws';
import fs from 'fs';
import dotenv from 'dotenv';
import fastifyFormBody from '@fastify/formbody';
import fastifyWs from '@fastify/websocket';
import fetch from 'node-fetch';
// Load environment variables from .env file
dotenv.config();
// Retrieve the OpenAI and Gemini API keys from environment variables
const { OPENAI_API_KEY, GOOGLE_API_KEY } = process.env;
if (!OPENAI_API_KEY || !GOOGLE_API_KEY) {
console.error('Missing OpenAI or Google API key. Please set them in the .env file.');
process.exit(1);
}
// Initialize Fastify
const fastify = Fastify();
fastify.register(fastifyFormBody);
fastify.register(fastifyWs);
// Constants
const SYSTEM_MESSAGE = 'You are an AI receptionist for Barts Automotive. Your job is to politely engage with the client and obtain their name, availability, and service/work required. Ask one question at a time. Do not ask for other contact information, and do not check availability, assume we are free. Ensure the conversation remains friendly and professional, and guide the user to provide these details naturally. If necessary, ask follow-up questions to gather the required information.';
const VOICE = 'alloy';
const PORT = process.env.PORT || 5050;
const WEBHOOK_URL = "<input your webhook URL here>";
// Session management
const sessions = new Map();
// List of Event Types to log to the console
const LOG_EVENT_TYPES = [
'response.content.done',
'rate_limits.updated',
'response.done',
'input_audio_buffer.committed',
'input_audio_buffer.speech_stopped',
'input_audio_buffer.speech_started',
'session.created',
'response.text.done',
'conversation.item.input_audio_transcription.completed'
];
// Root Route
fastify.get('/', async (request, reply) => {
reply.send({ message: 'Twilio Media Stream Server is running!' });
});
// Route for Twilio to handle incoming and outgoing calls
fastify.all('/incoming-call', async (request, reply) => {
console.log('Incoming call');
const twimlResponse = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>Hi, you have called Bart's Automative Centre. How can we help?</Say>
<Connect>
<Stream url="wss://${request.headers.host}/media-stream" />
</Connect>
</Response>`;
reply.type('text/xml').send(twimlResponse);
});
// WebSocket route for media-stream
fastify.register(async (fastify) => {
fastify.get('/media-stream', { websocket: true }, (connection, req) => {
console.log('Client connected');
const sessionId = req.headers['x-twilio-call-sid'] || `session_${Date.now()}`;
let session = sessions.get(sessionId) || { transcript: '', streamSid: null };
sessions.set(sessionId, session);
const openAiWs = new WebSocket('wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01', {
headers: {
Authorization: `Bearer ${OPENAI_API_KEY}`,
"OpenAI-Beta": "realtime=v1"
}
});
const sendSessionUpdate = () => {
const sessionUpdate = {
type: 'session.update',
session: {
turn_detection: { type: 'server_vad' },
input_audio_format: 'g711_ulaw',
output_audio_format: 'g711_ulaw',
voice: VOICE,
instructions: SYSTEM_MESSAGE,
modalities: ["text", "audio"],
temperature: 0.8,
input_audio_transcription: {
"model": "whisper-1"
}
}
};
console.log('Sending session update:', JSON.stringify(sessionUpdate));
openAiWs.send(JSON.stringify(sessionUpdate));
};
// Open event for OpenAI WebSocket
openAiWs.on('open', () => {
console.log('Connected to the OpenAI Realtime API');
setTimeout(sendSessionUpdate, 250);
});
// Listen for messages from the OpenAI WebSocket
openAiWs.on('message', (data) => {
try {
const response = JSON.parse(data);
if (LOG_EVENT_TYPES.includes(response.type)) {
console.log(`Received event: ${response.type}`, response);
}
// User message transcription handling
if (response.type === 'conversation.item.input_audio_transcription.completed') {
const userMessage = response.transcript.trim();
session.transcript += `User: ${userMessage}\n`;
console.log(`User (${sessionId}): ${userMessage}`);
}
// Agent message handling - Remove this, as Gemini will provide the final response
// if (response.type === 'response.done') {
// const agentMessage = response.response.output[0]?.content?.find(content => content.transcript)?.transcript || 'Agent message not found';
// session.transcript += `Agent: ${agentMessage}\n`;
// console.log(`Agent (${sessionId}): ${agentMessage}`);
// }
if (response.type === 'session.updated') {
console.log('Session updated successfully:', response);
}
if (response.type === 'response.audio.delta' && response.delta) {
const audioDelta = {
event: 'media',
streamSid: session.streamSid,
media: { payload: Buffer.from(response.delta, 'base64').toString('base64') }
};
connection.send(JSON.stringify(audioDelta));
}
} catch (error) {
console.error('Error processing OpenAI message:', error, 'Raw message:', data);
}
});
// Handle incoming messages from Twilio
connection.on('message', (message) => {
try {
const data = JSON.parse(message);
switch (data.event) {
case 'media':
if (openAiWs.readyState === WebSocket.OPEN) {
const audioAppend = {
type: 'input_audio_buffer.append',
audio: data.media.payload
};
openAiWs.send(JSON.stringify(audioAppend));
}
break;
case 'start':
session.streamSid = data.start.streamSid;
console.log('Incoming stream has started', session.streamSid);
break;
default:
console.log('Received non-media event:', data.event);
break;
}
} catch (error) {
console.error('Error parsing message:', error, 'Message:', message);
}
});
// Handle connection close and log transcript
connection.on('close', async () => {
if (openAiWs.readyState === WebSocket.OPEN) openAiWs.close();
console.log(`Client disconnected (${sessionId}).`);
console.log('Full Transcript:');
console.log(session.transcript);
await processTranscriptAndSend(session.transcript, sessionId);
// Clean up the session
sessions.delete(sessionId);
});
// Handle WebSocket close and errors
openAiWs.on('close', () => {
console.log('Disconnected from the OpenAI Realtime API');
});
openAiWs.on('error', (error) => {
console.error('Error in the OpenAI WebSocket:', error);
});
});
});
fastify.listen({ port: PORT }, (err) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log(`Server is listening on port ${PORT}`);
});
// Function to make Gemini API completion call with structured outputs
async function makeGeminiCompletion(transcript) {
console.log('Starting Gemini API call...');
try {
const response = await fetch('https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro:generateContent?key=' + GOOGLE_API_KEY, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
contents: [{
role: "user",
parts: [{ text: `${SYSTEM_MESSAGE}\n\n${transcript}` }]
}],
generationConfig: {
temperature: 0.8,
topP: 1,
topK: 32,
maxOutputTokens: 4096,
},
tools: []
})
});
console.log('Gemini API response status:', response.status);
const data = await response.json();
console.log('Full Gemini API response:', JSON.stringify(data, null, 2));
if (data.candidates && data.candidates[0] && data.candidates[0].content && data.candidates[0].content.parts) {
return data.candidates[0].content.parts[0].text; // Return the text response
} else {
console.error('Unexpected response structure from Gemini API');
return null;
}
} catch (error) {
console.error('Error making Gemini completion call:', error);
throw error;
}
}
// Function to send data to Make.com webhook
async function sendToWebhook(payload) {
console.log('Sending data to webhook:', JSON.stringify(payload, null, 2));
try {
const response = await fetch(WEBHOOK_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(payload)
});
console.log('Webhook response status:', response.status);
if (response.ok) {
console.log('Data successfully sent to webhook.');
} else {
console.error('Failed to send data to webhook:', response.statusText);
}
} catch (error) {
console.error('Error sending data to webhook:', error);
}
}
// Function to extract customer details from Gemini's response
function extractCustomerDetails(text) {
// Implement your logic to extract customerName, customerAvailability, and specialNotes from the text
// This will depend on the format of the response from Gemini
const customerDetails = {
customerName: "",
customerAvailability: "",
specialNotes: ""
};
// Example logic (adjust based on Gemini's response format):
if (text.includes("Name:")) {
customerDetails.customerName = text.split("Name:")[1].split("\n")[0].trim();
}
if (text.includes("Availability:")) {
customerDetails.customerAvailability = text.split("Availability:")[1].split("\n")[0].trim();
}
if (text.includes("Notes:")) {
customerDetails.specialNotes = text.split("Notes:")[1].split("\n")[0].trim();
}
return customerDetails;
}
// Main function to process the transcript and send customer details
async function processTranscriptAndSend(transcript, sessionId = null) {
console.log(`Starting transcript processing for session ${sessionId}...`);
try {
// Make the Gemini completion call
const geminiResponse = await makeGeminiCompletion(transcript);
if (geminiResponse) {
// Extract customer details from Gemini's response
const customerDetails = extractCustomerDetails(geminiResponse);
console.log('Extracted customer details:', customerDetails);
// Send the extracted details to the webhook
await sendToWebhook(customerDetails);
console.log('Sent customer details to webhook.');
} else {
console.error('Gemini API did not return a valid response.');
}
} catch (error) {
console.error('Error in processTranscriptAndSend:', error);
}
}