mqtt.lua 的代码列表
下面的代码清单 mqtt.lua 提供了使用协议扩展在 NetScaler 上实现 MQTT 协议的代码。该代码仅定义了 TCP 客户端数据回调函数 - client.on_data()。对于服务器数据,它不添加回调函数,服务器到客户端采用快速的本地路径。对于客户端数据,该代码会解析 CONNECT MQTT 协议消息并提取 clientID。然后,它使用 clienTID 作为 user_token 值,该值用于通过将 LB 虚拟服务器的 LB 方法设置为 USER_TOKEN 来对基于 clientID 的连接的所有客户端流量进行负载平衡。它还使用 clientID 作为 user_session 值,通过将 LB 虚拟服务器的持久性类型设置为 USERSESSION,该值可用于负载均衡持久化。该代码使用 ns.send () 执行 LB 并发送初始数据。它使用 ns.pipe () API 将其余客户端流量直接发送到服务器连接,绕过对扩展回调处理程序的调用。
--[[
MQTT event handler for TCP client data
ctxt - TCP client side App processing context.
data - TCP Data stream received.
- parse the client ID from the connect message - the first message should be connect
- send the data to LB with ClientID as user token and session
- pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will
bypass the tcp client on_data handler
- if a parse error is seen, throw an error so the connection is reset
--]]
function client.on_data(ctxt, payload)
local data = payload.data
local data_len = data:len()
local offset = 1
local byte = nil
local utf8_str_len = 0
local msg_type = 0
local multiplier = 1
local max_multiplier = 128 * 128 * 128
local rem_length = 0
local clientID = nil
-- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)
if (data_len < 2) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
-- check for connect packet - type value 1
msg_type = bit32.rshift(byte, 4)
if (msg_type ~= 1) then
error("Missing MQTT Connect packet.")
end
-- parse the remaining length
repeat
if (multiplier > max_multiplier) then
error("MQTT CONNECT packet parse error - invalid Remaining Length.")
end
if (data_len < offset) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)
multiplier = multiplier * 128
until (bit32.band(byte, 0x80) == 0)
-- protocol name
-- check if protocol name length is present
if (data_len < offset + 1) then
goto need_more_data
end
-- protocol name length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
-- skip the variable header for connect message
-- the four required fields (protocol name, protocol level, connect flags, keep alive)
offset = offset + utf8_str_len + 4
-- parse the client ID
--
-- check if client ID len is present
if (data_len < offset + 1) then
goto need_more_data
end
-- client ID length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
if (data_len < (offset + utf8_str_len - 1)) then
goto need_more_data
end
clientID = data:sub(offset, offset + utf8_str_len - 1)
-- send the data so far to lb, user_token is set to do LB based on clientID
-- user_session is set to clientID as well (it will be used to persist session)
ns.send(ctxt.output, "DATA", {data = data,
user_token = clientID,
user_session = clientID})
-- pipe the subsequent traffic to the lb - to bypass the extension handler
ns.pipe(ctxt.input, ctxt.output)
goto parse_done
::need_more_data::
ctxt:hold(data)
::parse_done::
return
end
<!--NeedCopy-->
mqtt.lua 的代码列表
已复制!
失败!