代码:
package com.weichai.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
/**
* 消费者的线程执行
* @author lhy
* @date 2018.10.09
*/
public class Consumer implements Runnable {
private KafkaStream stream;
private int threadNumber;
public Consumer(KafkaStream stream, int threadNumber) {
this.stream = stream;
this.threadNumber = threadNumber;
}
/**
* 线程执行
*/
@Override
public void run() {
// TODO Auto-generated method stub
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext()){
System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
}
System.out.println("Shutting down Thread: " + threadNumber);
}
}
package com.weichai.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* Kafka消费者的多线程调用,提高吞吐效率
* @author lhy
* @date 2018.10.09
*/
public class ConsumerThread {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerThread(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if(consumer !=null){
consumer.shutdown();
}
if(executor !=null){
executor.shutdown();
}
try {
if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){
System.out.println("消费者线程等待超时,直接退出!");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("系统异常中断,直接退出!");
e.printStackTrace();
}
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new Consumer(stream, threadNumber));
threadNumber++;
}
}
private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
// TODO Auto-generated method stub
String zooKeeper = "localhost:2181";
String groupId = "0";
String topic = "SimpleNode";
int threads = 5; // 启动的线程数
ConsumerThread thread = new ConsumerThread(zooKeeper, groupId, topic);
thread.run(threads);
try {
Thread.sleep(5000); //线程休眠5秒后终止
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
thread.shutdown();
}
}
运行截图:

此处重温了并发编程的内容,使用到了多线程框架,开启5个线程吸收Topic,大大提高了Kafka消费Topic的效率.