SparkStreaming写入hbase数据库过程中,有空指针NullPointException产生如何处理以及RDD类型如何转换!!
所用的软件版本:
spark2.3.0
hbase1.4.6
IDEA2019.1
业务代码如下:
package com.bd.spark
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
object kafkaSparkStreaming_10version {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[3]")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,salve1:9092,slave2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "streamingkafka1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
val topics = Array("streaming_kafka_3")
val stream: InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val readyData = stream.map(record => record.value()).map(str => str.replace("\"{","{")).map(str => str.replace("}\"", "}")).map(str => str.replace("\\\"", "\""))
readyData.print()
// 将数据写入到hbase中
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf)
hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "rawVideo")
readyData.foreachRDD(rdd1 => if(!rdd1.isEmpty()) {
val session = SparkSession.builder().getOrCreate()
import session.implicits._
val DF = session.read.json(session.createDataset(rdd1))
DF.createOrReplaceTempView("temp")
// session.sql("select num, cover from temp").show()
val ans = session.sql(" select num, cover from temp").rdd
val finalresult = ans.filter(!_.isNullAt(0)).map(x => {(x.getLong(0), x.getString(1))}) //filter(!_.isNullAt(0))处理空数据
finalresult.map(line => {
val put = new Put(Bytes.toBytes(line._1))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(line._2))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
}else{ println("=============================rdd isEmpty=========================================")})
ssc.start()
ssc.awaitTermination()
}
}
SparkStreaming产生的数据,写入到hbase数据库中,但是数据中有(NULL))数据,直接储存到hbase中就会产生
java.NullPointException.value at index 0 is null错误
参考https://blog.****.net/sparkexpert/article/details/52537723,要首先对数据进行去除空数据,采取如下代码:
val finalresult = ans.filter(!_.isNullAt(0)).map(x => {(x.getLong(0), x.getString(1))})
上述代码如果改成
val finalresult = ans.map(x => if(!x.isNotAt(0)) {(x.getLong(0), x.getString(1))})
finalresult就是RDD[any]类型,不是RDD[Long, String]类型,下面的代码运行就会报错:
finalresult.map(line => {
val put = new Put(Bytes.toBytes(line._1))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(line._2))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
}
val myRdd = sc.makeRDD(List(
List[AnyRef]("date 1", "blah2", (11: java.lang.Integer), "baz1"),
List[AnyRef]("date 2", "blah3", (5: java.lang.Integer), "baz2"),
List[AnyRef]("date 3", "blah4", (1: java.lang.Integer), "baz3")
)
然后进行map转换得到需要的类型
val unmessTypes = myRdd.map{
case List(a: String, b: String, c: java.lang.Integer, d: String) => (a, b, (c: Int), d)
}
做好记录,供以后参考!!!