Kafka Connect:如何使用hdfs sink连接器将来自Kafka主题的protobuf数据发送到HDFS?

问题描述:

我有一个制作人正在为主题制作protobuf消息。我有一个消费者应用程序,它将protobuf消息反序列化。但hdfs sink连接器直接从Kafka主题中获取消息。 etc/schema-registry/connect-avro-standalone.properties中的键和值转换器将被设置为?什么是最好的方法来做到这一点?提前致谢!Kafka Connect:如何使用hdfs sink连接器将来自Kafka主题的protobuf数据发送到HDFS?

Kafka Connect旨在将卡夫卡中序列化格式的问题从converters的概念中分离出来。正如您似乎已经发现的那样,您需要将key.convertervalue.converter类调​​整为支持protobufs的实现。这些类通常作为一个普通的Kafka Deserializer实现,然后执行从序列化特定的运行时格式(例如protobufs中的Message)到Kafka Connect的运行时API(它没有任何关联的序列化格式 - 它只是一个一组Java类型和一个类来定义模式)。

我不知道现有的实现。实现这一点的主要挑战是protobufs是自描述的(也就是说,可以在不访问原始模式的情况下对其进行反序列化),但由于其字段只是整数ID,所以如果没有以下要求,您可能无法得到有用的模式信息:a)需要特定模式可用于转换器,例如通过配置(这会使迁移架构更加复杂)或者b)为您的数据提供模式注册表服务+封装格式,使您可以动态查找模式。

+2

我有一个勉强工作的实现。我使用'avro-protobuf'扩展了'AvroConter'类的'Deserializer'。据我所知,Kafka Connect希望限制支持的格式数量,即JSON和Avro,因此我不会按原样发布它。然后,我不想复制整个'avro-converter'并将其重命名为'protobuf-converter'。什么是贡献该项目的最佳方法? –

+2

Kafka Connect绝对不希望限制支持的格式数量。恰恰相反,我们包括转换器,并为通用连接器提供数据API,以支持不同序列化格式的插入。 protobuf的实现绝对有价值,我建议发布它。 尽管我们希望获得相当完整的实现,但我们愿意将其与AvroConverter一起纳入我们的存储库。我看到的最大问题是,为了获得有用的实现,我期望您需要类似于模式注册表的东西。 –