前文多次提到了QoS(Quality of Service),CONNECT数据包、PUBLISH数据包、SUBSCRIBE数据包中都有QoS的标识。那么MQTT协议提供的QoS是什么呢?
作为最初用来在网络带宽窄、信号不稳定的环境下传输数据的协议,MQTT协议设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,MQTT协议还提供了3种不同层次的QoS。
·QoS0:At most once,至多一次。
·QoS1:At least once,至少一次。
·QoS2:Exactly once,确保只有一次。
这三个层次都是什么意思呢?QoS是消息的发送方(Sender)和接收方(Receiver)之间达成的一个协议。
·QoS0:表示Sender发送一条消息,Receiver最多能收到一次,也就是说Sender尽力向Receiver发送消息,如果发送失败,则放弃。
·QoS1:表示Sender发送一条消息,Receiver至少能收到一次,也就是说Sender向Receiver发送消息,如果发送失败,Sender会继续重试,直到Receiver收到消息为止,但是因为重传的原因,Receiver可能会收到重复的消息。
·QoS2:表示Sender发送一条消息,Receiver确保能收到且只收到一次,也就是说Sender尽力向Receiver发送消息,如果发送失败,会继续重试,直到Receiver收到消息为止,同时保证Receiver不会因为消息重传而收到重复的消息。
注意,QoS是Sender和Receiver之间达成的协议,不是Publisher和Subscriber之间达成的协议。也就是说,Publisher发布一条QoS1的消息,只能保证Broker至少收到一次;而对应的Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscribe的时候和Broker协商的QoS等级。
接下来,看一下QoS0、QoS1和QoS2的机制,并讨论一下什么是QoS降级。
QoS0是最简单的一个QoS等级。在QoS0等级下,Sender和Receiver之间一次消息的传递流程如图4-27所示。
图4-27 QoS0等级下Sender和Receiver之间的消息传递流程
Sender向Receiver发送一个包含消息数据的PUBLISH数据包,然后不管结果如何,丢弃已发送的PUBLISH数据包,这样一条消息即可完成发送。
QoS1要保证消息至少到达Receiver一次,所以这里有一个应答的机制。在Qos1等级下,Sender和Receiver之间一次消息的传递流程如图4-28所示。
图4-28 QoS1等级下Sender和Receiver之间的消息传递流程
1)Sender向Receiver发送一个带有消息数据的PUBLISH数据包,并在本地保存这个PUBLISH数据包。
2)Receiver在收到PUBLISH数据包后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体,只在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH数据包中的Packet Identifier一致。
3)Sender在收到PUBACK数据包之后,根据PUBACK数据包中的Packet Identifier找到本地保存的PUBLISH数据包,然后丢弃,这样一次消息发送完成。
4)如果Sender在一段时间内没有收到PUBLISH数据包对应的PUBACK数据包,那么它会将PUBLISH数据包中的DUP标识设为1(代表的是重新发送PUBLISH数据包),然后重新发送该PUBLISH数据包。重复这个流程,直到Sender收到PUBACK数据包,然后执行第3步。
当QoS为1或2时,PUBLISH数据包的可变头中将包含包标识符字段。包标识符长度为两个字节,如图4-29所示。
图4-29 包标识符
包标识符用来唯一标识一个MQTT协议数据包,但它并不要求全局唯一,只要保证在一次完整的消息传递过程中是唯一的就可以。
PUBACK数据包的固定头格式如图4-30所示。
图4-30 PUBACK数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为4,代表的是PUBACK数据包。PUBACK数据包的剩余长度字段值固定为2。
PUBACK数据包的可变头只包含一个2字节的包标识符,如图4-31所示。
图4-31 包标识符
PUBACK数据包没有消息体。
QoS0和QoS1是相对简单的QoS等级,QoS2不仅要确保Receiver能收到Sender发送的消息,还要保证消息不重复。所以,它的重传和应答机制就要更复杂,同时开销也是最大的。
在QoS2等级下,Sender和Receiver之间一次消息的传递流程如图4-32所示。
图4-32 QoS2等级下Sender和Receiver之间的消息传递流程
QoS2使用2套请求/应答流程(一个4段的握手)来确保Receiver收到来自Sender的消息,且不重复。
1)Sender发送QoS值为2的PUBLISH数据包,假设该数据包中Packet Identifier为P,并在本地保存该PUBLISH数据包。
2)Receiver收到PUBLISH数据包后,在本地保存PUBLISH数据包的Packet Identifier为P,并回复Sender一个PUBREC数据包。PUBREC数据包可变头中的Packet Identifier为P,没有消息体。
3)当Sender收到PUBREC数据包后,它就可以安全地丢掉初始的Packet Identifier为P的PUBLISH数据包,同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包。PUBREL数据包可变头中的Packet Identifier为P,没有消息体;如果Sender在一定时间内没有收到PUBREC数据包,它会把PUBLISH数据包的DUP标识设为1,重新发送该PUBLISH数据包。
4)当Receiver收到PUBREL数据包时,它可以丢弃掉保存的PUBLISH数据包的Packet Identifier P,并回复Sender一个PUBCOMP数据包。PUBCOMP数据包可变头中的Packet Identifier为P,没有消息体。
5)当Sender收到PUBCOMP数据包,它会认为数据包传输已完成,会丢掉对应的PUBREC数据包。如果Sender在一定时间内没有收到PUBCOMP数据包,则会重新发送PUBREL数据包。
我们可以看到,在QoS2中,想要完成一次消息的传递,Sender和Receiver之间至少要发送4个数据包,所以说,QoS2是最安全,也是最慢的一种QoS等级。
PUBREC数据包的固定头格式如图4-33所示。
图4-33 PUBREC数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为5,代表该数据包是PUBREC数据包。PUBREC数据包的剩余长度字段值固定为2。
PUBREC数据包的可变头只包含一个2字节的包标识符,如图4-34所示。
图4-34 PUBREC数据包可变头中的包标识符
PUBREC数据包没有消息体。
PUBREL数据包的固定头格式如图4-35所示。
图4-35 PUBREL数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为6,代表该数据包是PUBREL数据包。PUBREL数据包的剩余长度字段值固定为2。
PUBREL数据包的可变头只包含一个2字节的包标识符,如图4-36所示。
图4-36 PUBREL数据包可变头中的包标识符
PUBREL数据包没有消息体。
PUBCOM数据包的固定头格式如图4-37所示。
图4-37 PUBCOM数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为7,代表该数据包是PUBCOM数据包。PUBCOM数据包的剩余长度字段值固定为2。
PUBCOM数据包的可变头只包含一个2字节的包标识符,如图4-38所示。
图4-38 PUBCOM数据包可变头中的包标识符
PUBCOM数据包没有消息体。
本节会实现一个发布端和一个订阅端,它们可以通过命令行参数来指定发布和订阅的QoS,同时,通过捕获packetsend和packetreceive事件,将发送和接收到的MQTT协议数据包的类型打印出来。
发布端的代码publish_with_qos.js如下所示。
1. var args = require('yargs').argv; 2. var mqtt = require('mqtt') 3. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 4. clientId: "mqtt_sample_publisher_2", 5. clean: false 6. }) 7. 8. client.on('connect', function (connack) { 9. if (connack.returnCode == 0) { 10. client.on('packetsend', function (packet) { 11. console.log('send: ${packet.cmd}') 12. }) 13. client.on('packetreceive', function (packet) { 14. console.log('receive: ${packet.cmd}') 15. }) 16. client.publish("home/sample_topic", JSON.stringify({data: 'test'}), {qos: args.qos}) 17. } else { 18. console.log('Connection failed: ${connack.returnCode}') 19. } 20. })
第10~12行代码捕获packetsend事件,然后将发送的MQTT协议数据包的类型打印出来。
第13~15行代码捕获packetreceive事件,然后将收到的MQTT协议数据包的类型打印出来。
第16行代码使用命令行中指定的QoS来发布消息。
订阅端的代码subscribe_with_qos.js如下所示。
1. var args = require('yargs').argv; 2. var mqtt = require('mqtt') 3. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 4. clientId: "mqtt_sample_subscriber_id_2", 5. clean: false 6. }) 7. 8. 9. client.on('connect', function (connack) { 10. if (connack.returnCode == 0) { 11. client.subscribe("home/sample_topic", {qos: args.qos}, function () { 12. client.on('packetsend', function (packet) { 13. console.log('send: ${packet.cmd}') 14. }) 15. client.on('packetreceive', function (packet) { 16. console.log('receive: ${packet.cmd}') 17. }) 18. }) 19. } else { 20. console.log('Connection failed: ${connack.returnCode}') 21. } 22. })
第11行代码使用命令行中指定的QoS进行订阅。
第12~14行代码捕获packetsend事件,然后将发送的MQTT协议数据包的类型打印出来。
第15~17行代码捕获packetreceive事件,然后将收到的MQTT协议数据包的类型打印出来。
在subscribe_with_qos.js中,Client每次连接到Broker都会按照参数指定的QoS重新订阅主题,订阅成功以后才开始捕获接收和发送的数据包,所以Client从连接后到重新订阅前收到的离线消息都不会被打印出来。
我们可以通过node publish_with_qos.js--qos=xxx和node subscribe_with_qos.js--qos=xxx来运行这两段node.js代码。
接下来,用不同的参数组合来运行这两段node.js代码,看看输出分别是什么。
需要先运行subscribe_with_qos.js再运行publish_with_qos.js,确保接收到的消息可以打印出来。
运行“node publish_with_qos.js--qos=0”输出为:
send: publish
运行“node subscribe_with_qos.js--qos=0”输出为:
receive: publish
Publisher到Broker,Broker到Subscriber都是用的QoS0。
运行“node publish_with_qos.js--qos=1”输出为:
send: publish receive: puback
运行“node subscribe_with_qos.js--qos=1”输出为:
receive: publish send: puback
Publisher到Broker,Broker到Subscriber都是用的QoS1。
运行“node publish_with_qos.js--qos=0”输出为:
send: publish
运行“node subscribe_with_qos.js--qos=1”输出为:
receive: publish
这里就有点奇怪了,很明显Broker到Subscriber使用的是QoS0,和Subscriber订阅时指定的QoS不一样。原因我们在4.3.6中会进行详细解释。
运行“node publish_with_qos.js--qos=1”输出为:
send: publish receive: puback
运行“node subscribe_with_qos.js--qos=0”输出为:
receive: publish
和设定的一样,Publisher到Broker使用QoS1,Broker到Subscriber使用QoS0。Publisher使用QoS1发布消息,但是消息到Subscriber却是QoS0。也就是说,Subscriber有可能无法收到消息,这种现象被称为QoS的降级(QoS Degrade)。
运行“node publish_with_qos.js--qos=2”输出为:
send: publish receive: pubrec send: pubrel receive: pubcomp
运行“node subscribe_with_qos.js--qos=2”输出为:
receive: publish send: pubrec receive: pubrel send: pubcomp
可以看到,Publisher到Broker,Broker到Subscriber都是用的QoS2。
在上节的代码实践中,我们已经发现了在某些情况下,Broker在实际发送消息到订阅者时使用的QoS和订阅者在订阅主题时指定的QoS不一样。
这里有一个很重要的计算方法:在MQTT协议中,从Broker到Subscriber这段消息传递的实际QoS等级等于Publisher发布消息时指定的QoS等级和Subscriber在订阅时与Broker协商的QoS等级中最小的那一个。
Actual Subscribe QoS=MIN(Publish QoS,Subscribe QoS)
这也就解释了“publish qos=0,subscribe qos=1”的情况下Subscriber的实际QoS为0,以及“publish qos=1,subscribe qos=0”时出现QoS降级的原因。
同样,如果Publish QoS为1,Subscribe QoS为2,或者Publish QoS为2,Subscribe QoS为1,那么实际Subscribe接收消息的QoS仍然为1。
理解了实际Subscriber QoS的计算方法,你才能更好地设计系统中Publisher和Subscriber使用的QoS。例如,如果你希望Subscriber至少收到一次Publisher的消息,那么你要确保Publisher和Subscriber都使用不小于1的QoS。
如果Client想接收离线消息,就必须在连接到Broker的时候指定使用持久会话(Clean Session=0),这样Broker才会存储Client在离线期间没有确认接收的QoS大于1的消息。
以下情况可以选择QoS0:
·Client和Broker之间的网络连接非常稳定,例如一个通过有线网络连接到Broker的测试用Client;
·可以接受丢失部分消息,比如一个传感器以非常短的间隔发布状态数据,那丢失一些数据也可以接受;
·不需要离线消息。
以下情况可以选择QoS1:
·应用需要接收所有的消息,而且可以接受并处理重复的消息;
·无法接受QoS2带来的额外开销,QoS1发送消息的速度比QoS2快很多。
以下情况下可以选择QoS2:
·应用必须接收所有的消息,而且应用在重复的消息下无法正常工作,同时你可以接受QoS2带来的额外开销。
实际上,QoS1是应用最广泛的QoS等级。QoS1表示发送消息的速度很快,而且能够保证消息的可靠性。虽然使用QoS1等级可能会收到重复的消息,但是在应用程序中处理重复消息,通常并不是件难事。在5.1节中,我们会看到如何在应用程序中对消息进行去重。
本节学习了MQTT协议在3种不同的QoS等级下消息传递的流程,并用代码进行验证。同时,我们也讨论了如何根据实际的应用场景来选择不同的QoS等级。在4.4节,我们将学习MQTT协议的另外两个特性——Retained消息和LWT。