"use strict"; var __rest = (this && this.__rest) || function (s, e) { var t = {}; for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0) t[p] = s[p]; if (s != null && typeof Object.getOwnPropertySymbols === "function") for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) { if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i])) t[p[i]] = s[p[i]]; } return t; }; Object.defineProperty(exports, "__esModule", { value: true }); exports.ClusterAdapterWithHeartbeat = exports.ClusterAdapter = exports.MessageType = void 0; const in_memory_adapter_1 = require("./in-memory-adapter"); const debug_1 = require("debug"); const crypto_1 = require("crypto"); const debug = (0, debug_1.debug)("socket.io-adapter"); const EMITTER_UID = "emitter"; const DEFAULT_TIMEOUT = 5000; function randomId() { return (0, crypto_1.randomBytes)(8).toString("hex"); } var MessageType; (function (MessageType) { MessageType[MessageType["INITIAL_HEARTBEAT"] = 1] = "INITIAL_HEARTBEAT"; MessageType[MessageType["HEARTBEAT"] = 2] = "HEARTBEAT"; MessageType[MessageType["BROADCAST"] = 3] = "BROADCAST"; MessageType[MessageType["SOCKETS_JOIN"] = 4] = "SOCKETS_JOIN"; MessageType[MessageType["SOCKETS_LEAVE"] = 5] = "SOCKETS_LEAVE"; MessageType[MessageType["DISCONNECT_SOCKETS"] = 6] = "DISCONNECT_SOCKETS"; MessageType[MessageType["FETCH_SOCKETS"] = 7] = "FETCH_SOCKETS"; MessageType[MessageType["FETCH_SOCKETS_RESPONSE"] = 8] = "FETCH_SOCKETS_RESPONSE"; MessageType[MessageType["SERVER_SIDE_EMIT"] = 9] = "SERVER_SIDE_EMIT"; MessageType[MessageType["SERVER_SIDE_EMIT_RESPONSE"] = 10] = "SERVER_SIDE_EMIT_RESPONSE"; MessageType[MessageType["BROADCAST_CLIENT_COUNT"] = 11] = "BROADCAST_CLIENT_COUNT"; MessageType[MessageType["BROADCAST_ACK"] = 12] = "BROADCAST_ACK"; MessageType[MessageType["ADAPTER_CLOSE"] = 13] = "ADAPTER_CLOSE"; })(MessageType = exports.MessageType || (exports.MessageType = {})); function encodeOptions(opts) { return { rooms: [...opts.rooms], except: [...opts.except], flags: opts.flags, }; } function decodeOptions(opts) { return { rooms: new Set(opts.rooms), except: new Set(opts.except), flags: opts.flags, }; } /** * A cluster-ready adapter. Any extending class must: * * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse} * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse} */ class ClusterAdapter extends in_memory_adapter_1.Adapter { constructor(nsp) { super(nsp); this.requests = new Map(); this.ackRequests = new Map(); this.uid = randomId(); } /** * Called when receiving a message from another member of the cluster. * * @param message * @param offset * @protected */ onMessage(message, offset) { if (message.uid === this.uid) { return debug("[%s] ignore message from self", this.uid); } debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid); switch (message.type) { case MessageType.BROADCAST: { const withAck = message.data.requestId !== undefined; if (withAck) { super.broadcastWithAck(message.data.packet, decodeOptions(message.data.opts), (clientCount) => { debug("[%s] waiting for %d client acknowledgements", this.uid, clientCount); this.publishResponse(message.uid, { type: MessageType.BROADCAST_CLIENT_COUNT, data: { requestId: message.data.requestId, clientCount, }, }); }, (arg) => { debug("[%s] received acknowledgement with value %j", this.uid, arg); this.publishResponse(message.uid, { type: MessageType.BROADCAST_ACK, data: { requestId: message.data.requestId, packet: arg, }, }); }); } else { const packet = message.data.packet; const opts = decodeOptions(message.data.opts); this.addOffsetIfNecessary(packet, opts, offset); super.broadcast(packet, opts); } break; } case MessageType.SOCKETS_JOIN: super.addSockets(decodeOptions(message.data.opts), message.data.rooms); break; case MessageType.SOCKETS_LEAVE: super.delSockets(decodeOptions(message.data.opts), message.data.rooms); break; case MessageType.DISCONNECT_SOCKETS: super.disconnectSockets(decodeOptions(message.data.opts), message.data.close); break; case MessageType.FETCH_SOCKETS: { debug("[%s] calling fetchSockets with opts %j", this.uid, message.data.opts); super .fetchSockets(decodeOptions(message.data.opts)) .then((localSockets) => { this.publishResponse(message.uid, { type: MessageType.FETCH_SOCKETS_RESPONSE, data: { requestId: message.data.requestId, sockets: localSockets.map((socket) => { // remove sessionStore from handshake, as it may contain circular references const _a = socket.handshake, { sessionStore } = _a, handshake = __rest(_a, ["sessionStore"]); return { id: socket.id, handshake, rooms: [...socket.rooms], data: socket.data, }; }), }, }); }); break; } case MessageType.SERVER_SIDE_EMIT: { const packet = message.data.packet; const withAck = message.data.requestId !== undefined; if (!withAck) { this.nsp._onServerSideEmit(packet); return; } let called = false; const callback = (arg) => { // only one argument is expected if (called) { return; } called = true; debug("[%s] calling acknowledgement with %j", this.uid, arg); this.publishResponse(message.uid, { type: MessageType.SERVER_SIDE_EMIT_RESPONSE, data: { requestId: message.data.requestId, packet: arg, }, }); }; this.nsp._onServerSideEmit([...packet, callback]); break; } // @ts-ignore case MessageType.BROADCAST_CLIENT_COUNT: // @ts-ignore case MessageType.BROADCAST_ACK: // @ts-ignore case MessageType.FETCH_SOCKETS_RESPONSE: // @ts-ignore case MessageType.SERVER_SIDE_EMIT_RESPONSE: // extending classes may not make a distinction between a ClusterMessage and a ClusterResponse payload and may // always call the onMessage() method this.onResponse(message); break; default: debug("[%s] unknown message type: %s", this.uid, message.type); } } /** * Called when receiving a response from another member of the cluster. * * @param response * @protected */ onResponse(response) { var _a, _b; const requestId = response.data.requestId; debug("[%s] received response %s to request %s", this.uid, response.type, requestId); switch (response.type) { case MessageType.BROADCAST_CLIENT_COUNT: { (_a = this.ackRequests .get(requestId)) === null || _a === void 0 ? void 0 : _a.clientCountCallback(response.data.clientCount); break; } case MessageType.BROADCAST_ACK: { (_b = this.ackRequests.get(requestId)) === null || _b === void 0 ? void 0 : _b.ack(response.data.packet); break; } case MessageType.FETCH_SOCKETS_RESPONSE: { const request = this.requests.get(requestId); if (!request) { return; } request.current++; response.data.sockets.forEach((socket) => request.responses.push(socket)); if (request.current === request.expected) { clearTimeout(request.timeout); request.resolve(request.responses); this.requests.delete(requestId); } break; } case MessageType.SERVER_SIDE_EMIT_RESPONSE: { const request = this.requests.get(requestId); if (!request) { return; } request.current++; request.responses.push(response.data.packet); if (request.current === request.expected) { clearTimeout(request.timeout); request.resolve(null, request.responses); this.requests.delete(requestId); } break; } default: // @ts-ignore debug("[%s] unknown response type: %s", this.uid, response.type); } } async broadcast(packet, opts) { var _a; const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local; if (!onlyLocal) { try { const offset = await this.publishAndReturnOffset({ type: MessageType.BROADCAST, data: { packet, opts: encodeOptions(opts), }, }); this.addOffsetIfNecessary(packet, opts, offset); } catch (e) { return debug("[%s] error while broadcasting message: %s", this.uid, e.message); } } super.broadcast(packet, opts); } /** * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it * reconnects after a temporary disconnection. * * @param packet * @param opts * @param offset * @private */ addOffsetIfNecessary(packet, opts, offset) { var _a; if (!this.nsp.server.opts.connectionStateRecovery) { return; } const isEventPacket = packet.type === 2; // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and // restored on another server upon reconnection const withoutAcknowledgement = packet.id === undefined; const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined; if (isEventPacket && withoutAcknowledgement && notVolatile) { packet.data.push(offset); } } broadcastWithAck(packet, opts, clientCountCallback, ack) { var _a; const onlyLocal = (_a = opts === null || opts === void 0 ? void 0 : opts.flags) === null || _a === void 0 ? void 0 : _a.local; if (!onlyLocal) { const requestId = randomId(); this.ackRequests.set(requestId, { clientCountCallback, ack, }); this.publish({ type: MessageType.BROADCAST, data: { packet, requestId, opts: encodeOptions(opts), }, }); // we have no way to know at this level whether the server has received an acknowledgement from each client, so we // will simply clean up the ackRequests map after the given delay setTimeout(() => { this.ackRequests.delete(requestId); }, opts.flags.timeout); } super.broadcastWithAck(packet, opts, clientCountCallback, ack); } async addSockets(opts, rooms) { var _a; const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local; if (!onlyLocal) { try { await this.publishAndReturnOffset({ type: MessageType.SOCKETS_JOIN, data: { opts: encodeOptions(opts), rooms, }, }); } catch (e) { debug("[%s] error while publishing message: %s", this.uid, e.message); } } super.addSockets(opts, rooms); } async delSockets(opts, rooms) { var _a; const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local; if (!onlyLocal) { try { await this.publishAndReturnOffset({ type: MessageType.SOCKETS_LEAVE, data: { opts: encodeOptions(opts), rooms, }, }); } catch (e) { debug("[%s] error while publishing message: %s", this.uid, e.message); } } super.delSockets(opts, rooms); } async disconnectSockets(opts, close) { var _a; const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local; if (!onlyLocal) { try { await this.publishAndReturnOffset({ type: MessageType.DISCONNECT_SOCKETS, data: { opts: encodeOptions(opts), close, }, }); } catch (e) { debug("[%s] error while publishing message: %s", this.uid, e.message); } } super.disconnectSockets(opts, close); } async fetchSockets(opts) { var _a; const [localSockets, serverCount] = await Promise.all([ super.fetchSockets(opts), this.serverCount(), ]); const expectedResponseCount = serverCount - 1; if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) { return localSockets; } const requestId = randomId(); return new Promise((resolve, reject) => { const timeout = setTimeout(() => { const storedRequest = this.requests.get(requestId); if (storedRequest) { reject(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`)); this.requests.delete(requestId); } }, opts.flags.timeout || DEFAULT_TIMEOUT); const storedRequest = { type: MessageType.FETCH_SOCKETS, resolve, timeout, current: 0, expected: expectedResponseCount, responses: localSockets, }; this.requests.set(requestId, storedRequest); this.publish({ type: MessageType.FETCH_SOCKETS, data: { opts: encodeOptions(opts), requestId, }, }); }); } async serverSideEmit(packet) { const withAck = typeof packet[packet.length - 1] === "function"; if (!withAck) { return this.publish({ type: MessageType.SERVER_SIDE_EMIT, data: { packet, }, }); } const ack = packet.pop(); const expectedResponseCount = (await this.serverCount()) - 1; debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount); if (expectedResponseCount <= 0) { return ack(null, []); } const requestId = randomId(); const timeout = setTimeout(() => { const storedRequest = this.requests.get(requestId); if (storedRequest) { ack(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`), storedRequest.responses); this.requests.delete(requestId); } }, DEFAULT_TIMEOUT); const storedRequest = { type: MessageType.SERVER_SIDE_EMIT, resolve: ack, timeout, current: 0, expected: expectedResponseCount, responses: [], }; this.requests.set(requestId, storedRequest); this.publish({ type: MessageType.SERVER_SIDE_EMIT, data: { requestId, packet, }, }); } publish(message) { this.publishAndReturnOffset(message).catch((err) => { debug("[%s] error while publishing message: %s", this.uid, err); }); } publishAndReturnOffset(message) { message.uid = this.uid; message.nsp = this.nsp.name; return this.doPublish(message); } publishResponse(requesterUid, response) { response.uid = this.uid; response.nsp = this.nsp.name; this.doPublishResponse(requesterUid, response).catch((err) => { debug("[%s] error while publishing response: %s", this.uid, err); }); } } exports.ClusterAdapter = ClusterAdapter; class ClusterAdapterWithHeartbeat extends ClusterAdapter { constructor(nsp, opts) { super(nsp); this.nodesMap = new Map(); // uid => timestamp of last message this.customRequests = new Map(); this._opts = Object.assign({ heartbeatInterval: 5000, heartbeatTimeout: 10000, }, opts); this.cleanupTimer = setInterval(() => { const now = Date.now(); this.nodesMap.forEach((lastSeen, uid) => { const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; if (nodeSeemsDown) { debug("[%s] node %s seems down", this.uid, uid); this.removeNode(uid); } }); }, 1000); } init() { this.publish({ type: MessageType.INITIAL_HEARTBEAT, }); } scheduleHeartbeat() { if (this.heartbeatTimer) { this.heartbeatTimer.refresh(); } else { this.heartbeatTimer = setTimeout(() => { this.publish({ type: MessageType.HEARTBEAT, }); }, this._opts.heartbeatInterval); } } close() { this.publish({ type: MessageType.ADAPTER_CLOSE, }); clearTimeout(this.heartbeatTimer); if (this.cleanupTimer) { clearInterval(this.cleanupTimer); } } onMessage(message, offset) { if (message.uid === this.uid) { return debug("[%s] ignore message from self", this.uid); } if (message.uid && message.uid !== EMITTER_UID) { // we track the UID of each sender, in order to know how many servers there are in the cluster this.nodesMap.set(message.uid, Date.now()); } debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid); switch (message.type) { case MessageType.INITIAL_HEARTBEAT: this.publish({ type: MessageType.HEARTBEAT, }); break; case MessageType.HEARTBEAT: // nothing to do break; case MessageType.ADAPTER_CLOSE: this.removeNode(message.uid); break; default: super.onMessage(message, offset); } } serverCount() { return Promise.resolve(1 + this.nodesMap.size); } publish(message) { this.scheduleHeartbeat(); return super.publish(message); } async serverSideEmit(packet) { const withAck = typeof packet[packet.length - 1] === "function"; if (!withAck) { return this.publish({ type: MessageType.SERVER_SIDE_EMIT, data: { packet, }, }); } const ack = packet.pop(); const expectedResponseCount = this.nodesMap.size; debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount); if (expectedResponseCount <= 0) { return ack(null, []); } const requestId = randomId(); const timeout = setTimeout(() => { const storedRequest = this.customRequests.get(requestId); if (storedRequest) { ack(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`), storedRequest.responses); this.customRequests.delete(requestId); } }, DEFAULT_TIMEOUT); const storedRequest = { type: MessageType.SERVER_SIDE_EMIT, resolve: ack, timeout, missingUids: new Set([...this.nodesMap.keys()]), responses: [], }; this.customRequests.set(requestId, storedRequest); this.publish({ type: MessageType.SERVER_SIDE_EMIT, data: { requestId, packet, }, }); } async fetchSockets(opts) { var _a; const [localSockets, serverCount] = await Promise.all([ super.fetchSockets({ rooms: opts.rooms, except: opts.except, flags: { local: true, }, }), this.serverCount(), ]); const expectedResponseCount = serverCount - 1; if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) { return localSockets; } const requestId = randomId(); return new Promise((resolve, reject) => { const timeout = setTimeout(() => { const storedRequest = this.customRequests.get(requestId); if (storedRequest) { reject(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`)); this.customRequests.delete(requestId); } }, opts.flags.timeout || DEFAULT_TIMEOUT); const storedRequest = { type: MessageType.FETCH_SOCKETS, resolve, timeout, missingUids: new Set([...this.nodesMap.keys()]), responses: localSockets, }; this.customRequests.set(requestId, storedRequest); this.publish({ type: MessageType.FETCH_SOCKETS, data: { opts: encodeOptions(opts), requestId, }, }); }); } onResponse(response) { const requestId = response.data.requestId; debug("[%s] received response %s to request %s", this.uid, response.type, requestId); switch (response.type) { case MessageType.FETCH_SOCKETS_RESPONSE: { const request = this.customRequests.get(requestId); if (!request) { return; } response.data.sockets.forEach((socket) => request.responses.push(socket)); request.missingUids.delete(response.uid); if (request.missingUids.size === 0) { clearTimeout(request.timeout); request.resolve(request.responses); this.customRequests.delete(requestId); } break; } case MessageType.SERVER_SIDE_EMIT_RESPONSE: { const request = this.customRequests.get(requestId); if (!request) { return; } request.responses.push(response.data.packet); request.missingUids.delete(response.uid); if (request.missingUids.size === 0) { clearTimeout(request.timeout); request.resolve(null, request.responses); this.customRequests.delete(requestId); } break; } default: super.onResponse(response); } } removeNode(uid) { this.customRequests.forEach((request, requestId) => { request.missingUids.delete(uid); if (request.missingUids.size === 0) { clearTimeout(request.timeout); if (request.type === MessageType.FETCH_SOCKETS) { request.resolve(request.responses); } else if (request.type === MessageType.SERVER_SIDE_EMIT) { request.resolve(null, request.responses); } this.customRequests.delete(requestId); } }); this.nodesMap.delete(uid); } } exports.ClusterAdapterWithHeartbeat = ClusterAdapterWithHeartbeat;