Spark mapPartitions和map
楔子
Spark 两个API
map
public static void mapMethod() {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("hello", "spark", "hadoop", "李丰", "帅", "弓", "瑞", "杰", "青"), 3);
JavaRDD<String> map = rdd.map(new Function<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(String v1) throws Exception {
System.out.println("创建数据库连接……");
System.out.println("insert数据…… " + v1);
System.out.println("close数据库连接……");
return v1;
}
});
map.collect();
}
mapPartitions
/**
* mapPartitions 有几个分区创建几个连接 ,相比 map 性能高一点
*/
public static void mapPartitionsMethod() {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("hello", "spark", "hadoop", "李丰", "帅", "弓", "瑞", "杰", "青"), 3);
JavaRDD<String> mapPartitions = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Iterator<String> t) throws Exception {
ArrayList<String> list = new ArrayList<String>();
System.out.println("创建数据库连接……");
while (t.hasNext()) {
String next = t.next();
list.add(next);
System.out.println("insert数据…… " + next);
}
System.out.println("close数据库连接……");
return list.iterator();
}
});
mapPartitions.collect();
}