RocketMQ的使用

RocketMQ的使用

引言

讲解一下RocketMQ的使用。

消费方式-集群和广播

/**

  • 生产者

*/

public class SenderTest {

//演示消息同步发送

public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

//生产者

DefaultMQProducer producer = new DefaultMQProducer("hello-producerGroup");

//设置name server地址

producer.setNamesrvAddr("127.0.0.1:9876");

//设置队列数量为2,有4个,根据情况设置

producer.setDefaultTopicQueueNums(2);

//启动

producer.start();

for (int i = 0 ; i < 16 ; i++){

Message message = new Message();

//消息主题

message.setTopic("hello-topic");

//消息标签

message.setTags("sms");

//添加内容

message.setBody(("我是消息").getBytes(CharsetUtil.UTF_8));

//执行发送

SendResult result = producer.send(message);

//打印结果

System.out.println(result);

// }

producer.shutdown();

}

}

//消息发送者

public class ConsumerTest {

public static void main(String[] args) {

try {

// 实例化消息生产者Producer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer

("hello-consumergroup");

// 设置NameServer的地址

consumer.setNamesrvAddr("127.0.0.1:9876");

//设置消费模式

//consumer.setMessageModel(MessageModel.CLUSTERING); //默认是集群

consumer.setMessageModel(MessageModel.BROADCASTING); //默认是集群

//从最开始的位置开始消费

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息

//和发送者保持一致才能搜到消息

consumer.subscribe("hello-topic", "sms");

// 注册回调实现类来处理从broker拉取回来的消息

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyContext context) {

msgs.forEach(message->{

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

});

//System.out.printf("%s 成功搜到消息: %s %n", Thread.currentThread().getName(), msgs);

// 标记该消息已经被成功消费 ack机制

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

// 启动Producer实例

consumer.start();

}catch (Exception e){

e.printStackTrace();

}

}

}

运行两个consumer,分别测试集群和广播消费模式

普通消息(重要)

同步发送

同步消息是发送者发送消息,需要等待结果的返回,才能继续发送第二条消息,这是一种阻塞式模型,虽然消息可靠性高,但是阻塞导致性能低。API : SendResult result = producer.send(message); 代码示例:

发送者代码

public class Producer {

//演示消息同步发送

public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

//生产者

DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup");

//设置name server地址

producer.setNamesrvAddr("127.0.0.1:9876");

//设置队列数量为2,默认为4,根据情况设置

producer.setDefaultTopicQueueNums(2);

//启动

producer.start();

for (int i = 0 ; i < 16 ; i++){

Message message = new Message();

//消息主题

message.setTopic("syn-topic");

//消息标签

message.setTags("sms");

//添加内容

message.setBody((i+"我是消息").getBytes(CharsetUtil.UTF_8));

//执行发送

SendResult result = producer.send(message);

//打印结果

System.out.println(result);

}

producer.shutdown();

} }

同步发送使用 SendResult result = producer.send(message); 方法即可,马上可以拿到返回值。SendResult 结果如下

SendResult [

sendStatus=SEND_OK, msgId=C0A8006516B018B4AAC270EF9D940000,offsetMsgId=C0A8006500002A9F0000000000008E1C,

messageQueue=MessageQueue [topic=syn-topic, brokerName=LAPTOP-20VLGCRC, queueId=3], queueOffset=0]

————————————————

SendStatus : 状态OK

msgId: 发送者生成的ID

OffsetMsgId : 由Broker生成的消息ID

MessageQueue :队列信息

异步发送

异步消息是发送者发送消息,无需等待发送结果就可以再发送第二条消息,它是通过回调的方式来获取到消息的发送结果,消息可靠性高,性能也高。API : producer.send(message,SendCallback) 示例代码:

. . . 省略. . .

producer.send(

//创建消息对象

new Message("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8)),

//添加发送回调

new SendCallback() {

//发送成功结果处理

@Override

public void onSuccess(SendResult sendResult) {

System.out.println(sendResult);

}

//发送异常结果处理

@Override

public void onException(Throwable throwable) {

System.out.println("发送异常:"+throwable.getMessage());

}

}

);

SendCallback 是消息发送结果回调。如果:sendResult.getSendStatus() == SendStatus.SEND_OK 表示成功

单向发送

这种方式指的是发送者发送消息后无需等待Broker的结果返回,Broker也不会返回结果,该方式性能最高,但是消息可靠性低。API : producer.sendOneway(message) 示例代码:

... 省略...Message message = new Message("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8));
producer.sendOneway(message);

sendOneway 单向发送是没有返回结果值的。

总结

下面对三种发送方式做一个对比

可靠性最高: 同步发送 > 异步发送 > 单向发送

性能最高:单向发送 > 异步发送 > 同步发送

使用场景建议如下

如果是比较重要的不可丢失的消息,且对时效性要去不高建议使用同步发送,如转账消息

如果是不重要的可失败的消息,比如日志消息,建议使用单向发送

如果对时效性要求比较高,且消息不能丢失,可以尝试使用异步发送

顺序消息

在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。

按照发送的顺序进行消费就是顺序消息,遵循(FIFO), 默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。

全局有序

全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。

1.发送者

可以通过代码指定创建1个队列即可

producer.setDefaultTopicQueueNums(1);

2.消费者

使用一个线程,一次只拉取一个消息 , 使用 MessageListenerOrderly 有序的消费消息。

//最大线程1个

defaultMQPushConsumer.setConsumeThreadMax(1);

defaultMQPushConsumer.setConsumeThreadMin(1);

//同时只拉取一个消息

defaultMQPushConsumer.setPullBatchSize(1);

...省略...

defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {

@Override

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {

list.forEach(message->{

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

});

return ConsumeOrderlyStatus.SUCCESS;

}

});

...省略...

部分有序

还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。

比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:

把同一个订单ID的消息放入同一个MessageQueue

保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列

1.生产者

//演示消息同步发送

public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

//生产者

DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup");

//设置name server地址

producer.setNamesrvAddr("127.0.0.1:9876");

//发送消息超时时间

producer.setSendMsgTimeout(1000);

//启动

producer.start();

for (long i = 0 ; i < 4 ; i++){

Order order = new Order(i,"订单"+i,"创建");

//添加内容

byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);

Message message = new Message("topic-order","product-order",bytes);

message.setKeys("key-"+i);

//执行发送

SendResult result = producer.send(message, new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Long id = (Long) arg;

//使用取模算法确定id存放到哪个队列

int index =(int) (id % mqs.size());

//index就是要存放的队列的索引

return mqs.get(index);

}

//把订单ID作为参数,作为选择器的基础数据

},order.getId());

System.out.println(result);

//====================================================================

order.setStatus("支付");

//添加内容

bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);

message = new Message("topic-order","product-order",bytes);

message.setKeys("key-"+i);

//执行发送

result = producer.send(message,new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Long id = (Long) arg;

//使用取模算法确定id存放到哪个队列

int index =(int) (id % mqs.size());

//index就是要存放的队列的索引

return mqs.get(index);

}

},order.getId());

System.out.println(result);

//====================================================================

order.setStatus("发货");

//添加内容

bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);

message = new Message("topic-order","product-order",bytes);

message.setKeys("key-"+i);

//执行发送

result = producer.send(message,new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Long id = (Long) arg;

//使用取模算法确定id存放到哪个队列

int index =(int) (id % mqs.size());

//index就是要存放的队列的索引

return mqs.get(index);

}

},order.getId());

System.out.println(result);

//打印结果

}

producer.shutdown();

}

2.消费者

public class Consumer {

public static void main(String[] args) throws MQClientException {

//创建消费者

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syn-consumerGroup");

//设置name server 地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

//从开始位置消费

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//订阅

defaultMQPushConsumer.subscribe("topic-order","product-order");

defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {

@Override

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {

list.forEach(message->{

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

});

return ConsumeOrderlyStatus.SUCCESS;

}

});

defaultMQPushConsumer.start();

}

}

延迟消息

我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。

延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d 这个时候总共就有19个level。

RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走下面的流程

修改消息Topic的名字为SCHEDULE_TOPIC_XXXX

根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId

Commit Log Offset:记录在CommitLog中的位置

Size:记录消息的大小

Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。

将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。

延迟消息实战

只需要一处改动,发送者通过 message.setDelayTimeLevel(3); 设置延迟级别即可

1.消息发送者

public class Producer {

//演示消息同步发送

public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

//生产者

DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup-delay");

//设置name server地址

producer.setNamesrvAddr("127.0.0.1:9876");

//启动

producer.start();

for (long i = 0 ; i < 4 ; i++){

Order order = new Order(i,"订单"+i,"创建");

//添加内容

byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);

Message message = new Message("topic-order-delay","product-order-delay",bytes);

//延迟级别 3,代表 10s延迟

message.setDelayTimeLevel(3);

message.setKeys("key-"+i);

//执行发送

SendResult result = producer.send(message);

System.out.println("发送时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

System.out.println(result);

}

producer.shutdown();

}

}

2.消息消费者

public class Consumer {

public static void main(String[] args) throws MQClientException {

//创建消费者

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syn-consumerGroup-delay");

//设置name server 地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

//从开始位置消费

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//订阅

defaultMQPushConsumer.subscribe("topic-order-delay","product-order-delay");

defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

list.forEach(message->{

System.out.println("消费时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

});

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

defaultMQPushConsumer.start();

}

}

事务消息

事务消息概述

如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。

分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案,下面举个例子。

用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:

我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择2PC强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型

事务流程

用户服务(事务发起方)往MQ中发送一个事务消息,

MQ返回结果是否发送成功

用户服务受到消息发送成功结果,保存用户数据,提交本地事务

积分服务拿到MQ中的事务消息

积分服务保存积分到数据库

事务消息原理

事务流程中的最大的难点就是如何保证事务消息发送和本地事务的原子性,即:第一步和第二步要么都成功,要么都失败,不能说消息发送成功了,结果用户保存失败了,那么积分服务可能会增加成功,就导致数据不一致。RocketMQ已经帮我们处理好这个问题。它的工作原理如下:

事务发起方,即用户服务会先向broker发送一个prepare“半事务消息”(一个并不完整的消息)到RMQ_SYS_TRANS_HALF_TOPIC的queue中, 该消息对消费者不可见。

MQ会返回一个ACK确认消息发送成功或者失败

消息发送成功,用户服务执行保存用户操作,提交本地事务,并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚

本地事务提交成功,事务发起方即用户服务会向broker再次发起“结束半事务”消息请求,commit或者rollback指令

broker端收到请求后,首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息,设置为完成状态。如果消息状态为提交,则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了);如果消息状态为回滚,则什么也不做。

Producer发送的半消息结束请求是oneway的,也就是发送后就不管了,只靠这个是无法保证半消息一定被提交的(比如未执行第4步),rocketMq提供了一个兜底方案,这个方案叫消息反查机制,Broker启动时,会启动一个TransactionalMessageCheckService任务,该任务会定时从半消息队列中读出所有超时未完成的半消息,针对每条未完成的消息,Broker会给对应的Producer发送一个消息反查请求,根据反查结果来决定这个半消息是需要提交还是回滚,或者后面继续来反查

consumer(本例中指积分系统)消费消息,执行本地数据变更,提交本地事务

事务消息实战

我们需要做什么

编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态

发消息时生产者需要设置producer.setTransactionListener 事务监听

1.事务监听器

public class MyTransactionCheckListener implements TransactionListener {

@Override

public LocalTransactionState executeLocalTransaction(Message message, Object o) {

//执行业务,保存本地事务

//保存成功

return LocalTransactionState.ROLLBACK_MESSAGE;

}

@Override

public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

//这里查询本地事务状态

return LocalTransactionState.COMMIT_MESSAGE;

}

}

2.消息生产者

public class TransationSender {

public static void main(String[] args) throws MQClientException {

TransactionMQProducer producer = new TransactionMQProducer("tran-product-group");

producer.setNamesrvAddr("127.0.0.1:9876");

//线程池

ExecutorService excutorService = Executors.newFixedThreadPool(20);

producer.setExecutorService(excutorService);

producer.setTransactionListener(new MyTransactionCheckListener());

//设置事务消息监听

producer.start();

for(int i = 0 ; i < 10 ; i++){

String orderId = UUID.randomUUID().toString();

String tags = "Tag";

Message message = new Message("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));

TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);

System.out.println(transactionSendResult);

}

producer.shutdown();

}

}

3.消息消费者

正常消费

public class TransationConsumer {

public static void main(String[] args) throws MQClientException {

//创建消费者

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("trans-consumer-group");

//设置name server 地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

//订阅

defaultMQPushConsumer.subscribe("topic-tran", "tag");

defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

list.forEach(message->{

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

});

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

defaultMQPushConsumer.start();

}

}

消息过滤

消息过滤概述

消息过滤包括 tags过滤法sql过滤,消费者在消费消息的时候可以通过:Consumer.subscribe(topic,tags) 来指定要消费的消息,如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。或者使用“*”来消费某Topic主题下的所有tags消息。如:

Consumer.subscribe(“topic”,”taga || tagb || tagc”)

除此之外RocketMQ还支持使用SQL进行消息过滤,这种方式可以实现对消息的复杂过滤。SQL过滤表达式中支持多种常量类型与运算符。

支持的常量类型:

数值:比如:123,3.1415

字符:必须用单引号包裹起来,比如:‘abc’

布尔:TRUE 或 FALSE

NULL:特殊的常量,表示空

支持的运算符有:

数值比较:>,>=,<,<=,BETWEEN,=

字符比较:=,<>,IN

逻辑运算 :AND,OR,NOT

NULL判断:IS NULL 或者 IS NOT NULL

不过,只有使用PUSH模式的消费者才能使用SQL过滤。API如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件conf/broker.conf中添加如下属性,以开启该功能:enablePropertyFilter = true

消息过滤实战

1.发送者

发送消息时,你能通过putUserProperty来设置消息的属性

...省略...

//添加内容

byte[] bytes = (JSON.toJSONString("消息")).getBytes(CharsetUtil.UTF_8);

Message message = new Message("topic-order-filter","product-order-filter",bytes);

//添加一个用户属性,用来作为过滤条件

message.putUserProperty("sex",(i % 2)+"");

message.setKeys("key-"+i);

//执行发送

SendResult result = producer.send(message);

...省略...

2.消费者

通过:consumer.subscribe(“topic”, MessageSelector.bySql(" sql 条件")); 来过滤SQL

...省略...

//订阅

//defaultMQPushConsumer.subscribe("topic-order-filter","product-order-filter || tagb");

//只是消费 sex为 0的消息

defaultMQPushConsumer.subscribe("topic-order-filter", MessageSelector.bySql("sex = 0"));

defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

list.forEach(message->{

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

});

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

...省略...

消息重试

对发送失败的消息进行重新发送叫消息重试,producer和consumer都有消息重试机制。

生产者重试

RocketMQ默认支持消息重试机制,消息重试具有如下特点

对于同步和异步消息支持消息重试,对于oneway单向消息不支持重试

普通消息具有消息重试,顺序消息不支持消息重试

消息重试可能会造成消息重复,所以消费者一定要做好幂等处理

消息发送失败有三种情况:同步发送失败、异步发送失败、消息刷盘失败

同步发送失败策略:

对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败,默认重试2 次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其 也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。相关设置如下:

// 设置重试发送的次数,默认为2次

producer.setRetryTimesWhenSendFailed(3);

// 设置发送超时时限为5s,默认3s

producer.setSendMsgTimeout(5000);

如果超过重试次数,则抛出异常,由Producer去保证消息不丢。当然当生产者出现 RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息。

同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标 Broker。其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。

异步发送失败策略:

异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保 证消息不丢。 相关设置如下:

//指定异步发送失败后不进行重试发送

producer.setRetryTimesWhenSendAsyncFailed(0)

消息刷盘失败策略:

消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broker 的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。

发送端重试实例:

//同步发送消息,如果5秒内没有发送成功,则重试5次

DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");

producer.setRetryTimesWhenSendFailed(5);

producer.send(msg,5000L);

消费者重试

顺序消息重试

对于顺序消息消费失败默认会进行每隔1000毫秒进行重试,由于要保证消息是顺序消费,所以重试会导致后面的消息阻塞。可以通过下面的设置来修改重试间隔时间:

//每隔100毫秒重试

consumer.setSuspendCurrentQueueTimeMillis(100);

[注意]顺序消息没有发送失败重试机制,但具有消费失败重试机制 ,顺序消息重试是无止境的,为了防止消息一直重试阻塞,务必要做好监控工作。

无顺消息重试

对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回 状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息

重试时间间隔

对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如: 1s 5s 10s …2h ,如果16次都重试失败,消息进入死信队列

可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

// 修改消费重试次数

consumer.setMaxReconsumeTimes(10);

重试队列

对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而 是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队

列就是重试队列。 当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称为%RETRY%consumerGroup@consumerGroup 的重试队列。

消费端重试实例:

//最大重试次数,默认16

defaultMQPushConsumer.setMaxReconsumeTimes(10);

defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

for(MessageExt message : list){

System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));

if(message.getReconsumeTimes() > 3){

//如果重试次数大于3,可以把消息持久化到数据库,然后另外使用一个定时任务去定时重试。

//甚至进行人工干预

//消息持久化到数据库,这里就没必要重试了,返回success

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

//出现异常,进行重试

try {

throw new Exception("出现异常了...");

} catch (Exception e) {

e.printStackTrace();

//稍后重试

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

};

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

死信队列

死信队列概述

消息多次消费失败,达到最大重试次数,消息不会被丢弃而是进入死信队列(Dead-Letter Queue,DLQ),死信队列中的消息被称为死信消息(Dead-Letter Message,DLM)。

死信队列具有如下特征

死信队列中的消息无法再消费,死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取。

3天之后死信队列分钟的消息被删除,和普通消息一样

死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,其中每个队列都是死信队列

如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列

如果出现死信队列,说明程序除了问题,程序员应该及时的排除,进行BUG的处理。我们应该在消费者重试次数达到一定程度就对消息进行持久化,方便后续的处理。或额外定时重试。

最后

感谢原作者的精彩作品,我受益匪浅

原文链接:https://blog.csdn.net/weixin_48133130/article/details/134126430

LICENSED UNDER CC BY-NC-SA 4.0