Code listing for mqtt.lua
The code listing below, mqtt.lua, gives the code to implement the MQTT protocol on Citrix ADC using protocol extensions. The code only has the TCP client data callback function defined - client.on_data(). For server data, it does not add a callback function and the server to client takes the fast native path. For client data, the code parses the CONNECT MQTT protocol message and extracts the ClientID. It then uses the ClientID for user_token value, which is used to load balance all the client traffic for the connection based on the ClientID by setting LB method for the LB vserver as USER_TOKEN. It uses the ClientID also for user_session value, which can be used for LB persistence by setting persistence type for the LB vserver as USERSESSION. The code uses the ns.send() to do LB and send the initial data. It uses the ns.pipe() API to send the rest of the client traffic directly to server connection, bypassing calls to extension callback handler.
--[[
MQTT event handler for TCP client data
ctxt - TCP client side App processing context.
data - TCP Data stream received.
- parse the client ID from the connect message - the first message should be connect
- send the data to LB with ClientID as user token and session
- pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will
bypass the tcp client on_data handler
- if a parse error is seen, throw an error so the connection is reset
--]]
function client.on_data(ctxt, payload)
local data = payload.data
local data_len = data:len()
local offset = 1
local byte = nil
local utf8_str_len = 0
local msg_type = 0
local multiplier = 1
local max_multiplier = 128 * 128 * 128
local rem_length = 0
local clientID = nil
-- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)
if (data_len < 2) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
-- check for connect packet - type value 1
msg_type = bit32.rshift(byte, 4)
if (msg_type ~= 1) then
error("Missing MQTT Connect packet.")
end
-- parse the remaining length
repeat
if (multiplier > max_multiplier) then
error("MQTT CONNECT packet parse error - invalid Remaining Length.")
end
if (data_len < offset) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)
multiplier = multiplier * 128
until (bit32.band(byte, 0x80) == 0)
-- protocol name
-- check if protocol name length is present
if (data_len < offset + 1) then
goto need_more_data
end
-- protocol name length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
-- skip the variable header for connect message
-- the four required fields (protocol name, protocol level, connect flags, keep alive)
offset = offset + utf8_str_len + 4
-- parse the client ID
--
-- check if client ID len is present
if (data_len < offset + 1) then
goto need_more_data
end
-- client ID length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
if (data_len < (offset + utf8_str_len - 1)) then
goto need_more_data
end
clientID = data:sub(offset, offset + utf8_str_len - 1)
-- send the data so far to lb, user_token is set to do LB based on clientID
-- user_session is set to clientID as well (it will be used to persist session)
ns.send(ctxt.output, "DATA", {data = data,
user_token = clientID,
user_session = clientID})
-- pipe the subsequent traffic to the lb - to bypass the extension handler
ns.pipe(ctxt.input, ctxt.output)
goto parse_done
::need_more_data::
ctxt:hold(data)
::parse_done::
return
end
<!--NeedCopy-->