SparkStreaming写入hbase数据库过程中,有空指针NullPointException产生如何处理以及RDD类型如何转换!!

所用的软件版本:

spark2.3.0

hbase1.4.6

IDEA2019.1

业务代码如下:

package com.bd.spark

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf



object kafkaSparkStreaming_10version {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "master:9092,salve1:9092,slave2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "streamingkafka1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false:java.lang.Boolean)
    )

    val topics = Array("streaming_kafka_3")
    val stream: InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val readyData = stream.map(record => record.value()).map(str => str.replace("\"{","{")).map(str => str.replace("}\"", "}")).map(str => str.replace("\\\"", "\""))
    readyData.print()

//    将数据写入到hbase中
    val hbaseConf = HBaseConfiguration.create()
    val jobConf = new JobConf(hbaseConf)
    hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "rawVideo")

    readyData.foreachRDD(rdd1 => if(!rdd1.isEmpty()) {
      val session = SparkSession.builder().getOrCreate()
      import session.implicits._
      val DF = session.read.json(session.createDataset(rdd1))
      DF.createOrReplaceTempView("temp")
//      session.sql("select num, cover from temp").show()
      val ans = session.sql(" select num, cover from temp").rdd


      val finalresult = ans.filter(!_.isNullAt(0)).map(x => {(x.getLong(0), x.getString(1))})  //filter(!_.isNullAt(0))处理空数据

      finalresult.map(line => {
        val put = new Put(Bytes.toBytes(line._1))
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(line._2))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    }else{ println("=============================rdd isEmpty=========================================")})

    ssc.start()
    ssc.awaitTermination()
  }
}

SparkStreaming产生的数据,写入到hbase数据库中,但是数据中有(NULL))数据,直接储存到hbase中就会产生

java.NullPointException.value at index 0 is null错误

SparkStreaming写入hbase数据库过程中,有空指针NullPointException产生如何处理以及RDD类型如何转换!!

参考https://blog.****.net/sparkexpert/article/details/52537723,要首先对数据进行去除空数据,采取如下代码:

val finalresult = ans.filter(!_.isNullAt(0)).map(x => {(x.getLong(0), x.getString(1))})

上述代码如果改成

​
val finalresult = ans.map(x => if(!x.isNotAt(0)) {(x.getLong(0), x.getString(1))})

​

finalresult就是RDD[any]类型,不是RDD[Long, String]类型,下面的代码运行就会报错:

finalresult.map(line => {
        val put = new Put(Bytes.toBytes(line._1))
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(line._2))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    }

参考https://stackoverflow.com/questions/48471357/convert-rddlistanyref-to-rddliststring-date-string-string,要先进行构造

val myRdd = sc.makeRDD(List(
  List[AnyRef]("date 1", "blah2", (11: java.lang.Integer), "baz1"),
  List[AnyRef]("date 2", "blah3", (5: java.lang.Integer),  "baz2"),
  List[AnyRef]("date 3", "blah4", (1: java.lang.Integer),  "baz3") 
)

然后进行map转换得到需要的类型

val unmessTypes = myRdd.map{
  case List(a: String, b: String, c: java.lang.Integer, d: String) => (a, b, (c: Int), d)
}

 

做好记录,供以后参考!!!