卡夫卡生产者配置metadata.broker.list与url

问题描述:

我已经建立了一个卡夫卡服务器与3个经纪人。我想从我的计算机向这三位经纪人发送消息,但我已经为ngix中的abc.com/kafka1/ abc.com/kafka2/ abc.com/kafka3/等网址配置了每位经纪人。卡夫卡生产者配置metadata.broker.list与url

如何在metadata.broker.list属性中使用这些网址?我的代码如下。

package com.xxx.x.kafka.producer; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 


    class TestProducer { 
     public static void main(String[] args) { 
      long events = Long.parseLong(args[0]); 
      Random rnd = new Random(); 

      Properties props = new Properties(); 
      props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80"); 

      props.put("serializer.class", "kafka.serializer.StringEncoder"); 
      props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner"); 
      props.put("request.required.acks", "1"); 

      ProducerConfig config = new ProducerConfig(props); 

      Producer<String, String> producer = new Producer<String, String>(config); 

      for (long nEnvents = 0; nEnvents < events; nEnvents++) { 
       long runtime = new Date().getTime(); 
       String ip = "192.168.2." + rnd.nextInt(255); 
       String msg = runtime + ",www.example.com" + ip; 
       KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); 
       producer.send(data); 
      } 
      producer.close(); 
     } 
    } 

这是我运行我的代码时得到的错误。

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
    at kafka.producer.Producer.send(Producer.scala:77) 
    at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
    at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35) 

line error producer.send(data); 

我会说abc.com/kafka1/:80不是正确的语法。我认为正确的应该是abc.com:9092。

metadata.broker.list属性中使用的url和port应该由您在Kafka broker server.properties文件中设置的内容(或者您在启动时设置的任何名称)确定。

重要的价值观是:

# The port the socket server listens on 
port=xxx 
# Hostname the broker will bind to. If not set, the server will bind to all interfaces 
#host.name=localhost 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the 
# value for "host.name" if configured. Otherwise, it will use the value returned from 
# java.net.InetAddress.getCanonicalHostName(). 
# advertised.host.name=<hostname routable by clients> 

端口默认是9092,所以如果你用80

希望这有助于检查这一项。

+0

在服务器中它使用端口9092,但ngix将端口80转发到外部。 – giaosudau

+0

abc.com/kafka1/:80语法不正确。检查abc.com:80/kafka1/ – jordi

配置变量metadata.broker.list需要host1:port1,host2:port2而不是URL。尝试为每个代理配置不同的子域名,如kafka1.abc.com:80,kafka2.abc.com:80,kafka3.abc.com:80,并将这些子域指向适当的主机。请参阅Kafka Configuration的生产者配置部分

这是用于引导的,生产者只会用它来获取元数据(主题,分区和副本)。发送实际数据的套接字连接将基于元数据中返回的代理信息建立。格式为host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的VIP。