fastapi-sse / sse-demo.js
wonder
add tampermonkey script demo
b38cf00
// ==UserScript==
// @name SSE EventSource Fetch
// @namespace Violentmonkey Scripts
// @match https://example.org/*
// @grant GM_xmlhttpRequest
// @run-at document-start
// ==/UserScript==
let createNanoEvents = () => ({
emit(event, ...args) {
for (
let callbacks = this.events[event] || [],
i = 0,
length = callbacks.length;
i < length;
i++
) {
callbacks[i](...args)
}
},
events: {},
on(event, cb) {
; (this.events[event] ||= []).push(cb)
return () => {
this.events[event] = this.events[event]?.filter(i => cb !== i)
}
}
});
class Streamer {
constructor() {
this._buffer = "";
this.metrics = { sseChunks: 0, sseBytes: 0 };
this._streamGen = 0;
this._activeRequest = null; // Store the GM_xmlhttpRequest object
this._events = createNanoEvents();
}
async startStreaming(streamUrl) {
const gen = ++this._streamGen;
this._buffer = "";
let lastLength = 0;
return new Promise((resolve, reject) => {
this._activeRequest = GM_xmlhttpRequest({
method: "GET",
url: streamUrl,
headers: {
"Accept": "text/event-stream",
"Cache-Control": "no-cache"
},
// IMPORTANT: We do NOT use responseType: "blob"
// We keep it as text to read it incrementally
onreadystatechange: (res) => {
if (gen !== this._streamGen) return;
// readyState 3 = Partial Data Received
if (res.readyState >= 3 && res.responseText) {
const newChunk = res.responseText.slice(lastLength);
if (newChunk.length > 0) {
lastLength = res.responseText.length;
this.metrics.sseChunks++;
this.metrics.sseBytes += newChunk.length;
this._parseChunk(newChunk);
}
}
},
onload: (res) => {
if (gen === this._streamGen) {
this._emit('debug', 'Stream complete');
resolve();
}
},
onerror: (err) => {
this._emit('error', err);
reject(err);
},
onabort: () => {
this._emit('debug', 'Stream aborted');
resolve();
}
});
});
}
abort() {
if (this._activeRequest) {
this._activeRequest.abort();
this._activeRequest = null;
this._streamGen++;
}
}
_parseChunk(chunk) {
this._buffer += chunk;
const lines = this._buffer.split(/\r?\n/);
this._buffer = lines.pop();
for (const line of lines) {
const trimmedLine = line.trim();
if (!trimmedLine) continue;
if (trimmedLine.startsWith("data: ")) {
const data = trimmedLine.slice(6);
if (data === "[DONE]") return;
this._emit('data', data);
} else if (trimmedLine.startsWith("event: ")) {
const data = trimmedLine.slice(7);
this._emit('event', data);
}
}
}
on(event, cb) { return this._events.on(event, cb); }
_emit(event, ...args) {
this._events.emit(event, ...args);
}
}
(async function () {
let counter = 0
let events = createNanoEvents();
events.on('increase', add => {
counter += add;
console.log("counter:", counter);
});
events.emit('increase', 1)
events.emit("increase", 10);
const url = "https://nxdev-org-fastapi-sse.hf.space/stream";
let s = new Streamer();
s.on("data", (data) => {
console.log(`[streamer] data:`, data);
})
s.on("event", (data) => {
console.log(`[streamer] event:`, data);
});
s.on("debug", (data) => {
console.log(`[streamer] :`, data);
});
s.startStreaming(url);
})()