springboot接入rocketmq5(2.java代码)
2.新增配置,并在mq管理台新增topic。
·
- 新增依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>
2.新增配置,并在mq管理台新增topic。
rocketmq:
producer:
endpoints: xxxx:8980
push-consumer:
endpoints: xxxx:8980
consumerGroup: consumer-group
3.生产者。
/**
* 发送普通消息
*
* @param messageValue
* @param topic
*/
private void sendMessage(String messageValue, String topic) {
byte[] bytes = messageValue.getBytes(StandardCharsets.UTF_8);
Message<byte[]> message = MessageBuilder.withPayload(bytes).build();
rocketMQClientTemplate.send(topic, message);
}
public <T> void sendMessage(T messageValue, String topic) {
Class<?> clazz = messageValue.getClass();
String jsonObject = gson.toJson(messageValue, clazz);
boolean isSend = false;
String messageId="";
try {
SendReceipt sendResult = this.rocketMQClientTemplate.syncSendNormalMessage(topic, jsonObject);
if (sendResult != null) {
isSend = true;
messageId=sendResult.getMessageId().toString();
}
} catch (Exception e) {
log.error("发送mq消息失败,", e);
}
if (isSend) {
log.info("发送消息成功 topic:{} 内容:{}", topic, jsonObject);
}
saveSendLog(topic, jsonObject, isSend,messageId);
}
/**
* 发送延时消息,输入延时时间和单位
*
* @param topic topic
* @param message 消息体
* @param timeout 超时
* @param unit 单位
*/
public void sendDelay(String topic, Object message, long timeout, TemporalUnit unit) {
boolean isSend = false;
String messageId="";
SendReceipt sendResult = this.rocketMQClientTemplate.syncSendDelayMessage(topic, MessageBuilder.withPayload(message).build(), Duration.of(timeout, unit));
if (sendResult != null) {
isSend = true;
messageId=sendResult.getMessageId().toString();
log.info("已同步发送延时消息 message = {}", message);
}
saveSendLog(topic, gson.toJson(message), isSend,messageId);
}
/**
* 发送顺序消息,不建议使用,使用时需保证单节点单线程
* 建议直接使用redis锁发送普通消息实现
*
* @param topic topic
* @param topic orderGroupName 顺序组名
*/
@Deprecated
@RedisLock(lockPrefix = "orderMsg:", lockParameter = "#orderId")
public <T> void sendOrderly(String topic, String orderId, List<T> messages) {
for (int i = 0; i < messages.size(); i++) {
boolean isSend = false;
String messageId="";
SendReceipt sendResult = this.rocketMQClientTemplate.syncSendFifoMessage(topic, MessageBuilder.withPayload(messages.get(i)).build(), orderId);
if (sendResult != null) {
isSend = true;
messageId=sendResult.getMessageId().toString();
log.info("同步顺序发送消息完成:message = {}, sendResult = {}", messages.get(i), sendResult);
}
saveSendLog(topic, gson.toJson(messages.get(i)), isSend,messageId);
}
}
4.消费者。
@Service
@Slf4j
//@RocketMQMessageListener(consumerGroup = "questionnaire-plat", topic = "test-topic2", tag = "*")
//@RocketMQMessageListener(consumerGroup = "questionnaire-plat", topic = "test-delay", tag = "*")
//@RocketMQMessageListener(consumerGroup = "questionnaire-plat", topic = "test-order", tag = "*")
public class ExampleListener implements RocketMQListener {
@Autowired
Gson gson;
@Override
public ConsumeResult consume(MessageView messageView) {
// 从 MessageView 中获取 ByteBuffer
ByteBuffer byteBuffer = messageView.getBody();
if (byteBuffer == null) {
log.warn("收到空消息 messageId:{}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
}
// 转换 ByteBuffer 为字节数组
String messageBody = StandardCharsets.UTF_8.decode(byteBuffer).toString();
MqExample data = gson.fromJson(messageBody, MqExample.class);
log.info("收到测试消息 [topic:{}][msgId:{}] 内容:{}",
messageView.getTopic(),
messageView.getMessageId(),
data);
// mqExampleService.save(data);
return ConsumeResult.SUCCESS;
}
}
更多推荐
所有评论(0)