Java实现Kafka生产者消费者功能
Java实现Kafka生产者消费者功能
好久没有更新博客,最近学的东西很多,但一直忙的没有时间去写,先补充一篇kafka的,最基本的功能使用,不得不感叹大数据确实难,即使只说一个简单的功能,之前也需要铺垫很多完成的功能,比如这篇博客的前提是,你已经安装了虚拟机,里面配置了Hadoop生态组件zookeeper,安装配置了kafka,学会使用Maven,springboot等些技术,而不是直接拿来代码就可以复制粘贴。
保证你的虚拟机是可以ping通的,hmaster是我在host中配置的虚拟机IP,可以修改为自己的。
在shell中开启两个窗口,测试终端中的producer和consumer能否在同一个topic中传递消息。
启动kafka
bin/kafka-server-start.sh /config/server.properties
使用Kafka(单节点单broker) • 创建topic: zk
• kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic
查看所有topic
• kafka-topics.sh --list --zookeeper hadoop000:2181
发送消息: broker
• kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic
消费消息: zk
• kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning
Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
ProducerDemo
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "hmaster:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
Consumer
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "hmaster:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}