Flink代码示例(1):消费Kafka Demo
Flink: Blink分支 1.5.1
https://github.com/apache/flink/tree/blink
Maven Dependency:
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-json</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.1</version>
</dependency>
Kafka数据类型为Json,格式如下:
{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "int32",
"optional": true,
"field": "age"
}],
"optional": true,
"name": "postgres.public.test.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "int32",
"optional": true,
"field": "age"
}],
"optional": true,
"name": "postgres.public.test.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "int64",
"optional": true,
"field": "ts_usec"
}, {
"type": "int64",
"optional": true,
"field": "txId"
}, {
"type": "int64",
"optional": true,
"field": "lsn"
}, {
"type": "string",
"optional": true,
"field": "schema"
}, {
"type": "string",
"optional": true,
"field": "table"
}, {
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
}, {
"type": "boolean",
"optional": true,
"field": "last_snapshot_record"
}],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}],
"optional": false,
"name": "postgres.public.test.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 452609,
"age": 3
},
"source": {
"version": "0.8.3.Final",
"name": "postgres",
"db": "postgres",
"ts_usec": 1558601374145332000,
"txId": 21371566,
"lsn": 418263044043,
"schema": "public",
"table": "test",
"snapshot": false,
"last_snapshot_record": null
},
"op": "c",
"ts_ms": 1558601374138
}
}
示例代码:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
object KafkaConsumerDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv = TableEnvironment.getTableEnvironment(env)
tblEnv.connect(new Kafka()
.version("0.10")
.topic("postgres.public.test")
.property("zookeeper.connect", "BigData-Dev-1:2181")
.property("bootstrap.servers","BigData-Dev-1:9092")
.startFromLatest()
).withFormat(new Json().deriveSchema())
.withSchema(new Schema().field("schema",Types.ROW(
Array("type","fields","optional","name"),
Array[TypeInformation[_]](Types.STRING,ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,
Types.ROW(Array("type","fields","optional","name","field"),Array[TypeInformation[_]](
Types.STRING,
ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,Types.ROW(Array("type","optional","field"),Array[TypeInformation[_]](Types.STRING,Types.STRING,Types.STRING)))
,Types.STRING,Types.STRING,Types.STRING)
)),Types.STRING,Types.STRING)
)).field("payload", Types.ROW(
Array[String]("before", "after", "source", "op", "ts_ms"),
Array[TypeInformation[_]](
Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),
Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),
Types.ROW(Array[String]("version", "name", "db", "ts_usec", "txId", "lsn", "schema", "table", "snapshot", "last_snapshot_record"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)),
Types.STRING, Types.STRING))))
.inAppendMode()
.registerTableSource("test")
val tableResult = tblEnv.sqlQuery("select after.id,after.age from test where after.id is not null")
tblEnv.toAppendStream[Row](tableResult).print()
env.execute()
}
}
测试: