druid部署及使用
- 1. 安装使用的是imply版本
下载路径:https://imply.io/get-started
需要填写一些个人信息才可以下载
- 2. 集群规划
-------------------------------------------------------
192.168.112.40 | 192.168.112.41 | 192.168.112.42
-------------------------------------------------------
historical | coordinator | broker
middleManager | overlord | pivot
tranquility-kafka | |
-------------------------------------------------------
- 3. 修改配置文件common.runtime.properties
cd imply-2.6.11/conf/druid/_common
vim common.runtime.properties
#
# 修改druid.extensions.loadList,增加需要的扩展
# Extensions
#
druid.extensions.directory=dist/druid/extensions
druid.extensions.hadoopDependenciesDir=dist/druid/hadoop-dependencies
druid.extensions.loadList=["druid-parser-route","druid-kafka-indexing-service","druid-kafka-eight","druid-hdfs-storage","druid-histogram","druid-datasketches", "druid-lookups-cached-global","mysql-metadata-storage"]
#
# Logging
#
# Log all runtime properties on startup. Disable to avoid logging properties on startup:
druid.startup.logging.logProperties=true
#
# 修改zookeeper地址
# Zookeeper
#
druid.zk.service.host=192.168.112.40,192.168.112.41,192.168.112.42
druid.zk.paths.base=/druid
#
# 元数据保存地址,默认使用的是derby,改为mysql
# Metadata storage
#
# For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over):
#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://master.example.com:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=master.example.com
#druid.metadata.storage.connector.port=1527
# For MySQL:
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://192.168.112.43:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
# For PostgreSQL:
#druid.metadata.storage.type=postgresql
#druid.metadata.storage.connector.connectURI=jdbc:postgresql://db.example.com:5432/druid
#druid.metadata.storage.connector.user=...
#druid.metadata.storage.connector.password=...
#
# 深度存储,默认的是local,改为HDFS
# Deep storage
#
# For local disk (only viable in a cluster if this is a network mount):
#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments
# For HDFS:
druid.storage.type=hdfs
druid.storage.storageDirectory=/user/druid/segments
# For S3:
#druid.storage.type=s3
#druid.storage.bucket=your-bucket
#druid.storage.baseKey=druid/segments
#druid.s3.accessKey=...
#druid.s3.secretKey=...
#
# Indexing service logs
#
# For local disk (only viable in a cluster if this is a network mount):
#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs
# For HDFS:
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/user/druid/indexing-logs
# For S3:
#druid.indexer.logs.type=s3
#druid.indexer.logs.s3Bucket=your-bucket
#druid.indexer.logs.s3Prefix=druid/indexing-logs
#
# Service discovery
#
druid.selectors.indexing.serviceName=druid/overlord
druid.selectors.coordinator.serviceName=druid/coordinator
#
# Monitoring
#
druid.monitoring.monitors=["io.druid.java.util.metrics.JvmMonitor"]
druid.emitter=logging
druid.emitter.logging.logLevel=debug
深度存储使用的是HDFS,将HDFS的配置文件core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml copy到imply-2.6.11/conf/druid/_common目录下面
- 4. 修改coordinator配置
cd imply-2.6.11/conf/druid/coordinator
vim runtime.properties
#部署机器ip地址
druid.service=druid/coordinator
druid.host=192.168.112.41
druid.port=8081
druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S
- 5. 修改overlord配置
cd imply-2.6.11/conf/druid/overlord
vim runtime.properties
#部署机器ip地址
druid.service=druid/overlord
druid.host=192.168.112.41
druid.port=8090
druid.indexer.queue.startDelay=PT30S
druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata
- 6. 修改broker配置文件
cd imply-2.6.11/conf/druid/broker
vim runtime.properties
#部署机器ip地址
druid.service=druid/broker
druid.host=192.168.112.42
druid.port=8082
# HTTP server threads
# 查询节点与历史节点通信连接池大小
druid.broker.http.numConnections=5
# HTTP请求处理线程数
druid.server.http.numThreads=20
# Processing threads and buffers
# 每个处理线程中间结果缓存大小
druid.processing.buffer.sizeBytes=268435456
druid.processing.numMergeBuffers=2
# 处理线程个数
druid.processing.numThreads=7
druid.processing.tmpDir=var/druid/processing
# Query cache disabled -- push down caching and merging instead
druid.broker.cache.useCache=false
druid.broker.cache.populateCache=false
# SQL
druid.sql.enable=true
- 7. 修改middleManager配置文件
cd imply-2.6.11/conf/druid/middleManager
vim runtime.properties
# 部署机器ip地址
druid.service=druid/middlemanager
druid.host=192.168.112.40
druid.port=8091
# 修改capacity的个数,默认是3个,最大可接受任务的数量
# Number of tasks per middleManager
druid.worker.capacity=3
# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task
druid.indexer.task.restoreTasksOnRestart=true
# http请求处理线程数
# HTTP server threads
druid.server.http.numThreads=40
# Processing threads and buffers
#每个处理线程中间结果缓存大小
druid.processing.buffer.sizeBytes=100000000
druid.processing.numMergeBuffers=2
#处理线程个数
druid.processing.numThreads=2
druid.processing.tmpDir=var/druid/processing
# MR任务零时存储目录,hadoop版本根据具体需求配置
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=/tmp/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"]
- 8. 修改historical配置文件
cd imply-2.6.11/conf/druid/historical
vim runtime.properties
#部署机器ip地址
druid.service=druid/historical
druid.host=192.168.112.40
druid.port=8083
# HTTP请求处理线程数
# HTTP server threads
druid.server.http.numThreads=40
# Processing threads and buffers
# 每个处理线程的中间结果缓存大小
druid.processing.buffer.sizeBytes=536870912
druid.processing.numMergeBuffers=2
#处理的线程数
druid.processing.numThreads=7
druid.processing.tmpDir=var/druid/processing
# Segment storage
# Segment本地加载路径与最大存储空间的大小
druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize"\:130000000000}]
#最大存储空间的大小,该值的只用作coordinator调配Segment加载数据的依据
druid.server.maxSize=130000000000
# Query cache
druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=2000000000
- 9. 修改pivot配置文件
cd imply-2.6.11/conf/pivot
vim config.yaml
# The port on which the imply-ui server will listen on.
port: 9095
# runtime directory
varDir: var/pivot
# User management mode
# By default Imply will not show a login screen and anyone accessing it will automatically be treated as an 'admin'
# Uncomment the line below to enable user authentication, for more info see: https://docs.imply.io/on-premise/configure/config-api
#userMode: native-users
# 修改broker、overlord、coordinator地址
# The initial settings that will be loaded in, in this case a connection will be created for a Druid cluster that is running locally.
initialSettings:
connections:
- name: druid
type: druid
title: My Druid
host: 192.168.112.42:8082
coordinatorHosts: ["192.168.112.41:8081"]
overlordHosts: ["192.168.112.41:8090"]
# stateStore存储默认使用的是sqlite,改为mysql
# imply-ui must have a state store in order to function
# The state (data cubes, dashboards, etc) can be stored in two ways.
# Choose just one option and comment out the other.
#
# 1) Stored in a sqlite file, editable at runtime with Settings View. Not suitable for running in a cluster.
# 2) Stored in a database, editable at runtime with Settings View. Works well with a cluster of Imply servers.
#
#
# 1) File-backed (sqlite) state (not suitable for running in a cluster)
#
#stateStore:
# type: sqlite
# connection: var/pivot/pivot-settings.sqlite
#
# 2) Database-backed state 'mysql' (MySQL) or 'pg' (Postgres)
#
stateStore:
type: mysql
connection: 'mysql://druid:[email protected]:3306/pivot'
- 10. 修改配置文件data.conf
cd imply-2.6.11/conf/supervise
vim data.conf
:verify bin/verify-java
:verify bin/verify-version-check
historical bin/run-druid historical conf
middleManager bin/run-druid middleManager conf
# Uncomment to use Tranquility Server
#!p95 tranquility-server bin/tranquility server -configFile conf/tranquility/server.json
# tranquility-kafka的注释,使用tranquility-kafka方式摄入流式数据
# Uncomment to use Tranquility Kafka
!p95 tranquility-kafka bin/tranquility kafka -configFile conf/tranquility/kafka.json
# Uncomment to use Tranquility Clarity metrics server
#!p95 tranquility-metrics-server java -Xms4g -Xmx4g -cp "dist/tranquility/lib/*" com.metamx.tranquility.distribution.DistributionMain server -configFile conf/tranquility/server-for-metrics.yaml
- 11.服务安装与启动
nohup bin/supervise -c conf/supervise/quickstart.conf > quickstart.log &
bin/service --down
bin/service --restart ${service_name}
coordinator http://192.168.112.43:8081
broker http://192.168.112.43:8082/druid/v2
historical http://192.168.112.43:8083/druid/v2
overlord http://192.168.112.43:8090
pivot http://192.168.112.43:9095
tranquility-server http://192.168.112.43:8200/v1/post/datasource
192.168.112.40
historical
middleManager
tranquility-kafka
nohup bin/supervise -c conf/supervise/data.conf > data.log &
192.168.112.41
coordinator
overlord
nohup bin/supervise -c conf/supervise/master-no-zk.conf > master-no-zk.log &
192.168.112.42
broker
pivot
nohup bin/supervise -c conf/supervise/query.conf > query.log &
- 12. 使用tranquility-kafka方式摄入流式数据
cd imply-2.6.11/conf/tranquility
vim kafka.json
{
"dataSources": [
{
"spec": {
"dataSchema": {
"dataSource": "dh_ods_ods_dh_pay_server",
"parser": {
"type": "string",
"parseSpec": {
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["product_id","zid","account_id","pay_channel","record_time"],
"dimensionExclusions": ["timestamp","log","item_id","ip","log_level","sdk_name","pay_time","log_type","source_name","order_id"]
},
"format": "json"
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "MINUTE"
},
"metricsSpec": [
{
"name": "pay_amount_sum",
"type": "doubleSum",
"fieldName": "pay_amount"
}
]
},
"ioConfig": {
"type": "realtime"
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": "50000",
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M"
}
},
"properties": {
"task.partitions": "1",
"task.replicants": "1",
"topicPattern": "dh_ods_ods_dh_pay_server"
}
},
{
"spec": {
"dataSchema": {
"dataSource": "dh_ods_ods_dh_pay_server_3",
"parser": {
"type": "string",
"parseSpec": {
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["product_id","zid","account_id","pay_channel","record_time"],
"dimensionExclusions": ["timestamp","log","item_id","ip","log_level","sdk_name","pay_time","log_type","source_name","order_id"]
},
"format": "json"
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "MINUTE"
},
"metricsSpec": [
{
"name": "pay_amount_sum",
"type": "doubleSum",
"fieldName": "pay_amount"
}
]
},
"ioConfig": {
"type": "realtime"
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": "50000",
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M"
}
},
"properties": {
"task.partitions": "1",
"task.replicants": "1",
"topicPattern": "dh_ods_ods_dh_pay_server_3"
}
}
],
"properties": {
"zookeeper.connect": "192.168.112.40,192.168.112.41,192.168.112.42",
"druid.discovery.curator.path": "/druid/discovery",
"druid.selectors.indexing.serviceName": "druid/overlord",
"commit.periodMillis": "15000",
"consumer.numThreads": "4",
"kafka.zookeeper.connect": "192.168.112.40,192.168.112.41,192.168.112.42",
"kafka.group.id": "tranquility-kafka-dh-pay-server",
"serialization.format": "smile",
"druidBeam.taskLocator": "overlord"
}
}
- 13. 使用Kafka indexing service的方式摄入流式数据
curl -X POST -H 'Content-Type: application/json' -d @kafka.json http://192.168.112.41:8090/druid/indexer/v1/supervisor
- 14. 通过overlord界面查看task情况
http://192.168.112.41:8090
- 15. 通过pivot界面查看数据
http://192.168.112.42:9095
- 16. 使用HTTP的方式查询数据
http://192.168.112.42:8082/druid/v2/sql
请求方式:post
请求头:Content-Type:application/json
请求参数
{
"query":"SELECT * FROM dh_ods_ods_dh_pay_server WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY order by __time desc limit 10"
}