Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)

This commit is contained in:
Emelia Smith 2023-07-28 12:06:29 +02:00 committed by Claire
parent fea5640374
commit 8018d478ab
1 changed files with 17 additions and 9 deletions

View File

@ -228,9 +228,15 @@ const startWorker = async (workerId) => {
callbacks.forEach(callback => callback(json));
};
/**
* @callback SubscriptionListener
* @param {ReturnType<parseJSON>} json of the message
* @returns void
*/
/**
* @param {string} channel
* @param {function(string): void} callback
* @param {SubscriptionListener} callback
*/
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`);
@ -247,7 +253,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} channel
* @param {function(Object<string, any>): void} callback
* @param {SubscriptionListener} callback
*/
const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`);
@ -625,9 +631,9 @@ const startWorker = async (workerId) => {
* @param {string[]} ids
* @param {any} req
* @param {function(string, string): void} output
* @param {function(string[], function(string): void): void} attachCloseHandler
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
* @param {boolean=} needsFiltering
* @returns {function(object): void}
* @returns {SubscriptionListener}
*/
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress;
@ -645,6 +651,7 @@ const startWorker = async (workerId) => {
// The listener used to process each message off the redis subscription,
// message here is an object with an `event` and `payload` property. Some
// events also include a queued_at value, but this is being removed shortly.
/** @type {SubscriptionListener} */
const listener = message => {
const { event, payload } = message;
@ -837,7 +844,7 @@ const startWorker = async (workerId) => {
subscribe(`${redisPrefix}${id}`, listener);
});
if (attachCloseHandler) {
if (typeof attachCloseHandler === 'function') {
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
}
@ -874,12 +881,13 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {function(): void} [closeHandler]
* @return {function(string[]): void}
* @returns {function(string[], SubscriptionListener): void}
*/
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
req.on('close', () => {
ids.forEach(id => {
unsubscribe(id);
unsubscribe(id, listener);
});
if (closeHandler) {
@ -1118,7 +1126,7 @@ const startWorker = async (workerId) => {
* @typedef WebSocketSession
* @property {any} socket
* @property {any} request
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/
/**