Logstash导入MySQL(或TIDB)数据到ElasticSearch
本文主要介绍通过Logstash导入MySQL或TIDB数据库的数据到ElasticSearch中,实现搜索引擎的功能及可能遇到的一些问题。(Windows环境下)
一、安装ElasticSearch
1.官网下载ElasticSearch,解压即可。(下载地址:https://www.elastic.co/downloads/elasticsearch)
2.进入安装目录下bin目录中,使用命令 ./elasticsearch 启动,启动输出信息结尾出现started 关键字,表示启动成功,使用./elasticsearch -d 命令使服务后台启动,程序默认端口为9200。
3. 使用 http://localhost:9200 访问,结果如下则启动成功
二、安装logstash
1. 需要注意的是,logstash版本要和ElasticSearch版本保持一致,否则会出bug。(下载地址:https://www.elastic.co/downloads/logstash)
2. 连接MySQL
首先,下载mysql JDBC驱动jar包,(下载地址:https://www.mysql.com/products/connector/)将jar包放在logstash的bin目录下。本人使用的是ElasticSearch及logstash都是7.3.2版本,jar包需要JDK8版本,否则会报错。版本如下:
2.1 在 bin目录下新建文件 mysql.conf (或mysql.yml)这种格式的文件都可以,内容如下
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/数据库名?characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true"
jdbc_user => "用户名"
jdbc_password => "密码"
jdbc_driver_library => "mysql-connector-java-8.0.15.jar" # 上一步下载的jar包路径
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 待执行的sql语句文件地址
statement_filepath => "mysql.sql"
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => last_update_date
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => timestamp # 若同步的是datetime字段需要用timestamp ,数字id用numeric
# record_last_run上次数据存放位置;
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "sql_last_update.txt"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
clean_run => false
schedule => "* * * * *"
type => "tidb_data"
}
}
# 此filter是为了修正同步的时间戳与本机相同为东8区时间,否则时间戳会差8小时
filter {
date {
match => ["message","UNIX_MS"]
target => "@timestamp"
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.utc+8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "tidb2es"
document_id => "%{id}"
}
stdout{
codec => json_lines
}
}
2.2 创建mysql.sql文件内容如下:
# 根据需求写入查询的sql语句,此为增量更新,last_update_date为跟踪的增量字段名
select * from DB_NOTICE_MENU_INFO_F where last_update_date>:sql_last_value
2.3 创建sql_last_update.txt空文件,用来记录增量更新的初始值。
以上3个文件创建完成后,启动logstash,进入bin目录,执行命令:
logstash -f mysql.yml
稍等片刻,若启动成功便会有不同的数据打印出来,若报错,需具体问题具体分析。
三、本人踩过的几个坑,插旗以告之
1. 在创建启动的配置文件的时候要使用utf-8无deom模式,及mysql.yml文件
解决方案:使用notepad++设置成utf-8无deom模式创建文件,复制内容进去,再尝试。
2. jdk环境要使用jdk8,大于8java会报错,也就是mysql-connector-java会报错,找不到相关的类
3. 在配置文件中output模块给ES设置index不能是大写