本节我们将学习MQTT的Retain消息和LWT(Last Will and Testament)。
让我们来考虑一下这个场景。你有一个温度传感器,它每3个小时向一个主题发布当前温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候才能收到温度消息?
没错,和你想的一样,它必须等到3个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。
那该怎么解决这个问题呢?
这时候就轮到Retained消息出场解决这个问题了。Retained消息是指在PUBLISH数据包中将Retain标识设为1的消息,Broker收到这样的PUBLISH数据包以后,将为该主题保存这个消息,当一个新的订阅者订阅该主题时,Broker会马上将这个消息发送给订阅者。
Retain消息有如下特点:
·一个Topic只能有一条Retained消息,发布新的Retained消息将覆盖旧的Retained消息;
·如果订阅者使用通配符订阅主题,那他会收到所有匹配主题的Retained消息;
·只有新的订阅者才会收到Retained消息,如果订阅者重复订阅一个主题,那么在每次订阅的时候都会被当作新的订阅者,然后收到Retained消息。
当Retained消息发送到订阅者时,PUBLISH数据包中Retain的标识仍然是1。订阅者可以判断这个消息是否是Retained消息,以做出相应的处理。
Retained消息和持久会话没有任何关系。Retained消息是Broker为每一个主题单独存储的,而持久会话是Broker为每一个Client单独存储的。
如果你想删除某个主题的Retained消息,只要向这个主题发布一个Payload长度为0的Retained消息就可以了。
那么,开头我们提到的那个场景的解决方案就很简单了,温度传感器每3个小时向相应的主题发布包含当前温度的Retained消息,那么无论新的订阅者什么时候订阅这个主题,他都能收到温度传感器上一次发布的数据。
下面编写一个发布Retained消息的发布端,和一个接收消息的订阅端,订阅端在接收消息的时候将消息的Retain标识和内容打印出来。
发布端的代码publish_retained.js如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_publisher_1", 4. clean: false 5. }) 6. 7. client.on('connect', function (connack) { 8. if(connack.returnCode == 0){ 9. client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 0, retain: 1}, function (err) { 10. if(err == undefined) { 11. console.log("Publish finished") 12. client.end() 13. }else{ 14. console.log("Publish failed") 15. } 16. }) 17. }else{ 18. console.log('Connection failed: ${connack.returnCode}') 19. } 20. })
第9行代码在发布时指定Retain标识为1。
订阅端的代码subscribe_retained.js如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_subscriber_id_chapter_8", 4. clean: false 5. }) 6. 7. client.on('connect', function (connack) { 8. if(connack.returnCode == 0) { 9. if (connack.sessionPresent == false) { 10. console.log("subscribing") 11. client.subscribe("home/2ndfloor/201/temperature", { 12. qos: 0 13. }, function (err, granted) { 14. if (err != undefined) { 15. console.log("subscribe failed") 16. } else { 17. console.log('subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}') 18. } 19. }) 20. } 21. }else { 22. console.log('Connection failed: ${connack.returnCode}') 23. } 24. }) 25. 26. client.on("message", function (_, message, packet) { 27. var jsonPayload = JSON.parse(message.toString()) 28. console.log('retained: ${packet.retain}, temperature: ${jsonPayload. current}') 29. }) 30.
第9行代码判断了CONNACK数据包中的Session Present标识,只有在第一次建立会话的时候才进行订阅,重复多次运行订阅端代码也只会触发一次订阅。
第28行代码是在收到消息的时候打印出消息的Retain标识。
我们首先运行“node publish_retained.js”,再运行“node subscribe_retained.js”,会得到如下输出:
retained: true, temperature: 25
当订阅端第一次订阅该主题的时候,Broker会将为该主题保存的Retained消息转发给订阅端,所以在Publisher发布之后订阅者再订阅主题也能收到Retained消息。
然后我们再运行一次“node publish_retained.js”,在运行subscribe_retained.js的终端会有如下输出:
retained: false, temperature: 25
由于此时订阅端已经订阅了该主题,Broker收到Retained消息以后,只保存该消息,然后按照正常的转发逻辑转发给订阅端,因此对于订阅端来说,这只是一个普通的MQTT协议消息,所以Retain标识为0。
接着点击“Ctrl+C”,关闭subscribe_retained.js,重新运行,此时因为Session已经存在,订阅端不会再重新订阅这个主题,终端不会有任何输出。由此可见,Retained消息只对新订阅的订阅者有效。
LWT全称为Last Will and Testament,也就是我们在连接Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。
顾名思义,当Broker检测到Client非正常地断开连接时,就会向Client的遗愿主题中发布一条消息。遗愿的相关设置是在建立连接时,在CONNECT数据包里面指定的。
·Will Flag:是否使用LWT。
·Will Topic:遗愿主题名,不可使用通配符。
·Will QoS:发布遗愿消息时使用的QoS等级。
·Will Retain:遗愿消息的Retain标识。
·Will Message:遗愿消息内容。
Broker在以下情况下认为Client是非正常断开连接的:
1)Broker检测到底层I/O异常;
2)Client未能在Keepalive的间隔内和Broker之间进行消息交互;
3)Client在关闭底层TCP连接前没有发送DISCONNECT数据包;
4)Broker因为协议错误关闭了和Client的连接,比如Client发送了一个格式错误的MQTT协议数据包。
如果Client通过发布DISCONNECT数据包断开连接,这属于正常断开连接,不会触发LWT的机制。同时,Broker还会丢掉这个Client在连接时指定的LWT参数。
通常,如果我们关心设备,比如传感器的连接状态,则可以使用LWT。在接下来的代码实践中,我们会使用LWT和Retained消息实现对一个Client的连接状态监控。
实现Client连接状态监控的原理很简单:
1)Client在连接时指定Will Topic为“client/status”,遗愿消息为“offline”,Will Retain=1;
2)Client在连接成功后向同一个主题“client/status”发布一个内容为“online”的Retained消息。
那么,订阅者在任何时候订阅“client/status”,都会获取Client当前的连接状态。
Client.js的代码如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_publisher_chapter_8", 4. clean: false, 5. will:{ 6. topic : 'client/status', 7. qos: 1, 8. retain: true, 9. payload: JSON.stringify({status: 'offline'}) 10. } 11. }) 12. 13. client.on('connect', function (connack) { 14. if(connack.returnCode == 0){ 15. client.publish("client/status", JSON.stringify({status: 'online'}), {qos: 1, retain: 1}) 16. }else{ 17. console.log('Connection failed: ${connack.returnCode}') 18. } 19. })
代码的第5~9行对Client的LWT进行了设置。
在第15行,Client在连接到Broker之后会向指定的主题发布一条消息。
用于监控Client连接状态的monitor.js代码如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_subscriber_id_chapter_8_2", 4. clean: false 5. }) 6. 7. client.on('connect', function () { 8. client.subscribe("client/status", {qos: 1}) 9. }) 10. 11. client.on("message", function (_, message) { 12. var jsonPayload = JSON.parse(message.toString()) 13. console.log('client is ${jsonPayload.status}') 14. })
首先运行“node client.js”,然后运行“node monitor.js”,我们会得到以下输出:
client is online
在运行client.js的终端上,点击“Ctrl+C”终止client.js,之后在运行monitor.js的终端上会得到以下输出:
client is offline
重新运行“node client.js”,在运行monitor.js的终端上会得到以下输出:
client is online
点击“Ctrl+C”终止monitor.js,然后重新运行“node monitor.js”,会得到以下输出:
client is online
这样,我们就完美地监控了Client的连接状态。
本节我们学习了Retained消息和LWT,并利用这两个特性完成了对Client连接状态进行监控。接下来,我们将学习Keepalive和在移动端的连接保活。