Logstash导入MySQL(或TIDB)数据到ElasticSearch

本文主要介绍通过Logstash导入MySQL或TIDB数据库的数据到ElasticSearch中,实现搜索引擎的功能及可能遇到的一些问题。(Windows环境下)

一、安装ElasticSearch

1.官网下载ElasticSearch,解压即可。(下载地址:https://www.elastic.co/downloads/elasticsearch)

2.进入安装目录下bin目录中,使用命令 ./elasticsearch 启动,启动输出信息结尾出现started 关键字,表示启动成功,使用./elasticsearch  -d 命令使服务后台启动,程序默认端口为9200。

3. 使用 http://localhost:9200 访问,结果如下则启动成功

Logstash导入MySQL(或TIDB)数据到ElasticSearch

二、安装logstash

1.  需要注意的是,logstash版本要和ElasticSearch版本保持一致,否则会出bug。(下载地址:https://www.elastic.co/downloads/logstash

2.  连接MySQL

首先,下载mysql JDBC驱动jar包,(下载地址:https://www.mysql.com/products/connector/)将jar包放在logstash的bin目录下。本人使用的是ElasticSearch及logstash都是7.3.2版本,jar包需要JDK8版本,否则会报错。版本如下: 

Logstash导入MySQL(或TIDB)数据到ElasticSearch

2.1  在 bin目录下新建文件 mysql.conf (或mysql.yml)这种格式的文件都可以,内容如下

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://localhost:3306/数据库名?characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true"
        jdbc_user => "用户名"
        jdbc_password => "密码"
        jdbc_driver_library => "mysql-connector-java-8.0.15.jar"  # 上一步下载的jar包路径
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        # 待执行的sql语句文件地址
        statement_filepath => "mysql.sql" 
        # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
        use_column_value => true
        # 需要记录的字段,用于增量同步,需是数据库字段
        tracking_column => last_update_date
        # Value can be any of: numeric,timestamp,Default value is "numeric"
        tracking_column_type => timestamp # 若同步的是datetime字段需要用timestamp ,数字id用numeric
        # record_last_run上次数据存放位置;
        record_last_run => true
        #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
        last_run_metadata_path => "sql_last_update.txt"
        # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
        clean_run => false
        schedule => "* * * * *"
        type => "tidb_data"
        }
}

# 此filter是为了修正同步的时间戳与本机相同为东8区时间,否则时间戳会差8小时
filter {
date {
match => ["message","UNIX_MS"]
target => "@timestamp"
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.utc+8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
 
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "tidb2es"
        document_id => "%{id}"
        }
        stdout{
        codec => json_lines
        }
}

2.2  创建mysql.sql文件内容如下:
# 根据需求写入查询的sql语句,此为增量更新,last_update_date为跟踪的增量字段名
select * from DB_NOTICE_MENU_INFO_F where last_update_date>:sql_last_value

2.3   创建sql_last_update.txt空文件,用来记录增量更新的初始值。

以上3个文件创建完成后,启动logstash,进入bin目录,执行命令:

logstash -f mysql.yml

稍等片刻,若启动成功便会有不同的数据打印出来,若报错,需具体问题具体分析。

三、本人踩过的几个坑,插旗以告之

1.  在创建启动的配置文件的时候要使用utf-8无deom模式,及mysql.yml文件

    解决方案:使用notepad++设置成utf-8无deom模式创建文件,复制内容进去,再尝试。
2.  jdk环境要使用jdk8,大于8java会报错,也就是mysql-connector-java会报错,找不到相关的类
3.  在配置文件中output模块给ES设置index不能是大写