kafka集群java测试代码
参照了其他的帖子之后,最近我自己也在学kafka,自己写了一个demo,但是发现有好多地方都卡住了,写了这篇帖子就是希望各位看官在学习的时候能少走弯路,今天搞了我一天,我把我所遇到的问题列出来,供大家参考。
我是自己在虚拟机里面搭建好了kafka的集群,测试通过了,然后准备用java代码写一个demo程序来测试看能不能通过。废话不多说,开始吧。
很多帖子都是采用maven的形式,我这里是直接创建了一个java项目,src下面创建一个lib文件夹,把jar包导入进去。在这里jar包就是一个坑,我刚开始是以为只要一个kafka的jar就可以了的,但是后面代码都写完了,也没有报错,但是启动的时候死活都是各种报错,郁闷了半天,于是怀疑是不是少了什么东西,于是就开始各种百度,谷歌。结果也没有看出个所以然。后来还是在网上看到了一哥们写的,但是他所给的jar包不全,没有引用json的jar包。消费者或接受不到,这里补充一下。还有就是这里面的很多方法都过期了,对于过期的方法要慎用,因为以后万一升级,这些方法在以后的版本如果不支持了的话,现在写的代码就需要重新写,很麻烦。
java 代码:
生产者代码
package kafka;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class KafkaProducer extends Thread {
private String topic;
public KafkaProducer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
KeyedMessage t=new KeyedMessage(topic, "message: " + i);
producer.send(t);
System.out.println("发送了: " + i);
try {
TimeUnit.SECONDS.sleep(1);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "172.16.12.252:2181,172.16.12.253:2181,172.16.12.254:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "172.16.12.252:9092,172.16.12.253:9092,172.16.12.254:9092");// 声明kafka broker
return new Producer(new ProducerConfig(properties));
}
public static void main(String[] args) {
new KafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
new KafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test
}
}
消费者代码
package kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer extends Thread {
private String topic;
public KafkaConsumer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "172.16.12.252:2181,172.16.12.253:2181,172.16.12.254:2181");//声明zk
properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
}