需求

物联网系统,使用TDengine+kafka存储实时数据,上千台设备每3秒发送过来的数据存储到kafka,再从kafka拉取消息消费,把数据写入数据库。使用@KafkaListener监听器会一直监听kafka topic,会频繁的消费消息、写入数据库,造成频繁的io,现在要求减少io,提高系统性能。

分析

接管springboot 监听器自动创建kafka消费者,手动创建kafka消费者手动拉取消息,然后通过定时任务控制

配置类

/**
 * kafka消费者配置
 *
 * @ClassName KafkaConsumerConfig
 * @Author Chen
 * @Date 2024/10/09 17:00
 */
@Slf4j
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
​
    /**
     * kafka侦听器容器工厂
     *
     * @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link String}, {@link String}>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        // 设置消费者组中的线程数量
        factory.setConcurrency(3);
        // 设置轮询超时
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
​
    /**
     * 消费者工厂
     *
     * @return {@link ConsumerFactory}<{@link Integer}, {@link String}>
     */
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
​
    /**
     * 消费者配置
     *
     * @return {@link Map}<{@link String}, {@link Object}>
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 自动提交 offset 默认 true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 自动提交的频率 单位 ms
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        // 批量消费最大数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "taos");
        // session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        // 请求超时
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
        // Key 反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Value 反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 当kafka中没有初始offset或offset超出范围时将自动重置offset
        // earliest:重置为分区中最小的offset
        // latest:重置为分区中最新的offset(消费分区中新产生的数据)
        // none:只要有一个分区不存在已提交的offset,就抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
​
        return props;
    }
​
    /**
     * kafka批量监听
     *
     * @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link Integer}, {@link String}>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
​
        // 设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        // 设置是否开启批量监听
        factory.setBatchListener(true);
        // 设置消费者组中的线程数量
        factory.setConcurrency(3);
​
        return factory;
    }
​
    // 手动创建kafka消费者
    @Bean
    public KafkaConsumer<String, String> manualTaosKafkaConsumer() {
        // 初始化Kafka消费者
        return new KafkaConsumer<>(consumerConfigs());
    }
}

KafkaServiceImpl 业务逻辑

@Service
@Slf4j
public class KafkaServiceImpl implements KafkaService {
    
    // topic为taos
    private KafkaConsumer<String, String> manualTaosKafkaConsumer;
​
    @Autowired
    public KafkaServiceImpl(KafkaConsumer<String, String> manualTaosKafkaConsumer) {
        this.manualTaosKafkaConsumer = manualTaosKafkaConsume;
    }
​
    @PostConstruct
    public void init() {
        // 订阅主题
        manualTaosKafkaConsumer.subscribe(Arrays.asList("taos"));
    }
​
    @PreDestroy
    public void shutdown() {
        if (manualTaosKafkaConsumer != null) {
            manualTaosKafkaConsumer.close();
        }
    }
​
    @Override
    public void pollMessagesFromTaosTopic() {
        
        ConsumerRecords<String, String> records = manualTaosKafkaConsumer.poll(Duration.ofMillis(100));
        // 手动提交偏移量(好像没用)
        manualTaosKafkaConsumer.commitSync();
        // 其他业务
    }
​
}

定时任务

@Configuration
@Slf4j
public class KafkaScheduledTask {
​
    @Autowired
    private KafkaService kafkaService;
​
    @Scheduled(cron = "0/1 * * * * ?")
    public void kafkaConsumeTaosTopic() {
        log.info("kakfa消费开始");
        kafkaService.pollMessagesFromTaosTopic();
    }
}

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐