利用logstash的logstash-input-jdbc插件实现mysql增量导入ES

版本:6.5.4

这个版本的logstash已经默认内置 jdbc input plugin

官方插件文档

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#plugins-inputs-jdbc

 

一、配置文件

mysql-logstash.conf

input {

stdin { }

jdbc {

#需要同步的数据库

jdbc_connection_string => "jdbc:mysql://192.168.0.20:3306/ds_qa"

jdbc_user => "root"

jdbc_password => "123456"

#本地jar包

jdbc_driver_library => "/home/es/ext/mysql-connector-java-5.1.29-bin.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "50000"

#获取到记录的SQL查询语句

statement => "SELECT * FROM user_acct"

#也可以将sql语句放到文件中(比如sql语句比较复杂的时候)

#statement_filepath => "jdbc.sql"

#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

#是否将字段名称转换为小写, 默认是true

lowercase_column_names => "false"

}

}

output {

stdout {

codec => json_lines

}

elasticsearch {

#ESIP地址与端口

hosts => "192.168.0.43:9200"

#ES索引名称(自己定义的)

index => "ds"

#文档类型

document_type => "user_acct"

#文档类型id,%{sn}意思是取查询出来的sn的值,并将其映射到es中_id字段中

document_id => "%{sn}"

}

}

具体参数说明参看官方文档

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#plugins-inputs-jdbc-options

 

以上为单表配置

多表配置如下

mysql-logstash-mutitables.conf

input {

stdin { }

jdbc {

type => "user_acct"

#需要同步的数据库

jdbc_connection_string => "jdbc:mysql://192.168.0.20:3306/ds_qa"

jdbc_user => "root"

jdbc_password => "123456"

#本地jar包

jdbc_driver_library => "/home/es/ext/mysql-connector-java-5.1.29-bin.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "50000"

#获取到记录的SQL查询语句

statement => "SELECT * FROM user_acct"

#也可以将sql语句放到文件中(比如sql语句比较复杂的时候)

#statement_filepath => "jdbc.sql"

#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

#是否将字段名称转换为小写, 默认是true

lowercase_column_names => "false"

}

jdbc {

type => "user_acct2"

#需要同步的数据库

jdbc_connection_string => "jdbc:mysql://192.168.0.20:3306/ds_qa"

jdbc_user => "root"

jdbc_password => "123456"

#本地jar包

jdbc_driver_library => "/home/es/ext/mysql-connector-java-5.1.29-bin.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "50000"

#获取到记录的SQL查询语句

statement => "SELECT * FROM user_acct2"

#也可以将sql语句放到文件中(比如sql语句比较复杂的时候)

#statement_filepath => "jdbc.sql"

#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

#是否将字段名称转换为小写, 默认是true

lowercase_column_names => "false"

}

}

output {

stdout {

codec => json_lines

}

elasticsearch {

#ESIP地址与端口

hosts => "192.168.0.43:9200"

#ES索引名称(自己定义的)

index => "ds"

#文档类型

document_type => "%{type}" # <- use the type from each input

#文档类型id,%{sn}意思是取查询出来的sn的值,并将其映射到es中_id字段中

document_id => "%{sn}"

}

}

 

 

 

 

二、启动logstash

./logstash -f /home/es/logstash-6.5.4/config/mysql-logstash.conf

 

[[email protected] bin]$ ./logstash -f /home/es/logstash-6.5.4/config/mysql-logstash.conf

Sending Logstash logs to /home/es/logstash-6.5.4/logs which is now configured via log4j2.properties

[2019-01-05T23:01:39,808][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified

[2019-01-05T23:01:39,829][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.5.4"}

[2019-01-05T23:01:42,141][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch index=>"gds", id=>"e4ab79fee7bfe9282d0a0faf7263025a18046025c58ebfbf38882060bb5030cb", document_id=>"%{sn}", hosts=>[//192.168.0.43:9200], document_type=>"user_acct", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_6e5b0bd7-5c93-4cd1-8764-9a83c94236ad", enable_metric=>true, charset=>"UTF-8">, workers=>1, manage_template=>true, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, action=>"index", ssl_certificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}

[2019-01-05T23:01:42,172][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}

[2019-01-05T23:01:42,504][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.0.43:9200/]}}

[2019-01-05T23:01:42,661][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.0.43:9200/"}

[2019-01-05T23:01:42,704][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}

[2019-01-05T23:01:42,707][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}

[2019-01-05T23:01:42,732][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//192.168.0.43:9200"]}

[2019-01-05T23:01:42,742][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}

[2019-01-05T23:01:42,755][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}

[2019-01-05T23:01:43,061][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x504e63d9 run>"}

The stdin plugin is now waiting for input:

[2019-01-05T23:01:43,140][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

[2019-01-05T23:01:43,398][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

 

启动成功看到上面红色部分, 没有成功会报错, 根据错误修改配置

 

三、验证导入成功

导入成功之后使用kibana的开发工具验证一下

GET /ds/user_acct/_count

利用logstash的logstash-input-jdbc插件实现mysql增量导入ES

看到文档数对应的数据行数

再查询一下

GET /ds/user_acct/_search

利用logstash的logstash-input-jdbc插件实现mysql增量导入ES

 

四、多说几句

这种增量同步mysql到es的方式有个问题 同步的数据量很大 这种查询对mysql本身性能是有影响的

大数据量建议使用 canal

https://github.com/alibaba/canal

做法参看github上这个项目的源码

https://github.com/starcwang/canal_mysql_elasticsearch_sync

基于canal + spring-data-elasticsearch操作就更简单了

https://spring.io/projects/spring-data-elasticsearch