aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/node_modules/undici/lib/websocket/receiver.js
blob: bdd2031b418af13284d34150159a16235a4650fb (plain) (tree)























































































































































































































































































































































                                                                                                                                                           
'use strict'

const { Writable } = require('stream')
const diagnosticsChannel = require('diagnostics_channel')
const { parserStates, opcodes, states, emptyBuffer } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
const { WebsocketFrameSend } = require('./frame')

// This code was influenced by ws released under the MIT license.
// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
// Copyright (c) 2013 Arnout Kazemier and contributors
// Copyright (c) 2016 Luigi Pinca and contributors

const channels = {}
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')

class ByteParser extends Writable {
  #buffers = []
  #byteOffset = 0

  #state = parserStates.INFO

  #info = {}
  #fragments = []

  constructor (ws) {
    super()

    this.ws = ws
  }

  /**
   * @param {Buffer} chunk
   * @param {() => void} callback
   */
  _write (chunk, _, callback) {
    this.#buffers.push(chunk)
    this.#byteOffset += chunk.length

    this.run(callback)
  }

  /**
   * Runs whenever a new chunk is received.
   * Callback is called whenever there are no more chunks buffering,
   * or not enough bytes are buffered to parse.
   */
  run (callback) {
    while (true) {
      if (this.#state === parserStates.INFO) {
        // If there aren't enough bytes to parse the payload length, etc.
        if (this.#byteOffset < 2) {
          return callback()
        }

        const buffer = this.consume(2)

        this.#info.fin = (buffer[0] & 0x80) !== 0
        this.#info.opcode = buffer[0] & 0x0F

        // If we receive a fragmented message, we use the type of the first
        // frame to parse the full message as binary/text, when it's terminated
        this.#info.originalOpcode ??= this.#info.opcode

        this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION

        if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
          // Only text and binary frames can be fragmented
          failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
          return
        }

        const payloadLength = buffer[1] & 0x7F

        if (payloadLength <= 125) {
          this.#info.payloadLength = payloadLength
          this.#state = parserStates.READ_DATA
        } else if (payloadLength === 126) {
          this.#state = parserStates.PAYLOADLENGTH_16
        } else if (payloadLength === 127) {
          this.#state = parserStates.PAYLOADLENGTH_64
        }

        if (this.#info.fragmented && payloadLength > 125) {
          // A fragmented frame can't be fragmented itself
          failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
          return
        } else if (
          (this.#info.opcode === opcodes.PING ||
            this.#info.opcode === opcodes.PONG ||
            this.#info.opcode === opcodes.CLOSE) &&
          payloadLength > 125
        ) {
          // Control frames can have a payload length of 125 bytes MAX
          failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
          return
        } else if (this.#info.opcode === opcodes.CLOSE) {
          if (payloadLength === 1) {
            failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
            return
          }

          const body = this.consume(payloadLength)

          this.#info.closeInfo = this.parseCloseBody(false, body)

          if (!this.ws[kSentClose]) {
            // If an endpoint receives a Close frame and did not previously send a
            // Close frame, the endpoint MUST send a Close frame in response.  (When
            // sending a Close frame in response, the endpoint typically echos the
            // status code it received.)
            const body = Buffer.allocUnsafe(2)
            body.writeUInt16BE(this.#info.closeInfo.code, 0)
            const closeFrame = new WebsocketFrameSend(body)

            this.ws[kResponse].socket.write(
              closeFrame.createFrame(opcodes.CLOSE),
              (err) => {
                if (!err) {
                  this.ws[kSentClose] = true
                }
              }
            )
          }

          // Upon either sending or receiving a Close control frame, it is said
          // that _The WebSocket Closing Handshake is Started_ and that the
          // WebSocket connection is in the CLOSING state.
          this.ws[kReadyState] = states.CLOSING
          this.ws[kReceivedClose] = true

          this.end()

          return
        } else if (this.#info.opcode === opcodes.PING) {
          // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
          // response, unless it already received a Close frame.
          // A Pong frame sent in response to a Ping frame must have identical
          // "Application data"

          const body = this.consume(payloadLength)

          if (!this.ws[kReceivedClose]) {
            const frame = new WebsocketFrameSend(body)

            this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))

            if (channels.ping.hasSubscribers) {
              channels.ping.publish({
                payload: body
              })
            }
          }

          this.#state = parserStates.INFO

          if (this.#byteOffset > 0) {
            continue
          } else {
            callback()
            return
          }
        } else if (this.#info.opcode === opcodes.PONG) {
          // A Pong frame MAY be sent unsolicited.  This serves as a
          // unidirectional heartbeat.  A response to an unsolicited Pong frame is
          // not expected.

          const body = this.consume(payloadLength)

          if (channels.pong.hasSubscribers) {
            channels.pong.publish({
              payload: body
            })
          }

          if (this.#byteOffset > 0) {
            continue
          } else {
            callback()
            return
          }
        }
      } else if (this.#state === parserStates.PAYLOADLENGTH_16) {
        if (this.#byteOffset < 2) {
          return callback()
        }

        const buffer = this.consume(2)

        this.#info.payloadLength = buffer.readUInt16BE(0)
        this.#state = parserStates.READ_DATA
      } else if (this.#state === parserStates.PAYLOADLENGTH_64) {
        if (this.#byteOffset < 8) {
          return callback()
        }

        const buffer = this.consume(8)
        const upper = buffer.readUInt32BE(0)

        // 2^31 is the maxinimum bytes an arraybuffer can contain
        // on 32-bit systems. Although, on 64-bit systems, this is
        // 2^53-1 bytes.
        // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
        // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
        // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
        if (upper > 2 ** 31 - 1) {
          failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.')
          return
        }

        const lower = buffer.readUInt32BE(4)

        this.#info.payloadLength = (upper << 8) + lower
        this.#state = parserStates.READ_DATA
      } else if (this.#state === parserStates.READ_DATA) {
        if (this.#byteOffset < this.#info.payloadLength) {
          // If there is still more data in this chunk that needs to be read
          return callback()
        } else if (this.#byteOffset >= this.#info.payloadLength) {
          // If the server sent multiple frames in a single chunk

          const body = this.consume(this.#info.payloadLength)

          this.#fragments.push(body)

          // If the frame is unfragmented, or a fragmented frame was terminated,
          // a message was received
          if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
            const fullMessage = Buffer.concat(this.#fragments)

            websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)

            this.#info = {}
            this.#fragments.length = 0
          }

          this.#state = parserStates.INFO
        }
      }

      if (this.#byteOffset > 0) {
        continue
      } else {
        callback()
        break
      }
    }
  }

  /**
   * Take n bytes from the buffered Buffers
   * @param {number} n
   * @returns {Buffer|null}
   */
  consume (n) {
    if (n > this.#byteOffset) {
      return null
    } else if (n === 0) {
      return emptyBuffer
    }

    if (this.#buffers[0].length === n) {
      this.#byteOffset -= this.#buffers[0].length
      return this.#buffers.shift()
    }

    const buffer = Buffer.allocUnsafe(n)
    let offset = 0

    while (offset !== n) {
      const next = this.#buffers[0]
      const { length } = next

      if (length + offset === n) {
        buffer.set(this.#buffers.shift(), offset)
        break
      } else if (length + offset > n) {
        buffer.set(next.subarray(0, n - offset), offset)
        this.#buffers[0] = next.subarray(n - offset)
        break
      } else {
        buffer.set(this.#buffers.shift(), offset)
        offset += next.length
      }
    }

    this.#byteOffset -= n

    return buffer
  }

  parseCloseBody (onlyCode, data) {
    // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
    /** @type {number|undefined} */
    let code

    if (data.length >= 2) {
      // _The WebSocket Connection Close Code_ is
      // defined as the status code (Section 7.4) contained in the first Close
      // control frame received by the application
      code = data.readUInt16BE(0)
    }

    if (onlyCode) {
      if (!isValidStatusCode(code)) {
        return null
      }

      return { code }
    }

    // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
    /** @type {Buffer} */
    let reason = data.subarray(2)

    // Remove BOM
    if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) {
      reason = reason.subarray(3)
    }

    if (code !== undefined && !isValidStatusCode(code)) {
      return null
    }

    try {
      // TODO: optimize this
      reason = new TextDecoder('utf-8', { fatal: true }).decode(reason)
    } catch {
      return null
    }

    return { code, reason }
  }

  get closingInfo () {
    return this.#info.closeInfo
  }
}

module.exports = {
  ByteParser
}