Java中的对象不可序列化(org.apache.kafka.clients.consumer.ConsumerRecord)spark kafka streaming
我非常确定我只推送数据字符串,反序列化也是String。我推它的记录也显示错误。Java中的对象不可序列化(org.apache.kafka.clients.consumer.ConsumerRecord)spark kafka streaming
但是为什么突然它显示出这种类型的错误,有什么我失踪?
这里是下面的代码,
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka10.*;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
public final class KafkaConsumerDirectStream {
public static void main(String[] args) throws Exception {
try {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
sparkConf.set("spark.streaming.concurrentJobs", "3");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Object> kafkaParams = new HashMap<>();
// kafkaParams.put("metadata.broker.list", "x.xx.xxx.xxx:9091,
// x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093");
kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9091");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "11_ubiq_12dj");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("session.timeout.ms", "30000");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("TopicQueue");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferBrokers(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
//stream.print();
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
// stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
jssc.start();
jssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}
以下错误提高,
16/11/24 00:19:14 ERROR JobScheduler: Error running job streaming job 1479964754000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 30.0 (TID 1500) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = PartWithTopic02Queue, partition = 36, offset = 555, CreateTime = 1479964753779, checksum = 2582644462, serialized key size = -1, serialized value size = 6, key = null, value = Hello0))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at java.lang.Thread.getStackTrace(Thread.java:1117)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:785)
看来org.apache.spark.streaming.kafka10.*;
工作不正常。 我只用了org.apache.spark.streaming.kafka
这对我来说工作得很好。
apache.kafka.clients.consumer.ConsumerRecord类是不可序列不能用于RMI或喜欢。
在此之前,这是工作非常好。可能是我做了一些改变,这就是为什么它不工作... –
@CodeIdenti,之前你是什么意思?也许你正在序列化一个空对象? – AntJavaDev
如果当时“TopicQueue”中没有数据,它工作正常。但是当“TopicQueue”中的数据出现此错误时。 –
你只需要添加public final class KafkaConsumerDirectStream implements java.io.Serializable
这对我的工作,虽然使用org.apache.spark.streaming.kafka10.*
希望你能帮助,谢谢:-)
请忽略注释代码 –
异常提高运行 –