跳到主要内容

使用 RocketMQ

添加 RocketMQ starter

https://github.com/apache/rocketmq-spring

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>

配置 RocketMQ

rocketmq:
name-server: 192.168.1.191:9876
producer:
namespace: ${spring.profiles.active}
group: smart-homework-producer
consumer:
namespace: ${spring.profiles.active}
group: smart-homework-consumer

简单封装下 RocketMQTemplate

public interface Rmq {

interface Topic {
String MQ_TOPIC = "mq-topic";
}

interface Tags {
String MQ_TAGS = "mq_tags";
}
}
public interface Destination {

MqDestination MQ_DESTINATION = () -> Rmq.Tags.MQ_TAGS;

String getTopic();

String getTags();

default String getDestination() {
return "%s:%s".formatted(getTopic(), getTags());
}

interface MqDestination extends Destination {
@Override
default String getTopic() {
return Rmq.Topic.MQ_TOPIC;
}
}
}
MessageTemplate.java
@Slf4j
public class MessageTemplate {

private final RocketMQTemplate rocketMQTemplate;

public MessageTemplate(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}

@SneakyThrows
public <T> void sendMsg(Destination destination, T payload) {
Message<T> message =
MessageBuilder.withPayload(payload)
.setHeader(MessageConst.PROPERTY_KEYS, Objects.toString(payload))
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 1)
.build();
rocketMQTemplate.asyncSend(
destination.getDestination(),
message,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(
"消息发送成功,destination:{},message:{}, msgId:{}",
destination.getDestination(),
message.getPayload(),
sendResult.getMsgId());
}

@Override
public void onException(Throwable throwable) {
log.error(
"消息发送失败,destination:{},message:{}",
destination.getDestination(),
message.getPayload(),
throwable);
}
});
}
}
RocketMQConfig.java
@Slf4j
@Configuration
public class RocketMQConfig {

@Bean
public MessageTemplate<?> messageTemplate(RocketMQTemplate<?> RocketMQTemplate) {
return new MessageTemplate<>(RocketMQTemplate);
}
}

生产者发送消息

Producer.java
@Service
@RequiredArgsConstructor
public class Producer {

private final MessageTemplate messageTemplate;

public void produce(Long id) {
messageTemplate.sendMsg(Destination.MQ_DESTINATION, id);
}
}

消费者消费消息

MessageConsumer.java
@Slf4j
public abstract class MessageConsumer<T> implements RocketMQListener<MessageExt> {

@Resource private ObjectMapper objectMapper;
@Resource private ApplicationProperties properties;

public abstract void consume(T message);

@Override
@SneakyThrows
public void onMessage(MessageExt messageExt) {
T value =
objectMapper.readValue(
new String(messageExt.getBody(), StandardCharsets.UTF_8), new TypeReference<>() {});
int reconsumeTimes = messageExt.getReconsumeTimes();
String msgLog =
" topic:%s,tags:%s,msgId:%s,message:%s"
.formatted(messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId(), value);
String prefix = reconsumeTimes > 0 ? "第 %s 次重试".formatted(reconsumeTimes) : "";
try {
log.info("开始{}消费消息:{}", prefix, msgLog);
consume(value);
log.info("{}消费消息成功:{}", prefix, msgLog);
} catch (Exception exception) {
if (reconsumeTimes >= getMaxReconsumeTimes()) {
try {
log.info("开始处理消费失败的消息:{}", msgLog);
afterRetryFailed(value, exception);
log.info("成功处理消费失败的消息:{}", msgLog);
} catch (Exception e) {
// 保存数据库、发失败消息、告警等。
log.error("处理消费失败的消息失败:{}", msgLog, exception);
}
} else {
throw exception;
}
}
}

/**
* 重试多次后处理消息
*
* @param message 消息
* @param exception 失败的异常信息
*/
public void afterRetryFailed(T message, Exception exception) {}

protected int getMaxReconsumeTimes() {
// 可以统一配置重试消费次数,每个消费者也可以自定义次数
Integer maxReconsumeTimes = properties.getRocketmq().getMaxReconsumeTimes();
return maxReconsumeTimes == null ? -1 : Math.max(maxReconsumeTimes, -1);
}
}
Consumer.java
@Slf4j
@Service
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = Rmq.Topic.MQ_TOPIC,
selectorExpression = Rmq.Tags.MQ_TAGS,
consumerGroup = CG.MQ_CONSUMER_GROUP)
public class Consumer extends MessageConsumer<Long> {

@Override
@Transactional(propagation = Propagation.SUPPORTS)
public void consume(Long id) {
log.info("消费消息:{}", id);
}
}