Spark mapPartitions和map

楔子

Spark 两个API

map

public static void mapMethod() {
	JavaRDD<String> rdd = sc.parallelize(Arrays.asList("hello", "spark", "hadoop", "李丰", "帅", "弓", "瑞", "杰", "青"), 3);

	JavaRDD<String> map = rdd.map(new Function<String, String>() {

		private static final long serialVersionUID = 1L;

		@Override
		public String call(String v1) throws Exception {
			System.out.println("创建数据库连接……");
			System.out.println("insert数据……         " + v1);
			System.out.println("close数据库连接……");
			return v1;
		}
	});
	map.collect();
}

Spark mapPartitions和map

mapPartitions

/**
 * mapPartitions 有几个分区创建几个连接 ,相比 map 性能高一点
 */
public static void mapPartitionsMethod() {
	JavaRDD<String> rdd = sc.parallelize(Arrays.asList("hello", "spark", "hadoop", "李丰", "帅", "弓", "瑞", "杰", "青"), 3);
	JavaRDD<String> mapPartitions = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
		private static final long serialVersionUID = 1L;

		@Override
		public Iterator<String> call(Iterator<String> t) throws Exception {
			ArrayList<String> list = new ArrayList<String>();
			System.out.println("创建数据库连接……");
			while (t.hasNext()) {
				String next = t.next();
				list.add(next);
				System.out.println("insert数据……         " + next);
			}
			System.out.println("close数据库连接……");
			return list.iterator();
		}
	});
	mapPartitions.collect();

}

Spark mapPartitions和map