apache druid 实时加载kafka 中的数据(一)

简介

apache druid 是分布式列存储的 OLAP 框架。还是一个时间序列数据库。本篇文章主要是druid 在kafka 加载数据的配置。由于druid 升级情况太快,本人的环境还是在0.13,主要改动方面还是UI,新的版本在UI方面更适合新手入门。

文章如有帮助,请关注微信公共号。 apache druid 实时加载kafka 中的数据(一)

apache druid 实时加载kafka 中的数据(一)

最终使用druid时,是0.9版本,当时在kafka加载数据推荐的方式是两种

  • Tranquility

  • kafka index service

Tranquility

是用于将流实时推送到Druid的工具包。是一个独立,需要单独下载。

apache druid 实时加载kafka 中的数据(一)

** 其特点**

无缝地处理分区,复制,服务发现和架构过渡,而无需停机。集成了http server,Samza,Spark ,Storm,Flink 等工具。

可以自由的控制向druid,主动发送数据。

** 劣势**

本身具有时间窗,超过时间窗的数据直接丢弃。

版本落后,由于没有官方组织维护,目前版本只是兼容值0.9.2,后面druid升级后,Tranquility未及时升级,有些新的api 无法适配。

kafka index service

这是druid 自身携带的扩展插件,使用时,需要在common.runtime.properties 文件中的属性 druid.extensions.loadList 添加druid-kafka-indexing-service。

**  其特点**

支持实时查询按时间分segment,非实追加到对应时间的segment 。

通过算法把Peon分配到 不同的【 Middle Managers】上实现分布式

加大对应kafka的topic的partition数量 加大taskCount的值,产生更多的Peon

创建 supervisor

apache druid 实时加载kafka 中的数据(一)

apache druid 实时加载kafka 中的数据(一)

上面是一个完整的supervisor的内容,主要包含type,dataSchema,tuningConfig,ioConfig 四个部分

  • type

标记类型,supervisor  的类型 就是kafka.

  • dataSchema

数据库的配置,主要包含dataSource,parser,metricsSpec,granularitySpec

dataSource

druid的数据库名称。

parser

配置与解析数据。简单理解就是kafka中的数据与druid存储之间的关系映射。主要包含以下配置

timestampSpec

配置处于的位置 dataSchema->parser->timestampSpec

druid 本身是时间序列数据库,故此时间就是数据的主键。由于druid 在 0.9后,已经不支持设置时区了,时间都是采用的utc格式。druid查询时,可以设置时区。包括一些roll-up操作都是按照utc时间进行。如有必须需改动源码。

dimensionsSpec

位置:dataSchema->parser->dimensionsSpec

维度。数据库需要存储的字段,需要与kafka中的对应。

dimensions

是一个数组类型,默认字段的类型都是string

可以设置字段的类型,例如{ "type": "long", "name": "userId" }

metricsSpec

位置:dataSchema->metricsSpec

度量。此值roll-up 启用是才有意义。

`{      "name": "theta_customer_id",

"type": "thetaSketch",

"fieldName": "customer_id"
           } `

name: druid中字段的名称。

type:指标类型。thetaSketch 去重。还支持doubleSum,longSum,doubleMin,doubleMax 等聚合类型。

fieldName:kafka中 属性的名称

granularitySpec

位置:      dataSchema->granularitySpec

segmentGranularity: Segment粒度(SegmentGranularity)表示每一个实时索引任务中产生的Segment所涵盖的时间范围。

queryGranularity:查询粒度。例如 {"queryGranularity":"DAY"} 查询的最小粒度就是DAY,经过roll-up后,维度完全一样的数据,一天范围内将聚合为一条数据。

  • tuningConfig

调优相关的配置。

配置一个segment大小。

调整压缩算法。

  • ioConfig

    消费者的配置。对于kafak index service 就是kafka 消费者一个配置。

    下面的实例,配置了kafka的topic,启动的任务数量,任务执行的时间,kafka的地址。

    completionTimeout:这个值将发布任务声明为失败并终止之前等待的时间。如果设置得太低,您的任务可能永远不会发布。任务的发布时间大约在taskDuration过去之后开始。默认是30M,为防止任务未发布,调整为与任务时间一致(PT3600S)

"ioConfig": {        

"topic": "com.test",       

 "replicas": 1,         "taskCount": 1,     

   "taskDuration": "PT3600S",     

   "consumerProperties": {        

   "bootstrap.servers": "10.0.0.1:9096,10.0.0.1:9096"    

   },     

   "completionTimeout": "PT3600S"  

 }

提交supervisor

提交至overlord节点。

apache druid 实时加载kafka 中的数据(一)

新版中出现界面配置

第一种,根据界面的配置向导来加载kafka数据

访问:8888  端口

apache druid 实时加载kafka 中的数据(一)

apache druid 实时加载kafka 中的数据(一)

apache druid 实时加载kafka 中的数据(一)

一直按照向导配置,就可以自动生成supervisor的配置 很方便。

第二种,通过页面 提供的Submit supervisor提交 相应的json文件

apache druid 实时加载kafka 中的数据(一)

总结

简单介绍了下supervisor  重点配置的具体含义,由于篇幅问题,详细的配置还需要去官网文档中查看。本文的目的就是通过个人使用 kafka index service时一些新得,帮助新手能快速跑通第一个druid实例。

文章如有帮助,请关注微信公共号。 apache druid 实时加载kafka 中的数据(一)