公司的一些服务在启动时候通过@KafkaListener注册了一堆消费者,最近发现这些服务启动时候时间非常长,消耗了大量的时间在注册kafka消费者上。

解决方案

通过@KafkaListener注解的autoStartup属性可以控制消费者的启动时间,将autoStartup设置为false,在服务启动完成后再手动启动消费者。

1
2
3
4
@KafkaListener(topics = "topic1", autoStartup = "false")
public void listen(ConsumerRecord<?, ?> record) {
    // do something
}

在服务启动完成后,通过KafkaListenerEndpointRegistry手动启动消费者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Component
public class KafkaConsumerStarter implements ApplicationRunner {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Override
    public void run(ApplicationArguments args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.schedule(this::startKafkaListeners, 5, TimeUnit.SECONDS);
    }

    private void startKafkaListeners() {
        kafkaListenerEndpointRegistry.getListenerContainers()
            .forEach(MessageListenerContainer::start);
    }
}

而且通过这样处理也不会出现消费者已经开始消费了,但服务中某些资源还没有初始化完成,造成消费kafka消息时候出现异常。