引言
这篇文章使用SpringBoot整合RocketMQ。
环境搭建
导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
启动类
@SpringBootApplication
public class ApplicationStart {
public static void main(String[] args) {
SpringApplication.run(ApplicationStart.class);
}
}
配置文件
rocketmq:
name-server: 127.0.0.1:9876
是否开启自动配置
producer:
enable-msg-trace: true
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
group: "service-pay-producer"
# 消息最大长度 默认 1024 * 4 (4M)
max-message-size: 4096
# 发送消息超时时间,默认 3000
send-message-timeout: 3000
# 发送消息失败重试次数,默认2
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
发送消息
生产者
@Service
public class RocketMQProducer{
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
/**
* 发送普通消息
* @return
*/
public SendResult sendMsg(String msgBody){
SendResult result = rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());
return result;
}
/**
* 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑
*/
public void sendAsyncMsg(String msgBody){
rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}
@Override
public void onException(Throwable e) {
// 处理消息发送异常逻辑
}
});
}
/**
* 发送延时消息<br/>
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>
*/
public void sendDelayMsg(String msgBody, Integer delayLevel){
rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
public void sendTagMsg(String msgBody){
rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());
}
}
消费者
实现:RocketMQListener 接口消费消息,通过@RocketMQMessageListener指定consumerGroup,topic,和tags
/**
rocketmq 消息监听,@RocketMQMessageListener中的selectorExpression为tag,默认为*
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="*",consumerGroup = "queue_group_test")
public class RocketMQMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body, CharsetUtil.UTF_8);
log.info("接收到消息:{}", msg);
}
}
测试
@Controller
public class ProducerController {
@Autowired
private RocketMQProducer rocketMQProducer;
@RequestMapping("/send")
@ResponseBody
public SendResult send(String msg) {
//formats: topicName:tags
return rocketMQProducer.sendMsg(msg);
}
}
事务消息
生产者
生产者增加事务消息发送方法
/**
发送事务消息
*/
public SendResult sendTransMsg(String msgBody){
//封装消息
Message<String> message = MessageBuilder.withPayload(msgBody).build();
//发送事务消息
return rocketMQTemplate.sendMessageInTransaction(
//这里和事务监听器里面的事务组保持一致
"tx-producer-group",
//topic:tag
"queue_test_topic:trans-tags",message ,null);
}
事务监听器
通过 @RocketMQTransactionListener(txProducerGroup = “tx-producer-group”) 注解标记,监听器需要实现RocketMQLocalTransactionListener 接口 , txProducerGroup 是事务组的名字。
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class MyTransactionCheckListener implements RocketMQLocalTransactionListener {
public static final Logger LOGGER = LoggerFactory.getLogger(MyTransactionCheckListener.class);
Random random = new Random();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
//执行业务,保存本地事务
//保存成功
if(random.nextInt() % 2 == 0){
LOGGER.info("本地事务提交成功...");
return RocketMQLocalTransactionState.COMMIT;
}
LOGGER.info("本地事务提交未知...");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {
//这里查询本地事务状态
if(random.nextInt() % 2 == 0){
LOGGER.info("本地事务回查...COMMIT");
return RocketMQLocalTransactionState.COMMIT;
}
LOGGER.info("本地事务回查...ROLLBACK");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
消费者
正常编写即可
测试
@RequestMapping("/sendTrans")
@ResponseBody
public SendResult sendTrans(String msg) {
//formats: topicName:tags
return rocketMQProducer.sendTransMsg(msg);
}
最后
感谢原作者的精彩作品,我受益匪浅
原文链接:https://blog.csdn.net/weixin_48133130/article/details/134126430