在创建主题时获取卡夫卡制作者中的错误错误,但在卡夫卡服务器上创建了主题
我正在使用卡夫卡制作者10.2.1创建主题并写入主题,当我创建主题时我得到以下错误,但主题被创建:在创建主题时获取卡夫卡制作者中的错误错误,但在卡夫卡服务器上创建了主题
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:774)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:494)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
at kafka.AvroProducer.produce(AvroProducer.java:47)
at samples.TestMqttSource.messageReceived(TestMqttSource.java:89)
at mqtt.JsonConsumer.messageArrived(JsonConsumer.java:132)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:477)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:380)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:184)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
msg org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
loc org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
cause org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
excep java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
所有的建议,非常感谢。
不能使用KafkaProducer来创建一个话题(所以我不太清楚你如何设法创建主题,除非你通过不同的方法,这样做是以前,如卡夫卡管理shell脚本)。而是使用Kafka库提供的AdminUtils。
我最近取得了这两个要求,你会很惊讶它是多么容易实现。以下是一个简单的代码示例,向您展示如何通过AdminUtils创建主题,以及如何写入主题。
class Foo {
private String TOPIC = "testingTopic";
private int NUM_OF_PARTITIONS = 10;
private int REPLICATION_FACTOR = 1;
public Foo() {
ZkClient zkClient = new ZkClient("localhost:2181", 15000, 10000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection("localhost:2181"), false);
if (!AdminUtils.topicExists(zkUtils, TOPIC)) {
try {
AdminUtils.createTopic(zkUtils, TOPIC, NUM_OF_PARTITIONS, REPLICATION_FACTOR, new Properties(), Enforced$.MODULE$);
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVER_CONFIG, "localhost:9092");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
// This is just to show you how to write but you could be more elaborate
int i = 0;
while (i < 11) {
ProducerRecord<String, String> rec = new ProducerRecord<>(TOPIC, ("This is line number " + i));
producer.send(rec);
i++;
}
producer.closer();
} catch (AdminOperationException aoe) {
aoe.printStackTrace();
}
}
}
}
请记住,如果您想删除主题,默认情况下在设置中禁用。启动Kafka时使用的配置文件(默认情况下为$ {kafka_home} /config/server.properties),如果它尚不存在并且设置为false或注释掉,则添加以下行:
delete.topic.enabled=true
然后您必须重新启动服务器,并且可以通过Java或提供的命令行工具删除主题。
NB
它总是一个好主意,关闭生产者/消费者,当你与他们完成了,如图所示的代码示例。
感谢您的答案和评论。我们的卡夫卡设置为从消息中自动生成主题。我发现了什么问题,我在Windows PC上运行客户端,并且在连接到Kafka时使用了Kafka服务器的IP地址,但我认为在创建主题的元数据中, kafkaserver名称被返回,因此我将Kafka服务器的IP地址和主机名添加到我的主机文件中,并且它可以正常工作。 – Margit
啊,没关系,发生这种事情有点不寻常!但很高兴这一切工作 –
生产者不能创建主题。管理客户端API可以做到这一点。创建主题是因为在代理上启用了主题自动创建(auto.create.topics.enable属性)(默认情况下)。你能显示代码吗? – ppatierno
感谢您的评论。我找到了一个解决方案,我试图在下面的答案的评论中解释“问题”。 – Margit