Spark Streaming + Accumulo - 序列化BatchWriterImpl

问题描述:

我正在寻找Spark Streaming + Accumulo连接器和完整的使用示例。Spark Streaming + Accumulo - 序列化BatchWriterImpl

当前我正在尝试将Spark Streaming结果写入Accumulo表,但是我得到了BatchWriter的NotSerializableException。有人能指点我如何序列化BatchWriter的例子吗?以下代码基于Accumulo文档。

当前代码:

val accumuloInstanceName = "accumulo" 
val zooKeepers = "localhost:2181" 
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers) 
val accumuloUser = programOptions.accumuloUser() 
val accumuloPassword = programOptions.accumuloPassword() 
val passwordToken = new PasswordToken(accumuloPassword) 
val connector = instance.getConnector(accumuloUser, passwordToken) 

val accumuloBatchWriterConfig = new BatchWriterConfig 
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024 
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory) 
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
fullMergeResultFlatten.foreachRDD(recordRDD => 
    recordRDD.foreach(record => { 
    val mutation = new Mutation(Longs.toByteArray(record.timestamp)) 
    mutation.put("value", "", new Value(Longs.toByteArray(record.value))) 
    mutation.put("length", "", new Value(Longs.toByteArray(record.length))) 
    accumuloBatchWriter.addMutation(mutation) 
    }) 
) 

在运行时发生错误:

17/05/05 16:55:25 ERROR util.Utils: Exception encountered 
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 

我想这是很常见的情况,但我无法找到任何简单的火花流+ accumulo例子。

正如elserj指出的,序列化连接对象通常不是正确的模式。我所看到的模式是直接使用RDD.foreachPartition()从Spark工作节点发起连接。这很好,因为它允许您为每批作业创建一个连接(而不是为每个单独的记录创建一个几乎没有效率的新连接)。

实施例:

fullMergeResultFlatten.foreachRDD(recordRDD => { 
    recordRDD.foreachPartition(partitionRecords => { 
    // this connection logic is executed in the Spark workers 
    val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
    partitionRecords.foreach(// save operation) 
    accumuloBatchWriter.close() 
    }) 
}) 

您不能序列化BatchWriter类。我没有关于如何修正你的代码的建议,但我可以说试图序列化这个类并不是正确的方法。