普通消息,也叫并发消息,是发送效率最高、使用场景最多的一类消息。发送普通消息的代码如下:
同步发送消息时,根据HashKey将消息发送到指定的分区中,每个分区中的消息都是按照发送顺序保存的,即分区有序。如果 Topic 的分区被设置为 1,这个 Topic 的消息就是全局有序的。注意,顺序消息的发送必须是单线程,多线程将不再有序。顺序消息的消费和普通消息的消费方式不同,后面会详细讲解。
下面来看一下发送顺序消息的实现代码:
生产者发送消息后,消费者在指定时间才能消费消息,这类消息被称为延迟消息或定时消息。生产者发送延迟消息前需要设置延迟级别,目前开源版本支持18个延迟级别:Broker在接收用户发送的消息后,首先将消息保存到名为SCHEDULE_TOPIC_XXXX的Topic中。此时,消费者无法消费该延迟消息。然后,由Broker端的定时投递任务定时投递给消费者。
保存延迟消息的实现逻辑见org.apache.rocketmq.store.schedule.ScheduleMessageService 类。按照配置的延迟级别初始化多个任务,每秒执行一次。如果消息投递满足时间条件,那么将消息投递到原始的Topic中。消费者此时可以消费该延迟消息。
生产者代码中怎么设置延迟级别呢?相关代码如下:
事务消息的发送、消费流程和延迟消息类似,都是先发送到对消费者不可见的 Topic中。当事务消息被生产者提交后,会被二次投递到原始Topic中,此时消费者正常消费。事务消息的发送具体分为以下两个步骤。
第一步:用户发送一个Half消息到 Broker,Broker设置 queueOffset=0,即对消费者不可见。
第二步:用户本地事务处理成功,发送一个 Commit 消息到 Broker,Broker 修改queueOffset为正常值,达到重新投递的目的,此时消费者可以正常消费;如果本地事务处理失败,那么将发送一个Rollback消息给Broker,Broker将删除Half消息,如图2-5所示。
有读者可能会有疑问:如果生产者忘记了提交或回滚,那么 Broker怎么处理 Half消息呢?
Broker会定期回查生产者,确认生产者本地事务的执行状态,再决定是提交、回滚还是删除Half消息。
图2-5
下面介绍如何初始化事务消息生产者,代码如下:
下面是执行本地事务和发送事务消息代码的实现代码:
单向消息的生产者只管发送过程,不管发送结果。单项消息主要用于日志传输等消息允许丢失的场景,常用的发送代码如下:
批量消息发送能提高发送效率,提升系统吞吐量。批量消息发送有以下3点注意事项:
(1)消息最好小于1MB。
(2)同一批批量消息的Topic、waitStoreMsgOK属性必须一致。
(3)批量消息不支持延迟消息。
批量发送实现代码如下: