在apache的火花JavaPairRDD
问题描述:
键排序我JavaPairRDD类型的关键Tuple2<Integer, Integer>
在apache的火花JavaPairRDD
我希望通过我的钥匙JavaPairRDD排序所以我写了这样一个比较:
JavaPairRDD<Tuple2<Integer, Integer>, Integer> Rresult=result.sortByKey(new Comparator<Tuple2<Integer, Integer>>() {
@Override
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
if(o1._1()==o2._1())
return o1._2()-o2._2();
return o1._1()-o2._1();
}
},true);
这个排序值通过第一次进入元组,如果它们与第二项相同。
但我收到以下错误堆栈:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
.. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1083)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.io.ObjectStrea
答
你如何创建JavaPairRDD
?在应用分类之前请检查它。 Yow也将得到Task不可序列化的异常,直接在sortByKey
方法中使用新的比较器。您应该在单独的课程中实施Comparator
和Serializable
并将其传递给sortByKey
方法。以下是供您参考的样本。
public class SparkSortSample {
public static void main(String[] args) {
//SparkSession
SparkSession spark = SparkSession
.builder()
.appName("SparkSortSample")
.master("local[1]")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
//Sample data
List<Tuple2<Tuple2<Integer, Integer>, Integer>> inputList = new ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>>();
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 444), 4444));
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(3, 333), 3333));
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(1, 111), 1111));
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 222), 2222));
//JavaPairRDD
JavaPairRDD<Tuple2<Integer, Integer>, Integer> javaPairRdd = jsc.parallelizePairs(inputList);
//Sorted RDD
JavaPairRDD<Tuple2<Integer, Integer>, Integer> sortedPairRDD = javaPairRdd.sortByKey(new TupleComparator(), true);
sortedPairRDD.foreach(rdd -> {
System.out.println("sort = " + rdd);
});
// stop
jsc.stop();
jsc.close();
}
}
这里是TupleComparator类,它实现了Comparator和Serializable接口。
class TupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {
@Override
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
if (o1._1() == o2._1())
return o1._2() - o2._2();
return o1._1() - o2._1();
}
}