问题与对象不可序列化类:org.apache.hadoop.hbase.io.ImmutableBytesWritable错误
问题描述:
我想从Spark加载大量的数据到HBase。我正在使用saveAsNewAPIHadoopDataset方法。问题与对象不可序列化类:org.apache.hadoop.hbase.io.ImmutableBytesWritable错误
我正在创建ImmutableWritable并放置并保存,如下所示。
dataframe.mapPartitions { rows =>
{
rows.map { eachRow =>
{
val rowKey = Seq(eachRow.getAs[String]("uniqueId"), eachRow.getAs[String]("authTime")).mkString(",")
val put = new Put(Bytes.toBytes(rowKey));
val fields = eachRow.schema.fields;
for (i <- 0 until fields.length) {
put.addColumn(userCF, Bytes.toBytes(fields(i).name), Bytes.toBytes(String.valueOf(eachRow.get(i))))
}
(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put)
}
}
}
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
我的数据是30GB的价值,它存在于60个文件的HDFS中。
当我一次提交10个文件的同一份工作时,每件事情都很顺利。
但是,当我一次提交所有的东西,它是给这个错误。这个错误真的令人沮丧,我尝试了所有可能的事情。但真正想知道是什么让数据在5GB时成功运行,以及是什么导致30GB数据时出错。
有没有人遇到过这样的问题。
答
这是因为ImmutableBytesWritable不可序列化。当有洗牌时,apache spark会尝试序列化它以发送到另一个节点。如果你想尝试拿一些或收集在司机身上,也会发生同样的情况。
实际上只有两种方法。
- 不要在洗牌时使用它。如果你只是需要把每个记录从磁盘放入数据库,那么看起来像洗牌是不需要的。确保它是。如果您需要在数据进入数据库之前对其进行预处理,请将其保存为其他可序列化格式,并在保存时将其转换为仅需的数据。
- 使用另一个序列化程序。 Apache Spark带有Kryo(确保你使用的是spark 2.0.0-- Kryo已经在那里更新了,它修复了一些令人讨厌的并发错误)。为了使用它,你必须配置它。这并不难,但需要一些代码。
你可以把完整的堆栈跟踪在这里......? –
当我为每个执行程序分配40G内存时,工作正在接近,但是每当我分配的内存更少时,我都面临着这个错误。这意味着,每当有一个洗牌,我得到这个错误。 – Srini