main
Raw Download raw file
  1/*
  2 * Websock: high-performance buffering wrapper
  3 * Copyright (C) 2019 The noVNC authors
  4 * Licensed under MPL 2.0 (see LICENSE.txt)
  5 *
  6 * Websock is similar to the standard WebSocket / RTCDataChannel object
  7 * but with extra buffer handling.
  8 *
  9 * Websock has built-in receive queue buffering; the message event
 10 * does not contain actual data but is simply a notification that
 11 * there is new data available. Several rQ* methods are available to
 12 * read binary data off of the receive queue.
 13 */
 14
 15import * as Log from './util/logging.js';
 16
 17// this has performance issues in some versions Chromium, and
 18// doesn't gain a tremendous amount of performance increase in Firefox
 19// at the moment.  It may be valuable to turn it on in the future.
 20const MAX_RQ_GROW_SIZE = 40 * 1024 * 1024;  // 40 MiB
 21
 22// Constants pulled from RTCDataChannelState enum
 23// https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/readyState#RTCDataChannelState_enum
 24const DataChannel = {
 25    CONNECTING: "connecting",
 26    OPEN: "open",
 27    CLOSING: "closing",
 28    CLOSED: "closed"
 29};
 30
 31const ReadyStates = {
 32    CONNECTING: [WebSocket.CONNECTING, DataChannel.CONNECTING],
 33    OPEN: [WebSocket.OPEN, DataChannel.OPEN],
 34    CLOSING: [WebSocket.CLOSING, DataChannel.CLOSING],
 35    CLOSED: [WebSocket.CLOSED, DataChannel.CLOSED],
 36};
 37
 38// Properties a raw channel must have, WebSocket and RTCDataChannel are two examples
 39const rawChannelProps = [
 40    "send",
 41    "close",
 42    "binaryType",
 43    "onerror",
 44    "onmessage",
 45    "onopen",
 46    "protocol",
 47    "readyState",
 48];
 49
 50export default class Websock {
 51    constructor() {
 52        this._websocket = null;  // WebSocket or RTCDataChannel object
 53
 54        this._rQi = 0;           // Receive queue index
 55        this._rQlen = 0;         // Next write position in the receive queue
 56        this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB)
 57        // called in init: this._rQ = new Uint8Array(this._rQbufferSize);
 58        this._rQ = null; // Receive queue
 59
 60        this._sQbufferSize = 1024 * 10;  // 10 KiB
 61        // called in init: this._sQ = new Uint8Array(this._sQbufferSize);
 62        this._sQlen = 0;
 63        this._sQ = null;  // Send queue
 64
 65        this._eventHandlers = {
 66            message: () => {},
 67            open: () => {},
 68            close: () => {},
 69            error: () => {}
 70        };
 71    }
 72
 73    // Getters and setters
 74
 75    get readyState() {
 76        let subState;
 77
 78        if (this._websocket === null) {
 79            return "unused";
 80        }
 81
 82        subState = this._websocket.readyState;
 83
 84        if (ReadyStates.CONNECTING.includes(subState)) {
 85            return "connecting";
 86        } else if (ReadyStates.OPEN.includes(subState)) {
 87            return "open";
 88        } else if (ReadyStates.CLOSING.includes(subState)) {
 89            return "closing";
 90        } else if (ReadyStates.CLOSED.includes(subState)) {
 91            return "closed";
 92        }
 93
 94        return "unknown";
 95    }
 96
 97    // Receive queue
 98    rQpeek8() {
 99        return this._rQ[this._rQi];
100    }
101
102    rQskipBytes(bytes) {
103        this._rQi += bytes;
104    }
105
106    rQshift8() {
107        return this._rQshift(1);
108    }
109
110    rQshift16() {
111        return this._rQshift(2);
112    }
113
114    rQshift32() {
115        return this._rQshift(4);
116    }
117
118    // TODO(directxman12): test performance with these vs a DataView
119    _rQshift(bytes) {
120        let res = 0;
121        for (let byte = bytes - 1; byte >= 0; byte--) {
122            res += this._rQ[this._rQi++] << (byte * 8);
123        }
124        return res >>> 0;
125    }
126
127    rQlen() {
128        return this._rQlen - this._rQi;
129    }
130
131    rQshiftStr(len) {
132        let str = "";
133        // Handle large arrays in steps to avoid long strings on the stack
134        for (let i = 0; i < len; i += 4096) {
135            let part = this.rQshiftBytes(Math.min(4096, len - i), false);
136            str += String.fromCharCode.apply(null, part);
137        }
138        return str;
139    }
140
141    rQshiftBytes(len, copy=true) {
142        this._rQi += len;
143        if (copy) {
144            return this._rQ.slice(this._rQi - len, this._rQi);
145        } else {
146            return this._rQ.subarray(this._rQi - len, this._rQi);
147        }
148    }
149
150    rQshiftTo(target, len) {
151        // TODO: make this just use set with views when using a ArrayBuffer to store the rQ
152        target.set(new Uint8Array(this._rQ.buffer, this._rQi, len));
153        this._rQi += len;
154    }
155
156    rQpeekBytes(len, copy=true) {
157        if (copy) {
158            return this._rQ.slice(this._rQi, this._rQi + len);
159        } else {
160            return this._rQ.subarray(this._rQi, this._rQi + len);
161        }
162    }
163
164    // Check to see if we must wait for 'num' bytes (default to FBU.bytes)
165    // to be available in the receive queue. Return true if we need to
166    // wait (and possibly print a debug message), otherwise false.
167    rQwait(msg, num, goback) {
168        if (this._rQlen - this._rQi < num) {
169            if (goback) {
170                if (this._rQi < goback) {
171                    throw new Error("rQwait cannot backup " + goback + " bytes");
172                }
173                this._rQi -= goback;
174            }
175            return true; // true means need more data
176        }
177        return false;
178    }
179
180    // Send queue
181
182    sQpush8(num) {
183        this._sQensureSpace(1);
184        this._sQ[this._sQlen++] = num;
185    }
186
187    sQpush16(num) {
188        this._sQensureSpace(2);
189        this._sQ[this._sQlen++] = (num >> 8) & 0xff;
190        this._sQ[this._sQlen++] = (num >> 0) & 0xff;
191    }
192
193    sQpush32(num) {
194        this._sQensureSpace(4);
195        this._sQ[this._sQlen++] = (num >> 24) & 0xff;
196        this._sQ[this._sQlen++] = (num >> 16) & 0xff;
197        this._sQ[this._sQlen++] = (num >>  8) & 0xff;
198        this._sQ[this._sQlen++] = (num >>  0) & 0xff;
199    }
200
201    sQpushString(str) {
202        let bytes = str.split('').map(chr => chr.charCodeAt(0));
203        this.sQpushBytes(new Uint8Array(bytes));
204    }
205
206    sQpushBytes(bytes) {
207        for (let offset = 0;offset < bytes.length;) {
208            this._sQensureSpace(1);
209
210            let chunkSize = this._sQbufferSize - this._sQlen;
211            if (chunkSize > bytes.length - offset) {
212                chunkSize = bytes.length - offset;
213            }
214
215            this._sQ.set(bytes.subarray(offset, offset + chunkSize), this._sQlen);
216            this._sQlen += chunkSize;
217            offset += chunkSize;
218        }
219    }
220
221    flush() {
222        if (this._sQlen > 0 && this.readyState === 'open') {
223            this._websocket.send(new Uint8Array(this._sQ.buffer, 0, this._sQlen));
224            this._sQlen = 0;
225        }
226    }
227
228    _sQensureSpace(bytes) {
229        if (this._sQbufferSize - this._sQlen < bytes) {
230            this.flush();
231        }
232    }
233
234    // Event handlers
235    off(evt) {
236        this._eventHandlers[evt] = () => {};
237    }
238
239    on(evt, handler) {
240        this._eventHandlers[evt] = handler;
241    }
242
243    _allocateBuffers() {
244        this._rQ = new Uint8Array(this._rQbufferSize);
245        this._sQ = new Uint8Array(this._sQbufferSize);
246    }
247
248    init() {
249        this._allocateBuffers();
250        this._rQi = 0;
251        this._websocket = null;
252    }
253
254    open(uri, protocols) {
255        this.attach(new WebSocket(uri, protocols));
256    }
257
258    attach(rawChannel) {
259        this.init();
260
261        // Must get object and class methods to be compatible with the tests.
262        const channelProps = [...Object.keys(rawChannel), ...Object.getOwnPropertyNames(Object.getPrototypeOf(rawChannel))];
263        for (let i = 0; i < rawChannelProps.length; i++) {
264            const prop = rawChannelProps[i];
265            if (channelProps.indexOf(prop) < 0) {
266                throw new Error('Raw channel missing property: ' + prop);
267            }
268        }
269
270        this._websocket = rawChannel;
271        this._websocket.binaryType = "arraybuffer";
272        this._websocket.onmessage = this._recvMessage.bind(this);
273
274        this._websocket.onopen = () => {
275            Log.Debug('>> WebSock.onopen');
276            if (this._websocket.protocol) {
277                Log.Info("Server choose sub-protocol: " + this._websocket.protocol);
278            }
279
280            this._eventHandlers.open();
281            Log.Debug("<< WebSock.onopen");
282        };
283
284        this._websocket.onclose = (e) => {
285            Log.Debug(">> WebSock.onclose");
286            this._eventHandlers.close(e);
287            Log.Debug("<< WebSock.onclose");
288        };
289
290        this._websocket.onerror = (e) => {
291            Log.Debug(">> WebSock.onerror: " + e);
292            this._eventHandlers.error(e);
293            Log.Debug("<< WebSock.onerror: " + e);
294        };
295    }
296
297    close() {
298        if (this._websocket) {
299            if (this.readyState === 'connecting' ||
300                this.readyState === 'open') {
301                Log.Info("Closing WebSocket connection");
302                this._websocket.close();
303            }
304
305            this._websocket.onmessage = () => {};
306        }
307    }
308
309    // private methods
310
311    // We want to move all the unread data to the start of the queue,
312    // e.g. compacting.
313    // The function also expands the receive que if needed, and for
314    // performance reasons we combine these two actions to avoid
315    // unnecessary copying.
316    _expandCompactRQ(minFit) {
317        // if we're using less than 1/8th of the buffer even with the incoming bytes, compact in place
318        // instead of resizing
319        const requiredBufferSize =  (this._rQlen - this._rQi + minFit) * 8;
320        const resizeNeeded = this._rQbufferSize < requiredBufferSize;
321
322        if (resizeNeeded) {
323            // Make sure we always *at least* double the buffer size, and have at least space for 8x
324            // the current amount of data
325            this._rQbufferSize = Math.max(this._rQbufferSize * 2, requiredBufferSize);
326        }
327
328        // we don't want to grow unboundedly
329        if (this._rQbufferSize > MAX_RQ_GROW_SIZE) {
330            this._rQbufferSize = MAX_RQ_GROW_SIZE;
331            if (this._rQbufferSize - (this._rQlen - this._rQi) < minFit) {
332                throw new Error("Receive queue buffer exceeded " + MAX_RQ_GROW_SIZE + " bytes, and the new message could not fit");
333            }
334        }
335
336        if (resizeNeeded) {
337            const oldRQbuffer = this._rQ.buffer;
338            this._rQ = new Uint8Array(this._rQbufferSize);
339            this._rQ.set(new Uint8Array(oldRQbuffer, this._rQi, this._rQlen - this._rQi));
340        } else {
341            this._rQ.copyWithin(0, this._rQi, this._rQlen);
342        }
343
344        this._rQlen = this._rQlen - this._rQi;
345        this._rQi = 0;
346    }
347
348    // push arraybuffer values onto the end of the receive que
349    _recvMessage(e) {
350        if (this._rQlen == this._rQi) {
351            // All data has now been processed, this means we
352            // can reset the receive queue.
353            this._rQlen = 0;
354            this._rQi = 0;
355        }
356        const u8 = new Uint8Array(e.data);
357        if (u8.length > this._rQbufferSize - this._rQlen) {
358            this._expandCompactRQ(u8.length);
359        }
360        this._rQ.set(u8, this._rQlen);
361        this._rQlen += u8.length;
362
363        if (this._rQlen - this._rQi > 0) {
364            this._eventHandlers.message();
365        } else {
366            Log.Debug("Ignoring empty message");
367        }
368    }
369}