购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.4 Retained消息和LWT

本节我们将学习MQTT的Retain消息和LWT(Last Will and Testament)。

4.4.1 Retained消息

让我们来考虑一下这个场景。你有一个温度传感器,它每3个小时向一个主题发布当前温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候才能收到温度消息?

没错,和你想的一样,它必须等到3个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。

那该怎么解决这个问题呢?

这时候就轮到Retained消息出场解决这个问题了。Retained消息是指在PUBLISH数据包中将Retain标识设为1的消息,Broker收到这样的PUBLISH数据包以后,将为该主题保存这个消息,当一个新的订阅者订阅该主题时,Broker会马上将这个消息发送给订阅者。

Retain消息有如下特点:

·一个Topic只能有一条Retained消息,发布新的Retained消息将覆盖旧的Retained消息;

·如果订阅者使用通配符订阅主题,那他会收到所有匹配主题的Retained消息;

·只有新的订阅者才会收到Retained消息,如果订阅者重复订阅一个主题,那么在每次订阅的时候都会被当作新的订阅者,然后收到Retained消息。

当Retained消息发送到订阅者时,PUBLISH数据包中Retain的标识仍然是1。订阅者可以判断这个消息是否是Retained消息,以做出相应的处理。

Retained消息和持久会话没有任何关系。Retained消息是Broker为每一个主题单独存储的,而持久会话是Broker为每一个Client单独存储的。

如果你想删除某个主题的Retained消息,只要向这个主题发布一个Payload长度为0的Retained消息就可以了。

那么,开头我们提到的那个场景的解决方案就很简单了,温度传感器每3个小时向相应的主题发布包含当前温度的Retained消息,那么无论新的订阅者什么时候订阅这个主题,他都能收到温度传感器上一次发布的数据。

4.4.2 代码实践:发布和接收Retained消息

下面编写一个发布Retained消息的发布端,和一个接收消息的订阅端,订阅端在接收消息的时候将消息的Retain标识和内容打印出来。

发布端的代码publish_retained.js如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_sample_publisher_1",
 4.   clean: false
 5. })
 6. 
 7. client.on('connect', function (connack) {
 8.   if(connack.returnCode == 0){
 9.     client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 
 25}), {qos: 0, retain: 1}, function (err) {
10.       if(err == undefined) {
11.         console.log("Publish finished")
12.         client.end()
13.       }else{
14.         console.log("Publish failed")
15.       }
16.     })
17.   }else{
18.     console.log('Connection failed: ${connack.returnCode}')
19.   }
20. })

第9行代码在发布时指定Retain标识为1。

订阅端的代码subscribe_retained.js如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_sample_subscriber_id_chapter_8",
 4.   clean: false
 5. })
 6. 
 7. client.on('connect', function (connack) {
 8.   if(connack.returnCode == 0) {
 9.     if (connack.sessionPresent == false) {
10.       console.log("subscribing")
11.       client.subscribe("home/2ndfloor/201/temperature", {
12.         qos: 0
13.       }, function (err, granted) {
14.         if (err != undefined) {
15.           console.log("subscribe failed")
16.         } else {
17.           console.log('subscribe succeeded with ${granted[0].topic}, qos: 
 ${granted[0].qos}')
18.         }
19.       })
20.     }
21.   }else {
22.     console.log('Connection failed: ${connack.returnCode}')
23.   }
24. })
25. 
26. client.on("message", function (_, message, packet) {
27.   var jsonPayload = JSON.parse(message.toString())
28.   console.log('retained: ${packet.retain}, temperature: ${jsonPayload.
 current}')
29. })
30.

第9行代码判断了CONNACK数据包中的Session Present标识,只有在第一次建立会话的时候才进行订阅,重复多次运行订阅端代码也只会触发一次订阅。

第28行代码是在收到消息的时候打印出消息的Retain标识。

我们首先运行“node publish_retained.js”,再运行“node subscribe_retained.js”,会得到如下输出:


retained: true, temperature: 25

当订阅端第一次订阅该主题的时候,Broker会将为该主题保存的Retained消息转发给订阅端,所以在Publisher发布之后订阅者再订阅主题也能收到Retained消息。

然后我们再运行一次“node publish_retained.js”,在运行subscribe_retained.js的终端会有如下输出:


retained: false, temperature: 25

由于此时订阅端已经订阅了该主题,Broker收到Retained消息以后,只保存该消息,然后按照正常的转发逻辑转发给订阅端,因此对于订阅端来说,这只是一个普通的MQTT协议消息,所以Retain标识为0。

接着点击“Ctrl+C”,关闭subscribe_retained.js,重新运行,此时因为Session已经存在,订阅端不会再重新订阅这个主题,终端不会有任何输出。由此可见,Retained消息只对新订阅的订阅者有效。

4.4.3 LWT

LWT全称为Last Will and Testament,也就是我们在连接Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。

顾名思义,当Broker检测到Client非正常地断开连接时,就会向Client的遗愿主题中发布一条消息。遗愿的相关设置是在建立连接时,在CONNECT数据包里面指定的。

·Will Flag:是否使用LWT。

·Will Topic:遗愿主题名,不可使用通配符。

·Will QoS:发布遗愿消息时使用的QoS等级。

·Will Retain:遗愿消息的Retain标识。

·Will Message:遗愿消息内容。

Broker在以下情况下认为Client是非正常断开连接的:

1)Broker检测到底层I/O异常;

2)Client未能在Keepalive的间隔内和Broker之间进行消息交互;

3)Client在关闭底层TCP连接前没有发送DISCONNECT数据包;

4)Broker因为协议错误关闭了和Client的连接,比如Client发送了一个格式错误的MQTT协议数据包。

如果Client通过发布DISCONNECT数据包断开连接,这属于正常断开连接,不会触发LWT的机制。同时,Broker还会丢掉这个Client在连接时指定的LWT参数。

通常,如果我们关心设备,比如传感器的连接状态,则可以使用LWT。在接下来的代码实践中,我们会使用LWT和Retained消息实现对一个Client的连接状态监控。

4.4.4 代码实践:监控Client连接状态

实现Client连接状态监控的原理很简单:

1)Client在连接时指定Will Topic为“client/status”,遗愿消息为“offline”,Will Retain=1;

2)Client在连接成功后向同一个主题“client/status”发布一个内容为“online”的Retained消息。

那么,订阅者在任何时候订阅“client/status”,都会获取Client当前的连接状态。

Client.js的代码如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_sample_publisher_chapter_8",
 4.   clean: false,
 5.   will:{
 6.     topic : 'client/status',
 7.     qos: 1,
 8.     retain: true,
 9.     payload: JSON.stringify({status: 'offline'})
10.   }
11. })
12. 
13. client.on('connect', function (connack) {
14.   if(connack.returnCode == 0){
15.     client.publish("client/status", JSON.stringify({status: 'online'}), 
 {qos: 1, retain: 1})
16.   }else{
17.     console.log('Connection failed: ${connack.returnCode}')
18.   }
19. })

代码的第5~9行对Client的LWT进行了设置。

在第15行,Client在连接到Broker之后会向指定的主题发布一条消息。

用于监控Client连接状态的monitor.js代码如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_sample_subscriber_id_chapter_8_2",
 4.   clean: false
 5. })
 6. 
 7. client.on('connect', function () {
 8.     client.subscribe("client/status", {qos: 1})
 9. })
10. 
11. client.on("message", function (_, message) {
12.   var jsonPayload = JSON.parse(message.toString())
13.   console.log('client is ${jsonPayload.status}')
14. })

首先运行“node client.js”,然后运行“node monitor.js”,我们会得到以下输出:


client is online

在运行client.js的终端上,点击“Ctrl+C”终止client.js,之后在运行monitor.js的终端上会得到以下输出:


client is offline

重新运行“node client.js”,在运行monitor.js的终端上会得到以下输出:


client is online

点击“Ctrl+C”终止monitor.js,然后重新运行“node monitor.js”,会得到以下输出:


client is online

这样,我们就完美地监控了Client的连接状态。

本节我们学习了Retained消息和LWT,并利用这两个特性完成了对Client连接状态进行监控。接下来,我们将学习Keepalive和在移动端的连接保活。 PGHznyts0Rfbj9+p3owGUR+wMqTtcVXECP3RDWVsfeMT4g1vDFEXpprla2LZAuhs

点击中间区域
呼出菜单
上一章
目录
下一章
×