



本节我们将学习MQTT的Retain消息和LWT(Last Will and Testament)。
让我们来考虑一下这个场景。你有一个温度传感器,它每3个小时向一个主题发布当前温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候才能收到温度消息?
没错,和你想的一样,它必须等到3个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。
那该怎么解决这个问题呢?
这时候就轮到Retained消息出场解决这个问题了。Retained消息是指在PUBLISH数据包中将Retain标识设为1的消息,Broker收到这样的PUBLISH数据包以后,将为该主题保存这个消息,当一个新的订阅者订阅该主题时,Broker会马上将这个消息发送给订阅者。
Retain消息有如下特点:
·一个Topic只能有一条Retained消息,发布新的Retained消息将覆盖旧的Retained消息;
·如果订阅者使用通配符订阅主题,那他会收到所有匹配主题的Retained消息;
·只有新的订阅者才会收到Retained消息,如果订阅者重复订阅一个主题,那么在每次订阅的时候都会被当作新的订阅者,然后收到Retained消息。
当Retained消息发送到订阅者时,PUBLISH数据包中Retain的标识仍然是1。订阅者可以判断这个消息是否是Retained消息,以做出相应的处理。
Retained消息和持久会话没有任何关系。Retained消息是Broker为每一个主题单独存储的,而持久会话是Broker为每一个Client单独存储的。
如果你想删除某个主题的Retained消息,只要向这个主题发布一个Payload长度为0的Retained消息就可以了。
那么,开头我们提到的那个场景的解决方案就很简单了,温度传感器每3个小时向相应的主题发布包含当前温度的Retained消息,那么无论新的订阅者什么时候订阅这个主题,他都能收到温度传感器上一次发布的数据。
下面编写一个发布Retained消息的发布端,和一个接收消息的订阅端,订阅端在接收消息的时候将消息的Retain标识和内容打印出来。
发布端的代码publish_retained.js如下所示。
1. var mqtt = require('mqtt')
2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
3. clientId: "mqtt_sample_publisher_1",
4. clean: false
5. })
6.
7. client.on('connect', function (connack) {
8. if(connack.returnCode == 0){
9. client.publish("home/2ndfloor/201/temperature", JSON.stringify({current:
25}), {qos: 0, retain: 1}, function (err) {
10. if(err == undefined) {
11. console.log("Publish finished")
12. client.end()
13. }else{
14. console.log("Publish failed")
15. }
16. })
17. }else{
18. console.log('Connection failed: ${connack.returnCode}')
19. }
20. })
第9行代码在发布时指定Retain标识为1。
订阅端的代码subscribe_retained.js如下所示。
1. var mqtt = require('mqtt')
2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
3. clientId: "mqtt_sample_subscriber_id_chapter_8",
4. clean: false
5. })
6.
7. client.on('connect', function (connack) {
8. if(connack.returnCode == 0) {
9. if (connack.sessionPresent == false) {
10. console.log("subscribing")
11. client.subscribe("home/2ndfloor/201/temperature", {
12. qos: 0
13. }, function (err, granted) {
14. if (err != undefined) {
15. console.log("subscribe failed")
16. } else {
17. console.log('subscribe succeeded with ${granted[0].topic}, qos:
${granted[0].qos}')
18. }
19. })
20. }
21. }else {
22. console.log('Connection failed: ${connack.returnCode}')
23. }
24. })
25.
26. client.on("message", function (_, message, packet) {
27. var jsonPayload = JSON.parse(message.toString())
28. console.log('retained: ${packet.retain}, temperature: ${jsonPayload.
current}')
29. })
30.
第9行代码判断了CONNACK数据包中的Session Present标识,只有在第一次建立会话的时候才进行订阅,重复多次运行订阅端代码也只会触发一次订阅。
第28行代码是在收到消息的时候打印出消息的Retain标识。
我们首先运行“node publish_retained.js”,再运行“node subscribe_retained.js”,会得到如下输出:
retained: true, temperature: 25
当订阅端第一次订阅该主题的时候,Broker会将为该主题保存的Retained消息转发给订阅端,所以在Publisher发布之后订阅者再订阅主题也能收到Retained消息。
然后我们再运行一次“node publish_retained.js”,在运行subscribe_retained.js的终端会有如下输出:
retained: false, temperature: 25
由于此时订阅端已经订阅了该主题,Broker收到Retained消息以后,只保存该消息,然后按照正常的转发逻辑转发给订阅端,因此对于订阅端来说,这只是一个普通的MQTT协议消息,所以Retain标识为0。
接着点击“Ctrl+C”,关闭subscribe_retained.js,重新运行,此时因为Session已经存在,订阅端不会再重新订阅这个主题,终端不会有任何输出。由此可见,Retained消息只对新订阅的订阅者有效。
LWT全称为Last Will and Testament,也就是我们在连接Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。
顾名思义,当Broker检测到Client非正常地断开连接时,就会向Client的遗愿主题中发布一条消息。遗愿的相关设置是在建立连接时,在CONNECT数据包里面指定的。
·Will Flag:是否使用LWT。
·Will Topic:遗愿主题名,不可使用通配符。
·Will QoS:发布遗愿消息时使用的QoS等级。
·Will Retain:遗愿消息的Retain标识。
·Will Message:遗愿消息内容。
Broker在以下情况下认为Client是非正常断开连接的:
1)Broker检测到底层I/O异常;
2)Client未能在Keepalive的间隔内和Broker之间进行消息交互;
3)Client在关闭底层TCP连接前没有发送DISCONNECT数据包;
4)Broker因为协议错误关闭了和Client的连接,比如Client发送了一个格式错误的MQTT协议数据包。
如果Client通过发布DISCONNECT数据包断开连接,这属于正常断开连接,不会触发LWT的机制。同时,Broker还会丢掉这个Client在连接时指定的LWT参数。
通常,如果我们关心设备,比如传感器的连接状态,则可以使用LWT。在接下来的代码实践中,我们会使用LWT和Retained消息实现对一个Client的连接状态监控。
实现Client连接状态监控的原理很简单:
1)Client在连接时指定Will Topic为“client/status”,遗愿消息为“offline”,Will Retain=1;
2)Client在连接成功后向同一个主题“client/status”发布一个内容为“online”的Retained消息。
那么,订阅者在任何时候订阅“client/status”,都会获取Client当前的连接状态。
Client.js的代码如下所示。
1. var mqtt = require('mqtt')
2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
3. clientId: "mqtt_sample_publisher_chapter_8",
4. clean: false,
5. will:{
6. topic : 'client/status',
7. qos: 1,
8. retain: true,
9. payload: JSON.stringify({status: 'offline'})
10. }
11. })
12.
13. client.on('connect', function (connack) {
14. if(connack.returnCode == 0){
15. client.publish("client/status", JSON.stringify({status: 'online'}),
{qos: 1, retain: 1})
16. }else{
17. console.log('Connection failed: ${connack.returnCode}')
18. }
19. })
代码的第5~9行对Client的LWT进行了设置。
在第15行,Client在连接到Broker之后会向指定的主题发布一条消息。
用于监控Client连接状态的monitor.js代码如下所示。
1. var mqtt = require('mqtt')
2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
3. clientId: "mqtt_sample_subscriber_id_chapter_8_2",
4. clean: false
5. })
6.
7. client.on('connect', function () {
8. client.subscribe("client/status", {qos: 1})
9. })
10.
11. client.on("message", function (_, message) {
12. var jsonPayload = JSON.parse(message.toString())
13. console.log('client is ${jsonPayload.status}')
14. })
首先运行“node client.js”,然后运行“node monitor.js”,我们会得到以下输出:
client is online
在运行client.js的终端上,点击“Ctrl+C”终止client.js,之后在运行monitor.js的终端上会得到以下输出:
client is offline
重新运行“node client.js”,在运行monitor.js的终端上会得到以下输出:
client is online
点击“Ctrl+C”终止monitor.js,然后重新运行“node monitor.js”,会得到以下输出:
client is online
这样,我们就完美地监控了Client的连接状态。
本节我们学习了Retained消息和LWT,并利用这两个特性完成了对Client连接状态进行监控。接下来,我们将学习Keepalive和在移动端的连接保活。