使用ConsumeKafka处理器创建更大的NiFi流文件
问题描述:
我创建了一个简单的NiFi管道,它从Kafka主题(使用ConsumeKafka
)中读取数据流并将其写入HDFS(使用PutHDFS
)。目前,我看到很多在HDFS上创建的小文件。一个新文件每秒创建一次,有些只有一个或两个记录。使用ConsumeKafka处理器创建更大的NiFi流文件
我要更少,更大的文件写入到HDFS。
我有以下设置在ConsumeKafka
:
Message Demarcator = <new line>
Max Poll Records = 10000
Max Uncommitted Time = 20s
在我用水槽代替Nifi过去,它有batchSize
和batchDurationMillis
,这让我调整HDFS文件有多大。 Nifi中的ConsumeKafka
好像缺少batchDurationMillis
等价物。
是什么在NiFi的解决方案吗?
答
使用消息Demarcator和最大轮询记录是正确的做法,让每流文件多个消息。您可能希望通过从0秒,这意味着运行尽可能快地调整运行计划(调度选项卡上),要像1秒或任何让你感觉攫取更多的数据,以减慢ConsumeKafka处理器。
即使有了上述情况,您仍然可能希望在PutHDFS之前插入MergeContent处理器,并根据大小合并流文件,以便在写入HDFS之前可以等到数据量适当。
如何使用MergeContent将取决于数据要合并的类型......如果你有Avro中,有一个Avro的特定的合并策略。如果你有JSON,你可以一个接一个地合并它们,或者你可以用一个页眉,页脚和分隔符来包装它们来创建一个有效的JSON数组。
谢谢,运行计划正是我需要的改变。 –