PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0

Friday, August 5, 2022

[FIXED] How to handle UnknownHostException when using @KafkaListener

 August 05, 2022     apache-kafka, exception, java, spring-boot     No comments   

Issue

During a network outage, the following WARN was logged over and over (until if filled up all the HD space):

[WARN ] 2022-05-18 12:58:02.984 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] NetworkClient - [Consumer clientId=consumer-Test-1, groupId=Test] Error connecting to node kafka-broker.ew2.aws.dev:9092 (id: 3 rack: sample-az2)
java.net.UnknownHostException: node kafka-broker.ew2.aws.dev
        at java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[?:?]
        at java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[?:?]
        at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:246) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar!/:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]

How and where do I catch this exception?


Solution

I went with the following solution:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory(
    final ConsumerFactory<String, String> consumerFactory
) {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setCommonErrorHandler(new CommonContainerStoppingErrorHandler() {
        final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler();

        @Override
        public void handleRemaining(
            final Exception thrownException,
            final List<ConsumerRecord<?, ?>> records,
            final Consumer<?, ?> consumer,
            final MessageListenerContainer container
        ) {
            if (thrownException.getCause() instanceof UnknownHostException) {
                log.error("UnknownHostException has occurred and will stop the container to prevent log flooding.");
                super.handleOtherException(thrownException, consumer, container, false);
            }
            else {
                this.defaultErrorHandler.handleOtherException(thrownException, consumer, container, false);
            }
        }
    });
    return factory;
}


Answered By - Stole
Answer Checked By - Katrina (PHPFixing Volunteer)
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Newer Post Older Post Home

0 Comments:

Post a Comment

Note: Only a member of this blog may post a comment.

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
Comments
Atom
Comments

Copyright © PHPFixing