1. 新增依赖
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
            <version>2.3.1</version>
        </dependency>

2.新增配置,并在mq管理台新增topic。
在这里插入图片描述

rocketmq:
  producer:
    endpoints: xxxx:8980
  push-consumer:
    endpoints: xxxx:8980
    consumerGroup: consumer-group

3.生产者。

    /**
     * 发送普通消息
     *
     * @param messageValue
     * @param topic
     */
    private void sendMessage(String messageValue, String topic) {
        byte[] bytes = messageValue.getBytes(StandardCharsets.UTF_8);
        Message<byte[]> message = MessageBuilder.withPayload(bytes).build();
        rocketMQClientTemplate.send(topic, message);

    }

    public <T> void sendMessage(T messageValue, String topic) {
        Class<?> clazz = messageValue.getClass();
        String jsonObject = gson.toJson(messageValue, clazz);
        boolean isSend = false;
        String messageId="";
        try {
            SendReceipt sendResult = this.rocketMQClientTemplate.syncSendNormalMessage(topic, jsonObject);
            if (sendResult != null) {
                isSend = true;
                messageId=sendResult.getMessageId().toString();
            }
        } catch (Exception e) {
            log.error("发送mq消息失败,", e);
        }

        if (isSend) {
            log.info("发送消息成功 topic:{}  内容:{}", topic, jsonObject);
        }
        saveSendLog(topic, jsonObject, isSend,messageId);
    }


    /**
     * 发送延时消息,输入延时时间和单位
     *
     * @param topic      topic
     * @param message    消息体
     * @param timeout    超时
     * @param unit    单位
     */
    public void sendDelay(String topic, Object message, long timeout, TemporalUnit unit) {
        boolean isSend = false;
        String messageId="";
        SendReceipt sendResult = this.rocketMQClientTemplate.syncSendDelayMessage(topic, MessageBuilder.withPayload(message).build(), Duration.of(timeout, unit));
        if (sendResult != null) {
            isSend = true;
            messageId=sendResult.getMessageId().toString();
            log.info("已同步发送延时消息 message = {}", message);
        }
        saveSendLog(topic, gson.toJson(message), isSend,messageId);
    }
    /**
     * 发送顺序消息,不建议使用,使用时需保证单节点单线程
     * 建议直接使用redis锁发送普通消息实现
     *
     * @param topic topic
     * @param topic orderGroupName 顺序组名
     */
    @Deprecated
    @RedisLock(lockPrefix = "orderMsg:", lockParameter = "#orderId")
    public <T> void sendOrderly(String topic, String orderId, List<T> messages) {
        for (int i = 0; i < messages.size(); i++) {
            boolean isSend = false;
            String messageId="";
            SendReceipt sendResult = this.rocketMQClientTemplate.syncSendFifoMessage(topic, MessageBuilder.withPayload(messages.get(i)).build(), orderId);
            if (sendResult != null) {
                isSend = true;
                messageId=sendResult.getMessageId().toString();
                log.info("同步顺序发送消息完成:message = {}, sendResult = {}", messages.get(i), sendResult);
            }
            saveSendLog(topic, gson.toJson(messages.get(i)), isSend,messageId);
        }
    }

4.消费者。

@Service
@Slf4j
//@RocketMQMessageListener(consumerGroup = "questionnaire-plat", topic = "test-topic2", tag = "*")
//@RocketMQMessageListener(consumerGroup = "questionnaire-plat", topic = "test-delay", tag = "*")
//@RocketMQMessageListener(consumerGroup = "questionnaire-plat", topic = "test-order", tag = "*")
public class ExampleListener implements RocketMQListener {
    @Autowired
    Gson gson;

    @Override
    public ConsumeResult consume(MessageView messageView) {
        // 从 MessageView 中获取 ByteBuffer
        ByteBuffer byteBuffer = messageView.getBody();
        if (byteBuffer == null) {
            log.warn("收到空消息 messageId:{}", messageView.getMessageId());
            return ConsumeResult.SUCCESS;
        }
        // 转换 ByteBuffer 为字节数组
        String messageBody = StandardCharsets.UTF_8.decode(byteBuffer).toString();
        MqExample data = gson.fromJson(messageBody, MqExample.class);
        log.info("收到测试消息 [topic:{}][msgId:{}] 内容:{}",
                messageView.getTopic(),
                messageView.getMessageId(),
                data);
//        mqExampleService.save(data);

        return ConsumeResult.SUCCESS;
    }
}
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐