Flink on zeppelin 初试2
先跟鸡哥打个广告 ,博客地址: https://me.****.net/weixin_47482194
写的博客很有水平的,上了几次官网推荐了。
1,首先配置好与hive的集成
2,该导入到flink的包都导入到flink lib下面,我是CDH版本,基本包如下:
对于这个地方不懂的同学可以评论留言,这是踩过坑的。。
3,实践操作:
我们正常些代码去查询hive,代码如下:
public class SourceData2hive_ods { private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (" + " category_id STRING," + " user_id STRING ," + " item_id STRING ," + " behavior STRING ," + " ts STRING ," + " t_s AS TO_TIMESTAMP(FROM_UNIXTIME(cast(ts AS BIGINT), 'yyyy-MM-dd HH:mm:ss'))," + // " t_s AS TO_TIMESTAMP(FROM_UNIXTIME(cast(ts AS BIGINT), 'yyyy-MM-dd HH:mm:ss'))" + " WATERMARK FOR t_s AS t_s - INTERVAL '5' SECOND " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'ods_kafka'," + " 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092'," + " 'properties.group.id' = 'test1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'earliest-offset'" + ")"; private static final String hiveSql = "CREATE TABLE ods_table (" + " category_id STRING," + " user_id STRING ," + " item_id STRING ," + " behavior STRING ," + " ts STRING " + ") partitioned by (dt string,hr string) " + "stored as PARQUET " + "TBLPROPERTIES (" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," + " 'sink.partition-commit.delay'='5 s'," + " 'sink.partition-commit.trigger'='partition-time'," + // " 'sink.partition-commit.trigger'='process-time'," + " 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.enableCheckpointing(5000); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); String name = "myhive"; String defaultDatabase = "flink"; // String hiveConfDir = "/wyyt/software/flink-1.11.0"; // String hiveConfDir = "/wyyt/software/flink-1.11.0/conf/"; String hiveConfDir = "G:/Flink SQL开发文件"; String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tEnv.executeSql("drop table if exists kafkaTable"); tEnv.executeSql(KAFKA_SQL); // tEnv.executeSql("select * from kafkaTable").print(); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.executeSql("drop table if exists ods_table"); // // 如果hive中已经存在了相应的表,则这段代码省略 tEnv.executeSql(hiveSql); // String insertSql = "insert into ods_table SELECT category_id, user_id,item_id,behavior,ts, " + " DATE_FORMAT(t_s, 'yyyy-MM-dd'), DATE_FORMAT(t_s, 'HH') FROM kafkaTable"; tEnv.executeSql(insertSql); } }
我们在zeppelin写查询,就很简单了:
通过 %flink 指定这是代码模式。。。简单吧。。。。诶。
创建表:
对于API跟不懂的关键字说明:
github上的资料:https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md#paragraph-local-properties
未完待续。。。。。。