rtmp / server.ts
wuhp's picture
Update server.ts
8cfcabf verified
import express from 'express';
import { createServer as createViteServer } from 'vite';
import { createProxyMiddleware } from 'http-proxy-middleware';
import path from 'path';
import crypto from 'crypto';
import { createRequire } from 'module';
import { fileURLToPath } from 'url';
import { spawn, execSync } from 'child_process';
import fs from 'fs';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const require = createRequire(import.meta.url);
const NodeMediaServer = require('node-media-server');
let constTunnelUrl: string | null = null;
let startingBore = false;
async function startBoreTunnel() {
if (startingBore) return;
startingBore = true;
const borePath = path.join(process.cwd(), 'bore');
if (!fs.existsSync(borePath)) {
console.log('[SYSTEM] Downloading bore TCP proxy...');
try {
const res = await fetch("https://github.com/ekzhang/bore/releases/download/v0.5.1/bore-v0.5.1-x86_64-unknown-linux-musl.tar.gz");
const buffer = await res.arrayBuffer();
fs.writeFileSync("bore.tar.gz", Buffer.from(buffer));
execSync("tar -xzf bore.tar.gz");
execSync("chmod +x bore");
fs.unlinkSync("bore.tar.gz");
} catch(e) {
console.error('[SYSTEM] Failed to download bore tunnel:', e);
return;
}
}
console.log('[SYSTEM] Starting bore TCP tunnel on port 1935...');
const cp = spawn(borePath, ['local', '1935', '--to', 'bore.pub']);
const handleData = (data: Buffer) => {
const text = data.toString();
const match = text.match(/listening at (bore\.pub:\d+)/);
if (match) {
constTunnelUrl = `rtmp://${match[1]}/live`;
console.log(`[SYSTEM] TCP Tunnel active at: ${constTunnelUrl}`);
}
};
cp.stdout.on('data', handleData);
cp.stderr.on('data', handleData);
cp.on('close', (code) => {
console.log(`[SYSTEM] Bore tunnel closed with code ${code}`);
constTunnelUrl = null;
setTimeout(() => { startingBore = false; startBoreTunnel(); }, 5000);
});
}
async function startServer() {
const app = express();
const PORT = process.env.PORT ? parseInt(process.env.PORT as string, 10) : 3000;
// Logging middleware
app.use((req, res, next) => {
console.log(`[REQ] ${req.method} ${req.url}`);
next();
});
// Start Node Media Server for RTMP Ingest (1935) and HTTP FLV (8123)
const nmsConfig = {
rtmp: {
port: 1935,
chunk_size: 60000,
gop_cache: true,
ping: 30,
ping_timeout: 60
},
http: {
port: 8123,
allow_origin: '*',
}
};
const nms = new NodeMediaServer(nmsConfig);
nms.run();
const activeStreams = new Set<string>();
nms.on('prePublish', (...args: any[]) => {
const session = args[0];
let streamPath = (typeof session === 'string') ? args[1] : (session?.streamPath || session?.StreamPath || session?.publishStreamPath);
if (args.length > 1 && typeof args[1] === 'string') {
streamPath = args[1];
}
console.log('[SYSTEM] RTMP stream prePublish payload:', streamPath);
if (streamPath) activeStreams.add(streamPath.toLowerCase());
});
nms.on('postPublish', (...args: any[]) => {
const session = args[0];
let streamPath = (typeof session === 'string') ? args[1] : (session?.streamPath || session?.StreamPath || session?.publishStreamPath);
if (args.length > 1 && typeof args[1] === 'string') {
streamPath = args[1];
}
console.log('[SYSTEM] RTMP stream started:', streamPath);
if (streamPath) activeStreams.add(streamPath.toLowerCase());
});
nms.on('donePublish', (...args: any[]) => {
const session = args[0];
let streamPath = (typeof session === 'string') ? args[1] : (session?.streamPath || session?.StreamPath || session?.publishStreamPath);
if (args.length > 1 && typeof args[1] === 'string') {
streamPath = args[1];
}
console.log('[SYSTEM] RTMP stream ended:', streamPath);
if (streamPath) activeStreams.delete(streamPath.toLowerCase());
});
// Start the TCP Tunnel proxy
startBoreTunnel();
// Proxy HTTP FLV requests from port 3000 to port 8123 where NMS is listening
// This allows the frontend web player to access the stream using HTTP-FLV over port 3000
app.use('/live', createProxyMiddleware({
target: 'http://127.0.0.1:8123',
changeOrigin: true,
ws: true
}));
// Our own stream checker API
app.get('/api/streams/:app/:key', (req, res) => {
const streamPath = `/${req.params.app}/${req.params.key}`.toLowerCase();
const isActive = activeStreams.has(streamPath);
console.log(`[REQ] GET /api/streams${streamPath} => ${isActive}`);
res.json({ active: isActive });
});
// API Route to get connection info
app.get('/api/config', async (req, res) => {
let rtmpUrl = '';
// Check if bore TCP tunnel successfully issued a public proxy address
if (constTunnelUrl) {
return res.json({ rtmpUrl: constTunnelUrl, isNgrok: true }); // using isNgrok flag to keep UI happy and colored emerald!
}
// Hugging Face Spaces injects SPACE_HOST
if (process.env.SPACE_HOST) {
rtmpUrl = `rtmp://${process.env.SPACE_HOST}:1935/live`;
} else {
const appUrl = process.env.APP_URL || 'localhost';
// Determine the external URL. Note: AI Studio only exposes Port 3000 (HTTP).
if (appUrl.startsWith('https://')) {
const hostname = new URL(appUrl).hostname;
rtmpUrl = `rtmp://${hostname}:1935/live`;
} else {
rtmpUrl = `rtmp://${appUrl.split(':')[0]}:1935/live`;
}
}
res.json({ rtmpUrl });
});
// Vite middleware for development
if (process.env.NODE_ENV !== 'production') {
const vite = await createViteServer({
server: { middlewareMode: true },
appType: 'spa',
});
app.use(vite.middlewares);
} else {
// production mode
// __dirname is the directory where server.js is located (which should be 'dist' thanks to esbuild)
const distPath = __dirname;
console.log('[SYSTEM] Static dist path:', distPath);
// explicitly serve assets directory
app.use('/assets', express.static(path.join(distPath, 'assets'), {
setHeaders: (res, filePath) => {
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
res.setHeader('Cache-Control', 'public, max-age=31536000, immutable');
}
}));
// serve the rest of dist
app.use(express.static(distPath, {
setHeaders: (res, filePath) => {
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
if (filePath.endsWith('.html')) {
res.setHeader('Cache-Control', 'no-cache, no-store, must-revalidate');
res.setHeader('Pragma', 'no-cache');
res.setHeader('Expires', '0');
}
}
}));
// Add health diagnostic endpoint
app.get('/api/health', (req, res) => {
res.json({ status: 'ok', dir: __dirname, time: new Date().toISOString() });
});
app.get('/assets/*', (req, res) => {
res.status(404).send('Not Found');
});
app.get('*', (req, res) => {
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
res.setHeader('Cache-Control', 'no-cache, no-store, must-revalidate');
res.setHeader('Pragma', 'no-cache');
res.setHeader('Expires', '0');
res.sendFile(path.join(distPath, 'index.html'));
});
}
const server = app.listen(PORT, '0.0.0.0', () => {
console.log(`Express server running on http://localhost:${PORT}`);
});
// Graceful shutdown to release ports
process.on('SIGTERM', () => {
console.log('SIGTERM received, shutting down...');
nms.stop();
server.close();
process.exit(0);
});
}
startServer();