Logstash的安装和Elasticsearch的整合
logstash
是常见的ELK组合的成员之一.
E:elasticsearch
L:logstash
K:kibana(比head强大在数据分析)
logstash是负责收集数据在给elasticsearch做索引使用,logstash可以收集 数据库、文件等多种数据源的数据。
1 上传解压(jdk运行环境)
2 运行测试(bin目录下)
#logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}’
这行命令的意思就是收集控制台打印内容,并且转化成固定格式的字符串在控制台输出
最终我们得到一个这样的结果,说明logstash运行成功
3 从MySql数据库抽取数据内容
连接mysql数据库。在其中创建一个product表,随便录入点数据,方便一会测试使用
4 实现数据库数据导入索引(ik分词器使用)
product表格的数据导入es集群中 index02 product
安装jdbc input插件logstash-input-jdbc的包
logstash连接数据库需要插件的支持。此处使用logstash-input-jdbc-4.2.4.tar.gz
上传插件,到logstash根目录下的plugins文件夹(如果没有手动创建)
上传包解压到当前目录
5 准备mysql连接jar包支持
linux服务器上具备这个jar包,就可以使用logstash从mysql导入数据到es;
mysql-connector-java-5.1.38-bin.jar
jar位置无要求,一会做配置文件的时候会指定jar的路径,
5.1 jdbc.conf :logstash启动时加载的文件
需要创建3个文件,来配置logstash,3个文件名可以自定义
1、jdbc.sql 查询出来需要保存到索引的数据
select * from product
2、template.json
修改"analyzer": “ik_max_word”
这里指定使用的分词器为我们安装在Elasticsearch的ik分词器,这个在我的其他文章中已经安装好了
{
"template": "*",
"version": 50001,
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"_default_": {
"_all": {
"enabled": true,
"norms": false
},
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false,
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
],
"properties": {
"@timestamp": {
"type": "date",
"include_in_all": false
},
"@version": {
"type": "keyword",
"include_in_all": false
}
}
}
}
}
3、jdbc.conf
导入的数据源指定,导入的属性指定(jar包,sql语句执行等) input标签
需要修改的位置有:
连接mysql地址和库,配置为刚刚创建的product表所在的库
jdbc_connection_string => "jdbc:mysql:///easydb"
登录名密码
jdbc_user => "root"
jdbc_password => "root"
刚刚上传的mysql-connector的jar包,确定路径
jdbc_driver_library => "/home/resources/mysql-connector-java-5.1.38-bin.jar"
连接数据库执行的sql语句,可以在文件中设定搜索,更新等
statement_filepath => "/home/resources/jdbc.sql"
设定监听事件间隔schedule的值为cron表达式,可自行百度一下
type为自定义的类型,此处直接使用表名
schedule => "* * * * *"
type => "product"
es的连接地址
hosts => "192.168.1.55:9200"
我之前创建好的索引
index => "index02"
使用搜出出来的那个字段设定docid,我们使用product_id,这个为数据库中表里面的ID字段
document_id => "%{product_id}"
刚刚创建的第2个配置文件
template => "/home/resources/template.json"
input {
stdin {
}
jdbc {
# 连接mysql地址和库
jdbc_connection_string => "jdbc:mysql:///easydb"
# 登录名密码
jdbc_user => "root"
jdbc_password => "root"
# 上传的mysql-connector的jar包,确定路径
jdbc_driver_library => "/home/resources/mysql-connector-java-5.1.38-bin.jar"
# 驱动类全路径名称
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 开启分页,和分页最大值,自定义设置
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#连接数据库执行的sql语句,可以在文件中设定搜索,更新等
statement_filepath => "/home/resources/jdbc.sql"
#设定监听事件间隔
schedule => "* * * * *"
type => "product"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
#设定收集数据的输出位置
elasticsearch {
#es的连接地址
hosts => "192.168.1.55:9200"
#新建的索引
index => "index02"
#使用搜出出来的那个字段设定docid,我们使用product_id
document_id => "%{product_id}"
template_overwrite => true
template => "/home/resources/template.json"
}
stdout {
codec => json_lines
}
}
创建好三个文件后。启动logstash
6 启动logstash 指定配置文件jdbc.conf
#logstash -f jdbc.conf
我的jdbc.conf 在logstash的bin目录下,如果在其他目录此处需要带上路径。例如/home/jdbc.conf
已经启动成功。稍等一会。会根据cron表达式进行执行。我配置的为 schedule => “* * * * *”
所以稍等一会就会打印执行日志
这个工具是可以给es实时更新数据的,。我们可以在数据库测试添加和修改数据测试