Flink有状态计算入门示例
需求:从socket接收字符串,按空格分隔成单词,统计单词出现的次数。
直接看代码吧。
主函数:
FlinkStatefulCalcTest.java
package com.ccclubs.flink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author xianghu.wang
* @time 2019/4/11
* @description
*/
public class FlinkStatefulCalcTest {
public static void main(String[] args) throws Exception {
// 创建运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为方便输出,设置并行度为1
env.setParallelism(1);
// 从socket接收数据
DataStream<String> words = env.socketTextStream("localhost", 9999, "\n");
// 把字符串分隔成单词及其个数的键值对(未聚合)
words.flatMap((String line, Collector<WordCount> out) -> {
for (String s : line.split(" ")) {
out.collect(new WordCount(s, 1));
}
}).returns(WordCount.class)
// 按key分组,维护state时可认为只有一个key
.keyBy("word")
// 开始计算
.flatMap(new WordCountFunction())
// 打印在控制台
.print();
// print the results with a single thread, rather than in parallel
env.execute("stateful Window WordCount");
}
}
单词计数实体类:
WordCount.java
package com.ccclubs.flink;
/**
* @author xianghu.wang
* @time 2019/4/11
* @description
*/
public class WordCount {
/**
* 单词
**/
private String word;
/**
* 单词个数
**/
private Integer count;
public WordCount() {
}
public WordCount(String word, Integer count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
状态类:
WordCountState.java
package com.ccclubs.flink;
import java.io.Serializable;
/**
* @author xianghu.wang
* @time 2019/4/11
* @description 单词统计状态类
*/
public class WordCountState implements Serializable {
/**单词**/
private String word;
/**单词个数**/
private Integer count;
public String getWord() {
return word;
}
public WordCountState(String word, Integer count) {
this.word = word;
this.count = count;
}
public void setWord(String word) {
this.word = word;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
}
重点!!!! 计算函数
WordCountFunction.java
package com.ccclubs.flink;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @author xianghu.wang
* @time 2019/4/11
* @description 有状态结算,RichFlatMapFunction<WordCount, WordCount>的泛型分别为
* 入参和出参的数据类型(根据实际需求来,这里都是WordCount是巧合)
*/
public class WordCountFunction extends RichFlatMapFunction<WordCount, WordCount> {
/**
* 单词统计状态
**/
private transient ValueState<WordCountState> countState;
/**
* 有状态计算
*
* @param in
* @param out
* @throws Exception
*/
@Override
public void flatMap(WordCount in, Collector<WordCount> out) throws Exception {
// access the state in
WordCountState lastState = countState.value();
// 第一次进入计算,更新state,输入原样返回
if (lastState == null) {
// 初始化state
lastState = new WordCountState(in.getWord(), in.getCount());
// 进来的数据原样返回
out.collect(in);
// 更新state
countState.update(lastState);
} else {
// count 累加
lastState.setCount(lastState.getCount() + in.getCount());
out.collect(new WordCount(in.getWord(), lastState.getCount()));
// 更新state
countState.update(lastState);
}
}
/**
* 初始化ValueState(未指定默认值,官方推荐自己处理)
*
* @param config
*/
@Override
public void open(Configuration config) {
ValueStateDescriptor<WordCountState> descriptor =
new ValueStateDescriptor<>(
// the state name
"countState",
// type information
TypeInformation.of(new TypeHint<WordCountState>() {
}));
countState = getRuntimeContext().getState(descriptor);
}
}
运行结果:
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
注:转载请注明出处