RocketMQ学习笔记

2020-05-14   171 次阅读


RocketMQ介绍

MQ的应用场景

MQ全称Message Queue,即消息队列,开发中消息队列通常有如下应用场景。
a. 任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间
b. 应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦
c. 日志收集
进行统一业务日志收集,公分析系统进行数据分析,消息队列作为日志数据的中转站

RocketMQ介绍

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力。(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)。并且其内部通过Java语言开发,便于阅读与修改。
http://rocketmq.apache.org/

消息队列技术选型对比

市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,Redis。
本项目选用RocketMQ的一个主要原因如下 :
1、支持事务消息
2、支持延迟消息
3、天然支持集群、负载均衡
4、支持指定次数和时间间隔的失败消息重发
详细的技术选型对比如下:

RabbitMQ:
优点:
1.支持AMQP协议
2.基于erlang语言开发,高并发性能较好
3.工作模式较为灵活
4.支持延迟消息
5.提供较为友好的后台管理页面
6.单机部署,1~2WTPS
缺点:
1.不支持水平扩容
2.不支持事务
3.消息吞吐量三者最差
4.当产生消息堆积,性能下降明显
5.消息重发机制需要手动设置
6.不支持消息重复消费

RocketMQ:
优点:
1.高可用,高吞吐量,海量消息堆积,低延迟性能上,都表现出色
2.api与架构设计更加贴切业务场景
3.支持顺序消息
4.支持事务消息
5.支持消息过滤
6.支持重复消费
7.支持延迟消息
8.支持消息跟踪
9.天然支持集群、负载均衡
10.支持指定次数和时间间隔的失败消息重发
11.单机部署,5~10WTPS
缺点:
1.生态圈相较Kafka有所不如
2.消息吞吐量与消息堆积能力也不如Kafka
3.不支持主从自动切换
4.只支持Java

Kafka:
优点:
1.高可用,高吞吐量,低延迟性能上,都表现出色
2.使用人数多,技术生态圈完善
3.支持顺序消息
4.支持多种客户端
5.支持重复消费
缺点:
1.依赖分区,消费者数量受限于分区数
2.单机消息过多时,性能下降明显
3.不支持事务消息
4.不支持指定次数和时间间隔的失败消息重发

RocketMQ基础

组成结构

RocketMQ组成结构图如下:
1.png
交互过程如下:
1、Broker定时发送自身状态到NameServer
2、Producer请求NameServer获取Broker的地址
3、Producer将消息发送到Broker中的消息列表
4、Consumer订阅Broker中的消息队列,通过拉去消息,或由Broker将消息推送至Consumer

图解:
1、Producer Cluster 消息生产者群
- 负责发送消息,一般由业务系统负责产生消息

2、Consumer Cluster 消费者群

  • 负责消费信息,一般是后台系统负责异步消费
  • 两种消费模式:
    1. Push Consumer,服务端想消费端推送消息
    2. Pull Consumer,消费端向服务定时拉去消息

3、NameServer 名称服务器

  • 集群架构中的组织协调员,相当于注册中心,手机broker的工作情况,不负责消息的处理

4、Broker 消息服务器

  • 是RocketMQ的核心,负责消息的接受,存储,发送等
  • 需要定时发送自身状态到NameServer,默认10秒发送一次,超时2分钟会认为该broker失效

安装RocketMQ

环境要求

  1. 64位JDK 1.8+;
  2. Maven 3.2.x;
  3. 64位操作系统系统,本文档在Windows上安装。

下载

下载地址:http://archive.apache.org/dist/rocketmq/4.5.0/rocketmq-all-4.5.0-bin-release.zip
使用版本:4.5.0
下载后解压到一个没有空格和中文的目录。

启动

1、 配置
开发环境不需要太多的内存,这里调小一点.
Broker默认磁盘空间利用率达到85%就不再接收,这里在开发环境可以提高磁盘空间利用率报警阀值为98%。

调整默认的内存大小参数
cd bin/
vim runserver.cmd
set "JAVA_OPT=%JAVA_OPT% ‐server ‐Xms512m ‐Xmx512m ‐Xmn512m ‐XX:MetaspaceSize=128m ‐
XX:MaxMetaspaceSize=320m"
cd bin/
vim runbroker.cmd
set "JAVA_OPT=%JAVA_OPT% ‐server ‐Drocketmq.broker.diskSpaceWarningLevelRatio=0.98 ‐Xms512m ‐
Xmx512m ‐Xmn512m"

2、启动NameServer
进入cmd命令窗口,执行bin/mqnamesrv.cmd

f:
cd F:\devenv\rocketmq‐all‐4.5.0‐bin‐release\bin
mqnamesrv.cmd

3、启动broker
进入cmd命令窗口,执行bin/mqbroker.cmd -n 127.0.0.1:9876

f:
cd F:\devenv\rocketmq‐all‐4.5.0‐bin‐release\bin
mqbroker.cmd ‐n 127.0.0.1:9876

-n:指定NameServer的地址

4、测试
a、发送消息
进入cmd命令窗口,执行:

set NAMESRV_ADDR=127.0.0.1:9876
cd F:\devenv\rocketmq‐all‐4.5.0‐bin‐release\bin
tools.cmd org.apache.rocketmq.example.quickstart.Producer

b、接受信息
进入cmd命令窗口,执行:

set NAMESRV_ADDR=127.0.0.1:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

5、安装管理端
RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:https://github.com/apache/rocketmqexternals/tree/master/rocketmq-console

a、下载源代码
见资料文件夹rocketmq-console.zip
解压rocketmq-console.zip
b、修改配置
修改rocketmq-console\src\main\resources\application.properties
server.port=9877 // 服务端口号
rocketmq.config.namesrvAddr=127.0.0.1:9876 // 名称服务地址
rocketmq.config.dataPath=/tmp/rocketmq-console/data // mq数据路径
c、打包
进入rocketmq-console目录下
cmd下执行:

mvn clean package ‐Dmaven.test.skip=true

d、启动 进入/rocketmq-console/target
java -jar rocketmq-console-ng-1.0.1.jar --server.port=9877 --rocketmq.config.namesrvAddr=127.0.0.1:9876
e、访问 http://127.0.0.1:9877

快速入门

三种消息发送方式

RocketMQ支持三种消息发送方式:
a、同步消息(sync message)
    produce向broker发送消息,执行API时同步等待,知道broker服务器返回发送结果
b、异步消息(async message)
     producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
c、单向消息(oneway message)
    producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

消息结构

RocketMQ的消息包括基础属性和扩展属性两部分:
a、基础属性
1)topic : 主题相当于消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中,比方说一个电商
系统可以分为商品消息、订单消息、物流消息等,就可以在broker中创建商品主题、订单主题等,所有商品的消息
发送至该主题下的消息队列中。
2)消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M。
3) 消息 Flag:消息的一个标记,RocketMQ不处理,留给业务系统使用。
b、扩展属性
1)tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空 。
2)keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息, 可为空 。 3)waitStoreMsgOK :消息
发送时是否等消息存储完成后再返回 。
Message 的基础属性主要包括消息所属主题 topic , 消息 Flag(RocketMQ 不做处理)、 扩展属性、消息体 。

生产者工程

1、创建生产者工程
工程结构如下:
1.png

2、创建rocketmq-java
创建rocketmq-java 父工程,pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rocketmq-java</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-java</name>
    <description>Demo project for Spring Boot</description>
    <packaging>pom</packaging>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3、创建rocketmq-provider 生产者工程
pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>rocketmq-java</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rocketmq-provider</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-provider</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4、新建rocketmq-provider 工程 application.yml文件:

server:
  port: 8010
  servlet:
    context-path: /rocketmq-provider
spring:
  application:
    name: rocketmq-provider
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: demo-producer-group

5、发送同步信息

@Component
public class ProducerSimple {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步信息
     * @param topic
     * @param msg
     */
    public void sendSyncMsg(String topic, String msg){
        rocketMQTemplate.syncSend(topic, msg);
    }

}

6、编写测试类


@SpringBootTest
class RocketmqProviderApplicationTests {

    @Autowired
    private ProducerSimple producerSimple;
    @Test
    void testSendSyncMsg() {
        producerSimple.sendSyncMsg("my-topic","第一条同步数据");
    }
}

7、启动NameServer、Broker、管理端
8、执行testSendSyncMsg方法
9、观察控制台和管理端
控制台出现end... 表示消息发送成功。
进入管理端,查询消息。
1.png

#####消费者工程
1、创建消费者工程
新建工程 rocketmq-consumer , pom如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>rocketmq-java</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rocketmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、配置application.yml文件

server:
  port: 8899
  servlet:
    context-path: /rabbitmq-consumer

spring:
  application:
    name: rabbitmq-consumer
rocketmq:
  name-server: 127.0.0.1:9876

3、消费消息
编写消费监听类:

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "demo-consumer-group")
public class ConsumerSimple implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("==========接受对象是字符串对象==========");
        System.out.println("接受的RocketMQ信息:  " + s);
    }
}

监听消息队列 需要指定:
topic:监听的主题
consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群。

消息发送过程

通过快速入门对消息的发送和接收有一个粗略的认识,下边分析具体的消息发送过程,如下图:
1.png

消息发送流程如下:
1、Producer从NameServer中获取主题路由信息
Broker将自己的状态上报给NameServer,NameServer中存储了每个Broker及主题、消息队列的信息。
Producer根据 topic从NameServer查询所有消息队列,查询到的结果例如:

[
{"brokerName":"Broker‐1","queueId":0},
{"brokerName":"Broker‐1","queueId":1},
{"brokerName":"Broker‐2","queueId":0},
{"brokerName":"Broker‐2","queueId":1}
]

Producer按选择算法从以上队列中选择一个进行消息发送,如果发送消息失败则在下次选择的时候 会规避掉失败
的broker。
2、构建消息,发送消息
发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等(topic、消息
体,生产组等)。
如果该topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列 。
为什么要多个队列 ?
1)高可用
当某个队列不可用时其它队列顶上。
2)提高并发
发送消息是选择队列进行发送,提高发送消息的并发能力。
消息消费时每个消费者可以监听多个队列,提高消费消息的并发能力。
生产组有什么用?
在事务消息中broker需要回查producer,同一个生产组的producer组成一个集群,提高并发能力。
3、监听队列,消费消息
一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。
一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。
消费组有两种消费模式:
1)集群模式
一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。
2)广播模式
主题下的一条消息能被消费组下的所有消费者消费。
消费者和broker之间通过推模式和拉模式接收消息,推模式即broker推送给消费者,拉模式是消费者主动从broker
查询消息。

三种消息发送方式

RocketMQ 支持 3 种消息发送方式 ,即同步消息(sync message ) 、异步消息(async message)、单向消息(oneway message) 。

同步消息

参见快速入门的测试程序。

异步消息

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消
息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
在ProducerSimple中编写发送异步消息的方法

/**
     * 异步发送信息
     * @param topic
     * @param msg
     */
    public void sendAsyncMsg(String topic, String msg){
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                //成功调用
                System.out.println("发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                //失败调用
                System.out.println("发送失败");
            }
        });
    }

测试:

 @Test
    void testSendAsyncMsg() throws InterruptedException {
        producerSimple.sendAsyncMsg("my-topic","第一条异步数据");
        Thread.sleep(3000);
    }

单向消息

producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

 /**
     * 发送单向消息
     * @param topic
     * @param msg
     */
    public void  sendOneWayMsg(String topic, String msg){
        rocketMQTemplate.sendOneWay(topic, msg);
    }

自定义消息格式

前边我们发送的消息内容格式都是字符串,在生产开发中消息内容格式是复杂的,本小节介绍如何对消息格式进行自定义。
JSON是互联网开发中非常常用的数据格式,它具有格式标准,扩展方便的特点,将消息的格式使用JSON进行定义
可以提高消息内容的扩展性,RocketMQ支持传递JSON数据格式。
在生产端和消费端定义模型类:

@Data
@NoArgsConstructor
@ToString
public class OrderExt implements Serializable {

    private String id;
    private Date createTime;
    private Long money;
    private String title;

}

生产端:

    /**
     * 消息体为对象(自定义传送的消息体)
     * @param topic
     * @param orderExt
     */
    public void sendMsgByJson(String topic, OrderExt orderExt){
        //同步消息,将Object转为json
        rocketMQTemplate.convertAndSend(topic, orderExt);
        System.out.println("send msg:  " + orderExt.toString());
    }

消费端:

    @Test
    void testSendObj(){
        OrderExt orderExt = new OrderExt();
        orderExt.setId("11111");
        orderExt.setMoney(122L);
        orderExt.setTitle("测试对象");
        orderExt.setCreateTime(new Date());
        producerSimple.sendMsgByJson("my-topic-obj", orderExt);
    }

上例实现了RocketMQ传输JSON消息的过程,消费端在接收到JSON手动将JSON转成对象,也可以自动转换成对
象,代码如下:
定义新的监听类,RocketMQListener泛型指定要转换的对象类型。

@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt> {

    /**
     * 接受到信息调用此方法
     * @param orderExt
     */
    @Override
    public void onMessage(OrderExt orderExt) {
        System.out.println("============接受对象信息============");
        System.out.println(orderExt.toString());
    }
}

延迟消息

1、介绍
延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。要实现该功能的话,可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,判断该订单的支付状态,如果处于未支付状态,则将该订单关闭。
RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可。

2、同步消息延迟
生产端:

    /**
     * 同步延迟
     * @param topic
     * @param orderExt
     */
    public void sendMsgByJsonDelay(String topic, OrderExt orderExt){
        //发送同步消息,消息内容将OrderExt转为json(这里Message使用的是SPring的Message)
        Message<OrderExt> messageBuilder = MessageBuilder.withPayload(orderExt).build();
        //指定发送超时时间和延迟等级
        rocketMQTemplate.syncSend(topic, messageBuilder, 2000, 3);

        System.out.println("send msg:  " + orderExt.toString());
    }

消费端:
同自定义消息格式章节。
测试:


    /**
     * 发送延迟信息
     */
    @Test
    void sendMsgByJsonDelay(){
        OrderExt orderExt = new OrderExt();
        orderExt.setId(UUID.randomUUID().toString());
        orderExt.setMoney(122L);
        orderExt.setTitle("测试对象");
        orderExt.setCreateTime(new Date());
        producerSimple.sendMsgByJsonDelay("my-topic-obj", orderExt);
        System.out.println("end.........");
    }

3、异步消息延迟
生产端:

 /**
     * 异步延迟
     */
    public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, RemotingException, MQClientException, InterruptedException {
        //异步延迟发送数据,其中使用的Message使用的是RocketMQ自带的消息体
        String json = rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt);
        org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topic, json.getBytes(Charset.forName("utf-8")));
        //设置延迟等级
        message.setDelayTimeLevel(3);

        //发送异步消息
        rocketMQTemplate.getProducer().send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步延迟发送消息成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步延迟发送消息失败");
            }
        });

        System.out.println("send msg:  " + orderExt.toString());
    }

消费端:
同自定义消息格式章节。

测试

 @Test
    void sendAsyncMsgByJsonDelay() throws InterruptedException, RemotingException, MQClientException, JsonProcessingException {
        OrderExt orderExt = new OrderExt();
        orderExt.setId(UUID.randomUUID().toString());
        orderExt.setMoney(122L);
        orderExt.setTitle("测试对象");
        orderExt.setCreateTime(new Date());
        producerSimple.sendAsyncMsgByJsonDelay("my-topic-obj", orderExt);
        Thread.sleep(3000);
        System.out.println("end.........");
    }

消费重试

1、什么是消费重试
当消息发送到Broker成功,在被消费者消费时如果消费者没有正常消费,此时消息会重试消费。消费重试存在两种场景:
1)消息没有被消费者接收,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。
2)当消息已经被消费者成功接收,但是在进行消息处理时出现异常,消费端无法向Broker返回成功,这种情况下RocketMQ会不断重试。本小节重点讨论第二个场景。
针对第二种消费重试的场景,borker是怎么知道重试呢?
消费者在消费消息成功会向broker返回成功状态,否则会不断进行消费重试。

2、处理策略
当消息在消费时出现异常,此时消息被不断重试消费。RocketMQ会一直重试消费吗?
答案是不会!
消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第3级开始重试,每试一次如果还不成功则延迟等级加1。比如:一条消息消费失败,等待10s(第3级)进行重试,如果还没有被成功消费则延迟等级加1,即按第4级别延迟等待,等30s继续进行重试,如此进行下去,直到重试16次。
当重试了16次还未被成功消费将会投递到死信队列,到达死信队列的消息将不再被消费。
实际生产中的处理策略是什么呢?
实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处
理。
项目使用的Spring整合RocketMQ的方式,消费者实现RocketMQListener的onMessage方法,在此方法中实现处
理策略的示例代码如下:

public class ConsumerSimple implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
//取出当前重试次数
int reconsumeTimes = messageExt.getReconsumeTimes();
//当大于一定的次数后将消息写入数据库,由单独的程序或人工去处理
if(reconsumeTimes >=2){
//将消息写入数据库,之后正常返回
return ;
}
throw new RuntimeException(String.format("第%s次处理失败..",reconsumeTimes));
}
}

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

一个萌新程序猿