弗林克,卡夫卡和动物园管理员与URI

问题描述:

我想从我的本地机器连接到卡夫卡:弗林克,卡夫卡和动物园管理员与URI

kafkaParams.setProperty("bootstrap.servers", Defaults.BROKER_URL) 
kafkaParams.setProperty("metadata.broker.list", Defaults.BROKER_URL) 
kafkaParams.setProperty("group.id", "group_id") 
kafkaParams.setProperty("auto.offset.reset", "earliest") 

完全正常的,但我的BROKER_URI定义如下my-server.com:1234/my/subdirectory

我发现这种现象被称为chroot路径。

它引发以下错误:Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: my-server.com:1234/my/subdirectory

如何解决这个问题?

这是我的依赖关系:

val flinkVersion = "1.0.3" 

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided", 
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", 
"org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion, 

只是尽量不带路径上下文和斜线host:port格式。如果你有一个以上的服务器,这将是一个列表host1:port1,host2:port2

参考:http://kafka.apache.org/documentation.html

+0

这提供了以下错误:'异常线程“main” org.apache.kafka.common.errors.TimeoutException:超时过期而获取主题元数据' –

+0

这表明配置的格式没问题。接下来要看的是如果在kafka实例上运行iptables或防火墙。你可以在你的客户端中telnet kafka实例吗? –

+0

有趣的是,我可以使用Kafka控制台消费者进行连接:'./kafka-console-consumer.sh --zookeeper my-server.com:1234/my/subdirectory --topic my-topic --from-beginning'工作得很好。和Telnet也工作正常:'telnet my-server.com 1234' –

bootstrap.servers应该是一个逗号分隔的列表如下所示:address1:port1,address2:port2,...,addressn:portn。如果您只有一个Kafka经纪人,您应该输入类似localhost:9092(除非您将Kafka配置为在另一个端口上运行)。

你可以参考this post from dataArtisans了解更多关于如何让Flink和Kafka一起工作的细节。

笨。 Zookeeper!=卡夫卡。正如你在代码中看到的那样,我使用了两次相同的URL,但事实证明它们应该是不同的。

我想从我的本地机器连接到卡夫卡:

kafkaParams.setProperty("bootstrap.servers", Defaults.KAFKA_URL) 
kafkaParams.setProperty("metadata.broker.list", Defaults.ZOOKEEPER_URL) 
kafkaParams.setProperty("group.id", "group_id") 
kafkaParams.setProperty("auto.offset.reset", "earliest") 
+1

;)你会请发布正确的代码? –

+0

Ofcourse :-),它被添加 –