1
0
Files
ws4kp-to-hls/src/streamHandler.js

413 lines
13 KiB
JavaScript

const puppeteer = require('puppeteer');
const { spawn } = require('child_process');
const fs = require('fs');
const { buildFFmpegArgs } = require('./ffmpegConfig');
const { setupPage, waitForPageFullyLoaded, hideLogo } = require('./pageLoader');
/**
* Main streaming handler - captures webpage and streams as HLS
* @param {Express.Request} req - Express request object
* @param {Express.Response} res - Express response object
* @param {Object} options - Configuration options
* @param {boolean} options.useMusic - Whether to include music
* @param {string} options.musicPath - Path to music directory
* @param {Promise<string>} options.lateGeocodePromise - Optional promise for late URL update
*/
async function streamHandler(req, res, { useMusic = false, musicPath, lateGeocodePromise = null }) {
const { url, width = 1920, height = 1080, fps = 30, hideLogo: hideLogoFlag = 'false' } = req.query;
if (!url) {
return res.status(400).send('URL parameter is required');
}
// Validate URL
try {
new URL(url);
} catch (error) {
return res.status(400).send('Invalid URL');
}
let browser = null;
let ffmpegProcess = null;
let isCleaningUp = false;
let playlistFile = null;
try {
// Set HLS headers
res.setHeader('Content-Type', 'application/vnd.apple.mpegurl');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Build FFmpeg command and launch browser in parallel
const ffmpegConfigPromise = buildFFmpegArgs({
fps: parseInt(fps),
useMusic,
musicPath
});
const browserPromise = puppeteer.launch({
headless: true,
executablePath: process.env.PUPPETEER_EXECUTABLE_PATH || undefined,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-default-apps',
'--disable-sync',
'--disable-translate',
'--disable-background-networking',
'--disable-background-timer-throttling',
'--no-first-run',
'--mute-audio',
'--disable-breakpad',
'--disable-component-update',
`--window-size=${width},${height}`,
`--force-device-scale-factor=1`
],
defaultViewport: { width: parseInt(width), height: parseInt(height), deviceScaleFactor: 1 }
});
// Wait for both to complete in parallel
const [ffmpegConfig, browserInstance] = await Promise.all([ffmpegConfigPromise, browserPromise]);
browser = browserInstance;
playlistFile = ffmpegConfig.playlistFile;
console.log('Starting stream with black frames...');
// Start FFmpeg immediately
ffmpegProcess = spawn('ffmpeg', ['-loglevel', 'error', '-hide_banner', ...ffmpegConfig.args], {
stdio: ['pipe', 'pipe', 'pipe']
});
// Pipe FFmpeg output to response
ffmpegProcess.stdout.pipe(res);
ffmpegProcess.stderr.on('data', (data) => {
const message = data.toString();
// Log important warnings about audio discontinuities or errors
if (message.includes('Non-monotonous DTS') ||
message.includes('Application provided invalid') ||
message.includes('past duration') ||
message.includes('Error while decoding stream') ||
message.includes('Invalid data found')) {
console.warn(`FFmpeg warning: ${message.trim()}`);
}
});
ffmpegProcess.on('error', (error) => {
console.error('FFmpeg error:', error);
cleanup();
});
ffmpegProcess.on('close', (code) => {
if (code && code !== 0 && !isCleaningUp) {
console.error(`FFmpeg exited with code ${code}`);
cleanup();
}
});
ffmpegProcess.stdin.on('error', (error) => {
if (error.code !== 'EPIPE') {
console.error('FFmpeg stdin error:', error);
}
});
// Setup Puppeteer page
const page = await setupPage(browser, { width: parseInt(width), height: parseInt(height) });
// Black frame control
let sendBlackFrames = true;
let waitingForCorrectUrl = !!lateGeocodePromise;
// Load initial page only if we're not waiting for late geocoding
if (!waitingForCorrectUrl) {
waitForPageFullyLoaded(page, url)
.then(async (loaded) => {
if (loaded) {
sendBlackFrames = false;
if (hideLogoFlag === 'true') {
await hideLogo(page);
}
}
})
.catch(err => {
console.error('Page load promise error:', err.message);
sendBlackFrames = false;
});
}
// Handle late geocoding if provided
if (lateGeocodePromise) {
lateGeocodePromise.then(async (updatedUrl) => {
if (!isCleaningUp && page && !page.isClosed() && updatedUrl && updatedUrl !== url) {
try {
console.log('Updating to correct location...');
sendBlackFrames = true;
await waitForPageFullyLoaded(page, updatedUrl);
console.log('Correct location fully loaded, switching to live frames');
waitingForCorrectUrl = false;
sendBlackFrames = false;
if (hideLogoFlag === 'true') {
await hideLogo(page);
}
} catch (err) {
console.error('Location update error:', err.message);
waitingForCorrectUrl = false;
sendBlackFrames = false;
}
} else if (!updatedUrl || updatedUrl === url) {
console.log('Using initial URL, waiting for page load to complete...');
// URL is the same, so load it now
try {
await waitForPageFullyLoaded(page, url);
waitingForCorrectUrl = false;
sendBlackFrames = false;
if (hideLogoFlag === 'true') {
await hideLogo(page);
}
} catch (err) {
console.error('Initial page load error:', err.message);
waitingForCorrectUrl = false;
sendBlackFrames = false;
}
}
}).catch(() => {
console.warn('Geocoding failed, waiting for fallback location to load');
// Load the fallback URL
waitForPageFullyLoaded(page, url)
.then(async () => {
waitingForCorrectUrl = false;
sendBlackFrames = false;
if (hideLogoFlag === 'true') {
await hideLogo(page);
}
})
.catch(() => {
waitingForCorrectUrl = false;
sendBlackFrames = false;
});
});
}
// Periodic page refresh
const pageRefreshInterval = setInterval(async () => {
if (!isCleaningUp && page && !page.isClosed()) {
try {
console.log('Refreshing page for stability...');
await page.reload({ waitUntil: 'domcontentloaded', timeout: 10000 });
if (hideLogoFlag === 'true') {
await hideLogo(page);
}
} catch (err) {
console.error('Page refresh error:', err.message);
}
}
}, 30 * 60 * 1000);
// Frame capture
const frameInterval = 1000 / fps;
let captureLoopActive = true;
let consecutiveErrors = 0;
const MAX_CONSECUTIVE_ERRORS = 5;
const createBlackFrame = () => {
try {
const canvas = require('canvas');
const canvasObj = canvas.createCanvas(parseInt(width), parseInt(height));
const ctx = canvasObj.getContext('2d');
ctx.fillStyle = '#000000';
ctx.fillRect(0, 0, parseInt(width), parseInt(height));
return canvasObj.toBuffer('image/jpeg', { quality: 0.5 });
} catch (err) {
console.error('Error creating black frame:', err);
return Buffer.alloc(0);
}
};
let blackFrameBuffer = null;
const captureLoop = async () => {
while (!isCleaningUp && captureLoopActive && ffmpegProcess && !ffmpegProcess.killed) {
const start = Date.now();
try {
if (page.isClosed()) {
console.error('Page was closed unexpectedly');
break;
}
let screenshot;
if (sendBlackFrames) {
if (!blackFrameBuffer) {
blackFrameBuffer = createBlackFrame();
}
screenshot = blackFrameBuffer;
} else {
screenshot = await page.screenshot({
type: 'jpeg',
quality: 80,
optimizeForSpeed: true,
fromSurface: true
});
}
if (screenshot && screenshot.length > 0 && ffmpegProcess && ffmpegProcess.stdin.writable && !isCleaningUp) {
const canWrite = ffmpegProcess.stdin.write(screenshot);
if (!canWrite) {
await new Promise((resolve) => {
let resolved = false;
const currentProcess = ffmpegProcess;
const onDrain = () => { if (!resolved) { resolved = true; cleanupListeners(); resolve(); } };
const onError = () => { if (!resolved) { resolved = true; cleanupListeners(); resolve(); } };
const timeout = setTimeout(() => { if (!resolved) { resolved = true; cleanupListeners(); resolve(); } }, 800);
function cleanupListeners() {
clearTimeout(timeout);
if (currentProcess && currentProcess.stdin) {
currentProcess.stdin.removeListener('drain', onDrain);
currentProcess.stdin.removeListener('error', onError);
}
}
if (currentProcess && currentProcess.stdin) {
currentProcess.stdin.once('drain', onDrain);
currentProcess.stdin.once('error', onError);
} else {
resolve();
}
});
}
}
consecutiveErrors = 0;
} catch (error) {
if (!isCleaningUp) {
consecutiveErrors++;
console.error(`Capture error (${consecutiveErrors}/${MAX_CONSECUTIVE_ERRORS}):`, error.message || error);
if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
console.error('Too many consecutive errors, stopping stream');
try { await cleanup(); } catch (e) {}
break;
}
await new Promise(r => setTimeout(r, 1000));
} else {
break;
}
}
const elapsed = Date.now() - start;
const wait = Math.max(0, frameInterval - elapsed);
if (wait > 0) await new Promise(r => setTimeout(r, wait));
}
};
captureLoop();
// Cleanup function
const cleanup = async () => {
if (isCleaningUp) return;
isCleaningUp = true;
console.log('Cleaning up stream...');
try { captureLoopActive = false; } catch (e) {}
try { clearInterval(pageRefreshInterval); } catch (e) {}
if (ffmpegProcess && !ffmpegProcess.killed) {
try {
ffmpegProcess.stdin.end();
} catch (err) {}
ffmpegProcess.kill('SIGTERM');
setTimeout(() => {
if (ffmpegProcess && !ffmpegProcess.killed) {
ffmpegProcess.kill('SIGKILL');
}
}, 5000);
ffmpegProcess = null;
}
if (playlistFile) {
try {
fs.unlinkSync(playlistFile);
} catch (err) {}
}
if (browser) {
try {
await browser.close();
} catch (err) {}
browser = null;
}
if (!res.headersSent && !res.writableEnded) {
res.end();
}
};
// Handle client disconnect
let disconnectLogged = false;
req.on('close', () => {
if (!disconnectLogged) {
console.log('Client disconnected');
disconnectLogged = true;
}
cleanup();
});
req.on('error', (error) => {
if (error.code === 'ECONNRESET' || error.code === 'EPIPE') {
if (!disconnectLogged) {
console.log('Client disconnected');
disconnectLogged = true;
}
} else {
console.error('Request error:', error);
}
cleanup();
});
res.on('error', (error) => {
if (error.code === 'ECONNRESET' || error.code === 'EPIPE') {
if (!disconnectLogged) {
console.log('Client disconnected');
disconnectLogged = true;
}
} else {
console.error('Response error:', error);
}
cleanup();
});
// Keepalive monitoring
const keepaliveInterval = setInterval(() => {
if (isCleaningUp || !ffmpegProcess || ffmpegProcess.killed) {
clearInterval(keepaliveInterval);
return;
}
if (res.writableEnded || res.socket?.destroyed) {
console.log('Connection lost, cleaning up');
clearInterval(keepaliveInterval);
cleanup();
}
}, 10000);
} catch (error) {
console.error('Error:', error);
if (ffmpegProcess && !ffmpegProcess.killed) ffmpegProcess.kill();
if (browser) await browser.close();
if (!res.headersSent) {
res.status(500).send('Internal server error');
}
}
}
module.exports = { streamHandler };