mqtt.lua 的代码列表

下面的代码列表 mqtt.lua 提供了使用协议扩展在 Citrix ADC 上实现 MQTT 协议的代码。代码只定义了 TCP 客户端数据回调函数-client .on_data ()。对于服务器数据,它不添加回调函数,服务器到客户端采用快速本机路径。对于客户端数据,该代码解析 CONNECT MQTT 协议消息并提取 ClientiID。然后,它使用用户_token 的 ClientiID 值,该值用于通过将 LB虚拟服务器的 LB 方法设置为 USER_TOKEN 来平衡基于 ClientiID 的连接的所有客户端流量。它也将 ClientiID 用于 user_session 值,通过将 LB虚拟服务器的持久性类型设置为 USERSISE,可用于 LB 持久性。该代码使用 ns.send()执行 LB 并发送初始数据。它使用 ns.pipe () API 将其余客户端流量直接发送到服务器连接,绕过对扩展回调处理程序的调用。

--[[

  MQTT event handler for TCP client data

    ctxt - TCP client side App processing context.

    data - TCP Data stream received.

    - parse the client ID from the connect message - the first message should be connect

    - send the data to LB with ClientID as user token and session

    - pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will

      bypass the tcp client on_data handler

    - if a parse error is seen, throw an error so the connection is reset

--]]

function client.on_data(ctxt, payload)

    local data = payload.data

    local data_len = data:len()

    local offset = 1

    local byte = nil

    local utf8_str_len = 0

    local msg_type = 0

    local multiplier = 1

    local max_multiplier = 128 * 128 * 128

    local rem_length = 0

    local clientID = nil

    -- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)

    if (data_len < 2) then

        goto need_more_data

    end

    byte = data:byte(offset)

    offset = offset + 1

       -- check for connect packet - type value 1

    msg_type = bit32.rshift(byte, 4)

    if (msg_type ~= 1) then

        error("Missing MQTT Connect packet.")

    end

    -- parse the remaining length

    repeat

        if (multiplier > max_multiplier) then

           error("MQTT CONNECT packet parse error - invalid Remaining Length.")

       end

       if (data_len < offset) then

          goto need_more_data

       end

       byte = data:byte(offset)

       offset = offset + 1

       rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)

       multiplier = multiplier * 128

    until (bit32.band(byte, 0x80) == 0)

    -- protocol name

    -- check if protocol name length is present

    if (data_len < offset + 1) then

       goto need_more_data

    end

    -- protocol name length MSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = byte * 256

    -- length LSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = utf8_str_len + byte

       -- skip the variable header for connect message

    -- the four required fields (protocol name, protocol level, connect flags, keep alive)

    offset = offset + utf8_str_len + 4

    -- parse the client ID

    --

    -- check if client ID len is present

    if (data_len < offset + 1) then

       goto need_more_data

    end

       -- client ID length MSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = byte * 256

    -- length LSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = utf8_str_len + byte

    if (data_len < (offset + utf8_str_len - 1)) then

        goto need_more_data

    end

    clientID = data:sub(offset, offset + utf8_str_len - 1)

    -- send the data so far to lb, user_token is set to do LB based on clientID

    -- user_session is set to clientID as well (it will be used to persist session)

    ns.send(ctxt.output, "DATA", {data = data,

                               user_token = clientID,

                               user_session = clientID})

       -- pipe the subsequent traffic to the lb - to bypass the extension handler

    ns.pipe(ctxt.input, ctxt.output)

    goto parse_done

    ::need_more_data::

    ctxt:hold(data)

    ::parse_done::

    return

end
<!--NeedCopy-->
mqtt.lua 的代码列表