公司的一些服务在启动时候通过@KafkaListener
注册了一堆消费者,最近发现这些服务启动时候时间非常长,消耗了大量的时间在注册kafka消费者上。
解决方案
通过@KafkaListener
注解的autoStartup
属性可以控制消费者的启动时间,将autoStartup
设置为false
,在服务启动完成后再手动启动消费者。
1
2
3
4
|
@KafkaListener(topics = "topic1", autoStartup = "false")
public void listen(ConsumerRecord<?, ?> record) {
// do something
}
|
在服务启动完成后,通过KafkaListenerEndpointRegistry
手动启动消费者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Component
public class KafkaConsumerStarter implements ApplicationRunner {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Override
public void run(ApplicationArguments args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(this::startKafkaListeners, 5, TimeUnit.SECONDS);
}
private void startKafkaListeners() {
kafkaListenerEndpointRegistry.getListenerContainers()
.forEach(MessageListenerContainer::start);
}
}
|
而且通过这样处理也不会出现消费者已经开始消费了,但服务中某些资源还没有初始化完成,造成消费kafka消息时候出现异常。