Serverio išsiųsti įvykiai 101: vadovas

Serverio išsiųsti įvykiai (SSE), kaip rodo pavadinimas, yra būdas susisiekti su klientu palaikant nuolatinį ryšį, kai serveris siunčia tekstinius pranešimus klientui, kai tik jie yra. Jie yra panašūs į žiniatinklio lizdus, tačiau, skirtingai nei interneto lizdai, ryšys yra vienakryptis, ty tik serveris gali siųsti pranešimus, o klientas tik klauso.
Kitas pagrindinis skirtumas tarp SSE ir žiniatinklio lizdų yra tas, kad žiniatinklio lizdai naudoja savo ws://
websocket protokolą, o SSE naudoja HTTP protokolą. Be to, SSE gali perduoti duomenis tik į text/event-stream
formatu.
Įvykių siuntimas iš serverio
Pagrindiniame nodejs (express) serveryje galite apibrėžti galutinį tašką, leidžiantį prenumeruoti iš klientų ir saugoti juos unikaliame rinkinyje.
const clients = new Set();
const addSubscription = (client) => {
clients.add(client);
console.log(`Client ${client} connected`);
}
const removeSubscription = (client) => {
clients.delete(client);
console.log(`Client ${client} disconnected`);
}
app.get("/subscribe", (req, res) => {
const client = new URLSearchParams(req.query).get("id") || crypto.randomUUID();
addSubscription(client);
// ...
req.on('close', () => {
removeSubscription(client);
})
})
Kai prenumerata bus pridėta ir išsaugota rinkinyje, turite nustatyti šias atsakymų antraštes su būsenos kodu 200
kad klientas žinotų, kad tai yra teksto / įvykio srautas, nuolatinis ryšys.
app.get("/subscribe", (req, res) => {
const client = new URLSearchParams(req.query).get("id") || crypto.randomUUID();
addSubscription(client);
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Connection": "keep-alive",
"Cache-Control": "no-cache"
});
req.on('close', () => {
removeSubscription(client);
})
})
Dabar, kai ryšys nustatytas, galite siųsti pranešimus klientui EventStream formatu. Štai viskas, dabar galite klausytis šių įvykių srautų naudodami EventSource API, apie kurią daugiau pakalbėsiu vėliau šiame įraše.
app.get("/subscribe", (req, res) => {
const client = new URLSearchParams(req.query).get("id") || crypto.randomUUID();
addSubscription(client);
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Connection": "keep-alive",
"Cache-Control": "no-cache"
});
res.write(`data: ${message}\n\n`);
req.on('close', () => {
removeSubscription(client);
})
})
Taip pat galite reguliariais intervalais siųsti ping klientui naudodami setInterval
.
setInterval(() => res.write(`data: ping\n\n`), 5000);
Visa tai gerai, bet ką daryti, jei norite siųsti pranešimus, kai kas nors nutinka serveryje arba duomenų bazėje? Norėdami tai padaryti, turite naudoti įvykių skleidėjus nodejs, kad suaktyvintumėte konkretų įvykį ir užfiksuotumėte tą įvykį mūsų užklausų tvarkyklėje, kad išsiųstumėte pranešimą klientui.
Įvykių skleidėjai
Įvykių teikėjai yra publikavimo / prenumeratos architektūros tipas, kuriame turite prenumeratorių, prenumeruojančių konkrečius „pavadintus“ įvykius, ir skelbėjus (leidėjus), kurie paskelbia / išleidžia įvykį pagal tam tikrą veiksmą.
Štai paprastas įvykių emiterio nodejs pavyzdys:
import { EventEmitter } from 'events';
class UpdateEvents extends EventEmitter {
constructor () {
super();
}
new (data) {
this.emit('new', data);
}
}
const updates = new UpdateEvents();
export default {
updates,
newUpdate: (data) => updates.new(data)
}
The new
metodas UpdateEvents
klasė yra įvykių emiterio metodas, kuris skleidžia pavadintą įvykį new
. Tai ir pakurstė įvykį. Tada sukuriame egzempliorių UpdateEvents
klasę ir eksportuokite ją, kad būtų galima klausytis new
renginys. Galite klausytis įvykio bet kurioje programos kodo vietoje naudodami:
updates.on('new', (data) => {
// do something with the data
})
Tai tikrai naudinga jūsų SSE galutiniam taškui. Pavyzdžiui, jei norite siųsti įvykius iš operacijos / įvykio kitoje programos dalyje ir nebūtinai užklausų tvarkyklėje, galite suaktyvinti įvykį iš skirtingų kodo vietų ir klausytis jo SSE galutiniame taške.
// in some other part of the application
newUpdate({ message: "Hello World" })
// ----------------------------------
// in the SSE endpoint
app.get("/subscribe", (req, res) => {
const client = new URLSearchParams(req.query).get("id") || crypto.randomUUID();
addSubscription(client);
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Connection": "keep-alive",
"Cache-Control": "no-cache"
})
updates.on('new', (data) => {
res.write(`data: ${message}\n\n`);
})
req.on('close', () => {
removeSubscription(client);
})
})
SSE įvykių prenumerata iš klientų
SSE įvykiai fiksuojami naudojant EventSource
žiniatinklio API. Jums tereikia perduoti SSE galutinio taško URL adresą EventSource
API. Negalite perduoti savo tinkintų antraščių į „EventSource“, todėl turite pasikliauti užklausos parametrais, kad pateiktumėte papildomą kontekstą apie klientą.
const url = new URL(SSE_ENDPOINT, YOUR_API_BASE_URL)
const event = new EventSource(`${url.href}?id=${crypto.randomUUID()}`)
Tada klausykite pranešimų, kuriuos siunčia serveris, naudodami onmessage
renginys.
const url = new URL(SSE_ENDPOINT, YOUR_API_BASE_URL)
const event = new EventSource(`${url.href}?id=${crypto.randomUUID()}`)
event.onmessage = (e) => {
console.log(e.data);
}
event.onopen = (e) => {
console.log('connection opened');
}
event.onerror = (e) => {
console.log(e);
}
Kas nutinka, kai nutrūksta ryšys su serveriu? Tokiu atveju naršyklė bando automatiškai prisijungti iš naujo per tam tikrą laiko tarpą, vadinamą retry
intervalas. Numatytasis kartojimo intervalas naršyklėje yra ~3 sekundės. Tačiau galite nurodyti savo kartojimo intervalą, išsiųsdami reikšmę (milisekundėmis) a retry
lauką su serverio išsiųstu pranešimu.
// server
res.write(`data: ${message}\n`);
res.write(`retry: ${retryInterval}\n\n`); // in milliseconds
Pro patarimas – Šiame web.dev straipsnyje sužinokite, kaip tinkamai siųsti pranešimus naudojant EventStream formatą.
Jei nenorite pasikliauti naršyklės teikiamu automatiniu pakartotiniu prisijungimu arba jei jis jums neveikia, galite patys įdiegti pasirinktinį pakartotinio bandymo mechanizmą. Leiskite man parodyti, kaip.
let retryInterval = 6000;
function listenToEvents(retryAfter) {
let isListening = false;
const interval = setInterval(() => {
if (!isListening) {
isListening = true;
const url = new URL(SSE_ENDPOINT, YOUR_API_BASE_URL);
const event = new EventSource(`${url.href}?id=${crypto.randomUUID()}`);
event.onmessage = (e) => {
const payload = JSON.parse(e.data);
// do something with the payload
payload.retry && (retryInterval = payload.retry);
}
event.onerror = (e) => {
clearInterval(interval);
event.close();
listenToEvents(retryInterval);
}
}
}, retryAfter);
}
listenToEvents(1000); // initially, establish the connection in 1 second
Visų pirma, turite nustatyti intervalą, kuris nuolat tikrins, ar ryšys vis dar gyvas, ar ne. Ir intervalas gali būti nustatytas į pasirinktinę vertę arba į pakartotinio bandymo vertę, kurią gaunate iš serverio. Šis intervalas bus suvyniotas į funkciją, pavadintą listenToEvents
kuri priims a retryInterval
parametrą ir inicijuokite vietinį kintamąjį pavadinimu isListening
.
Šis intervalas ir toliau kurs naujus įvykių šaltinio objektus, jei isListening
kintamasis yra false
. Nustatyta false
pagal numatytuosius nustatymus, bet nustatyta true
užmezgant ryšį, todėl pirmajame intervalo rate bus sukurtas tik vienas įvykių šaltinio objektas.
Jei ryšys nutrūksta, onerror
įvykis bus paleistas uždarant įvykį, išvalant esamą intervalą ir iškviečiant funkciją listenToEvents
rekursyviai.
Išvada
Šiame vadove turite sužinoti apie serverio siunčiamus įvykius, įvykių skleidėjus ir EventSource
API. Serverio išsiųsti įvykiai yra beveik panašūs į žiniatinklio lizdus su tam tikrais pagrindiniais skirtumais. Jei norite sužinoti daugiau apie žiniatinklio lizdus, peržiūrėkite WebSockets 101 vadovą.