Issue
I wrote kafka producer / consumer for my app:
Consumer config:
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
props[ConsumerConfig.GROUP_ID_CONFIG] = "group12345"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
}
Producer config:
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val configProps: MutableMap<String, Any> = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
Topic config:
@Configuration
class KafkaTopicConfig {
@Bean
fun kafkaAdmin(): KafkaAdmin {
val configs: MutableMap<String, Any?> = HashMap()
configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
return KafkaAdmin(configs)
}
@Bean
fun topic1(): NewTopic {
return NewTopic("kafkaTest", 1, 1.toShort())
}
}
Kafka service:
@Service
class KafkaService(
private val kafkaTemplate: KafkaTemplate<String, String>
) {
fun send() {
kafkaTemplate.send("kafkaTest", "test message ${System.currentTimeMillis()}")
}
@KafkaListener(topics = ["kafkaTest"], groupId = "group12345")
fun listenGroupFoo(message: String) {
println("--> $message")
}
}
That's ALL classes in my app. When I trying to run app, I get this exception:
2021-10-11 17:20:13.319 WARN 8544 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Error connecting to node 34bcfcc207e0:9092 (id: 1001 rack: null)
java.net.UnknownHostException: 34bcfcc207e0
I have no idea, what is host 34bcfcc207e0
. It appears at start or thread.
What's wrong?
Solution
Kafka is not an HTTP service. Remove
http://
from all your stringsIf you're running Kafka in a Container, the default advertised listener is using its hostname (the container ID), and you need to change this to use an address you expect Connect to Kafka running in Docker
Answered By - OneCricketeer
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.