配料不是卡夫卡生产者正与斯卡拉

问题描述:

我写在斯卡拉生产者和我想要做配料。批量应该工作的方式是,它应该将队列中的消息保存到队列中,然后将所有消息一起发布到主题上。但不知何故,它不工作。当我开始发送消息时,它开始逐一发布消息。有谁知道如何在卡夫卡生产者中使用配料。配料不是卡夫卡生产者正与斯卡拉

val kafkaStringSerializer = "org.apache.kafka.common.serialization.StringSerializer" 
     val batchSize: java.lang.Integer = 163840 
     val props = new Properties() 
     props.put("key.serializer", kafkaStringSerializer) 
     props.put("value.serializer", kafkaStringSerializer) 
     props.put("batch.size", batchSize); 
     props.put("bootstrap.servers", "localhost:9092") 

     val producer = new KafkaProducer[String,String](props) 

     val TOPIC="topic" 
     val inlineMessage = "adsdasdddddssssssssssss" 

     for(i<- 1 to 10){ 
     val record: ProducerRecord[String, String] = new ProducerRecord(TOPIC, inlineMessage) 
     val futureResponse: Future[RecordMetadata] = producer.send(record) 
     futureResponse.isDone 
     println("Future Response ==========>" + futureResponse.get().serializedValueSize()) 
     } 

你必须设置linger.ms在你的道具

默认情况下,它是零,这意味着消息发送如有可能,立即。 您可以增加它(例如100),以便批次发生 - 这意味着更高的延迟时间,但吞吐量更高。

batch.size是最大值:如果在达到linger.ms之前达到此值,则数据将在不等待更多时间的情况下发送。

要查看实际发送的批次,您需要配置日志记录(批处理在后台线程上完成,并且您将无法查看生产者api完成的批次 - 您无法发送或接收批次只有通过批处理与经纪人发送记录,并接收其响应,通信是内部完成的)

首先,如果没有这样做,绑定一个log4j的属性文件(Dlog4j.configuration=file:path/to/log4j.properties

log4j.rootLogger=WARN, stderr 
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=TRACE, stderr 

log4j.appender.stderr=org.apache.log4j.ConsoleAppender 
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout 
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n 
log4j.appender.stderr.Target=System.err 

例如,我会收到

TRACE Sent produce request to 2: (type=ProduceRequest, magic=1, acks=1, timeout=30000, partitionRecords=({test-1=[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, crc=2237306008, CreateTime=1502444105996, key=0 bytes, value=2 bytes))), (record=LegacyRecordBatch(offset=1, Record(magic=1, attributes=0, compression=NONE, crc=3259548815, CreateTime=1502444106029, key=0 bytes, value=2 bytes)))]}), transactionalId='' (org.apache.kafka.clients.producer.internals.Sender) 

哪个批次2个的数据。批量将包含记录发送到同一个经纪人

然后,batch.size和linger.ms玩玩看的差别。请注意,记录包含一些开销,因此1000的批处理大小不会包含10个大小为100的消息

请注意,我没有找到说明所有记录器及其功能的文档(如log4j.logger.org。 apache.kafka.clients.producer.internals.Sender)。您可以启用rootLogger调试/跟踪,找到你想要的数据,或者explore the code

+0

我做到了。我有props.put(“linger.ms”,5000)。但仍然没有工作。现在我看到我的消息延迟了5秒。消息仍然是一个接一个,但延迟5秒。 – user1733735

+3

这些消息分批存储并批量提取,但仍作为单个消息呈现给消费者。不要指望读取单个消息并获取一批消息作为响应。这不是卡夫卡配料工作的方式。 –

+0

所以,如果我必须验证我的配料是否正确完成。我如何验证它? – user1733735

您在数据同步生产的卡夫卡服务器。意味着,你叫producer.sendfutureResponse.get的那一刻,它只会得到的数据存储在服务器卡夫卡后返回。

储存在一个单独的列表的响应,并调用futureResponse.getfor循环外。

在默认configuration,卡夫卡支持配料,看到linger.msbatch.size

List<Future<RecordMetadata>> responses = new ArrayList<>(); 
for (int i=1; i<=10; i++) { 
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, inlineMessage); 
    Future<RecordMetadata> response = producer.send(record); 
    responses.add(response); 
} 

for (Future<RecordMetadata> response : responses) { 
    response.get(); // verify whether the message is sent to the broker. 
}