使用ConsumeKafka处理器创建更大的NiFi流文件

问题描述:

我创建了一个简单的NiFi管道,它从Kafka主题(使用ConsumeKafka)中读取数据流并将其写入HDFS(使用PutHDFS)。目前,我看到很多在HDFS上创建的小文件。一个新文件每秒创建一次,有些只有一个或两个记录。使用ConsumeKafka处理器创建更大的NiFi流文件

我要更少,更大的文件写入到HDFS。

我有以下设置在ConsumeKafka

Message Demarcator = <new line> 
Max Poll Records = 10000 
Max Uncommitted Time = 20s 

在我用水槽代替Nifi过去,它有batchSizebatchDurationMillis,这让我调整HDFS文件有多大。 Nifi中的ConsumeKafka好像缺少batchDurationMillis等价物。

是什么在NiFi的解决方案吗?

使用消息Demarcator和最大轮询记录是正确的做法,让每流文件多个消息。您可能希望通过从0秒,这意味着运行尽可能快地调整运行计划(调度选项卡上),要像1秒或任何让你感觉攫取更多的数据,以减慢ConsumeKafka处理器。

即使有了上述情况,您仍然可能希望在PutHDFS之前插入MergeContent处理器,并根据大小合并流文件,以便在写入HDFS之前可以等到数据量适当。

如何使用MergeContent将取决于数据要合并的类型......如果你有Avro中,有一个Avro的特定的合并策略。如果你有JSON,你可以一个接一个地合并它们,或者你可以用一个页眉,页脚和分隔符来包装它们来创建一个有效的JSON数组。

+0

谢谢,运行计划正是我需要的改变。 –