ELK+springboot搭建分布式日志系统---strom流之kafka整合
废话不多说,直接开整!
业务场景:需要将一个服务的日志服务通过kafka整合到ES上
难点有2个:一个是如何将数据传递到kafka;一个是如何将数据抓取到ES分析
第一个难点:
我这里因为是个Netty服务结构,里面只有DI模型,只支持TCP,UDP传输,不支持spring-web(所以不支持MVC注解模式)。所以最新的KafkaTemplate(模板模式)别想了!
采取思路只有一个,查看kafka源码(org.apache.kafka的jar包)。
kafka模型设计在我前面的文章已经讲到过,这里之只谈我的解决思路:
遵循服务者消费者模型,把Netty服务当成生产者,logstash当成消费者,生产者建模关键代码如下(这里我将它整成一个Utils):
1创建KafkaProducer静态对象
private static KafkaProducer<String, String> producer;
2配置基本参数,放入静态代码块
static{
//生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍
Map<String,Object> config=new HashMap<String, Object>();
//kafka服务器地址
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务地址");
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," org.apache.kafka.clients.producer.internals.DefaultPartitioner");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);
//往kafka服务器提交消息间隔时间,0则立即提交不等待
config.put(ProducerConfig.LINGER_MS_CONFIG,0);
producer=new KafkaProducer<String,String>(config);
}
3send静态方法
public static Future<RecordMetadata> send(String topic,String value){
try {
logger.info("发送的值是"+value);
Future<RecordMetadata> future=producer.send(new ProducerRecord<String,String>(topic,value));
return future;
}catch(Exception e){
logger.warn("kafka发送消息异常");
return null;
}
}
这里key和value值都是采用的String序列化的,test下
第一步问题解决成功!
第二步问题:logstack如何获取kafka上的数据
这里就直接给我已经配好点(基于6版本)
input{
kafka{
bootstrap_servers => [zookeeper地址]
client_id => "客户端名称"
auto_offset_reset => "latest"
consumer_threads => 1 #线程数
decorate_events => true
topics => ["els-server"] #topic节点
topics_pattern => "els-server.*" #节点分区(注意:配置此项则topics项配置无效)
codec=> json {
charset =>"UTF-8" #设置编码格式
}
type => "kafkaDTO" #主要为了处理
}
}
filter {
json {
source => "message"
target => "message"
}
}
output {
file {
path => "/config-dir/test.log"
flush_interval => 0
} #设置此项目的主要是前期检查json格式是否输入正确,有利于文件排错(保存到本地日志)
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "els-server-%{+YYYY.MM.dd}"
}
}
注意配置文件的注释部分,有利于初期对配置排错处理,数据处理等作用,如file中观察json格式如下:
导入指数参考搭建那篇文章,然后数据如下:
第二步完成!
尝试分布式部署ES,但单台主机修改配置无效(无法同时启动主从节点),还是希望有帮助的话,给个赞!