无法连接到卡夫卡生产商的远程动物园管理员
问题描述:
我一直在玩Apache Kafka几天,这里是我的问题, 如果我在“快速入门”部分中设置了本地测试在网站上,一切都很好,卡夫卡生产者/消费者,zookeeper服务器和kafka经纪人完美地工作。无法连接到卡夫卡生产商的远程动物园管理员
现在,如果我在远程服务器上运行(我们称之为节点2): - 动物园管理员 - 端口2181 - 卡夫卡经纪人 - 端口9092 - 卡夫卡消费者
然后,如果我从我的本地计算机上运行: - kafka生产者
假设node2上没有防火墙。 连接结束超时。
以下是错误日志:
/etc/java/jdk1.6.0_41/bin/java -Didea.launcher.port=7533 -Didea.launcher.bin.path=/home/kevin/Documents/idea-IU-123.169/bin -Dfile.encoding=UTF-8 -classpath /etc/java/jdk1.6.0_41/lib/dt.jar:/etc/java/jdk1.6.0_41/lib/tools.jar:/etc/java/jdk1.6.0_41/lib/jconsole.jar:/etc/java/jdk1.6.0_41/lib/htmlconverter.jar:/etc/java/jdk1.6.0_41/lib/sa-jdi.jar:/home/kevin/Desktop/kafka-0.7.2/examples/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-compiler.jar:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-library.jar:/home/kevin/Desktop/kafka-0.7.2/core/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Documents/idea-IU-123.169/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain kafka.examples.KafkaConsumerProducerDemo
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
Exception in thread "Thread-0" java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
at kafka.producer.SyncProducer.send(SyncProducer.scala:125)
at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool.send(ProducerPool.scala:100)
at kafka.producer.Producer.zkSend(Producer.scala:137)
at kafka.producer.Producer.send(Producer.scala:99)
at kafka.javaapi.producer.Producer.send(Producer.scala:103)
at kafka.examples.Producer.run(Producer.java:53)
Process finished with exit code 0
,这里是我的制片代码:
import java.util.Properties;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
public class Producer extends Thread{
private final kafka.javaapi.producer.Producer<String, String> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("zk.connect", "node2:2181");
props.put("connect.timeout.ms", "5000");
props.put("socket.timeout.ms", "30000");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("conpression.codec", "0");
producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
this.topic = topic;
}
public void run() {
String messageStr = new String("Message_test");
producer.send(new ProducerData<String, String>(topic, messageStr));
}
}
**所以我也测试通过
切换props.put("zk.connect", "node2:2181");
props.put("broker.list", "0:node2:9082");
而且在这种情况下,我能够成功连接。**
答
见项目#3 http://kafka.apache.org/faq.html
的解决方法是明确设置主机属性卡夫卡
的server.properties您可以验证这一点通过使用Zookeeper。如果您使用的是kafka 0.7 *,请打开ZkCli控制台并获取/ brokers/ids/0,并且您应该获得所有经纪人元数据。确保IP地址/这里的主机名匹配的ZK连接字符串使用的是在生产代码 -
props.put("zk.connect", "node2:2181");
以我为例,我是用我的本地机器连接到Ubuntu的虚拟机上运行的生产商(同一个盒子,不同的IP),并且此解决方法有所帮助。
你能解决这个问题吗? – 2013-04-03 19:42:27