Flink slotSharingGroup disableChain startNewChain 用法案例

 使用slot分组 或者 disableChaining(),我们可以将算子的依赖链给隔离或者分开,这样可以针对不同的需求进行不同的优化。

 

package application;


import com.alibaba.fastjson.JSONObject;
import operator.*;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.OutputTag;
import org.slf4j.LoggerFactory;

import java.util.Properties;


/**
 * todo 备忘记录:使用分流方式,分为单场景快速流,分为多场景或者需要查询hbase的复合流
 *
 *  todo 1,接下来要做的;了解并行度;添加水印;添加checkpoint;代码正规化;
 *
 *  todo 2, 测试slot group ,通过 slotSharingGroup
 *
 *  todo 3, disableChain,startNewChain的使用
 *
 *  todo 4, name()可以取名字哦
 */
public class StormToFlink_hbase_demo {

    private static org.slf4j.Logger logger = LoggerFactory.getLogger(StormToFlink_hbase_demo.class);

    public static void main(String[] args) throws Exception {
//        String fileUrl = "D:\\wxgz-local\\resources_yace\\";
        String fileUrl = "/zywa/job/storm/resources_new/";


        // todo 2,读取kafka数据
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        //todo 获取kafka的配置属性
        args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092",
                "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};


        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties sendPros = parameterTool.getProperties();
        Properties pros = parameterTool.getProperties();

        //todo 指定输入数据为kafka topic
        DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>(
                pros.getProperty("input-topic"),
                new SimpleStringSchema(),
                pros).setStartFromLatest()

        ).setParallelism(4);

        //todo 定义一个测流输出
        final OutputTag<JSONObject> mutiOutputTag = new OutputTag<JSONObject>("mutiStream") {
        };



        //todo 2,过滤掉不满足格式的数据
//        DataStream<JSONObject> jsonDstream = kafkaDstream.map(new MapOperator_01(fileUrl)).disableChaining().setParallelism(4);
        DataStream<JSONObject> jsonDstream = kafkaDstream.map(new MapOperator_01(fileUrl)).disableChaining().name("MapOperator_01").setParallelism(4);


//        SingleOutputStreamOperator<JSONObject> splitStream = jsonDstream.process(new ProcessOperator_01(mutiOutputTag)).startNewChain().setParallelism(4);
        SingleOutputStreamOperator<JSONObject> splitStream = jsonDstream.process(new ProcessOperator_01(mutiOutputTag)).disableChaining().setParallelism(4);

        //todo 3,需要查询hbase的流
        DataStream<JSONObject> mutiStream = splitStream.getSideOutput(mutiOutputTag);
        mutiStream.print();

        //todo 4,先做单条件流 ,去匹配场景表达式
        DataStream<JSONObject> filterDstream = splitStream.filter(new FilterOperator_01()).disableChaining().setParallelism(4);
//        DataStream<JSONObject> filterDstream = splitStream.filter(new FilterOperator_01()).slotSharingGroup("group_03").setParallelism(4);

        DataStream<JSONObject> mapDstream = filterDstream.map(new MapOperator_02()).name("MapOperator_02").setParallelism(4);


        //todo 推送下发
        mapDstream.filter(new FilterFunction<JSONObject>() {
            @Override
            public boolean filter(JSONObject json) throws Exception {
                //推送
                if (json.containsKey("Payload")) {
                    return true;
                }
                return false;
            }
        }).setParallelism(4)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject s) throws Exception {
                        return s.toJSONString();
                    }
                }).setParallelism(4)
                .addSink(new FlinkKafkaProducer010<String>(
                        "dianyou_wx_test3",
                        new SimpleStringSchema(),
                        sendPros)).setParallelism(4);

        //todo 下发到kafka filter
        SingleOutputStreamOperator processDstream = mapDstream.filter(new FilterFunction<JSONObject>() {
            @Override
            public boolean filter(JSONObject json) throws Exception {
                //推送
                if (!json.containsKey("Payload")) {
                    return true;
                }
                return false;
            }
        }).setParallelism(4)
                .keyBy(value -> value.getString("appKey"))

                .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500)))

                .process(new ProcessOperator_02())
                .setParallelism(4);


        //todo 发送到kafka
        processDstream.addSink(new FlinkKafkaProducer010<String>(
                "dianyou_wx_test2",
                new SimpleStringSchema(),
                sendPros))
                .setParallelism(4);


        //todo 匹配复杂情况

        DataStream<JSONObject> mutiProcessDstream = mutiStream.keyBy(value -> value.getString("appKey"))
                .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500)))
                .process(new ProcessOperator_03())
                .setParallelism(4);


        //todo 批量条件
        DataStream<JSONObject> process = mutiProcessDstream.map(new MapOperator_03())
                .setParallelism(4)
                .filter(new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) throws Exception {
                        if (jsonObject.containsKey("tiaojian")) {
                            return true;
                        }
                        return false;
                    }
                }).setParallelism(4)

                .keyBy(value -> value.getJSONObject("logJson").getString("appKey"))
                .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500)))
                .process(new ProcessOperator_04())
                .setParallelism(4);


        //todo 已经匹配到场景的情况下,先发送到topic

        process.map(new MapOperator_04())
                .setParallelism(4).
                filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        if (StringUtils.isNotBlank(value)) {
                            return true;
                        }
                        return false;
                    }
                })
                .setParallelism(4)
                .addSink(new FlinkKafkaProducer010<String>(
                        "dianyou_wx_test3",
                        new SimpleStringSchema(),
                        sendPros)).setParallelism(4);


        process.map(new MapOperator_05())
                .setParallelism(4).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                if (StringUtils.isNotBlank(value)) {
                    return true;
                }
                return false;
            }
        }).setParallelism(4)

                .addSink(new FlinkKafkaProducer010<String>(
                        "dianyou_wx_test2",
                        new SimpleStringSchema(),
                        sendPros)).setParallelism(4);

        env.execute("startExecute");
    }


}

 web端效果:

Flink slotSharingGroup disableChain startNewChain 用法案例