购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

3.2 消费者启动机制

RocketMQ客户端中有两个独立的消费者实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumer和org.apache.rocketmq.client.consumer.DefaultMQPushConsumer。下面将分别进行介绍。

1.DefaultMQPullConsumer

该消费者使用时需要用户主动从 Broker 中 Pull 消息和消费消息,提交消费位点。DefaultMQPullConsumer的类图继承关系如图3-6所示。

图3-6

可以看到,DefaultMQPullConsumer 实现时包含消费者的操作和属性配置,这是一个典型的类对象设计。下面我们介绍一些核心属性和方法。

以下是一些核心属性:

namesrvAddr: 继承自 ClientConfig,表示 RocketMQ 集群的 Namesrv 地址,如果是多个,则用分号分开。比如:127.0.0.1:9876;127.0.0.2:9876。

clientIP: 使用客户端的程序所在机器的IP地址,目前支持IPV4和IPV6,同时排除了本地环回地址(127.0.xxx.xxx)和私有内网地址(192.168.xxx.xxx)。如果在 Docker 中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址。

instanceName: 实例名,顾名思义每个实例都需要取不一样的名字。假如要在同一个机器上部署多个程序进程,那么每个进程的实例名都必须不相同,否则程序会启动失败。

vipChannelEnabled: 这是一个 boolean 值,表示是否开启 VIP 通道。VIP 通道和非VIP通道的区别是使用不同的端口号进行通信。

clientCallbackExecutorThreads: 客户端回调线程数。该线程数等于 Netty 通信层回调线程的个数,默认值为Runtime.getRuntime().availableProcessors(),表示当前有效的CPU个数。

pollNameServerInterval :获取Topic路由信息间隔,单位为ms,默认为30 000ms。

heartbeatBrokerInterval: 客户端和Broker心跳间隔,单位为ms,默认为30 000ms。

persistConsumerOffsetInterval :持久化消费位点时间间隔,单位为 ms,默认为5000ms。

defaultMQPullConsumerImpl: 默认Pull消费者的具体实现。

consumerGroup: 消费者组名字。

brokerSuspendMaxTimeMillis :在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值。

consumerTimeoutMillisWhenSuspend: 在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMillis大,不建议修改。

consumerPullTimeoutMillis: 消费者Pull消息时Socket的超时时间。

messageModel: 消费模式,现在支持集群模式消费和广播模式消费。

messageQueueListener :消息路由信息变化时回调处理监听器,一般在重新平衡时会被调用。

offsetStore: 位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中,位点存储模块有两个实现类:RemoteBrokerOffsetStore 和LocalFileOffsetStore。

allocateMessageQueueStrategy: 消费Queue分配策略管理器。

maxReconsumeTimes: 最大重试次数,可以配置。

下面介绍一些核心方法。由于生产者和消费者都继承了MQAdmin接口,所以管理相关的接口都是一样的,不再赘述。

registerMessageQueueListener(): 注册队列变化监听器,当队列发生变化时会被监听到。

pull(): 从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取。

pullBlockIfNotFound(): 长轮询方式拉取。如果没有拉取到消息,那么Broker会将请求Hold住一段时间。

updateConsumeOffset(final MessageQueue mq,final long offset): 更新某一个Queue的消费位点。

fetchConsumeOffset(final MessageQueue mq,final boolean fromStore): 查找某个Queue的消费位点。

sendMessageBack(MessageExt msg,int delayLevel,String brokerName,String consumerGroup): 如果消费发送失败,则可以将消息重新发回给 Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)。

fetchSubscribeMessageQueues(final String topic): 获取一个Topic的全部Queue信息。

2.DefaultMQPushConsumer

DefaultMQPushConsumer的大部分属性、方法和 DefaultMQPullConsumer是一样的。下面介绍一下DefaultMQPushConsumer的核心属性和方法。

defaultMQPushConsumerImpl: 默认的Push消费者具体实现类。

consumeFromWhere :一个枚举,表示从什么位点开始消费。

(1)CONSUME_FROM_LAST_OFFSET:从上次消费的位点开始消费,相当于断点继续。

(2) CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:RoketMQ 4.2.0不支持,处理同CONSUME_FROM_LAST_OFFSET。

(3) CONSUME_FROM_MIN_OFFSET:RoketMQ 4.2.0 不支持,处理同CONSUME_FROM_LAST_OFFSET。

(4) CONSUME_FROM_MAX_OFFSET:RoketMQ 4.2.0 不支持,处理同CONSUME_FROM_LAST_OFFSET。

(5)CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费。

(6)CONSUME_FROM_TIMESTAMP:从指定时间开始消费。

consumeTimestamp: 表示从哪一时刻开始消费,格式为 yyyyMMDDHHmmss,默认为半小时前。当 consumeFromWhere=CONSUME_FROM_TIMESTAMP 时,consumeTimestamp设置的值才生效。

allocateMessageQueueStrategy: 消费者订阅topic-queue策略。

subscription: 订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag。

messageListener: 消息Push回调监听器。

consumeThreadMin: 最小消费线程数,必须小于consumeThreadMax。

consumeThreadMax :最大线程数,必须大于consumeThreadMin。

adjustThreadPoolNumsThreshold: 动态调整消费线程池的线程数大小,开源版本不支持该功能。

consumeConcurrentlyMaxSpan :并发消息的最大位点差。如果 Pull消息的位点差超过该值,拉取变慢。

pullThresholdForQueue :一个 Queue 能缓存的最大消息数。超过该值则采取拉取流控措施。

pullThresholdSizeForQueue :一个Queue最大能缓存的消息字节数,单位是MB。

pullThresholdForTopic :一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施。该字段默认值是-1,该值根据 pullThresholdForQueue 的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue。

pullThresholdSizeForTopic :一个Topic最大能缓存的消息字节数,单位是MB。默认为-1,结合 pullThresholdSizeForQueue 配置项生效,该配置项的优先级低于pullThresholdSizeForQueue。

pullInterval :拉取间隔,单位为ms。

consumeMessageBatchMaxSize: 消费者每次批量消费时,最多消费多少条消息。

pullBatchSize :一次最多拉取多少条消息。

postSubscriptionWhenPull :每次拉取消息时是否更新订阅关系,该方法的返回值默认为False。

maxReconsumeTimes :最大重试次数,该函数返回值默认为-1,表示默认最大重试次数为16。

suspendCurrentQueueTimeMillis: 为短轮询场景设置的挂起时间,比如顺序消息场景。

consumeTimeout :消费超时时间,单位为min,默认值为15min。

上面主要讲了RocketMQ默认的两种消费者的核心属性和方法,下面来看一下它们是如何启动的。

DefaultMQPullConsumer的启动流程如图3-7所示。

业务代码通常使用构造函数初始化一个DefaultMQPullConsumer实例,设置各种参数,比如Namesrv地址、消费者组名等。然后调用start()方法启动defaultMQPullConsumerImpl实例。我们这里主要讲 defaultMQPullConsumerImpl.start()方法中的启动过程,具体步骤如下:

第一步:最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JUST,然后设置消费者的默认启动状态为失败。

图3-7

第二步:检查消费者的配置比,如消费者组名、消费类型、Queue分配策略等参数是否符合规范;将订阅关系数据发给Rebalance服务对象。

第三步:校验消费者实例名,如果是默认的名字,则更改为当前的程序进程id。

第四步:获取一个 MQClientInstance,如果 MQClientInstance 已经初始化,则直接返回已初始化的实例。这是核心对象,每个clientId缓存一个实例。

第五步:设置Rebalance对象消费者组、消费类型、Queue分配策略、MQClientInstance等参数。

第六步:对 Broker API 的封装类 pullAPIWrapper进行初始化,同时注册消息,过滤filter。

第七步:初始化位点管理器,并加载位点信息。位点管理器分为本地管理和远程管理两种,集群消费时消费位点保存在 Broker 中,由远程管理器管理;广播消费时位点存储在本地,由本地管理器管理。

第八步:本地注册消费者实例,如果注册成功,则表示消费者启动成功。

第九步:启动MQClientInstance实例。具体启动过程见2.2节。

DefaultMQPushConsumer的启动过程如图3-8所示。

DefaultMQPushConsumer的启动过程与DefaultMQPullConsumer的启动过程类似,用户也是通过构造函数初始化,依次调用 DefaultMQPushConsumer 的 start 方法和其内部实现类DefaultMQPushConsumerImpl的start()方法,开启整个启动过程的。

DefaultMQPushConsumer 的启动过程分为11个步骤,前7个步骤与DefaultMQPullConsumer的步骤类似,不再赘述。

第八步:初始化消费服务并启动。之所以用户“感觉”消息是 Broker 主动推送给自己的,是因为DefaultMQPushConsumer通过Pull服务将消息拉取到本地,再通过Callback的 形 式,将本地消息Push给用户的消费代码。DefaultMQPushConsumer 与DefaultMQPullConsumer获取消息的方式一样,本质上都是拉取。

图3-8

消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService。DefaultMQPushConsumer根据用户监听器继承的不同接口初始化不同的消费服务程序,具体的实现代码如下:

第九步:启动MQClientInstance实例。具体启动过程见2.1.3节。

第十步:更新本地订阅关系和路由信息;通过 Broker 检查是否支持消费者的过滤类型;向集群中的所有Broker发送消费者组的心跳信息。

第十一步:立即执行一次Rebalance,Rebalance过程我们在下一节中详细讲解。 cKTc6EYocjZ7narpM5gGh1+hKuVDVPi6/mIcdZVYmEOKYL0b5XGyPXG0Nlj2RRAw

点击中间区域
呼出菜单
上一章
目录
下一章
×