卡夫卡生产者配置metadata.broker.list与url
问题描述:
我已经建立了一个卡夫卡服务器与3个经纪人。我想从我的计算机向这三位经纪人发送消息,但我已经为ngix
中的abc.com/kafka1/ abc.com/kafka2/ abc.com/kafka3/
等网址配置了每位经纪人。卡夫卡生产者配置metadata.broker.list与url
如何在metadata.broker.list
属性中使用这些网址?我的代码如下。
package com.xxx.x.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEnvents = 0; nEnvents < events; nEnvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com" + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
这是我运行我的代码时得到的错误。
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35)
line error producer.send(data);
答
我会说abc.com/kafka1/:80不是正确的语法。我认为正确的应该是abc.com:9092。
metadata.broker.list属性中使用的url和port应该由您在Kafka broker server.properties文件中设置的内容(或者您在启动时设置的任何名称)确定。
重要的价值观是:
# The port the socket server listens on
port=xxx
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
# advertised.host.name=<hostname routable by clients>
端口默认是9092,所以如果你用80
希望这有助于检查这一项。
答
配置变量metadata.broker.list
需要host1:port1,host2:port2
而不是URL。尝试为每个代理配置不同的子域名,如kafka1.abc.com:80,kafka2.abc.com:80,kafka3.abc.com:80
,并将这些子域指向适当的主机。请参阅Kafka Configuration的生产者配置部分
这是用于引导的,生产者只会用它来获取元数据(主题,分区和副本)。发送实际数据的套接字连接将基于元数据中返回的代理信息建立。格式为host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的VIP。
在服务器中它使用端口9092,但ngix将端口80转发到外部。 – giaosudau
abc.com/kafka1/:80语法不正确。检查abc.com:80/kafka1/ – jordi