Flink读取kafka数据并以parquet格式写入HDFS
大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中;
目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read.parquet(path)
数据实体:
public class Prti {
private String passingTime;
private String plateNo;
public Prti() {
}
//gettter and setter 方法....
}
public class FlinkReadKafkaToHdfs {
private final static StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
private final static Properties properties = new Properties();
/**
* kafka 中发送数据JSON格式:
* {"passingTime":"1546676393000","plateNo":"1"}
*/
public static void main(String[] args) throws Exception {
init();
readKafkaToHdfsByReflect(environment, properties);
}
}
private static void init() {
environment.enableCheckpointing(5000);
environment.setParallelism(1);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//kafka的节点的IP或者hostName,多个使用逗号分隔
properties.setProperty("bootstrap.servers", "192.168.0.10:9092");
//only required for Kafka 0.8;
// properties.setProperty("zookeeper.connect", "192.168.0.10:2181");
//flink consumer flink的消费者的group.id
properties.setProperty("group.id", "test-consumer-group");
//第一种方式:路径写自己代码上的路径
// properties.setProperty("fs.hdfs.hadoopconf", "...\\src\\main\\resources");
//第二种方式:填写一个schema参数即可
properties.setProperty("fs.default-scheme", "hdfs://hostname:8020");
properties.setProperty("kafka.topic", "test");
properties.setProperty("hfds.path", "hdfs://hostname/test");
properties.setProperty("hdfs.path.date.format", "yyyy-MM-dd");
properties.setProperty("hdfs.path.date.zone", "Asia/Shanghai");
properties.setProperty("window.time.second", "60");
}
public static void readKafkaToHdfsByReflect(StreamExecutionEnvironment environment, Properties properties) throws Exception {
String topic = properties.getProperty("kafka.topic");
String path = properties.getProperty("hfds.path");
String pathFormat = properties.getProperty("hdfs.path.date.format");
String zone = properties.getProperty("hdfs.path.date.zone");
Long windowTime = Long.valueOf(properties.getProperty("window.time.second"));
FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), properties);
KeyedStream<Prti, String> KeyedStream = environment.addSource(flinkKafkaConsumer010)
.map(FlinkReadKafkaToHdfs::transformData)
.assignTimestampsAndWatermarks(new CustomWatermarks<Prti>())
.keyBy(Prti::getPlateNo);
DataStream<Prti> output = KeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
.apply(new WindowFunction<Prti, Prti, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Prti> iterable, Collector<Prti> collector) throws Exception {
System.out.println("keyBy: " + key + ", window: " + timeWindow.toString());
iterable.forEach(collector::collect);
}
});
//写入HDFS,parquet格式
DateTimeBucketAssigner<Prti> bucketAssigner = new DateTimeBucketAssigner<>(pathFormat, ZoneId.of(zone));
StreamingFileSink<Prti> streamingFileSink = StreamingFileSink.
forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(Prti.class))
.withBucketAssigner(bucketAssigner)
.build();
output.addSink(streamingFileSink).name("Hdfs Sink");
environment.execute("PrtiData");
}
private static Prti transformData(String data) {
if (data != null && !data.isEmpty()) {
JSONObject value = JSON.parseObject(data);
Prti prti = new Prti();
prti.setPlateNo(value.getString("plate_no"));
prti.setPassingTime(value.getString("passing_time"));
return prti;
} else {
return new Prti();
}
}
private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<Prti> {
private Long cuurentTime = 0L;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Prti prti, long l) {
return new Watermark(cuurentTime);
}
@Override
public long extractTimestamp(Prti prti, long l) {
Long passingTime = Long.valueOf(prti.getPassingTime());
cuurentTime = Math.max(passingTime, cuurentTime);
return passingTime;
}
}
往kafka中发送数据,省略…
进入spark shell中,执行:spark.read.parquet("/test/日期路径"),即可读取;
注意点:
StreamingFileSink<Prti> streamingFileSink = StreamingFileSink.
forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(Prti.class))
.withBucketAssigner(bucketAssigner)
.build();
第一种方式,最简单的方式:
ParquetAvroWriters.forReflectRecord(Prti.class)
第二种方式:这种方式对实体类有很高的要求,需要借助avro的插件编译生成数据实体类即可;
ParquetAvroWriters.forSpecificRecord(Prti.class)
编写好一个prti.avsc的文件,内容如下:
{"namespace": "com.xxx.streaming.entity",
"type": "record",
"name": "Prti",
"fields": [
{"name": "passingTime", "type": "string"},
{"name": "plateNo", "type": "string"}
]
}
其中:com.xxx.streaming.entity是生成的实体放置的包路径;
在pom中引入插件:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
第三种方式:
ParquetAvroWriters.forGenericRecord(“schema”)
传入一个avsc的文件进去即可。
补充一下:
增加POM依赖,有人问核心代码的依赖找不到,在maven的仓库里地区找不到,但是直接在pom引入可以下载到;在中英库上有的;
http://repo1.maven.org/maven2/org/apache/flink/flink-parquet/
核心的一个依赖:
org.apache.flink
flink-parquet
1.7.0
其他依赖,没有整理,有几个是不需要的,懒得整理了:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
原文:https://blog.****.net/u012798083/article/details/85852830