上文介绍了MQTT基于订阅与发布的消息模型,MQTT协议的订阅与发布是基于主题的。一个典型的MQTT消息发送与接收的流程如图4-11所示。
1)ClientA连接到Broker。
2)ClientB连接到Broker,并订阅主题Topic1。
3)ClientA给Broker发送一个Publish数据包,主题为Topic1;
4)Broker收到ClientA的消息,发现ClientB订阅了Topic1,然后通过发送PUBLISH数据包的方式将消息转发到ClientB;
5)ClientB从Broker接收到该消息。
图4-11 MQTT消息的发送与接收流程
和传统的队列有点不同,如果ClientB在ClientA发布消息之后再订阅Topic1,那么ClientB就不会收到该消息。
MQTT协议通过订阅与发布模型对消息的发布者和订阅者进行解耦,发布者在发布消息时不需要订阅方也能连接到Broker,只要订阅方之前订阅过相应主题,那么它在连接到Broker之后就可以收到发布方在它离线期间发布的消息。为了方便起见,在本书中我们称这种消息为离线消息。
接收离线消息需要Client使用持久会话,且发布时消息的QoS不小于1。
在继续学习前,我们有必要搞清楚两组概念:发布者(Publisher)和订阅者(Subscriber),发送方(Sender)和接收方(Receiver)。弄清这两组概念,我们才能更好地理解订阅和发布的流程以及QoS的概念。
Publisher和Subscriber是相对于Topic来说的身份,如果一个Client向某个Topic发布消息,那么它就是Publisher;如果一个Client订阅了某个Topic,那么它就是Subscriber。在上面的例子中,ClientA是Publisher,ClientB是Subscriber。
Sender和Receiver是相对于消息传输方向的身份,仍然用前面的例子做解释。
·当ClientA发布消息时,它给Broker发送一条消息,那么ClientA是Sender,Broker是Receiver;
·当Broker转发消息给ClientB时,Broker是Sender,ClientB则是Receiver。
Publisher/Subscriber、Sender/Receiver这两组概念最大的区别是Publisher和Subscriber只可能是Client,而Sender/Receiver有可能是Client,也有可能是Broker。
解释清楚这两组不同的概念之后,我们接下来看一下PUBLISH数据包。
PUBLISH数据包用于在Sender和Receiver之间传输消息数据,也就是说,当Publisher要向某个Topic发布一条消息的时候,Publisher会向Broker发送一个PUBLISH数据包;当Broker要将一条消息转发给订阅了某条主题的Subscriber时,Broker也会向Subscriber发送一条PUBLISH数据包。PUBLISH数据包的格式如下所示。
PUBLISH数据包的固定格式如图4-12所示。
图4-12 PUBLISH数据包的固定头
固定头中的MQTT协议数据包类型字段的值为3,代表该数据包是PUBLISH数据包。PUBLISH数据包固定头中的标识位(Flag)中有如下3个字段。
·消息重复标识(DUP Flag):长度为1bit,值为0或1。当DUP Flag=1时,代表该消息是一条重发消息,因为Receiver没有确认收到之前的消息。这个标识只在QoS大于0的消息中使用。
·QoS:长度为2bit,值为0、1、2,代表PUBLISH消息的服务质量级别。
·Retain标识(Retain Flag):长度为1bit,值为0或1。当Retain标识在从Client发送到Broker的PUBLISH消息中被设为1时,Broker应该保存该消息,并且之后有任何新的Subscriber订阅PUBLISH消息中指定的主题时,都会先收到该消息,这种消息也被称为Retained消息;当Retain标识在从Broker发送到Client的PUBLISH消息中被设为1时,代表该消息是一条Retained消息。
PUBLISH数据包的可变头由两个字段组成——主题名和包标识符(Packet Identifier)。其中,Packet Identifier只会在QoS1和QoS2的PUBLISH数据包里出现,我们在4.3节再详细讲解。
主题名是一个UTF-8编码的字符串,它由两个前缀字节来辨识字符串的长度,如图4-13所示。
图4-13 主题名字段
由于只有2个字节标识主题名长度,所以主题名的最大长度为65535字节。
虽然主题名可以是长度从1~65535的任意字符串(可以包含空格),但是在实际项目中,我们最好还是遵循以下一些命名规则。
·主题名称应该包含层级,不同的层级用“/”划分,比如,2楼201房间的温度感应器可以用主题:“home/2ndfloor/201/temperature”表示。
·主题名称开头不要使用“/”,例如:“/home/2ndfloor/201/temperature”。
·不要在主题中使用空格。
·只使用ASCII字符。
·主题名称在可读的前提下尽量短一些。
·主题名称对大小写是敏感的,“Home”和“home”是两个不同的主题。
·可以将设备的唯一标识加到主题中,比如:“warehouse/shelf/shelf1_ID/status”。
·主题尽量精确,不要使用泛用的主题,例如在201房间中有3个传感器,温度传感器、亮度传感器和湿度传感器,那么你应该使用3个主题名称:“home/2ndfloor/201/temperature”“home/2ndfloor/201/brightness”和“home/2ndfloor/201/humidity”,而不是让3个传感器都使用“home/2ndfloor/201”这个主题名。
·以“$”开头的主题属于Broker预留的系统主题,通常用于发布Broker的内部统计信息,比如“$SYS/broker/clients/connected”。应用程序不要使用“$”开头的主题收发数据。
PUBLISH数据包的消息体就是该数据包要发送的数据,它可以是任意格式的数据,比如二进制数据、文本、JSON等。具体数据格式由应用程序定义。在实际生产中,我们可以使用JSON、Protocol Buffer等格式对数据进行编码。
消息体中数据的长度可以由固定头中的数据包剩余长度减去可变头的长度得到。
接下来写一小段代码,目的是向一个主题发布一条QoS为1的使用JSON编码的数据,然后退出。代码如下。
1. //publisher.js 2. 3. var mqtt = require('mqtt') 4. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 5. clientId: "mqtt_sample_publisher_1", 6. clean: false 7. }) 8. 9. client.on('connect', function (connack) { 10. if(connack.returnCode == 0){ 11. client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 1}, function (err) { 12. if(err == undefined) { 13. console.log("Publish finished") 14. client.end() 15. }else{ 16. console.log("Publish failed") 17. } 18. }) 19. }else{ 20. console.log('Connection failed: ${connack.returnCode}') 21. } 22. })
第11行代码表示向主题“home/2ndfloor/201/temperature”,发送一条QoS为1的消息,消息的内容是格式为JSON的字符串。
运行“node publisher.js”,会得到以下输出。
Publish finished
ClientB想要接收ClientA发布到某个主题的消息,就必须先向Broker订阅这个主题,订阅一个主题的流程如图4-14所示。
图4-14 Client的订阅流程
1)Client向Broker发送一个SUBSCRIBE数据包,其中包含Client想要订阅的主题以及其他参数。
2)Broker收到SUBSCRIBE数据包后,向Client发送一个SUBACK数据包作为应答。
接下来我们看一下数据包的具体内容。
SUBSCRIBE数据包的固定头格式如图4-15所示。
图4-15 SUBSCRIBE数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为8,代表该数据包是SUBSCRIBE数据包。
SUBSCRIBE数据包的可变头只包含一个两字节的包标识符,用来唯一标识一个数据包。数据包标识只需要保证从Sender到Receiver的一次消息交互中唯一即可。SUBSCRIBE数据包可变头的标识符格式如图4-16所示。
图4-16 SUBSCRIBE数据包可变头的包标识符格式
SUBSCRIBE数据包中的消息体由Client要订阅的主题列表构成。和PUBLISH数据包的主题名不同,SUBSCRIBE数据包中的主体名可以包含通配符,通配符包括单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和PUBLISH数据包中的主题名进行区分,我们称SUBSCRIBE数据包中的主题名为主题过滤器(Topic Filter)。
单层通配符“+”:如之前所述,MQTT协议的主题名是具有层级概念的,不同的层级间用“/”分割,“+”可以用来指代任意一个层级。
例如:“home/2ndfloor/+/temperature”,可匹配:home/2ndfloor/201/temperature、home/2ndfloor/202/temperature;不可匹配:home/2ndfloor/201/livingroom/temperature、home/3ndfloor/301/temperature。
多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指定任意多个层级,但是“#”必须是Topic Filter的最后一个字符,同时必须跟在“/”后面,除非Topic Filter只包含“#”这一个字符。
例如:“home/2ndfloor/#”,可匹配:home/2ndfloor、home/2ndfloor/201、home/2ndfloor/201/temperature、home/2ndfloor/202/temperature、home/2ndfloor/201/livingroom/temperature;不可匹配:home/3ndfloor/301/temperature。
“#”是一个合法的Topic Filter,代表所有的主题;而“home#”不是一个合法的Topic Filter,因为“#”号需要跟在“/”后面。
每一个Topic Filter必须是一个UTF-8编码的字符串,在这个字符串后面紧跟着1个字节,用于描述订阅该主题的QoS。Topic Filter的格式如图4-17所示。
图4-17 Topic Filter的格式
QoS字节的最后2位用于标识QoS值,值为0、1或2。
消息体的主题列表按照上面的格式依次拼接即可。
为了确认每一次的订阅,Broker在收到SUBSCRIBE数据包后都会回复一个SUBACK数据包作为应答。
SUBACK数据包的固定头如图4-18所示。
图4-18 SUBACK数据包的固定头
固定头中的MQTT协议数据包类型字段的值为9,代表该数据包是SUBACK数据包。
SUBACK数据包的可变头只包含一个两字节的包标识符,其格式如图4-19所示。
图4-19 SUBACK数据包的可变头
SUBACK数据包包含一组返回码,返回码的数量和顺序与SUBSCRIBE数据包的订阅列表对应,用于标识订阅类别中每一个订阅项的订阅结果。
SUBACK数据包中每一个返回码为一个字节,如图4-20所示。
图4-20 返回码字段
返回码列表按照图4-20的格式依次拼接而成。
返回码的对应值如表4-2所示。
表4-2 返回码的对应值
返回码0~2代表订阅成功,同时Broker授予Subscriber不同的QoS等级,这个等级可能会与Subscriber在SUBSCRIBE数据包中要求的不一样。
返回码128代表订阅失败,比如Client没有权限订阅某个主题,或者要求订阅的主题格式不正确等。
接下来,试着写一下订阅并处理消息的代码。订阅主题为4.2.2节中代码实现的publisher.js,然后通过捕获“message”事件获取接收的消息并进行打印。
通常,在建立和Broker的连接后我们就可以开始订阅了,但这里有一个小小的优化,如果你建立的是持久会话的连接,那么Broker有可能已经保存了之前连接时订阅的主题,这样就没必要再发起SUBSCRIBE请求了。这个小优化在网络带宽或者设备处理能力较差时尤为重要。
完整的代码subscriber.js如下。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_subscriber_id_1", 4. clean: false 5. }) 6. 7. client.on('connect', function (connack) { 8. if(connack.returnCode == 0) { 9. if (connack.sessionPresent == false) { 10. console.log("subscribing") 11. client.subscribe("home/2ndfloor/201/temperature", { 12. qos: 1 13. }, function (err, granted) { 14. if (err != undefined) { 15. console.log("subscribe failed") 16. } else { 17. console.log('subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}') 18. } 19. }) 20. } 21. }else { 22. console.log('Connection failed: ${connack.returnCode}') 23. } 24. }) 25. 26. client.on("message", function (_, message, _) { 27. var jsonPayload = JSON.parse(message.toString()) 28. console.log('current temperature is ${jsonPayload.current}') 29. })
第9行代码通过判断CONNACK的SessionPresent标识,来决定是否发起订阅,如果Session已经存在,则不再发起订阅。
第11行代码指定订阅主题“home/2ndfloor/201/temperature”,订阅的QoS等级为1。
在终端上运行“node subscriber.js”会得到以下输出。
subscribing subscribe succeeded with home/2ndfloor/201/temperature, qos: 1
第一次运行上述代码的时候,Broker上面没有保存这个Client的会话,所以需要进行订阅,现在点击“Ctrl+C”终止这段代码运行,然后重新运行,因为Broker上已经保存了这个Client的会话,不需要再订阅,所以我们也不会看到订阅相关的输出。
在4.2.5节中,我们运行过publisher.js,向“home/2ndfloor/201/temperature”这个主题发布过一个消息,但是这发生在subscriber.js订阅该主题之前,所以现在Subscriber不会收到任何消息,我们需要再运行一次publish.js,然后在运行subscriber.js的终端上会得到如下输出。
current temperature is 25
这样,我们就通过MQTT协议完成了一次点对点的消息传递,同时也验证了建立持久会话连接之后,Broker会保存Client的订阅信息。
Subscriber也可以取消对某些主题的订阅。取消订阅的流程如图4-21所示。
图4-21 取消订阅流程
(1)Client向Broker发送一个UNSUBSCRIBE数据包,其中包含Client想要取消订阅的主题。
(2)Broker收到UNSUBSCRIBE数据包后,向Client发送一个UNSUBACK数据包作为应答。
接下来看一下数据包的具体内容。
UNSUBSCRIBE数据包的固定头格式如图4-22所示。
图4-22 UNSUBSCRIBE数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为10,代表该数据包是UNSUBSCRIBE数据包。
UNSUBSCRIBE数据包的可变头只包含一个2字节的包标识符,包标识符的格式如图4-23所示。
图4-23 UNSUBSCRIBE数据包可变头中的包标识符
UNSUBSCRIBE数据包包含要取消的主题过滤器(Topic Filter)列表,这些主题过滤和SUBSCRIBE数据包中的规则是一样的,不过不再包含QoS字段。其格式如图4-24所示。
图4-24 UNSUBSCRIBE数据包的消息体
UNSUBSCRIBE数据包的消息体的主题列表按照图4-24的格式依次拼接而成。
和订阅时不同,取消订阅时,主题名中的通配符并不起通配作用。取消订阅的主题名必须每个字符都和订阅时指定的主题名相同,这样才能被取消,举个例子。
订阅主题名为“home/2ndfloor/201/temperature”,取消订阅名为“home/+/201/temperature”,并不会取消之前的订阅。
同理,订阅的时候使用了通配符,取消订阅的时候也必须使用完全一样的主题名。
订阅主题名为“home/+/201/temperature”,取消订阅名为“home/+/201/temperature”,这样才能取消之前的订阅。
Broker在收到UNSUBSCRIBE数据包后,会回复给Client一个UNSUBACK数据包作为响应。
UNSUBACK数据包的固定头如图4-25所示。
图4-25 UNSUBACK数据包的固定头
固定头中的MQTT协议数据包类型字段的值为10,代表该数据包是UNSUBACK数据包。UNSUBACK数据包中的固定头的数据包剩余长度字段固定为2。
UNSUBACK数据包的可变头只包含一个2字节的包标识符,如图4-26所示。
图4-26 UNSUBACK可变头的包标识符
UNSUBACK数据包没有消息体。
下面要完成的代码很简单,只需要在建立连接后取消之前订阅的主题。
完整的代码unsubscribe.js如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_subscriber_id_1", 4. clean: false 5. }) 6. 7. client.on('connect', function (connack) { 8. if (connack.returnCode == 0) { 9. console.log("unsubscribing") 10. client.unsubscribe("home/2ndfloor/201/temperature", function (err) { 11. if (err != undefined) { 12. console.log("unsubscribe failed") 13. } else { 14. console.log("unsubscribe succeeded") 15. } 16. client.end() 17. }) 18. } else { 19. console.log('Connection failed: ${connack.returnCode}') 20. } 21. })
在终端上运行“node unsubscribe.js”,会得到以下输出。
unsubscribing unsubscribe succeeded
这里取消了对“home/2ndfloor/201/temperature”的订阅,所以再次运行subscriber.js和publisher.js的时候,在运行subscribe.js的终端上就不会再有“home/2ndfloor/201/temperature”的打印信息了。如何使subscriber.js重新订阅这个主题呢?读者可以参考上文进行思考,然后自己动手实现。
在本节中,我们学习了MQTT协议发布、订阅消息的模型及其特性,并第一次实现了消息的点对点传输。接下来,我们将学习MQTT协议中的一个非常重要的特性——QoS等级。