问题与对象不可序列化类:org.apache.hadoop.hbase.io.ImmutableBytesWritable错误

问题描述:

我想从Spark加载大量的数据到HBase。我正在使用saveAsNewAPIHadoopDataset方法。问题与对象不可序列化类:org.apache.hadoop.hbase.io.ImmutableBytesWritable错误

我正在创建ImmutableWritable并放置并保存,如下所示。

dataframe.mapPartitions { rows => 
     { 
     rows.map { eachRow => 
      { 
      val rowKey = Seq(eachRow.getAs[String]("uniqueId"), eachRow.getAs[String]("authTime")).mkString(",") 
      val put = new Put(Bytes.toBytes(rowKey)); 
      val fields = eachRow.schema.fields; 

      for (i <- 0 until fields.length) { 
       put.addColumn(userCF, Bytes.toBytes(fields(i).name), Bytes.toBytes(String.valueOf(eachRow.get(i)))) 
      } 

      (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put) 
      } 
     } 
     } 
    }.saveAsNewAPIHadoopDataset(job.getConfiguration) 

我的数据是30GB的价值,它存在于60个文件的HDFS中。

当我一次提交10个文件的同一份工作时,每件事情都很顺利。

但是,当我一次提交所有的东西,它是给这个错误。这个错误真的令人沮丧,我尝试了所有可能的事情。但真正想知道是什么让数据在5GB时成功运行,以及是什么导致30GB数据时出错。

有没有人遇到过这样的问题。

+0

你可以把完整的堆栈跟踪在这里......? –

+0

当我为每个执行程序分配40G内存时,工作正在接近,但是每当我分配的内存更少时,我都面临着这个错误。这意味着,每当有一个洗牌,我得到这个错误。 – Srini

这是因为ImmutableBytesWritable不可序列化。当有洗牌时,apache spark会尝试序列化它以发送到另一个节点。如果你想尝试拿一些或收集在司机身上,也会发生同样的情况。

实际上只有两种方法。

  • 不要在洗牌时使用它。如果你只是需要把每个记录从磁盘放入数据库,那么看起来像洗牌是不需要的。确保它是。如果您需要在数据进入数据库之前对其进行预处理,请将其保存为其他可序列化格式,并在保存时将其转换为仅需的数据。
  • 使用另一个序列化程序。 Apache Spark带有Kryo(确保你使用的是spark 2.0.0-- Kryo已经在那里更新了,它修复了一些令人讨厌的并发错误)。为了使用它,你必须配置它。这并不难,但需要一些代码。
+0

嗨evgeni。是的,我尝试了kryo,并通过在类的列表中提供它来简化该类的可序列化。但克里给了我其他的序列化问题。精确地说,索引出界问题。所以我别无选择,只能放弃它..我希望新的API能够更好地使用kryo API。 – Srini

+0

@Srini,你用什么火花版本?如果在2.0.0之前,那么Kryo遇到了问题,您无法解决问题。可能就是这样。 – evgenii