购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.1 建立到Broker的连接

Client在可以发布和订阅消息之前,必须先连接到Broker。Client建立连接的流程如图4-1所示。

图4-1 Client建立连接的流程

1)Client向Broker发送一个CONNECT数据包;

2)Broker在收到Client的CONNECT数据包后,如果允许Client接入,则回复一个CONNACK包,该CONNACK包的返回码为0,表示MQTT协议连接建立成功;如果不允许Client接入,也回复一个CONNACK包,该CONNACK包的返回码为一个非0的值,用来标识接入失败的原因,然后断开底层的TCP连接。

4.1.1 CONNECT数据包

CONNECT数据包的格式如下。

1.固定头

CONNECT数据包的固定头格式如图4-2所示。

图4-2 CONNECT数据包的固定头格式

固定头中的MQTT协议数据包类型字段的值为1,代表CONNECT数据包。

2.可变头

可变头由4部分组成,依次为:协议名称、协议版本、连接标识和Keepalive。

协议名称是一个UTF-8编码字符串。在MQTT协议数据包中,字符串会有2个字节的前缀,用于标识字符串的长度。

协议名称的值固定为“MQTT”,加上前缀一共6个字节,如图4-3所示。

图4-3 协议名称字段

如果协议名称不正确,Broker会断开与Client的连接。

协议版本长度为1个字节,是一个无符号整数,MQTT 3.1.1的版本号为4,如图4-4所示。

图4-4 协议版本号字段

连接标识长度为1个字节,字节中不同的位用于标识不同的连接选项,如图4-5所示。

图4-5 连接标识字段

每一个标识位的含义如下所示。

·用户名标识(User Name Flag):标识消息体中是否有用户名字段,长度为1bit,值为0或1;

·密码标识(Password Flag):标识消息体中是否有密码字段,长度为1bit,值为0或1;

·遗愿消息Retain标识(Will Retain):标识遗愿消息是否是Retain消息,长度为1bit,值为0或1;

·遗愿消息QoS标识(Will QoS):标识遗愿消息的QoS,长度为2bit,值为0、1或2。

·遗愿标识(Will Flag):标识是否使用遗愿消息,长度为1bit,值为0或1;

·会话清除标识(Clean Session):标识Client是否建立一个持久化的会话,长度为1bit,值为0或1。当Clean Session的标识设为0时,代表Client希望建立一个持久会话的连接,Broker将存储该Client订阅的主题和未接受的消息,否则Broker不会存储这些数据,并在建立连接时清除这个Client之前存在的持久会话所保存的数据。

可变头的最后2个字节代表连接的Keepalive,即连接保活设置,如图4-6所示。

图4-6 连接保活字段

Keepalive代表一个单位为秒的时间间隔,Client和Broker之间在这个时间间隔之内至少要有一次消息交互,否则Client和Broker会认为它们之间的连接已经断开,我们会在4.4节中进行详细讲解。

3.消息体

CONNECT数据包的消息体依次由5个字段组成:客户端标识符、遗愿主题、遗愿QoS、遗愿消息、用户名和密码。除了客户端标识符外,其他4个字段都是可选的,由可变头里对应的连接标识来决定是否包含在消息体中。

这些字段有一个2个字节的前缀,用来标识字段值的长度,如图4-7所示。

图4-7 MQTT协议的变长字段格式

·客户端标识符(Client Identifier):Client Identifier是用来标识Client身份的字段,在MQTT 3.1.1中,这个字段的长度是1~23个字节,而且只能包含数字和26个英文字母(包括大小写),Broker通过这个字段来区分不同的Client。连接时,Client应该保证它的Identifier是唯一的,通常我们可以使用UUID、唯一的设备硬件标识或者Android设备的DEVICE_ID等作为Client Identifier的取值来源。

MQTT协议中要求Client连接时必须带上Client Identifier,但也允许Broker在实现时接受Client Identifier为空的CONNECT数据包,这时Broker会为Client分配一个内部唯一的Identifier。如果你需要使用持久性会话,那就必须自己为Client设定一个唯一的Identifier。

·用户名(Username):如果可变头中的用户名标识为1,那么消息体中将包含用户名字段,Broker可以使用用户名和密码对接入的Client进行验证,只允许已授权的Client接入。注意,不同的Client需要使用不同的Client Identifier,但它们可以使用同样的用户名和密码进行连接。

·密码(Password):如果可变头中的密码标识为1,那么消息体中将包含密码字段。

·遗愿主题(Will Topic):如果可变头中的遗愿标识为1,那么消息体中将包含遗愿主题。当Client非正常地中断连接时,Broker将向指定的遗愿主题发布遗愿消息。

·遗愿消息(Will Message):如果可变头中的遗愿标识为1,那么消息体中将包含遗愿消息。当Client非正常地中断连接时,Broker将向指定的遗愿主题发布由该字段指定的内容。

4.1.2 CONNACK数据包

当Broker收到Client的CONNECT数据包后,将检查并校验CONNECT数据包的内容,然后给Client回复一个CONNACK数据包。

CONNACK数据包的格式如下所示。

1.固定头

CONNACK数据包的固定头格式如图4-8所示。

图4-8 CONNACK的固定头

当固定头中的MQTT数据包的类型字段值为2时,则代表该数据包是CONNACK数据包。CONNACK数据包剩余长度固定为2。

2.可变头

CONNACK数据包的可变头为2个字节,由连接确认标识和连接返回码组成,如图4-9所示。

图4-9 CONNACK数据包的可变头

·连接确认标识:连接确认标识的前7位都是保留的,必须设为0,最后一位是会话存在标识(Session Present Flag),值为0或1。当Client在连接时设置Clean Session=1,则CONNACK中的Session Present Flag始终为0;当Client在连接时设置Clean Session=0,那就有两种情况——如果Broker保存了这个Client之前留下的持久性会话,那么CONNACK中的Session Present Flag值为1;如果Broker没有保存该Client的任何会话数据,那么CONNACK中的Session Present Flag值为0。

连接返回码(Connect Return Code):用于标识Client与Broker的连接是否建立成功。

连接返回码如表4-1所示。

表4-1 连接返回码

这里重点讲一下Return Code 4和Return Code 5。Return Code 4在MQTT协议中的含义是用户名(Username)或密码(Password)的格式不正确,但是在大部分的Broker实现中,在使用错误的用户名或密码时,得到的返回码也是4。所以,这里我们认为4代表错误的用户名或密码。Return Code 5一般在Broker不使用用户名和密码而使用IP地址或者Client Identifier进行验证的时候使用,用来标识Client没有通过验证。

Return Code 2代表Client Identifier格式不规范,比如长度超过23个字符、包含了不允许的字符等(部分Broker的实现在协议标准上做了扩展,比如允许超过23个字符的Client Identifer等)。

3.消息体

CONNACK数据包没有消息体。

当Client向Broker发送CONNECT数据包并获得Return Code为0的CONNACK包后,就代表连接建立成功,可以发布和订阅消息了。

4.1.3 关闭连接

接下来我们看一下MQTT协议的连接是如何关闭的。MQTT协议的连接关闭可以由Client或Broker二者任意一方发起。

1.Client主动关闭连接

Client主动关闭连接的流程非常简单,只需要向Broker发送一个DISCONNECT数据包就可以了。

DISCONNECT数据包固定头格式如图4-10所示。

图4-10 DISCONNECT数据包的固定头格式

固定头中的MQTT协议数据包类型字段的值为14,代表该数据包为DISCONNECT数据包。DISCONNECT的数据包剩余长度固定为0。

DISCONNECT数据包没有可变头和消息体。

在Client发送完DISCONNECT数据包之后,就可以关闭底层的TCP连接了,不需要等待Broker的回复,Broker也不会回复DISCONNECT数据包。

在这里,读者可能会有一个疑问,为什么需要在关闭TCP连接之前,发送一个与Broker没有交互的DISCONNECT数据包,而不是直接关闭底层的TCP连接?

这里涉及MQTT协议的一个特性,即Broker需要判断Client是否正常地断开连接。当Broker收到Client的DISCONNECT数据包的时候,会认为Client是正常地断开连接,那么它会丢弃当前连接指定的遗愿消息。如果Broker检测到Client的TCP连接丢失,但又没有收到DISCONNECT数据包,它会认为Client是非正常断开连接,就会向在连接的时候指定的遗愿主题发布遗愿消息。

2.Broker主动关闭连接

MQTT协议规定Broker在没有收到Client的DISCONNECT数据包之前都应该保持和Client的连接,只有Broker在Keepalive的时间间隔里,没有收到Client的任何MQTT协议数据包时才会主动关闭连接。一些Broker的实现在MQTT协议上做了一些拓展,支持Client的连接管理,可以主动断开和某个Client的连接。

Broker主动关闭连接之前不需要向Client发送任何MQTT协议数据包,直接关闭底层的TCP连接就可以。

4.1.4 代码实践

接下来,我们将用代码来展示各种情况下MQTT协议连接的建立以及断开。

在这里,我们使用Node.js的MQTT库,请确保已安装Node.js,并通过npm install mqtt--save安装了MQTT库。

这里使用一个公共的Broker:mqtt.eclipse.org。

1.建立持久会话的连接

首先引用MQTT库。


1. var mqtt = require('mqtt')

然后建立连接。


1. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
2.   clientId: "mqtt_sample_id_1",
3.   clean: false
4. })

这里通过clientId选项指定Client Identifier,并通过Clean选项设定Clean Session为false,代表我们要建立一个持久会话的连接。

接下来通过捕获connect事件将CONNACK数据包中的Return Code和Session Present Flag打印出来,然后断开连接。


1. client.on('connect', function (connack) {
2.   console.log('return code: ${connack.returnCode}, sessionPresent: ${connack.
 sessionPresent}')
3.   client.end()

完整的代码persistent_connection.js如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_sample_id_1",
 4.   clean: false
 5. })
 6. 
 7. client.on('connect', function (connack) {
 8.   console.log('return code: ${connack.returnCode}, sessionPresent: ${connack.
 sessionPresent}')
 9.   client.end()
10. })

在终端上运行node persistent_connection.js会得到以下输出。


return code: 0, sessionPresent: false

连接成功,因为是客户端标识符为“mqtt_sample_id_1”的Client第一次建立连接,所以SessionPresent为false。

再次运行node persistent_connection.js,输出就会变成SessionPresent。


return code: 0, sessionPresent: true

因为之前已经创建了一个持久会话,所以这次再使用同样的客户端标识符进行连接,得到的SessionPresent为true,表示会话已经存在了。

2.建立非持久会话的连接

我们只需要将clean选项设为true,就可以建立一个非持久会话的连接了。完整的代码non_persistent_connetion.js如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_sample_id_1",
 4.   clean: true
 5. })
 6. 
 7. client.on('connect', function (connack) {
 8.   console.log('return code: ${connack.returnCode}, sessionPresent: ${connack.
 sessionPresent}')
 9.   client.end()
10. })

第4行代码将Clean Session设为true。

我们在终端上运行node persistent_connection.js会得到以下输出。


return code: 0, sessionPresent: false

无论运行多少次,SessionPresent都会为false。

3.使用相同的客户端标识符进行连接

接下来看一下如果两个Client使用相同的Client Identifier会发生什么事情。我们把代码稍微调整下,在连接成功时保持连接,然后捕获offline事件,在Client的连接被关闭时打印出来。

完整的代码identifcal.js如下所示。


 1. var mqtt = require('mqtt')
 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', {
 3.   clientId: "mqtt_identical_1",
 4. })
 5. 
 6. client.on('connect', function (connack) {
 7.   console.log('return code: ${connack.returnCode}, sessionPresent: ${connack.
 sessionPresent}')
 8. })
 9. 
10. client.on('offline', function () {
11.   console.log("client went offline")
12. })

从第10行代码开始,捕获Client的离线事件,并进行打印。

然后打开两个终端,分别运行node identifcal.js,这样就会看到在两个终端上不停地打印以下内容。


return code: 0, sessionPresent: false
client went offline
return code: 0, sessionPresent: false
client went offline
return code: 0, sessionPresent: false
......

在MQTT协议中,两个Client使用相同的Client Identifier进行连接时,如果第二个Client连接成功,Broker会关闭与第一个Client的连接。

由于我们使用的MQTT库实现了断线重连功能,因此当连接被Broker关闭时,Client会尝试重新连接,结果就是这两个Client交替地把对方顶下线,我们就会看到上面所示的打印输出。因此,在实际应用中,一定要保证每一个设备使用的Client Identifier都是唯一的。

如果你观察到一个Client不停地上线和下线,那么就很有可能是由于Client Identifier冲突造成的。

在本节中,我们学习了MQTT连接关闭的过程,并且学习了连接建立和关闭的相关代码。在4.2节,我们将学习订阅和发布的概念,进而实现消息在Client之间的传输。 Yj1fnpE/58m6KpPGXg9YIezkyCYRrJLCMplcpYy4GIgNrxEF7aoSn9aoWPITsJ2U

点击中间区域
呼出菜单
上一章
目录
下一章
×