Liste de codes pour mqtt.lua
La liste de codes ci-dessous, mqtt.lua, fournit le code permettant d’implémenter le protocole MQTT sur NetScaler à l’aide d’extensions de protocole. Seule la fonction de rappel des données du client TCP est définie dans le code, à savoir client.on_data (). Pour les données du serveur, il n’ajoute pas de fonction de rappel et le chemin natif rapide entre le serveur et le client emprunte le chemin natif rapide. Pour les données client, le code analyse le message du protocole CONNECT MQTT et extrait le ClientID. Il utilise ensuite la valeur ClientID pour user_token, qui est utilisée pour équilibrer la charge de tout le trafic client pour la connexion en fonction du ClientID en définissant la méthode LB pour le serveur virtuel LB comme USER_TOKEN. Il utilise également le ClientID pour la valeur user_session, qui peut être utilisée pour la persistance LB en définissant le type de persistance pour le serveur LB vserver comme USERSESSION. Le code utilise le ns.send () pour effectuer LB et envoyer les données initiales. Il utilise l’API ns.pipe () pour envoyer le reste du trafic client directement à la connexion au serveur, en contournant les appels au gestionnaire de rappel d’extension.
--[[
MQTT event handler for TCP client data
ctxt - TCP client side App processing context.
data - TCP Data stream received.
- parse the client ID from the connect message - the first message should be connect
- send the data to LB with ClientID as user token and session
- pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will
bypass the tcp client on_data handler
- if a parse error is seen, throw an error so the connection is reset
--]]
function client.on_data(ctxt, payload)
local data = payload.data
local data_len = data:len()
local offset = 1
local byte = nil
local utf8_str_len = 0
local msg_type = 0
local multiplier = 1
local max_multiplier = 128 * 128 * 128
local rem_length = 0
local clientID = nil
-- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)
if (data_len < 2) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
-- check for connect packet - type value 1
msg_type = bit32.rshift(byte, 4)
if (msg_type ~= 1) then
error("Missing MQTT Connect packet.")
end
-- parse the remaining length
repeat
if (multiplier > max_multiplier) then
error("MQTT CONNECT packet parse error - invalid Remaining Length.")
end
if (data_len < offset) then
goto need_more_data
end
byte = data:byte(offset)
offset = offset + 1
rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)
multiplier = multiplier * 128
until (bit32.band(byte, 0x80) == 0)
-- protocol name
-- check if protocol name length is present
if (data_len < offset + 1) then
goto need_more_data
end
-- protocol name length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
-- skip the variable header for connect message
-- the four required fields (protocol name, protocol level, connect flags, keep alive)
offset = offset + utf8_str_len + 4
-- parse the client ID
--
-- check if client ID len is present
if (data_len < offset + 1) then
goto need_more_data
end
-- client ID length MSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = byte * 256
-- length LSB
byte = data:byte(offset)
offset = offset + 1
utf8_str_len = utf8_str_len + byte
if (data_len < (offset + utf8_str_len - 1)) then
goto need_more_data
end
clientID = data:sub(offset, offset + utf8_str_len - 1)
-- send the data so far to lb, user_token is set to do LB based on clientID
-- user_session is set to clientID as well (it will be used to persist session)
ns.send(ctxt.output, "DATA", {data = data,
user_token = clientID,
user_session = clientID})
-- pipe the subsequent traffic to the lb - to bypass the extension handler
ns.pipe(ctxt.input, ctxt.output)
goto parse_done
::need_more_data::
ctxt:hold(data)
::parse_done::
return
end
<!--NeedCopy-->