问题

rabbitmq消费手动确认时rabbitmq日志报 unknown delivery tag 1,代码日志报
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
在这里插入图片描述
在这里插入图片描述

代码

@Configuration
@Slf4j
public class RabbitConfig {
    // 定义死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("deadLetterQueue");
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("deadLetterExchange");
    }
    // 绑定死信队列和死信交换机
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("deadLetterKey");
    }

    @Bean
    public Queue illegalAlarmQueue() {
        return QueueBuilder.durable("illegalAlarmQueue")
                .withArgument("x-dead-letter-exchange", "deadLetterExchange")
                .withArgument("x-dead-letter-routing-key", "deadLetterKey")
                .build();
    }

    @Bean
    public DirectExchange masterExchange() {
        return new DirectExchange("masterExchange");
    }

    @Bean
    public Binding illegalAlarmBinding() {
        return BindingBuilder.bind(illegalAlarmQueue()).to(masterExchange()).with("illegalAlarmKey");
    }
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

 @RabbitListener(queues = "illegalAlarmQueue")
    public void receiveMessageIllegalAlarmQueue(Message message, Channel channel) throws IOException {
        IllegalAlarm illegalAlarm = (IllegalAlarm) jsonMessageConverter.fromMessage(message);
        try {
            List<String> paths = new ArrayList<>(6);
            paths.add(illegalAlarm.getTempPicturePath1());
            paths.add(illegalAlarm.getTempPicturePath2());
            paths.add(illegalAlarm.getTempPicturePath3());
            if (channel.isOpen()) {
                List<File> fileEntities = createFileEntities(paths);
                //手动提交事务
                Boolean result = transactionTemplate.execute(status -> {
                    try {
                        IllegalAlarm illegalAlarmTemp = illegalAlarmServiceI.saveIllegalAlarm(illegalAlarm);
                        int res = fileServiceI.saveBatch(fileEntities);
                        if (illegalAlarmTemp != null && res > 0) {
                            log.info("保存完成,id:{}",illegalAlarmTemp.getId());
                            return true;
                        }
                        return false;
                    } catch (Exception e) {
                        log.error("保存记录出现错误,异常信息:{}", e.getMessage());
                        // 回滚事务
                        status.setRollbackOnly();
                        return false;
                    }
                });
                if(result){
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            log.error("{}消费消息错误,异常信息:{}", illegalAlarm.getId(), e.getMessage());
            log.error("详情:{}", JSON.toJSONString(illegalAlarm));
            if (channel.isOpen()) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
spring: 
  rabbitmq:
    host: 192.168.5.100
    port: 5672
    username: guest
    password: guest

以上是我的配置文件。

原因

原因是因为进行了两次消息确认double ack.,配置手动签收模式失效,被注解注入的SimpleRabbitListenerContainerFactory覆盖,而它默认使用了自动签收。

解决

在这里插入图片描述
在这里插入图片描述

配置类

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
 spring:
  rabbitmq:
    host: 192.168.5.188
    port: 5672
    username: guest
    password: guest
    listener:
      direct:
        acknowledge-mode: manual # 消费端手动ack消息

参考

rabbitmq reply-text=PRECONDITION_FAILED

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐