在创建主题时获取卡夫卡制作者中的错误错误,但在卡夫卡服务器上创建了主题

问题描述:

我正在使用卡夫卡制作者10.2.1创建主题并写入主题,当我创建主题时我得到以下错误,但主题被创建:在创建主题时获取卡夫卡制作者中的错误错误,但在卡夫卡服务器上创建了主题

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:774) 
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:494) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360) 
    at kafka.AvroProducer.produce(AvroProducer.java:47) 
    at samples.TestMqttSource.messageReceived(TestMqttSource.java:89) 
    at mqtt.JsonConsumer.messageArrived(JsonConsumer.java:132) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:477) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:380) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:184) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
msg org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
loc org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
cause org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
excep java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 

所有的建议,非常感谢。

+0

生产者不能创建主题。管理客户端API可以做到这一点。创建主题是因为在代理上启用了主题自动创建(auto.create.topics.enable属性)(默认情况下)。你能显示代码吗? – ppatierno

+0

感谢您的评论。我找到了一个解决方案,我试图在下面的答案的评论中解释“问题”。 – Margit

不能使用KafkaProducer来创​​建一个话题(所以我不太清楚你如何设法创建主题,除非你通过不同的方法,这样做是以前,如卡夫卡管理shell脚本)。而是使用Kafka库提供的AdminUtils。

我最近取得了这两个要求,你会很惊讶它是多么容易实现。以下是一个简单的代码示例,向您展示如何通过AdminUtils创建主题,以及如何写入主题。

class Foo { 

    private String TOPIC = "testingTopic"; 
    private int NUM_OF_PARTITIONS = 10; 
    private int REPLICATION_FACTOR = 1; 

    public Foo() { 


     ZkClient zkClient = new ZkClient("localhost:2181", 15000, 10000, ZKStringSerializer$.MODULE$); 
     ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection("localhost:2181"), false); 

     if (!AdminUtils.topicExists(zkUtils, TOPIC)) { 
      try { 
       AdminUtils.createTopic(zkUtils, TOPIC, NUM_OF_PARTITIONS, REPLICATION_FACTOR, new Properties(), Enforced$.MODULE$); 

       Properties producerConfig = new Properties(); 

       producerConfig.put(ProducerConfig.BOOTSTRAP_SERVER_CONFIG, "localhost:9092"); 
       producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
       producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 

       KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); 

       // This is just to show you how to write but you could be more elaborate 
       int i = 0; 

       while (i < 11) { 
        ProducerRecord<String, String> rec = new ProducerRecord<>(TOPIC, ("This is line number " + i)); 
        producer.send(rec); 
        i++; 
       } 

       producer.closer(); 
      } catch (AdminOperationException aoe) { 
       aoe.printStackTrace(); 
      } 
     } 

    } 

} 

请记住,如果您想删除主题,默认情况下在设置中禁用。启动Kafka时使用的配置文件(默认情况下为$ {kafka_home} /config/server.properties),如果它尚不存在并且设置为false或注释掉,则添加以下行:

delete.topic.enabled=true 

然后您必须重新启动服务器,并且可以通过Java或提供的命令行工具删除主题。

NB

它总是一个好主意,关闭生产者/消费者,当你与他们完成了,如图所示的代码示例。

+0

感谢您的答案和评论。我们的卡夫卡设置为从消息中自动生成主题。我发现了什么问题,我在Windows PC上运行客户端,并且在连接到Kafka时使用了Kafka服务器的IP地址,但我认为在创建主题的元数据中, kafkaserver名称被返回,因此我将Kafka服务器的IP地址和主机名添加到我的主机文件中,并且它可以正常工作。 – Margit

+0

啊,没关系,发生这种事情有点不寻常!但很高兴这一切工作 –