Java版SparkStreaming读取Kafka实现实时的单词统计
1.开发工具:IDEA
2.sbt依赖:
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.apache.spark" %%"spark-hive" % "2.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0"
<!--只是开发SparkStreaming+Kafka项目只需要添加以下依赖即可-->
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0"
3.Kafka消费的两种方式分别为reveivers和direct,本次测试采用direct的方式。在以后的博客里面会讲两种方式的区别。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
/**
* 基于Kafka Direct方式实时wordcount程序
*/
public class KafkaDirector {
public static void main(String[] args) throws InterruptedException {
//创建SparkConf对象
SparkConf conf=new SparkConf()
.setAppName("KafkaDirectWordCount")
.setMaster("local[2]");
//创建JavaStreamingContext对象
JavaStreamingContext jsc=new JavaStreamingContext(conf, Durations.seconds(5));
//kafka的brokers
String brokers="10.1.34.112:9092";
//创建Kafka参数Map
Map<String,Object> kafkaParams=new HashMap<>();
kafkaParams.put("metadata.broker.list",brokers);
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", "g1");
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");
//创建Kafka的topics ,里面可以填多个topic
Collection<String> topics=Arrays.asList("mj");
//创建DStream
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(jsc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams));
//拆分Kafka topic里面的数据
JavaDStream<String> linesSplit=lines.flatMap(new FlatMapFunction<ConsumerRecord<Object, Object>, String>() {
@Override
public Iterator<String> call(ConsumerRecord<Object, Object> line) throws Exception {
return Arrays.asList(line.value().toString().split(" ")).iterator();
}
});
//单词映射成(word,1)的形式
JavaPairDStream<String,Integer> word=linesSplit.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String everyWord) throws Exception {
return new Tuple2<String,Integer>(everyWord,1);
}
});
JavaPairDStream<String,Integer> wordsCount=word.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
4.测试
1)新建一个topic
#bin/kafka-topics.sh --create --zookeeper 10.1.34.112:281 --replication-factor 1 --partitions 1 --topic test
2)运行程序
3)向topic写入测试数据,这个地方就不写了
4)查看运行效果