Code-Auflistung für mqtt.lua
Die nachstehende Code-Auflistung, mqtt.lua, enthält den Code zum Implementieren des MQTT-Protokolls auf Citrix ADC unter Verwendung von Protokollerweiterungen. Der Code hat nur die TCP-Clientdaten-Callback-Funktion definiert - client.on_data (). Für Serverdaten wird keine Callback-Funktion hinzugefügt, und der Server zum Client nimmt den schnellen nativen Pfad. Bei Clientdaten analysiert der Code die CONNECT MQTT Protokollmeldung und extrahiert die ClientID. Es verwendet dann den ClientID für user_token Wert, der verwendet wird, um den gesamten Clientdatenverkehr für die Verbindung basierend auf der ClientID zu laden, indem die LB-Methode für den LB-vserver als USER_TOKEN festgelegt wird. Es verwendet die ClientID auch für user_session Wert, der für die LB-Persistenz verwendet werden kann, indem Persistenztyp für den LB-vserver als USERSESSION festgelegt wird. Der Code verwendet die ns.send (), um LB zu tun und die Anfangsdaten zu senden. Es verwendet die ns.pipe () API, um den Rest des Clientdatenverkehrs direkt an die Serververbindung zu senden, wobei Aufrufe an den Extension-Callback-Handler umgangen werden.
--[[
MQTT event handler for TCP client data
ctxt - TCP client side App processing context.
data - TCP Data stream received.
- parse the client ID from the connect message - the first message should be connect
- send the data to LB with ClientID as user token and session
- pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will
bypass the tcp client on_data handler
- if a parse error is seen, throw an error so the connection is reset
--]]
function client.on_data(ctxt, payload)
local data = payload.data
local data_len = data:len()
local offset = 1
local byte = nil
local utf8_str_len = 0
local msg_type = 0
local multiplier = 1
local max_multiplier = 128 * 128 * 128
local rem_length = 0
local clientID = nil
-- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)
if (data_len < 2) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
-- check for connect packet - type value 1
msg_type = bit32.rshift(byte, 4)
if (msg_type ~= 1) then
error("Missing MQTT Connect packet.")
end
-- parse the remaining length
repeat
if (multiplier > max_multiplier) then
error("MQTT CONNECT packet parse error - invalid Remaining Length.")
end
if (data_len < offset) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)
multiplier = multiplier * 128
until (bit32.band(byte, 0x80) == 0)
-- protocol name
-- check if protocol name length is present
if (data_len < offset + 1) then
goto need_more_data
end
-- protocol name length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
-- skip the variable header for connect message
-- the four required fields (protocol name, protocol level, connect flags, keep alive)
offset = offset + utf8_str_len + 4
-- parse the client ID
--
-- check if client ID len is present
if (data_len < offset + 1) then
goto need_more_data
end
-- client ID length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
if (data_len < (offset + utf8_str_len - 1)) then
goto need_more_data
end
clientID = data:sub(offset, offset + utf8_str_len - 1)
-- send the data so far to lb, user_token is set to do LB based on clientID
-- user_session is set to clientID as well (it will be used to persist session)
ns.send(ctxt.output, "DATA", {data = data,
user_token = clientID,
user_session = clientID})
-- pipe the subsequent traffic to the lb - to bypass the extension handler
ns.pipe(ctxt.input, ctxt.output)
goto parse_done
::need_more_data::
ctxt:hold(data)
::parse_done::
return
end
<!--NeedCopy-->