Client在可以发布和订阅消息之前,必须先连接到Broker。Client建立连接的流程如图4-1所示。
图4-1 Client建立连接的流程
1)Client向Broker发送一个CONNECT数据包;
2)Broker在收到Client的CONNECT数据包后,如果允许Client接入,则回复一个CONNACK包,该CONNACK包的返回码为0,表示MQTT协议连接建立成功;如果不允许Client接入,也回复一个CONNACK包,该CONNACK包的返回码为一个非0的值,用来标识接入失败的原因,然后断开底层的TCP连接。
CONNECT数据包的格式如下。
CONNECT数据包的固定头格式如图4-2所示。
图4-2 CONNECT数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为1,代表CONNECT数据包。
可变头由4部分组成,依次为:协议名称、协议版本、连接标识和Keepalive。
协议名称是一个UTF-8编码字符串。在MQTT协议数据包中,字符串会有2个字节的前缀,用于标识字符串的长度。
协议名称的值固定为“MQTT”,加上前缀一共6个字节,如图4-3所示。
图4-3 协议名称字段
如果协议名称不正确,Broker会断开与Client的连接。
协议版本长度为1个字节,是一个无符号整数,MQTT 3.1.1的版本号为4,如图4-4所示。
图4-4 协议版本号字段
连接标识长度为1个字节,字节中不同的位用于标识不同的连接选项,如图4-5所示。
图4-5 连接标识字段
每一个标识位的含义如下所示。
·用户名标识(User Name Flag):标识消息体中是否有用户名字段,长度为1bit,值为0或1;
·密码标识(Password Flag):标识消息体中是否有密码字段,长度为1bit,值为0或1;
·遗愿消息Retain标识(Will Retain):标识遗愿消息是否是Retain消息,长度为1bit,值为0或1;
·遗愿消息QoS标识(Will QoS):标识遗愿消息的QoS,长度为2bit,值为0、1或2。
·遗愿标识(Will Flag):标识是否使用遗愿消息,长度为1bit,值为0或1;
·会话清除标识(Clean Session):标识Client是否建立一个持久化的会话,长度为1bit,值为0或1。当Clean Session的标识设为0时,代表Client希望建立一个持久会话的连接,Broker将存储该Client订阅的主题和未接受的消息,否则Broker不会存储这些数据,并在建立连接时清除这个Client之前存在的持久会话所保存的数据。
可变头的最后2个字节代表连接的Keepalive,即连接保活设置,如图4-6所示。
图4-6 连接保活字段
Keepalive代表一个单位为秒的时间间隔,Client和Broker之间在这个时间间隔之内至少要有一次消息交互,否则Client和Broker会认为它们之间的连接已经断开,我们会在4.4节中进行详细讲解。
CONNECT数据包的消息体依次由5个字段组成:客户端标识符、遗愿主题、遗愿QoS、遗愿消息、用户名和密码。除了客户端标识符外,其他4个字段都是可选的,由可变头里对应的连接标识来决定是否包含在消息体中。
这些字段有一个2个字节的前缀,用来标识字段值的长度,如图4-7所示。
图4-7 MQTT协议的变长字段格式
·客户端标识符(Client Identifier):Client Identifier是用来标识Client身份的字段,在MQTT 3.1.1中,这个字段的长度是1~23个字节,而且只能包含数字和26个英文字母(包括大小写),Broker通过这个字段来区分不同的Client。连接时,Client应该保证它的Identifier是唯一的,通常我们可以使用UUID、唯一的设备硬件标识或者Android设备的DEVICE_ID等作为Client Identifier的取值来源。
MQTT协议中要求Client连接时必须带上Client Identifier,但也允许Broker在实现时接受Client Identifier为空的CONNECT数据包,这时Broker会为Client分配一个内部唯一的Identifier。如果你需要使用持久性会话,那就必须自己为Client设定一个唯一的Identifier。
·用户名(Username):如果可变头中的用户名标识为1,那么消息体中将包含用户名字段,Broker可以使用用户名和密码对接入的Client进行验证,只允许已授权的Client接入。注意,不同的Client需要使用不同的Client Identifier,但它们可以使用同样的用户名和密码进行连接。
·密码(Password):如果可变头中的密码标识为1,那么消息体中将包含密码字段。
·遗愿主题(Will Topic):如果可变头中的遗愿标识为1,那么消息体中将包含遗愿主题。当Client非正常地中断连接时,Broker将向指定的遗愿主题发布遗愿消息。
·遗愿消息(Will Message):如果可变头中的遗愿标识为1,那么消息体中将包含遗愿消息。当Client非正常地中断连接时,Broker将向指定的遗愿主题发布由该字段指定的内容。
当Broker收到Client的CONNECT数据包后,将检查并校验CONNECT数据包的内容,然后给Client回复一个CONNACK数据包。
CONNACK数据包的格式如下所示。
CONNACK数据包的固定头格式如图4-8所示。
图4-8 CONNACK的固定头
当固定头中的MQTT数据包的类型字段值为2时,则代表该数据包是CONNACK数据包。CONNACK数据包剩余长度固定为2。
CONNACK数据包的可变头为2个字节,由连接确认标识和连接返回码组成,如图4-9所示。
图4-9 CONNACK数据包的可变头
·连接确认标识:连接确认标识的前7位都是保留的,必须设为0,最后一位是会话存在标识(Session Present Flag),值为0或1。当Client在连接时设置Clean Session=1,则CONNACK中的Session Present Flag始终为0;当Client在连接时设置Clean Session=0,那就有两种情况——如果Broker保存了这个Client之前留下的持久性会话,那么CONNACK中的Session Present Flag值为1;如果Broker没有保存该Client的任何会话数据,那么CONNACK中的Session Present Flag值为0。
连接返回码(Connect Return Code):用于标识Client与Broker的连接是否建立成功。
连接返回码如表4-1所示。
表4-1 连接返回码
这里重点讲一下Return Code 4和Return Code 5。Return Code 4在MQTT协议中的含义是用户名(Username)或密码(Password)的格式不正确,但是在大部分的Broker实现中,在使用错误的用户名或密码时,得到的返回码也是4。所以,这里我们认为4代表错误的用户名或密码。Return Code 5一般在Broker不使用用户名和密码而使用IP地址或者Client Identifier进行验证的时候使用,用来标识Client没有通过验证。
Return Code 2代表Client Identifier格式不规范,比如长度超过23个字符、包含了不允许的字符等(部分Broker的实现在协议标准上做了扩展,比如允许超过23个字符的Client Identifer等)。
CONNACK数据包没有消息体。
当Client向Broker发送CONNECT数据包并获得Return Code为0的CONNACK包后,就代表连接建立成功,可以发布和订阅消息了。
接下来我们看一下MQTT协议的连接是如何关闭的。MQTT协议的连接关闭可以由Client或Broker二者任意一方发起。
Client主动关闭连接的流程非常简单,只需要向Broker发送一个DISCONNECT数据包就可以了。
DISCONNECT数据包固定头格式如图4-10所示。
图4-10 DISCONNECT数据包的固定头格式
固定头中的MQTT协议数据包类型字段的值为14,代表该数据包为DISCONNECT数据包。DISCONNECT的数据包剩余长度固定为0。
DISCONNECT数据包没有可变头和消息体。
在Client发送完DISCONNECT数据包之后,就可以关闭底层的TCP连接了,不需要等待Broker的回复,Broker也不会回复DISCONNECT数据包。
在这里,读者可能会有一个疑问,为什么需要在关闭TCP连接之前,发送一个与Broker没有交互的DISCONNECT数据包,而不是直接关闭底层的TCP连接?
这里涉及MQTT协议的一个特性,即Broker需要判断Client是否正常地断开连接。当Broker收到Client的DISCONNECT数据包的时候,会认为Client是正常地断开连接,那么它会丢弃当前连接指定的遗愿消息。如果Broker检测到Client的TCP连接丢失,但又没有收到DISCONNECT数据包,它会认为Client是非正常断开连接,就会向在连接的时候指定的遗愿主题发布遗愿消息。
MQTT协议规定Broker在没有收到Client的DISCONNECT数据包之前都应该保持和Client的连接,只有Broker在Keepalive的时间间隔里,没有收到Client的任何MQTT协议数据包时才会主动关闭连接。一些Broker的实现在MQTT协议上做了一些拓展,支持Client的连接管理,可以主动断开和某个Client的连接。
Broker主动关闭连接之前不需要向Client发送任何MQTT协议数据包,直接关闭底层的TCP连接就可以。
接下来,我们将用代码来展示各种情况下MQTT协议连接的建立以及断开。
在这里,我们使用Node.js的MQTT库,请确保已安装Node.js,并通过npm install mqtt--save安装了MQTT库。
这里使用一个公共的Broker:mqtt.eclipse.org。
首先引用MQTT库。
1. var mqtt = require('mqtt')
然后建立连接。
1. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 2. clientId: "mqtt_sample_id_1", 3. clean: false 4. })
这里通过clientId选项指定Client Identifier,并通过Clean选项设定Clean Session为false,代表我们要建立一个持久会话的连接。
接下来通过捕获connect事件将CONNACK数据包中的Return Code和Session Present Flag打印出来,然后断开连接。
1. client.on('connect', function (connack) { 2. console.log('return code: ${connack.returnCode}, sessionPresent: ${connack. sessionPresent}') 3. client.end()
完整的代码persistent_connection.js如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_id_1", 4. clean: false 5. }) 6. 7. client.on('connect', function (connack) { 8. console.log('return code: ${connack.returnCode}, sessionPresent: ${connack. sessionPresent}') 9. client.end() 10. })
在终端上运行node persistent_connection.js会得到以下输出。
return code: 0, sessionPresent: false
连接成功,因为是客户端标识符为“mqtt_sample_id_1”的Client第一次建立连接,所以SessionPresent为false。
再次运行node persistent_connection.js,输出就会变成SessionPresent。
return code: 0, sessionPresent: true
因为之前已经创建了一个持久会话,所以这次再使用同样的客户端标识符进行连接,得到的SessionPresent为true,表示会话已经存在了。
我们只需要将clean选项设为true,就可以建立一个非持久会话的连接了。完整的代码non_persistent_connetion.js如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_sample_id_1", 4. clean: true 5. }) 6. 7. client.on('connect', function (connack) { 8. console.log('return code: ${connack.returnCode}, sessionPresent: ${connack. sessionPresent}') 9. client.end() 10. })
第4行代码将Clean Session设为true。
我们在终端上运行node persistent_connection.js会得到以下输出。
return code: 0, sessionPresent: false
无论运行多少次,SessionPresent都会为false。
接下来看一下如果两个Client使用相同的Client Identifier会发生什么事情。我们把代码稍微调整下,在连接成功时保持连接,然后捕获offline事件,在Client的连接被关闭时打印出来。
完整的代码identifcal.js如下所示。
1. var mqtt = require('mqtt') 2. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 3. clientId: "mqtt_identical_1", 4. }) 5. 6. client.on('connect', function (connack) { 7. console.log('return code: ${connack.returnCode}, sessionPresent: ${connack. sessionPresent}') 8. }) 9. 10. client.on('offline', function () { 11. console.log("client went offline") 12. })
从第10行代码开始,捕获Client的离线事件,并进行打印。
然后打开两个终端,分别运行node identifcal.js,这样就会看到在两个终端上不停地打印以下内容。
return code: 0, sessionPresent: false client went offline return code: 0, sessionPresent: false client went offline return code: 0, sessionPresent: false ......
在MQTT协议中,两个Client使用相同的Client Identifier进行连接时,如果第二个Client连接成功,Broker会关闭与第一个Client的连接。
由于我们使用的MQTT库实现了断线重连功能,因此当连接被Broker关闭时,Client会尝试重新连接,结果就是这两个Client交替地把对方顶下线,我们就会看到上面所示的打印输出。因此,在实际应用中,一定要保证每一个设备使用的Client Identifier都是唯一的。
如果你观察到一个Client不停地上线和下线,那么就很有可能是由于Client Identifier冲突造成的。
在本节中,我们学习了MQTT连接关闭的过程,并且学习了连接建立和关闭的相关代码。在4.2节,我们将学习订阅和发布的概念,进而实现消息在Client之间的传输。