后端——springboot+kafka 定时拉取消息(方案一)
物联网系统,使用TDengine+kafka存储实时数据,上千台设备每3秒发送过来的数据存储到kafka,再从kafka拉取消息消费,把数据写入数据库。监听器会一直监听kafka topic,会频繁的消费消息、写入数据库,造成频繁的io,现在要求减少io,提高系统性能。接管springboot 监听器自动创建kafka消费者,,然后通过定时任务控制。
·
需求
物联网系统,使用TDengine+kafka存储实时数据,上千台设备每3秒发送过来的数据存储到kafka,再从kafka拉取消息消费,把数据写入数据库。使用@KafkaListener监听器会一直监听kafka topic,会频繁的消费消息、写入数据库,造成频繁的io,现在要求减少io,提高系统性能。
分析
接管springboot 监听器自动创建kafka消费者,手动创建kafka消费者,手动拉取消息,然后通过定时任务控制
配置类
/**
* kafka消费者配置
*
* @ClassName KafkaConsumerConfig
* @Author Chen
* @Date 2024/10/09 17:00
*/
@Slf4j
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
/**
* kafka侦听器容器工厂
*
* @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link String}, {@link String}>>
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置 consumerFactory
factory.setConsumerFactory(consumerFactory());
// 设置消费者组中的线程数量
factory.setConcurrency(3);
// 设置轮询超时
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
/**
* 消费者工厂
*
* @return {@link ConsumerFactory}<{@link Integer}, {@link String}>
*/
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 消费者配置
*
* @return {@link Map}<{@link String}, {@link Object}>
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 自动提交 offset 默认 true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 自动提交的频率 单位 ms
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 批量消费最大数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "taos");
// session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
// 请求超时
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
// Key 反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Value 反序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 当kafka中没有初始offset或offset超出范围时将自动重置offset
// earliest:重置为分区中最小的offset
// latest:重置为分区中最新的offset(消费分区中新产生的数据)
// none:只要有一个分区不存在已提交的offset,就抛出异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
/**
* kafka批量监听
*
* @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link Integer}, {@link String}>>
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置 consumerFactory
factory.setConsumerFactory(consumerFactory());
// 设置是否开启批量监听
factory.setBatchListener(true);
// 设置消费者组中的线程数量
factory.setConcurrency(3);
return factory;
}
// 手动创建kafka消费者
@Bean
public KafkaConsumer<String, String> manualTaosKafkaConsumer() {
// 初始化Kafka消费者
return new KafkaConsumer<>(consumerConfigs());
}
}
KafkaServiceImpl 业务逻辑
@Service
@Slf4j
public class KafkaServiceImpl implements KafkaService {
// topic为taos
private KafkaConsumer<String, String> manualTaosKafkaConsumer;
@Autowired
public KafkaServiceImpl(KafkaConsumer<String, String> manualTaosKafkaConsumer) {
this.manualTaosKafkaConsumer = manualTaosKafkaConsume;
}
@PostConstruct
public void init() {
// 订阅主题
manualTaosKafkaConsumer.subscribe(Arrays.asList("taos"));
}
@PreDestroy
public void shutdown() {
if (manualTaosKafkaConsumer != null) {
manualTaosKafkaConsumer.close();
}
}
@Override
public void pollMessagesFromTaosTopic() {
ConsumerRecords<String, String> records = manualTaosKafkaConsumer.poll(Duration.ofMillis(100));
// 手动提交偏移量(好像没用)
manualTaosKafkaConsumer.commitSync();
// 其他业务
}
}
定时任务
@Configuration
@Slf4j
public class KafkaScheduledTask {
@Autowired
private KafkaService kafkaService;
@Scheduled(cron = "0/1 * * * * ?")
public void kafkaConsumeTaosTopic() {
log.info("kakfa消费开始");
kafkaService.pollMessagesFromTaosTopic();
}
}
更多推荐
所有评论(0)