Flink应用函数onWindowsWindow

问题描述:

我正在做一个Flink项目。该项目的主要思想是读取JSON(网络日志)的数据流,将它们关联起来,并生成一个新的JSON,它是不同JSON信息的组合。Flink应用函数onWindowsWindow

此时,我可以读取JSON,生成一个KeyedStream(基于生成日志的机器),然后生成一个5秒的窗口流。

我想要执行的下一步是将apply函数应用到窗口并合并每个JSON的信息。我对如何去做有点困惑。

我现在有该代码是以下各项之一:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events 
       .flatMap(new JSONParser()) 
       .keyBy(0) 
       .timeWindow(Time.seconds(5)) 
       .apply(new generateMetaAlert()); 




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> { 

     @Override 
     public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2, 
       Collector<Tuple2<String, JSONObject>> arg3) throws Exception { 


     } 

的。适用(新generateMetaAlert())部分与下一个错误抱怨:

的方法,应用(窗函数,R,元组,TimeWindow>)不适用于参数(MetaAlertGenerator.generateMetaAlert)

其他任何代码结构提议都不同于我编写的代码吗?

预先感谢您的帮助

+0

我觉得这是一个DUPLI [此问题的解答](https://stackoverflow.com/questions/47033981/probleme-with-apply-function-windowfunction-in-flink)。如果答案能解决您的问题,请检查并关闭。 –

当您将keyBy功能(不使用匿名类)在您的自定义WindowFunction(第三场)键的类型应该是Tuple因为编译器不能确定你的钥匙的类型。此代码编译没有任何错误(考虑到我想以填补空代码空白):

public class Test { 

    public Test() { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     DataStream<String> events = env.readTextFile("datastream.log"); 

     DataStream<Tuple2<String, JSONObject>> MetaAlert 
       = events 
       .flatMap(new JSONParser()) 
       .keyBy(0) 
       .timeWindow(Time.seconds(5)) 
       .apply(new GenerateMetaAlert()); 

    } 

    public class JSONObject { 
    } 

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> { 
     @Override 
     public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> { 
     @Override 
     public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

} 

但最直接的方法是使用匿名类,所以你可以保持String类型:

DataStream<Tuple2<String, JSONObject>> MetaAlert 
     = events 
     .flatMap(new JSONParser()) 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() { 
      @Override 
      public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 
       // Your code here 
      } 
     }); 

最后,如果你想保留类,但你也想保持你的类型的关键,因为它是可以实现一个KeySelector

public class Test { 

    public Test() { 

     DataStream<Tuple2<String, JSONObject>> MetaAlert 
       = events 
       .flatMap(new JSONParser()) 
       .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() { 
        @Override 
        public String getKey(Tuple2<String, JSONObject> json) throws Exception { 
         return json.f0; 
        } 
       }) 
       .timeWindow(Time.seconds(5)) 
       .apply(new GenerateMetaAlert()); 
    } 

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> { 
     @Override 
     public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

} 
+0

谢谢,那有效! –