Flink slotSharingGroup disableChain startNewChain 用法案例
使用slot分组 或者 disableChaining(),我们可以将算子的依赖链给隔离或者分开,这样可以针对不同的需求进行不同的优化。
package application; import com.alibaba.fastjson.JSONObject; import operator.*; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.util.OutputTag; import org.slf4j.LoggerFactory; import java.util.Properties; /** * todo 备忘记录:使用分流方式,分为单场景快速流,分为多场景或者需要查询hbase的复合流 * * todo 1,接下来要做的;了解并行度;添加水印;添加checkpoint;代码正规化; * * todo 2, 测试slot group ,通过 slotSharingGroup * * todo 3, disableChain,startNewChain的使用 * * todo 4, name()可以取名字哦 */ public class StormToFlink_hbase_demo { private static org.slf4j.Logger logger = LoggerFactory.getLogger(StormToFlink_hbase_demo.class); public static void main(String[] args) throws Exception { // String fileUrl = "D:\\wxgz-local\\resources_yace\\"; String fileUrl = "/zywa/job/storm/resources_new/"; // todo 2,读取kafka数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); //todo 获取kafka的配置属性 args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092", "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; ParameterTool parameterTool = ParameterTool.fromArgs(args); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties sendPros = parameterTool.getProperties(); Properties pros = parameterTool.getProperties(); //todo 指定输入数据为kafka topic DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( pros.getProperty("input-topic"), new SimpleStringSchema(), pros).setStartFromLatest() ).setParallelism(4); //todo 定义一个测流输出 final OutputTag<JSONObject> mutiOutputTag = new OutputTag<JSONObject>("mutiStream") { }; //todo 2,过滤掉不满足格式的数据 // DataStream<JSONObject> jsonDstream = kafkaDstream.map(new MapOperator_01(fileUrl)).disableChaining().setParallelism(4); DataStream<JSONObject> jsonDstream = kafkaDstream.map(new MapOperator_01(fileUrl)).disableChaining().name("MapOperator_01").setParallelism(4); // SingleOutputStreamOperator<JSONObject> splitStream = jsonDstream.process(new ProcessOperator_01(mutiOutputTag)).startNewChain().setParallelism(4); SingleOutputStreamOperator<JSONObject> splitStream = jsonDstream.process(new ProcessOperator_01(mutiOutputTag)).disableChaining().setParallelism(4); //todo 3,需要查询hbase的流 DataStream<JSONObject> mutiStream = splitStream.getSideOutput(mutiOutputTag); mutiStream.print(); //todo 4,先做单条件流 ,去匹配场景表达式 DataStream<JSONObject> filterDstream = splitStream.filter(new FilterOperator_01()).disableChaining().setParallelism(4); // DataStream<JSONObject> filterDstream = splitStream.filter(new FilterOperator_01()).slotSharingGroup("group_03").setParallelism(4); DataStream<JSONObject> mapDstream = filterDstream.map(new MapOperator_02()).name("MapOperator_02").setParallelism(4); //todo 推送下发 mapDstream.filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject json) throws Exception { //推送 if (json.containsKey("Payload")) { return true; } return false; } }).setParallelism(4) .map(new MapFunction<JSONObject, String>() { @Override public String map(JSONObject s) throws Exception { return s.toJSONString(); } }).setParallelism(4) .addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test3", new SimpleStringSchema(), sendPros)).setParallelism(4); //todo 下发到kafka filter SingleOutputStreamOperator processDstream = mapDstream.filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject json) throws Exception { //推送 if (!json.containsKey("Payload")) { return true; } return false; } }).setParallelism(4) .keyBy(value -> value.getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessOperator_02()) .setParallelism(4); //todo 发送到kafka processDstream.addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test2", new SimpleStringSchema(), sendPros)) .setParallelism(4); //todo 匹配复杂情况 DataStream<JSONObject> mutiProcessDstream = mutiStream.keyBy(value -> value.getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessOperator_03()) .setParallelism(4); //todo 批量条件 DataStream<JSONObject> process = mutiProcessDstream.map(new MapOperator_03()) .setParallelism(4) .filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject jsonObject) throws Exception { if (jsonObject.containsKey("tiaojian")) { return true; } return false; } }).setParallelism(4) .keyBy(value -> value.getJSONObject("logJson").getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessOperator_04()) .setParallelism(4); //todo 已经匹配到场景的情况下,先发送到topic process.map(new MapOperator_04()) .setParallelism(4). filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if (StringUtils.isNotBlank(value)) { return true; } return false; } }) .setParallelism(4) .addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test3", new SimpleStringSchema(), sendPros)).setParallelism(4); process.map(new MapOperator_05()) .setParallelism(4).filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if (StringUtils.isNotBlank(value)) { return true; } return false; } }).setParallelism(4) .addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test2", new SimpleStringSchema(), sendPros)).setParallelism(4); env.execute("startExecute"); } }
web端效果: