druid部署及使用

  • 1. 安装使用的是imply版本

下载路径:https://imply.io/get-started
需要填写一些个人信息才可以下载

  • 2. 集群规划

-------------------------------------------------------
192.168.112.40      | 192.168.112.41 | 192.168.112.42
-------------------------------------------------------
historical             | coordinator    | broker
middleManager        | overlord       | pivot
tranquility-kafka   |                |
-------------------------------------------------------

  • 3. 修改配置文件common.runtime.properties

cd imply-2.6.11/conf/druid/_common
vim common.runtime.properties
#
# 修改druid.extensions.loadList,增加需要的扩展
# Extensions
#

druid.extensions.directory=dist/druid/extensions
druid.extensions.hadoopDependenciesDir=dist/druid/hadoop-dependencies
druid.extensions.loadList=["druid-parser-route","druid-kafka-indexing-service","druid-kafka-eight","druid-hdfs-storage","druid-histogram","druid-datasketches", "druid-lookups-cached-global","mysql-metadata-storage"]

#
# Logging
#

# Log all runtime properties on startup. Disable to avoid logging properties on startup:
druid.startup.logging.logProperties=true

#
# 修改zookeeper地址
# Zookeeper
#

druid.zk.service.host=192.168.112.40,192.168.112.41,192.168.112.42
druid.zk.paths.base=/druid

#
# 元数据保存地址,默认使用的是derby,改为mysql
# Metadata storage
#

# For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over):
#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://master.example.com:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=master.example.com
#druid.metadata.storage.connector.port=1527

# For MySQL:
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://192.168.112.43:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd

# For PostgreSQL:
#druid.metadata.storage.type=postgresql
#druid.metadata.storage.connector.connectURI=jdbc:postgresql://db.example.com:5432/druid
#druid.metadata.storage.connector.user=...
#druid.metadata.storage.connector.password=...

#
# 深度存储,默认的是local,改为HDFS
# Deep storage
#

# For local disk (only viable in a cluster if this is a network mount):
#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments

# For HDFS:
druid.storage.type=hdfs
druid.storage.storageDirectory=/user/druid/segments

# For S3:
#druid.storage.type=s3
#druid.storage.bucket=your-bucket
#druid.storage.baseKey=druid/segments
#druid.s3.accessKey=...
#druid.s3.secretKey=...

#
# Indexing service logs
#

# For local disk (only viable in a cluster if this is a network mount):
#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs

# For HDFS:
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/user/druid/indexing-logs

# For S3:
#druid.indexer.logs.type=s3
#druid.indexer.logs.s3Bucket=your-bucket
#druid.indexer.logs.s3Prefix=druid/indexing-logs

#
# Service discovery
#

druid.selectors.indexing.serviceName=druid/overlord
druid.selectors.coordinator.serviceName=druid/coordinator

#
# Monitoring
#

druid.monitoring.monitors=["io.druid.java.util.metrics.JvmMonitor"]
druid.emitter=logging
druid.emitter.logging.logLevel=debug

深度存储使用的是HDFS,将HDFS的配置文件core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml copy到imply-2.6.11/conf/druid/_common目录下面

  • 4. 修改coordinator配置

cd imply-2.6.11/conf/druid/coordinator
vim runtime.properties

#部署机器ip地址
druid.service=druid/coordinator
druid.host=192.168.112.41
druid.port=8081

druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S

  • 5. 修改overlord配置

cd imply-2.6.11/conf/druid/overlord
vim runtime.properties

#部署机器ip地址
druid.service=druid/overlord
druid.host=192.168.112.41
druid.port=8090

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

 

  • 6. 修改broker配置文件

cd imply-2.6.11/conf/druid/broker
vim runtime.properties

#部署机器ip地址
druid.service=druid/broker
druid.host=192.168.112.42
druid.port=8082

# HTTP server threads
# 查询节点与历史节点通信连接池大小
druid.broker.http.numConnections=5
# HTTP请求处理线程数
druid.server.http.numThreads=20

# Processing threads and buffers
# 每个处理线程中间结果缓存大小
druid.processing.buffer.sizeBytes=268435456
druid.processing.numMergeBuffers=2
# 处理线程个数
druid.processing.numThreads=7
druid.processing.tmpDir=var/druid/processing

# Query cache disabled -- push down caching and merging instead
druid.broker.cache.useCache=false
druid.broker.cache.populateCache=false

# SQL
druid.sql.enable=true

 

  • 7. 修改middleManager配置文件

cd imply-2.6.11/conf/druid/middleManager
vim runtime.properties

# 部署机器ip地址
druid.service=druid/middlemanager
druid.host=192.168.112.40
druid.port=8091

# 修改capacity的个数,默认是3个,最大可接受任务的数量
# Number of tasks per middleManager
druid.worker.capacity=3

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task
druid.indexer.task.restoreTasksOnRestart=true

# http请求处理线程数
# HTTP server threads
druid.server.http.numThreads=40

# Processing threads and buffers
#每个处理线程中间结果缓存大小
druid.processing.buffer.sizeBytes=100000000
druid.processing.numMergeBuffers=2
#处理线程个数
druid.processing.numThreads=2
druid.processing.tmpDir=var/druid/processing

# MR任务零时存储目录,hadoop版本根据具体需求配置
# Hadoop indexing 
druid.indexer.task.hadoopWorkingPath=/tmp/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"]

  • 8. 修改historical配置文件

cd imply-2.6.11/conf/druid/historical
vim runtime.properties

#部署机器ip地址
druid.service=druid/historical
druid.host=192.168.112.40
druid.port=8083

# HTTP请求处理线程数
# HTTP server threads
druid.server.http.numThreads=40

# Processing threads and buffers
# 每个处理线程的中间结果缓存大小
druid.processing.buffer.sizeBytes=536870912
druid.processing.numMergeBuffers=2
#处理的线程数
druid.processing.numThreads=7
druid.processing.tmpDir=var/druid/processing

# Segment storage
# Segment本地加载路径与最大存储空间的大小
druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize"\:130000000000}]
#最大存储空间的大小,该值的只用作coordinator调配Segment加载数据的依据
druid.server.maxSize=130000000000

# Query cache
druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=2000000000

 

  • 9. 修改pivot配置文件

cd imply-2.6.11/conf/pivot
vim config.yaml
# The port on which the imply-ui server will listen on.
port: 9095

# runtime directory
varDir: var/pivot


# User management mode
# By default Imply will not show a login screen and anyone accessing it will automatically be treated as an 'admin'
# Uncomment the line below to enable user authentication, for more info see: https://docs.imply.io/on-premise/configure/config-api
#userMode: native-users


# 修改broker、overlord、coordinator地址
# The initial settings that will be loaded in, in this case a connection will be created for a Druid cluster that is running locally.
initialSettings:
  connections:
    - name: druid
      type: druid
      title: My Druid
      host: 192.168.112.42:8082
      coordinatorHosts: ["192.168.112.41:8081"]
      overlordHosts: ["192.168.112.41:8090"]

# stateStore存储默认使用的是sqlite,改为mysql
# imply-ui must have a state store in order to function
# The state (data cubes, dashboards, etc) can be stored in two ways.
# Choose just one option and comment out the other.
#
#  1) Stored in a sqlite file, editable at runtime with Settings View. Not suitable for running in a cluster.
#  2) Stored in a database, editable at runtime with Settings View. Works well with a cluster of Imply servers.
#

#
# 1) File-backed (sqlite) state (not suitable for running in a cluster)
#

#stateStore:
#  type: sqlite
#  connection: var/pivot/pivot-settings.sqlite

#
# 2) Database-backed state 'mysql' (MySQL) or 'pg' (Postgres)
#

stateStore:
   type: mysql
   connection: 'mysql://druid:[email protected]:3306/pivot'

  • 10. 修改配置文件data.conf

cd imply-2.6.11/conf/supervise
vim data.conf

:verify bin/verify-java
:verify bin/verify-version-check

historical bin/run-druid historical conf
middleManager bin/run-druid middleManager conf

# Uncomment to use Tranquility Server
#!p95 tranquility-server bin/tranquility server -configFile conf/tranquility/server.json

# tranquility-kafka的注释,使用tranquility-kafka方式摄入流式数据
# Uncomment to use Tranquility Kafka
!p95 tranquility-kafka bin/tranquility kafka -configFile conf/tranquility/kafka.json

# Uncomment to use Tranquility Clarity metrics server
#!p95 tranquility-metrics-server java -Xms4g -Xmx4g -cp "dist/tranquility/lib/*" com.metamx.tranquility.distribution.DistributionMain server -configFile conf/tranquility/server-for-metrics.yaml

  • 11.服务安装与启动

nohup bin/supervise -c conf/supervise/quickstart.conf > quickstart.log &
bin/service --down
bin/service --restart ${service_name}
    
coordinator http://192.168.112.43:8081
broker http://192.168.112.43:8082/druid/v2
historical http://192.168.112.43:8083/druid/v2
overlord http://192.168.112.43:8090
pivot http://192.168.112.43:9095
tranquility-server http://192.168.112.43:8200/v1/post/datasource

192.168.112.40
historical 
middleManager
tranquility-kafka

nohup bin/supervise -c conf/supervise/data.conf > data.log &

192.168.112.41
coordinator
overlord

nohup bin/supervise -c conf/supervise/master-no-zk.conf > master-no-zk.log &

192.168.112.42
broker
pivot
nohup bin/supervise -c conf/supervise/query.conf > query.log &

  • 12. 使用tranquility-kafka方式摄入流式数据

cd imply-2.6.11/conf/tranquility
vim kafka.json

{
    "dataSources": [
        {
            "spec": {
                "dataSchema": {
                    "dataSource": "dh_ods_ods_dh_pay_server",
                    "parser": {
                        "type": "string",
                        "parseSpec": {
                            "timestampSpec": {
                                "column": "timestamp",
                                "format": "auto"
                            },
                            "dimensionsSpec": {
                                "dimensions": ["product_id","zid","account_id","pay_channel","record_time"],
                                "dimensionExclusions": ["timestamp","log","item_id","ip","log_level","sdk_name","pay_time","log_type","source_name","order_id"]
                            },
                            "format": "json"
                        }
                    },
                    "granularitySpec": {
                        "type": "uniform",
                        "segmentGranularity": "hour",
                        "queryGranularity": "MINUTE"
                    },
                    "metricsSpec": [
                        {
                            "name": "pay_amount_sum",
                            "type": "doubleSum",
                            "fieldName": "pay_amount"
                        }
                    ]
                },
                "ioConfig": {
                    "type": "realtime"
                },
                "tuningConfig": {
                    "type": "realtime",
                    "maxRowsInMemory": "50000",
                    "intermediatePersistPeriod": "PT10M",
                    "windowPeriod": "PT10M"
                }
            },
            "properties": {
                "task.partitions": "1",
                "task.replicants": "1",
                "topicPattern": "dh_ods_ods_dh_pay_server"
            }
        },
        {
            "spec": {
                "dataSchema": {
                    "dataSource": "dh_ods_ods_dh_pay_server_3",
                    "parser": {
                        "type": "string",
                        "parseSpec": {
                            "timestampSpec": {
                                "column": "timestamp",
                                "format": "auto"
                            },
                            "dimensionsSpec": {
                                "dimensions": ["product_id","zid","account_id","pay_channel","record_time"],
                                "dimensionExclusions": ["timestamp","log","item_id","ip","log_level","sdk_name","pay_time","log_type","source_name","order_id"]
                            },
                            "format": "json"
                        }
                    },
                    "granularitySpec": {
                        "type": "uniform",
                        "segmentGranularity": "hour",
                        "queryGranularity": "MINUTE"
                    },
                    "metricsSpec": [
                        {
                            "name": "pay_amount_sum",
                            "type": "doubleSum",
                            "fieldName": "pay_amount"
                        }
                    ]
                },
                "ioConfig": {
                    "type": "realtime"
                },
                "tuningConfig": {
                    "type": "realtime",
                    "maxRowsInMemory": "50000",
                    "intermediatePersistPeriod": "PT10M",
                    "windowPeriod": "PT10M"
                }
            },
            "properties": {
                "task.partitions": "1",
                "task.replicants": "1",
                "topicPattern": "dh_ods_ods_dh_pay_server_3"
            }
        }
    ],
    "properties": {
        "zookeeper.connect": "192.168.112.40,192.168.112.41,192.168.112.42",
        "druid.discovery.curator.path": "/druid/discovery",
        "druid.selectors.indexing.serviceName": "druid/overlord",
        "commit.periodMillis": "15000",
        "consumer.numThreads": "4",
        "kafka.zookeeper.connect": "192.168.112.40,192.168.112.41,192.168.112.42",
        "kafka.group.id": "tranquility-kafka-dh-pay-server",
        "serialization.format": "smile",
        "druidBeam.taskLocator": "overlord"
    }
}

 

  • 13. 使用Kafka indexing service的方式摄入流式数据

curl -X POST -H 'Content-Type: application/json' -d @kafka.json http://192.168.112.41:8090/druid/indexer/v1/supervisor

  • 14. 通过overlord界面查看task情况

http://192.168.112.41:8090

druid部署及使用

druid部署及使用

  • 15. 通过pivot界面查看数据 

http://192.168.112.42:9095

druid部署及使用

druid部署及使用

  • 16. 使用HTTP的方式查询数据

http://192.168.112.42:8082/druid/v2/sql
请求方式:post
请求头:Content-Type:application/json
请求参数
{
"query":"SELECT * FROM dh_ods_ods_dh_pay_server WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY order by __time desc limit 10"
}