在生产环境下,特别是物联网这种无人值守的设备比较多的情况下,我们都希望设备能够自动从错误中恢复过来,比如在网络故障恢复以后,设备能够自动重新连接Broker。本节我们就来学习MQTT协议的Keepalive机制,以及连接保活的方式。
在4.4节中,我们提到Broker需要知道Client是否非正常地断开了和它的连接,以发送遗愿消息。实际上,Client也需要能够很快地检测到它和Broker的连接断开,以便重新连接。
MQTT协议是基于TCP协议的一个应用层协议,理论上TCP协议在连接断开时会通知上层应用,但是TCP协议有一个半打开连接的问题(Half-open Connection)。这里不会深入分析TCP协议,需要记住的是,在这种状态下,一端的TCP协议连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,需要很长时间才能感知到对端连接已经断开,这种情况在使用移动网络或者卫星网络的时候尤为常见。
仅仅依赖TCP层的连接状态监测是不够的,于是MQTT协议设计了一套Keepalive机制。回忆一下,在建立连接的时候,我们可以传递一个Keepalive参数,它的单位为秒。MQTT协议约定:在1.5×Keepalive的时间间隔内,如果Broker没有收到来自Client的任何数据包,那么Broker认为它和Client之间的连接已经断开;同样,在这段时间间隔内,如果Client没有收到来自Broker的任何数据包,那么Client也认为它和Broker之间的连接已经断开。
MQTT协议中设计了一对PINGREQ/PINGRESP数据包,当Broker和Client之间没有任何数据包传输时,我们可以通过PINGREQ/PINGRESP数据包满足Keepalive的约定和连接状态的侦测。
当Client在一个Keepalive时间间隔内没有向Broker发送任何数据包,比如PUBLISH数据包和SUBSCRIBE数据包时,它应该向Broker发送PINGREQ数据包,PINGREQ数据包的格式如下所示。
PINGREQ数据包的固定头如图4-39所示。
图4-39 PINGREQ数据包的固定头
固定头中的MQTT协议数据包类型字段的值为12,代表该数据包是PINGREQ数据包。PINGREQ数据包的剩余长度字段值固定为0。
PINGREQ数据包没有可变头。
PINGREQ数据包没有消息体。
当Broker收到来自Client的PINGREQ数据包时,它应该回复Client一个PINGRESP数据包,PINGRESP数据包的格式如下所示。
PINGRESP数据包的固定头格式如图4-40所示。
图4-40 PINGRESP数据包的固定头
固定头中的MQTT协议数据包类型字段的值为13,代表该数据包是PINGRESP数据包。PINGRESP数据包的剩余长度字段值固定为0。
PINGRESP数据包没有可变头。
PINGRESP数据包没有消息体。
Keepalive机制还有以下几点需要注意:
1)如果在一个Keepalive时间间隔内,Client和Broker有过数据包传输,比如PUBLISH数据包,那Client就没有必要再使用PINGREQ数据包了,在网络资源比较紧张的情况下这点很重要;
2)Keepalive的值是由Client指定的,不同的Client可以指定不同的值;
3)Keepalive的最大值为18个小时12分15秒;
4)Keepalive的值如果设为0的话,代表不使用Keepalive机制。
首先我们编写一段简单的Client代码,它会把发送和接收到的MQTT协议数据包的类别打印出来。
完整的代码Keepalive.js如下所示。
1. var mqtt = require('mqtt') 2. var dateTime = require('node-datetime'); 3. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 4. clientId: "mqtt_sample_id_chapter_9", 5. clean: false, 6. Keepalive: 5 7. }) 8. 9. client.on('connect', function () { 10. client.on('packetsend', function (packet) { 11. console.log('${dateTime.create().format('H:M:S')}: send ${packet.cmd}') 12. }) 13. 14. client.on('packetreceive', function (packet) { 15. console.log('${dateTime.create().format('H:M:S')}: receive ${packet.cmd}') 16. }) 17. })
代码第6行把Keepalive的值设为5秒。
运行“node Keepalive.js”,我们会得到以下输出:
19:42:44: send pingreq 19:42:44: receive pingresp 19:42:49: send pingreq 19:42:49: receive pingresp 19:42:54: send pingreq 19:42:54: receive pingresp ......
可以看到,每隔5秒就会有一个PINGREQ/PINGRESP数据包的交互。
然后再编写一段Client代码,这个Client每隔4秒发布一条消息,完整的代码Keepalive_with_publish.js如下所示。
1. var mqtt = require('mqtt') 2. var dateTime = require('node-datetime'); 3. var client = mqtt.connect('mqtt://mqtt.eclipse.org', { 4. clientId: "mqtt_sample_id_chapter_9", 5. clean: false, 6. Keepalive: 5 7. }) 8. 9. client.on('connect', function () { 10. client.on('packetsend', function (packet) { 11. console.log('${dateTime.create().format('H:M:S')}: send ${packet.cmd}') 12. }) 13. 14. client.on('packetreceive', function (packet) { 15. console.log('${dateTime.create().format('H:M:S')}: receive ${packet.cmd}') 16. }) 17. 18. setInterval(function () { 19. client.publish("foo/bar", "test") 20. }, 4 * 1000) 21. })
代码的第6行把Keepalive的值设为5秒。
代码的第18~20行,设置了定时器,每隔4秒做一个publish。
运行“node Keepalive_with_publish.js”,会得到以下输出:
19:54:37: send publish 19:54:41: send publish 19:54:45: send publish ......
正如之前所讲的那样,如果在一个Keepalive的时间间隔内,Client和Broker之间传输过数据包,那么就不会触发PINGREQ/PINGRESP数据包。
Client的连接保活逻辑很简单,在检测到连接断开时再重新进行连接就可以了。大多数语言的MQTT Client都支持这个功能,并默认打开。不过如果是移动设备,比如在Android系统或者iOS系统的智能手机上使用MQTT Client,那情况就有所不同了。通常在移动端使用MQTT协议的时候会碰到一个问题:App被切入后台后,怎样才能保持与MQTT协议的连接并继续接收消息?接下来,我们就通过Android系统和iOS系统分别来讲一下。
在Android系统上,我们可以在一个Service中创建和保持MQTT协议连接,这样即使App被切入后台,这个Service还在运行,MQTT协议的连接还存在,就能接收消息。参考代码如下所示。
1. public class MQTTService extends Service{ 2. ...... 3. @Override 4. public int onStartCommand(Intent intent, int flags, int startId) { 5. ...... 6. mqttClient.connect(...) 7. ...... 8. } 9. ...... 10. }
接收到MQTT消息后,我们可以通过一些方式,比如广播通知App处理这些消息。
iOS系统的连接保活机制与Android系统的不同,在App被切入后台时,你没有办法在后台运行App的任何代码,所以无法通过MQTT协议的连接来获取消息。(当然,iOS系统提供了几种可以后台运行的方式,比如Download、Audio等,但如果你的App假借这些方式运行后台程序,是过不了审核的,所以这里只讨论正常情况)。
在iOS系统中的App切入后台后,正确接收MQTT协议消息的方式是:
1)Publisher发布一条或多条消息;
2)Publisher通过某种渠道(比如HTTP API)告知App的应用服务器,然后服务器通过苹果的APNs向对应的iOS订阅者推送一条消息;
3)用户点击推送,App进入前台;
4)App重新建立和Broker的连接;
5)App收到Publisher刚刚发送的一条或多条消息。
App端的代码如下所示。
1. -(void)application:(UIApplication *)app didReceiveRemoteNotification:(NSDic- tionary *)userInfo { 2. if([app applicationState] == UIApplicationStateInactive) { 3. [mqttClient connect] 4. } 5. }...
实际上,当下国内主流的Android系统都有后台清理功能,App被切入后台后,它的服务,即使是前台服务(Foreground Service)也会很快地被杀掉,除非App被厂商或者用户加入白名单。所以在Android系统上最好还是利用厂商的推送通道,比如华为推送、小米推送等,即在App被切入后台时采用和iOS系统上一样的机制来接收MQTT协议的消息。
本节学习了MQTT协议的Keepalive机制,并了解了如何在移动端保持与MQTT协议的连接。到此为止,MQTT 3.1.1版本的所有特性就已经介绍完了,在4.6节中,我将讲解MQTT 5.0版本的一些新特性。