Kafka 2.0的简单Producer和Consumer实现
系统环境
在kafka单节点运行环境下,尝试使用java创建Kafka的Producer和Consumer进行测试,具体的代码环境如下:
- OS:Ubuntu 16.4
- Kafka:2.11_2.0.0
- Zookeeper:使用Kafka中自带的Zookeeper进行启动
- JDK: 1.8
项目使用maven,其中pom.xml的相关内容如下:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!--<scope>test</scope>-->
</dependency>
简单Producer的实现
简单Producer的代码如下:
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class TestProducer {
private static Properties kafkaProps;
private static void initKafka() {
kafkaProps = new Properties();
// broker url
kafkaProps.put("bootstrap.servers", "localhost:9092"); //,192.168.216.139:9092,192.168.216.140:9092
// request need to validate
kafkaProps.put("acks", "all");
// request failed to try
kafkaProps.put("retries", 0);
// memory cache size
kafkaProps.put("batch.size", 16384);
//
kafkaProps.put("linger.ms", 1);
kafkaProps.put("buffer.memory", 33554432);
// define the way of key and value serializer
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public static void main(String[] args) {
initKafka();
Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
System.out.println("Message sent successfully!");
producer.close();
}
}
代码中需要注意的是:bootstrap.servers的配置项,在默认kafka的单节点配置时,不能使用IP,而是使用localhost进行连接,否则会连接异常。
此处对代码中用到的几个参数进行解释:
- bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;
- acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。
- retries:生产者发送失败后,重试的次数 batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。
- linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。
- batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。
- buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。
- key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。
简单Consumer的实现
简单Consumer的代码如下:
package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class TestConsumer {
private static Properties kafkaProps = new Properties();
private static void kafkaInit() {
kafkaProps.put("bootstrap.servers", "localhost:9092");
// group id for each consumer
kafkaProps.put("group.id", "test");
// if value legal, auto add offset
kafkaProps.put("enable.auto.commit", "true");
// set how long time to udpate the offset value
kafkaProps.put("auto.commit.interval.ms", "1000");
// set session response time
kafkaProps.put("session.timeout.ms", "30000");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
kafkaInit();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
System.out.println("Subscribed to topic:" + "my-topic");
int i = 0;
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // ?
for (ConsumerRecord<String, String> record : records) {
// print the offset, key and value for the consumer records
System.out.printf("Offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
运行情况
Producer的成功运行后,部分输出如下:
可以看到,在producer的initKafka的相关配置项的值出现在ProducerConfig values中。
Consumer成功运行后,可以看到在producer中send的相关key和value值,在consumer的输出中出现:
遇到的问题
在producer运行时,出现如下错误:
在提示的参考URL页面中,可以找到相关问题的说明:
具体的解决方法为,修改pom.xml文件:在pom.xml文件中加入slf4j的相关引用,并将slf4j-log4j12引用中:
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!--<scope>test</scope>-->
</dependency>
修改完成后,重新运行producer程序,可以正常运行。