SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

引言

这篇文章使用SpringBoot整合RocketMQ。

环境搭建

导入依赖

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.0.5.RELEASE</version>

</parent>

<dependencies>

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-spring-boot-starter</artifactId>

<version>2.0.4</version>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

</dependency>

</dependencies>

启动类

@SpringBootApplication

public class ApplicationStart {

public static void main(String[] args) {

SpringApplication.run(ApplicationStart.class);

}

}

配置文件

rocketmq:

name-server: 127.0.0.1:9876

是否开启自动配置

producer:

enable-msg-trace: true

# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识

group: "service-pay-producer"

# 消息最大长度 默认 1024 * 4 (4M)

max-message-size: 4096

# 发送消息超时时间,默认 3000

send-message-timeout: 3000

# 发送消息失败重试次数,默认2

retry-times-when-send-failed: 2

retry-times-when-send-async-failed: 2

发送消息

生产者

@Service

public class RocketMQProducer{

@Autowired

private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.producer.send-message-timeout}")

private Integer messageTimeOut;

/**

* 发送普通消息

* @return

*/

public SendResult sendMsg(String msgBody){

SendResult result = rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());

return result;

}

/**

* 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑

*/

public void sendAsyncMsg(String msgBody){

rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

// 处理消息发送成功逻辑

}

@Override

public void onException(Throwable e) {

// 处理消息发送异常逻辑

}

});

}

/**

* 发送延时消息<br/>

* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>

*/

public void sendDelayMsg(String msgBody, Integer delayLevel){

rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);

}

/**

* 发送带tag的消息,直接在topic后面加上":tag"

*/

public void sendTagMsg(String msgBody){

rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());

}

}

消费者

实现:RocketMQListener 接口消费消息,通过@RocketMQMessageListener指定consumerGroup,topic,和tags

/**

  • rocketmq 消息监听,@RocketMQMessageListener中的selectorExpression为tag,默认为*

*/

@Slf4j

@Component

@RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="*",consumerGroup = "queue_group_test")

public class RocketMQMsgListener implements RocketMQListener<MessageExt> {

@Override

public void onMessage(MessageExt message) {

byte[] body = message.getBody();

String msg = new String(body, CharsetUtil.UTF_8);

log.info("接收到消息:{}", msg);

}

}

测试

@Controller

public class ProducerController {

@Autowired

private RocketMQProducer rocketMQProducer;

@RequestMapping("/send")

@ResponseBody

public SendResult send(String msg) {

//formats: topicName:tags

return rocketMQProducer.sendMsg(msg);

}

}

事务消息

生产者

生产者增加事务消息发送方法

/**

  • 发送事务消息

*/

public SendResult sendTransMsg(String msgBody){

//封装消息

Message<String> message = MessageBuilder.withPayload(msgBody).build();

//发送事务消息

return rocketMQTemplate.sendMessageInTransaction(

//这里和事务监听器里面的事务组保持一致

"tx-producer-group",

//topic:tag

"queue_test_topic:trans-tags",message ,null);

}

事务监听器

通过 @RocketMQTransactionListener(txProducerGroup = “tx-producer-group”) 注解标记,监听器需要实现RocketMQLocalTransactionListener 接口 , txProducerGroup 是事务组的名字。

@Component

@RocketMQTransactionListener(txProducerGroup = "tx-producer-group")

public class MyTransactionCheckListener implements RocketMQLocalTransactionListener {

public static final Logger LOGGER = LoggerFactory.getLogger(MyTransactionCheckListener.class);

Random random = new Random();

@Override

public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {

//执行业务,保存本地事务

//保存成功

if(random.nextInt() % 2 == 0){

LOGGER.info("本地事务提交成功...");

return RocketMQLocalTransactionState.COMMIT;

}

LOGGER.info("本地事务提交未知...");

return RocketMQLocalTransactionState.UNKNOWN;

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {

//这里查询本地事务状态

if(random.nextInt() % 2 == 0){

LOGGER.info("本地事务回查...COMMIT");

return RocketMQLocalTransactionState.COMMIT;

}

LOGGER.info("本地事务回查...ROLLBACK");

return RocketMQLocalTransactionState.ROLLBACK;

}

}

消费者

正常编写即可

测试

@RequestMapping("/sendTrans")

@ResponseBody

public SendResult sendTrans(String msg) {

//formats: topicName:tags

return rocketMQProducer.sendTransMsg(msg);

}

最后

感谢原作者的精彩作品,我受益匪浅

原文链接:https://blog.csdn.net/weixin_48133130/article/details/134126430

LICENSED UNDER CC BY-NC-SA 4.0