Flink 读取Kafka数据示例

1、目标

本例模拟中将集成Kafka与Flink:Flink实时从Kafka中获取消息,每隔10秒去统计机器当前可用的内存数并将结果写入到本地文件中或者打印出来。

2、环境

Apache Kafka 0.11.0.0

Apache Flink 1.3.2

Maven 3.5.3

本例运行在Windows环境本地,使用idea开发代码,代码进行的是本地测试,没有跑在flink集群上,参考博客中是跑在flink集群上,而且flink集群是以local模式启动的。

Maven工程完整pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flinktest</groupId>
    <artifactId>flinktest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <relocations>
                                <relocation>
                                    <pattern>org.codehaus.plexus.util</pattern>
                                    <shadedPattern>org.shaded.plexus.util</shadedPattern>
                                    <excludes>
                                        <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                        <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                    </excludes>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!--<dependency>-->
            <!--<groupId>org.apache.flink</groupId>-->
            <!--<artifactId>flink-table_2.10</artifactId>-->
            <!--<version>1.3.2</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>
</project>

3、创建工程

Flink 读取Kafka数据示例

4、 代码开发

代码主要由两部分组成:

  • MessageSplitter类、MessageWaterEmitter类和KafkaMessageStreaming类:Flink
    streaming实时处理Kafka消息类
  • KafkaProducerTest类和MemoryUsageExtrator类:构建Kafka测试消息

本例中,Kafka消息格式固定为:时间戳,主机名,当前可用内存数。其中主机名固定设置为machine-1,而时间戳和当前可用内存数都是动态获取。由于本例只会启动一个Kafka producer来模拟单台机器发来的消息,因此在最终的统计结果中只会统计machine-1这一台机器的内存。下面我们先来看完整代码实现。

KafkaMessageStreaming:

   package kafkatoflink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.util.Collector;
    
    import java.util.Properties;
    
   /**
* @Description    KafkaMessageStreaming Flink入口类,封装了对于Kafka消息的处理逻辑。本例每1秒统计一次结果并写入到本地件或者打印出来
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 16:51
*/
    public class KafkaMessageStreaming {
    
      public static void main(String[] args) throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // 非常关键,一定要设置启动检查点!!
        env.enableCheckpointing(5000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "10.47.85.158:9092");
        props.setProperty("group.id", "flink-group");
    
    //    args[0] = "test-0921";  //传入的是kafka中的topic
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<>("test-0921", new SimpleStringSchema(), props);
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
    
        DataStream<Tuple2<String, Long>> keyedStream = env
                .addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(0)
                .timeWindow(Time.seconds(2))
                .apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
                  public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
                    long sum = 0L;
                    int count = 0;
                    for (Tuple2<String, Long> record: input) {
                      sum += record.f1;
                      count++;
                    }
                    Tuple2<String, Long> result = input.iterator().next();
                    result.f1 = sum / count;
                    out.collect(result);
                  }
                });
    
    //将结果打印出来
        keyedStream.print();
    //    将结果保存到文件中
    //    args[1] = "E:\\FlinkTest\\KafkaFlinkTest";//传入的是结果保存的路径
        keyedStream.writeAsText("E:\\FlinkTest\\KafkaFlinkTest");
        env.execute("Kafka-Flink Test");
      } 
    }

KafkaProducerTest:

package kafkatoflink;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
* @Description    KafkaProducerTest 发送Kafka消息
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 11:29
*/
public class KafkaProducerTest {

  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", "10.47.85.158:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    int totalMessageCount = 100;
    for (int i = 0; i < totalMessageCount; i++) {
      String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
      producer.send(new ProducerRecord<>("test-0921", value), new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            System.out.println("Failed to send message with exception " + exception);
          }
        }
      });
      Thread.sleep(100L);
    }
    producer.close();
  }

  private static long currentMemSize() {
    return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
  }
}

MemoryUsageExtrator:

        package kafkatoflink;  
        
        import com.sun.management.OperatingSystemMXBean;
        
        import java.lang.management.ManagementFactory;
        
        /**
        * @Description    MemoryUsageExtrator 很简单的工具类,提取当前可用内存字节数
        * @Author         0262000099 Hengtai Nie
        * @CreateDate     2018/9/21 11:28
        */
        public class MemoryUsageExtrator {
        
          private static OperatingSystemMXBean mxBean =
                  (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
        
          /**
           * Get current free memory size in bytes
           * @return  free RAM size
           */
          public static long currentFreeMemorySizeInBytes() {
            return mxBean.getFreePhysicalMemorySize();
          }
        }

MessageSplitter:

package kafkatoflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* @Description    MessageSplitter 将获取到的每条Kafka消息根据“,”分割取出其中的主机名和内存数信息
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 11:27
*/
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
    if (value != null && value.contains(",")) {
      String[] parts = value.split(",");
      out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2])));
    }
  }
}

MessageWaterEmitter:

package kafkatoflink;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
* @Description    MessageWaterEmitter 根据Kafka消息确定Flink的水位
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 11:26
*/
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {

  public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
    if (lastElement != null && lastElement.contains(",")) {
      String[] parts = lastElement.split(",");
      return new Watermark(Long.parseLong(parts[0]));
    }
    return null;
  }

  public long extractTimestamp(String element, long previousElementTimestamp) {
    if (element != null && element.contains(",")) {
      String[] parts = element.split(",");
      return Long.parseLong(parts[0]);
    }
    return 0L;
  }
}

5、参考博客

https://www.cnblogs.com/huxi2b/p/7219792.html