const render = props.render; // dependencies const Logger = props.deps.Logger; const EventEmitter = props.deps.EventEmitter; const accountId = props.accountId; const wsAddress = props.wsAddress ?? 'ws://0.0.0.0:6376/ws'; const maxRetries = props.maxRetries ?? 5; const reconnectDelay = props.reconnectDelay ?? 1000; const maxDelay = props.maxDelay ?? 30000; const getAuthToken = props.getAuthToken; let logger$ = Logger('meroship', {}, props.debug ?? false).fallback(); const ConnectionStatus = { CONNECTED: 'Connected', CONNECTING: 'Connecting...', RECONNECTING: 'Reconnecting...', DISCONNECTED: 'Disconnected', }; const retryCountRef = useRef(0); const shouldReconnectRef = useRef(true); const pingIntervalRef = useRef(null); const wsTransport = useRef(null); const connectionStatus = useRef(ConnectionStatus.DISCONNECTED); const wsState$ = (ws) => { switch (ws.readyState) { case 0: return 'CONNECTING'; case 1: return 'OPEN'; case 2: return 'CLOSING'; case 3: return 'CLOSED'; default: return 'UNKNOWN'; } }; const wsApiEvents = useRef(EventEmitter()); const wsApiNotifications = useRef(EventEmitter()); const setupWebSocket = () => { const commandEvents = EventEmitter(); const ws = new WebSocket(wsAddress); connectionStatus.current = ConnectionStatus.CONNECTING; let retryTimeout = null; wsTransport.current = { send: (command, args, callback, reschedule, logger) => { let { log: log$, error: error$ } = logger$(logger).span( 'wsTransport.send', { command, args, }, ); if (ws.readyState === 1 /* OPEN */) { let id; for (;;) { id = Math.trunc(Math.random() * (Math.pow(2, 53) - 1)); try { commandEvents.once( id, ([err, result]) => { if (callback) if (err) return callback(err); else callback(null, result); }, true, ); break; } catch { error$('request ID collision, retrying', { id }); } } ws.send(JSON.stringify({ id, command, args })); if (pingIntervalRef.current) clearInterval(pingIntervalRef.current); pingIntervalRef.current = setInterval(ping, 15_000); log$('sent', { id, command, args }); } else { if (typeof reschedule === 'undefined') reschedule = (f) => f(); if (typeof reschedule === 'function') { wsApiEvents.current.once('connected', () => setTimeout(reschedule, null, () => wsTransport.current.send( command, args, callback, reschedule, logger, ), ), ); log$('scheduled', { command, args, wsState: wsState$(ws), connectionStatus: connectionStatus.current, }); } if (ws.readyState !== 0 /* CONNECTING */) { if ( connectionStatus.current === ConnectionStatus.RECONNECTING && retryTimeout ) { log$('expediting reconnection to send', { command, args, wsState: wsState$(ws), connectionStatus: connectionStatus.current, }); clearTimeout(retryTimeout); retryTimeout = null; } else { log$('forcefully reviving WS connection to send', { command, args, wsState: wsState$(ws), connectionStatus: connectionStatus.current, }); wsApiEvents.current.emit('disconnected', { delay: 0, maxRetries, trial: 0, }); } setupWebSocket(); } } }, close: (code, reason, logger) => { logger$(logger).span('wsTransport.close', { code, reason, }); // shouldReconnectRef.current = false; ws.close(code, reason); }, }; function ping(logger) { const pingLogger = logger$(logger).span('ping'); const { log: log$ } = pingLogger; wsTransport.current.send( 'Ping', [], (err) => { if (err) return log$('ping failed', { err }); if (connectionStatus.current !== ConnectionStatus.CONNECTED) { connectionStatus.current = ConnectionStatus.CONNECTED; wsTransport.current.send( 'Init', { account_id: accountId }, (err) => { if (err) return log$('connection initialization failed:', err); wsApiEvents.current.emit('connected'); }, (_f) => { /* do not reschedule if the connection is not open */ }, pingLogger, ); } }, (_f) => { /* do not reschedule if the connection is not open */ }, pingLogger, ); } ws.onopen = (event) => { const onOpenLogger = logger$().span('WebSocket.onopen', { event }); const { error: error$ } = onOpenLogger; wsTransport.current.send( 'Gateway::Headers', { 'x-api-key': { getAuthToken }.getAuthToken?.(), }, (err) => { if ( err && !( err.type === 'ParseError' && typeof err.data === 'string' && // if there is no gateway infront of meroship, this error is expected, ignore it err.data.startsWith('unknown variant `Gateway::Headers`') ) ) return error$('gateway auth header application failed:', err); retryCountRef.current = 0; ping(onOpenLogger); }, ); }; ws.onmessage = (event) => { const { log: log$ } = logger$().span('WebSocket.onmessage', { event }); let message = JSON.parse(event.data); if (message) { if ('id' in message) { if ('result' in message) { log$('received', { id: message.id, result: message.result }); commandEvents.emit(message.id, [null, message.result]); return; } else if ('error' in message) { log$('received', { id: message.id, error: message.error }); commandEvents.emit(message.id, [message.error]); return; } } else if ('event' in message) { log$('received', { event: message.event }); wsApiNotifications.current.emit(message.event.type, message.event.data); return; } } log$('unknown message', event.data); }; let _transport = wsTransport.current; ws.onclose = (event) => { const { log: log$ } = logger$().span('WebSocket.onclose', { event }); // this event was delayed, and a new connection has been established, safe to ignore let expiredTransport = _transport !== wsTransport.current; log$('WebSocket closed', { wsState: wsState$(ws), expiredTransport, connectionStatus: connectionStatus.current, }); if (expiredTransport) return; if (shouldReconnectRef.current && retryCountRef.current < maxRetries) { let delay = Math.min( reconnectDelay * Math.pow(2, retryCountRef.current), maxDelay, ) + Math.random() * 1500; log$('reconnecting', { delay, trial: retryCountRef.current, maxRetries, }); wsApiEvents.current.emit('disconnected', { delay, maxRetries, trial: retryCountRef.current, }); retryCountRef.current++; retryTimeout = setTimeout(() => { retryTimeout = null; setupWebSocket(); }, delay); connectionStatus.current = ConnectionStatus.RECONNECTING; } else { clearInterval(pingIntervalRef.current); log$('disconnected', { delay: 0, trial: retryCountRef.current, maxRetries, }); wsApiEvents.current.emit('disconnected', { delay: 0, maxRetries, trial: retryCountRef.current, }); connectionStatus.current = ConnectionStatus.DISCONNECTED; } }; }; const wsApi = { send: (command, args, callback, reschedule) => { if (wsTransport.current) { wsTransport.current.send(command, args, callback, reschedule); return wsApi; } throw 'WebSocket has not been initialized'; }, connect: () => { if (connectionStatus.current === ConnectionStatus.DISCONNECTED) { retryCountRef.current = 0; setupWebSocket(); } else if (connectionStatus.current === ConnectionStatus.CONNECTED) { // throw "Already connected"; } return wsApi; }, disconnect: () => { if (wsTransport.current) { if (connectionStatus.current === ConnectionStatus.DISCONNECTED) { // throw "Already disconnected.."; } else { shouldReconnectRef.current = false; wsTransport.current.close(); } } else { throw 'WebSocket has not been initialized'; } return wsApi; }, on: (event, listener) => { wsApiEvents.current.on(event, listener); return wsApi; }, once: (event, listener) => { wsApiEvents.current.once(event, listener); return wsApi; }, prependListener: (event, listener) => { wsApiEvents.current.prependListener(event, listener); return wsApi; }, prependOnceListener: (event, listener) => { wsApiEvents.current.prependOnceListener(event, listener); return wsApi; }, off: (event, listener) => { wsApiEvents.current.off(event, listener); return wsApi; }, notifications: { on: (event, listener) => { wsApiNotifications.current.on(event, listener); return wsApi.notifications; }, once: (event, listener) => { wsApiNotifications.current.once(event, listener); return wsApi.notifications; }, prependListener: (event, listener) => { wsApiNotifications.current.prependListener(event, listener); return wsApi.notifications; }, prependOnceListener: (event, listener) => { wsApiNotifications.current.prependOnceListener(event, listener); return wsApi.notifications; }, off: (event, listener) => { wsApiNotifications.current.off(event, listener); return wsApi.notifications; }, }, methods: { subscribe: (args, callback, reschedule) => { return wsApi.send('Subscribe', args, callback, reschedule); }, unsubscribe: (args, callback, reschedule) => { return wsApi.send('Unsubscribe', args, callback, reschedule); }, submitTx: (signedTx, callback, reschedule) => { return wsApi.send('SubmitTransaction', [signedTx], callback, reschedule); }, getAccountsStatus: (accounts, callback, reschedule) => { return wsApi.send('Status', { accounts }, callback, reschedule); }, }, }; if (render) { return render({ wsApi, EventEmitter }); }