购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.3 QoS及其最佳实践

前文多次提到了QoS(Quality of Service),CONNECT数据包、PUBLISH数据包、SUBSCRIBE数据包中都有QoS的标识。那么MQTT协议提供的QoS是什么呢?

4.3.1 MQTT协议中的QoS等级

作为最初用来在网络带宽窄、信号不稳定的环境下传输数据的协议,MQTT协议设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,MQTT协议还提供了3种不同层次的QoS。

·QoS0:At most once,至多一次。

·QoS1:At least once,至少一次。

·QoS2:Exactly once,确保只有一次。

这三个层次都是什么意思呢?QoS是消息的发送方(Sender)和接收方(Receiver)之间达成的一个协议。

·QoS0:表示Sender发送一条消息,Receiver最多能收到一次,也就是说Sender尽力向Receiver发送消息,如果发送失败,则放弃。

·QoS1:表示Sender发送一条消息,Receiver至少能收到一次,也就是说Sender向Receiver发送消息,如果发送失败,Sender会继续重试,直到Receiver收到消息为止,但是因为重传的原因,Receiver可能会收到重复的消息。

·QoS2:表示Sender发送一条消息,Receiver确保能收到且只收到一次,也就是说Sender尽力向Receiver发送消息,如果发送失败,会继续重试,直到Receiver收到消息为止,同时保证Receiver不会因为消息重传而收到重复的消息。

注意,QoS是Sender和Receiver之间达成的协议,不是Publisher和Subscriber之间达成的协议。也就是说,Publisher发布一条QoS1的消息,只能保证Broker至少收到一次;而对应的Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscribe的时候和Broker协商的QoS等级。

接下来,看一下QoS0、QoS1和QoS2的机制,并讨论一下什么是QoS降级。

4.3.2 QoS0

QoS0是最简单的一个QoS等级。在QoS0等级下,Sender和Receiver之间一次消息的传递流程如图4-27所示。

图4-27 QoS0等级下Sender和Receiver之间的消息传递流程

Sender向Receiver发送一个包含消息数据的PUBLISH数据包,然后不管结果如何,丢弃已发送的PUBLISH数据包,这样一条消息即可完成发送。

4.3.3 QoS1

QoS1要保证消息至少到达Receiver一次,所以这里有一个应答的机制。在Qos1等级下,Sender和Receiver之间一次消息的传递流程如图4-28所示。

图4-28 QoS1等级下Sender和Receiver之间的消息传递流程

1)Sender向Receiver发送一个带有消息数据的PUBLISH数据包,并在本地保存这个PUBLISH数据包。

2)Receiver在收到PUBLISH数据包后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体,只在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH数据包中的Packet Identifier一致。

3)Sender在收到PUBACK数据包之后,根据PUBACK数据包中的Packet Identifier找到本地保存的PUBLISH数据包,然后丢弃,这样一次消息发送完成。

4)如果Sender在一段时间内没有收到PUBLISH数据包对应的PUBACK数据包,那么它会将PUBLISH数据包中的DUP标识设为1(代表的是重新发送PUBLISH数据包),然后重新发送该PUBLISH数据包。重复这个流程,直到Sender收到PUBACK数据包,然后执行第3步。

1.QoS>0时的PUBLISH数据包

当QoS为1或2时,PUBLISH数据包的可变头中将包含包标识符字段。包标识符长度为两个字节,如图4-29所示。

图4-29 包标识符

包标识符用来唯一标识一个MQTT协议数据包,但它并不要求全局唯一,只要保证在一次完整的消息传递过程中是唯一的就可以。

2.PUBACK数据包
(1)固定头

PUBACK数据包的固定头格式如图4-30所示。

图4-30 PUBACK数据包的固定头格式

固定头中的MQTT协议数据包类型字段的值为4,代表的是PUBACK数据包。PUBACK数据包的剩余长度字段值固定为2。

(2)可变头

PUBACK数据包的可变头只包含一个2字节的包标识符,如图4-31所示。

图4-31 包标识符

(3)消息体

PUBACK数据包没有消息体。

4.3.4 QoS2

QoS0和QoS1是相对简单的QoS等级,QoS2不仅要确保Receiver能收到Sender发送的消息,还要保证消息不重复。所以,它的重传和应答机制就要更复杂,同时开销也是最大的。

在QoS2等级下,Sender和Receiver之间一次消息的传递流程如图4-32所示。

图4-32 QoS2等级下Sender和Receiver之间的消息传递流程

QoS2使用2套请求/应答流程(一个4段的握手)来确保Receiver收到来自Sender的消息,且不重复。

1)Sender发送QoS值为2的PUBLISH数据包,假设该数据包中Packet Identifier为P,并在本地保存该PUBLISH数据包。

2)Receiver收到PUBLISH数据包后,在本地保存PUBLISH数据包的Packet Identifier为P,并回复Sender一个PUBREC数据包。PUBREC数据包可变头中的Packet Identifier为P,没有消息体。

3)当Sender收到PUBREC数据包后,它就可以安全地丢掉初始的Packet Identifier为P的PUBLISH数据包,同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包。PUBREL数据包可变头中的Packet Identifier为P,没有消息体;如果Sender在一定时间内没有收到PUBREC数据包,它会把PUBLISH数据包的DUP标识设为1,重新发送该PUBLISH数据包。

4)当Receiver收到PUBREL数据包时,它可以丢弃掉保存的PUBLISH数据包的Packet Identifier P,并回复Sender一个PUBCOMP数据包。PUBCOMP数据包可变头中的Packet Identifier为P,没有消息体。

5)当Sender收到PUBCOMP数据包,它会认为数据包传输已完成,会丢掉对应的PUBREC数据包。如果Sender在一定时间内没有收到PUBCOMP数据包,则会重新发送PUBREL数据包。

我们可以看到,在QoS2中,想要完成一次消息的传递,Sender和Receiver之间至少要发送4个数据包,所以说,QoS2是最安全,也是最慢的一种QoS等级。

1.PUBREC数据包
(1)固定头

PUBREC数据包的固定头格式如图4-33所示。

图4-33 PUBREC数据包的固定头格式

固定头中的MQTT协议数据包类型字段的值为5,代表该数据包是PUBREC数据包。PUBREC数据包的剩余长度字段值固定为2。

(2)可变头

PUBREC数据包的可变头只包含一个2字节的包标识符,如图4-34所示。

图4-34 PUBREC数据包可变头中的包标识符

(3)消息体

PUBREC数据包没有消息体。

2.PUBREL数据包
(1)固定头

PUBREL数据包的固定头格式如图4-35所示。

图4-35 PUBREL数据包的固定头格式

固定头中的MQTT协议数据包类型字段的值为6,代表该数据包是PUBREL数据包。PUBREL数据包的剩余长度字段值固定为2。

(2)可变头

PUBREL数据包的可变头只包含一个2字节的包标识符,如图4-36所示。

图4-36 PUBREL数据包可变头中的包标识符

(3)消息体

PUBREL数据包没有消息体。

3.PUBCOM数据包
(1)固定头

PUBCOM数据包的固定头格式如图4-37所示。

图4-37 PUBCOM数据包的固定头格式

固定头中的MQTT协议数据包类型字段的值为7,代表该数据包是PUBCOM数据包。PUBCOM数据包的剩余长度字段值固定为2。

(2)可变头

PUBCOM数据包的可变头只包含一个2字节的包标识符,如图4-38所示。

图4-38 PUBCOM数据包可变头中的包标识符

(3)消息体

PUBCOM数据包没有消息体。

4.3.5 代码实践:使用不同的QoS发布消息

本节会实现一个发布端和一个订阅端,它们可以通过命令行参数来指定发布和订阅的QoS,同时,通过捕获packetsend和packetreceive事件,将发送和接收到的MQTT协议数据包的类型打印出来。

发布端的代码publish_with_qos.js如下所示。


 1. var args = require('yargs').argv;
 2. var mqtt = require('mqtt')
 3. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 4.   clientId: "mqtt_sample_publisher_2",
 5.   clean: false
 6. })
 7. 
 8. client.on('connect', function (connack) {
 9.   if (connack.returnCode == 0) {
10.     client.on('packetsend', function (packet) {
11.       console.log('send: ${packet.cmd}')
12.     })
13.     client.on('packetreceive', function (packet) {
14.       console.log('receive: ${packet.cmd}')
15.     })
16.     client.publish("home/sample_topic", JSON.stringify({data: 'test'}), {qos: 
 args.qos})
17.   } else {
18.     console.log('Connection failed: ${connack.returnCode}')
19.   }
20. })

第10~12行代码捕获packetsend事件,然后将发送的MQTT协议数据包的类型打印出来。

第13~15行代码捕获packetreceive事件,然后将收到的MQTT协议数据包的类型打印出来。

第16行代码使用命令行中指定的QoS来发布消息。

订阅端的代码subscribe_with_qos.js如下所示。


 1. var args = require('yargs').argv;
 2. var mqtt = require('mqtt')
 3. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 4.   clientId: "mqtt_sample_subscriber_id_2",
 5.   clean: false
 6. })
 7. 
 8. 
 9. client.on('connect', function (connack) {
10.   if (connack.returnCode == 0) {
11.     client.subscribe("home/sample_topic", {qos: args.qos}, function () {
12.       client.on('packetsend', function (packet) {
13.         console.log('send: ${packet.cmd}')
14.       })
15.       client.on('packetreceive', function (packet) {
16.         console.log('receive: ${packet.cmd}')
17.       })
18.     })
19.   } else {
20.     console.log('Connection failed: ${connack.returnCode}')
21.   }
22. })

第11行代码使用命令行中指定的QoS进行订阅。

第12~14行代码捕获packetsend事件,然后将发送的MQTT协议数据包的类型打印出来。

第15~17行代码捕获packetreceive事件,然后将收到的MQTT协议数据包的类型打印出来。

在subscribe_with_qos.js中,Client每次连接到Broker都会按照参数指定的QoS重新订阅主题,订阅成功以后才开始捕获接收和发送的数据包,所以Client从连接后到重新订阅前收到的离线消息都不会被打印出来。

我们可以通过node publish_with_qos.js--qos=xxx和node subscribe_with_qos.js--qos=xxx来运行这两段node.js代码。

接下来,用不同的参数组合来运行这两段node.js代码,看看输出分别是什么。

需要先运行subscribe_with_qos.js再运行publish_with_qos.js,确保接收到的消息可以打印出来。

1.发布使用QoS0,订阅使用QoS0

运行“node publish_with_qos.js--qos=0”输出为:


send: publish

运行“node subscribe_with_qos.js--qos=0”输出为:


receive: publish

Publisher到Broker,Broker到Subscriber都是用的QoS0。

2.发布使用QoS1,订阅使用QoS1

运行“node publish_with_qos.js--qos=1”输出为:


send: publish
receive: puback

运行“node subscribe_with_qos.js--qos=1”输出为:


receive: publish
send: puback

Publisher到Broker,Broker到Subscriber都是用的QoS1。

3.发布使用QoS0,订阅使用QoS1

运行“node publish_with_qos.js--qos=0”输出为:


send: publish

运行“node subscribe_with_qos.js--qos=1”输出为:


receive: publish

这里就有点奇怪了,很明显Broker到Subscriber使用的是QoS0,和Subscriber订阅时指定的QoS不一样。原因我们在4.3.6中会进行详细解释。

4.发布使用QoS1,订阅使用QoS0

运行“node publish_with_qos.js--qos=1”输出为:


send: publish
receive: puback

运行“node subscribe_with_qos.js--qos=0”输出为:


receive: publish

和设定的一样,Publisher到Broker使用QoS1,Broker到Subscriber使用QoS0。Publisher使用QoS1发布消息,但是消息到Subscriber却是QoS0。也就是说,Subscriber有可能无法收到消息,这种现象被称为QoS的降级(QoS Degrade)。

5.发布使用QoS2,订阅使用QoS2

运行“node publish_with_qos.js--qos=2”输出为:


send: publish
receive: pubrec
send: pubrel
receive: pubcomp

运行“node subscribe_with_qos.js--qos=2”输出为:


receive: publish
send: pubrec
receive: pubrel
send: pubcomp

可以看到,Publisher到Broker,Broker到Subscriber都是用的QoS2。

4.3.6 实际的Subscribe QoS

在上节的代码实践中,我们已经发现了在某些情况下,Broker在实际发送消息到订阅者时使用的QoS和订阅者在订阅主题时指定的QoS不一样。

这里有一个很重要的计算方法:在MQTT协议中,从Broker到Subscriber这段消息传递的实际QoS等级等于Publisher发布消息时指定的QoS等级和Subscriber在订阅时与Broker协商的QoS等级中最小的那一个。

Actual Subscribe QoS=MIN(Publish QoS,Subscribe QoS)

这也就解释了“publish qos=0,subscribe qos=1”的情况下Subscriber的实际QoS为0,以及“publish qos=1,subscribe qos=0”时出现QoS降级的原因。

同样,如果Publish QoS为1,Subscribe QoS为2,或者Publish QoS为2,Subscribe QoS为1,那么实际Subscribe接收消息的QoS仍然为1。

理解了实际Subscriber QoS的计算方法,你才能更好地设计系统中Publisher和Subscriber使用的QoS。例如,如果你希望Subscriber至少收到一次Publisher的消息,那么你要确保Publisher和Subscriber都使用不小于1的QoS。

4.3.7 QoS的最佳实践

1.QoS与会话

如果Client想接收离线消息,就必须在连接到Broker的时候指定使用持久会话(Clean Session=0),这样Broker才会存储Client在离线期间没有确认接收的QoS大于1的消息。

2.如何选择QoS

以下情况可以选择QoS0:

·Client和Broker之间的网络连接非常稳定,例如一个通过有线网络连接到Broker的测试用Client;

·可以接受丢失部分消息,比如一个传感器以非常短的间隔发布状态数据,那丢失一些数据也可以接受;

·不需要离线消息。

以下情况可以选择QoS1:

·应用需要接收所有的消息,而且可以接受并处理重复的消息;

·无法接受QoS2带来的额外开销,QoS1发送消息的速度比QoS2快很多。

以下情况下可以选择QoS2:

·应用必须接收所有的消息,而且应用在重复的消息下无法正常工作,同时你可以接受QoS2带来的额外开销。

实际上,QoS1是应用最广泛的QoS等级。QoS1表示发送消息的速度很快,而且能够保证消息的可靠性。虽然使用QoS1等级可能会收到重复的消息,但是在应用程序中处理重复消息,通常并不是件难事。在5.1节中,我们会看到如何在应用程序中对消息进行去重。

本节学习了MQTT协议在3种不同的QoS等级下消息传递的流程,并用代码进行验证。同时,我们也讨论了如何根据实际的应用场景来选择不同的QoS等级。在4.4节,我们将学习MQTT协议的另外两个特性——Retained消息和LWT。 H/sW/x6wVusd6PecxVzsP/R6GqIwXwzX61ZUFVkhoC7NWp0d3hn3KcsFJ+baA8LS

点击中间区域
呼出菜单
上一章
目录
下一章
×