Spring Boot项目整合ELK、Kafka

本篇文章主要介绍使用Spring Boot整合log4j,配合ELK(Elasticsearch , Logstash, Kibana)整合Kafka完成日志收集,应用场景比较多的是分布式项目,这样可以直接收集各个节点的日志到一起,便于错误日志查看和分析业务。

整个流程如下:

使用log4j的appender发送数据到kafka到topic,topic再发送到logstash,然后经过elasticsearch分析处理后到kibana页面做查询展示。

环境准备:

1、安装ELK

2、安装kafka

3、Spring Boot项目demo

简要步骤:

①、修改logj配置文件

②、加入日志打印代码

③、启动项目访问页面

④、使用kafka查看消费者打印

⑤、使用kibana建立日志索引

⑥、使用kibana查询日志、es语法查询

 

详细步骤:

一、修改logj配置文件

注:本项目demo是基于之前到Spring Boot入门教程的一个整合JPA 和 Thymeleaf示例。

Spring Boot2.0系列教程之 JPA 和 Thymeleaf 实践(五):Spring Boot2.0系列教程之 JPA 和 Thymeleaf 实践(五)

配置文件logback-spring.xml

Spring Boot项目整合ELK、Kafka

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
   <springProperty scope="context" name="LOG_HOME" source="logback.file.path" defaultValue="E:\a_hadoop\All_logs"/>
   <springProperty scope="context" name="LOG_LEVEL" source="logback.level" defaultValue="info"/>
   <springProperty scope="context" name="SERVER_NAME" source="spring.application.name" defaultValue="boot_demo"/>


   <!--<property name="LOG_HOME" value="D:/application/logs/enett" />-->

   <!--<property name="LOG_HOME" value="/Users/ailk/test/ysl" />-->
   <property name="SEF_Level" value="INFO" />
   <!--文件输出的格式设置 -->
   <appender name="FILE"
           class="ch.qos.logback.core.rolling.RollingFileAppender">
      <!-- 文件输出的日志 的格式 -->
      <encoder>
         <pattern>
            ${SERVER_NAME} ${NODE_FLAG} %level %date{yyyy-MM-dd HH:mm:ss.SSS} %logger[%line] %msg%n
         </pattern>
      </encoder>


      <!-- 配置日志所生成的目录以及生成文件名的规则 在logs/mylog-2016-10-31.0.log -->
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <fileNamePattern>${LOG_HOME}/${SERVER_NAME}/info/info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
         <timeBasedFileNamingAndTriggeringPolicy
               class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
            <!-- 最大64MB 超过最大值,会重新建一个文件-->
            <maxFileSize>10 MB</maxFileSize>
         </timeBasedFileNamingAndTriggeringPolicy>
      </rollingPolicy>
   </appender>









   <!--控制台输出的格式设置 -->
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
      <!-- 控制台输出的日志 的格式 -->
      <encoder>
         <pattern>
            ${SERVER_NAME} ${NODE_FLAG} %level %date{yyyy-MM-dd HH:mm:ss.SSS} %logger[%line] %msg%n
         </pattern>
      </encoder>
      <!-- 只是DEBUG级别以上的日志才显示 -->
      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
         <level>DEBUG</level>
      </filter>
   </appender>






    操作日志
   <appender name="DruidLog"
           class="ch.qos.logback.core.rolling.RollingFileAppender">
      <!-- 文件输出的日志 的格式 -->
      <encoder>
         <pattern>
            ${SERVER_NAME} ${NODE_FLAG} %level %date{yyyy-MM-dd HH:mm:ss.SSS} %logger[%line] %msg%n
         </pattern>
      </encoder>


      <!-- 配置日志所生成的目录以及生成文件名的规则 在logs/mylog-2016-10-31.0.log -->
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <fileNamePattern>${LOG_HOME}/${SERVER_NAME}/Druid/DruidLog-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
         <timeBasedFileNamingAndTriggeringPolicy
               class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
            <!-- 最大64MB 超过最大值,会重新建一个文件-->
            <maxFileSize>10 MB</maxFileSize>
         </timeBasedFileNamingAndTriggeringPolicy>
      </rollingPolicy>
   </appender>





   <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">

      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <pattern>${SERVER_NAME} ${NODE_FLAG} %level %date{yyyy-MM-dd HH:mm:ss.SSS} %logger[%line] %msg%n</pattern>
         <charset>utf8</charset>
      </encoder>
      <topic>boot_demo</topic>
      <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
      <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
           <!--注意此处应该是spring boot中的kafka配置属性-->
      <producerConfig>bootstrap.servers=192.168.234.156:9092</producerConfig>
           <producerConfig>retries=1</producerConfig>

          <producerConfig>batch-size=16384</producerConfig>
          <producerConfig>buffer-memory=33554432</producerConfig>
          <producerConfig>properties.max.request.size==2097152</producerConfig>
   </appender>






<!--   <logger name="com.alibaba.druid" additivity="false"  level="INFO">
      <appender-ref ref="DruidLog" />
   </logger>-->




   <logger name="org.springframework" level="${SEF_Level}" />
   <logger name="com.baomidou" level="${SEF_Level}" />
   <logger name="org.apache" level="${SEF_Level}" />
   <logger name="org.mybatis" level="${SEF_Level}" />
   <logger name="org.hibernate" level="${SEF_Level}" />
   <logger name="io.netty" level="${SEF_Level}" />
   <logger name="ch.qos" level="${SEF_Level}" />
   <logger name="org.eclipse" level="${SEF_Level}" />
   <logger name="org.thymeleaf" level="${SEF_Level}" />
   <logger name="io.lettuce" level="${SEF_Level}" />
   <logger name="com.xxl" level="${SEF_Level}" />
   <logger name="com.ctrip" level="${SEF_Level}" />
   <logger name="com.ulisesbocchio" level="${SEF_Level}" />
   <logger name="com.netflix" level="${SEF_Level}" />

   <!--myibatis log configure-->
   <logger name="com.apache.ibatis" level="${SEF_Level}"/>
   <logger name="java.sql.Connection" level="${SEF_Level}"/>
   <logger name="java.sql.Statement" level="${SEF_Level}"/>
   <logger name="java.sql.PreparedStatement" level="${SEF_Level}"/>
   <logger name="tk.mybatis.mapper" level="${SEF_Level}" />
   <logger name="com.github.pagehelperr" level="${SEF_Level}" />
   <logger name="org.apache.kafka" level="${SEF_Level}" />
   <logger name="c.u.jasyptspringboot" level="${SEF_Level}" />
   <logger name="com.ulisesbocchio.jasyptspringboot" level="${SEF_Level}" />
   <logger name="org.springframework.context.annotation" level="${SEF_Level}" />
   <logger name="org.springframework.beans.factory.annotation" level="${SEF_Level}" />
   <logger name="org.springframework.context.support" level="${SEF_Level}" />
   <logger name="com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver" level="${SEF_Level}" />



   <!--   <root level="${LOG_LEVEL}">
         <appender-ref ref="STDOUT" />
      </root>-->

      <root level="${LOG_LEVEL}">
         <appender-ref ref="FILE" />
         <appender-ref ref="STDOUT" />
         <appender-ref ref="kafkaAppender"  />
      </root>

</configuration>

 

二、加入日志打印代码

1、日志格式化类、json工具类

Spring Boot项目整合ELK、Kafka

2、日志打印方法调整,这里后续可以在kibana中看到日志

Spring Boot项目整合ELK、Kafka

3、json工具类代码

package com.boot.config;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;

import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;

/**
 * JSON工具类
 *
 * @author zhoujh
 */
public class JSONUtil {

    private static Gson gson = null;

    static {
        gson = new Gson();// yyyy-MM-dd HH:mm:ss
    }

    public static synchronized Gson newInstance() {
        if (gson == null) {
            gson = new Gson();
        }
        return gson;
    }

    /**
     * 从对象生成JSON字符串
     *
     * @param obj 任意对象
     */
    public static String toJson( Object obj ) {
        return gson.toJson(obj);
    }

    public static String toJsonByFastJson( Object obj ) {
        return  JSON.toJSONString(obj);
    }

    /**
     * JSON字符串转为JavaBean
     *
     * @param json JSON字符串
     */
    public static <T> T jsonToBean( String json, Class<T> cls ) {
        return gson.fromJson(json, cls);
    }
    public static <T> T jsonToBeanByFastJson( String json, Class<T> cls ) {
        return JSON.parseObject(json, cls);
    }

    /**
     * JSON字符串转为JavaBean
     *
     * @param json JSON字符串
     */
    public static <T> List<T> jsonToList( String json, Type type ) {
        return gson.fromJson(json, type);
    }

    public static <T> List<T> jsonToListByFastJson( String json, Class<T> cls){
       return JSON.parseArray(json,cls);
    }
    /**
     * JSON字符串转为JavaBean
     *
     * @param json JSON字符串
     */
    public static <T> Map<String, T> jsonToMap( String json, Type type ) {
        return gson.fromJson(json, type);
    }

}

4、日志工具类代码

package com.boot.config;

import com.google.gson.Gson;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.HashMap;

/**
 * 基础日志类
 *
 * @author zhoujh
 * @version 20180709
 */
public class LogUtil {

    private static Log log = LogFactory.getLog(LogUtil.class);
    private static final String thisClassName = LogUtil.class.getName();
    //信息分隔符
    private static final String msgSplit = ":";
    //是否要定位服务
    private static boolean showLocSrc = true;
    //是否显示日志
    private static final boolean enabled = true;
    //显示等级
    private static int level = 1;
    private static final int debug = 1;
    private static final int info = 2;
    private static final int warn = 3;
    private static final int error = 4;




     /*
        日志记录具体使用规则如下:
        Error:所有异常捕获的Catch节点中异常内容用此级别进行记录(如:Exception对象的e.ToString())
        Warn:所有验证未通过时用此级别进行记录(暂时只用来记录接口请求或通知等验证未通过需要警告进行跟进的操作)(如:签名解析失败记录)
        Info:功能日常交互信息类用此级别进行记录(如:请求接口参数、接口响应参数等)
        Debug:业务代码交互或调试使用此级别进行记录,但只存在于开发及测试环境,生产环境将会禁用不会记录(如:xx对象信息或xx对象json等)
        日志记录格式如下:
        【传入参数(如:操作ID、订单ID、票号、金额等)】【具体内容】 /n Exception堆栈信息(Exception记录必须使用e.ToString()记录堆栈信息)
    */

    /* 日志格式化文本 */
    public static final String LOGFORMAT = "【%s】【%s】";

    public static final String getLogContent(String pContent, Object... pParameters) {

        String fParameters = "";
        if (null != pParameters && 0 < pParameters.length) {
            Gson gson = new Gson();
            fParameters = String.format("参数:%s", gson.toJson(pParameters));
        }

        return String.format(LOGFORMAT, pContent, fParameters);
    }



    /**
     * 记录Debug级日志[业务代码交互或调试使用此级别进行记录,但只存在于开发及测试环境,生产环境将会禁用不会记录(如:xx对象信息或xx对象json等)]
     *
     * @param content 信息综述
     * @param  parameters 详述
     */
    public static void debug(String content) {
        debug(content,"");
    }

    public static void debug(String content, Object parameters) {
        debug(content, JSONUtil.toJson(parameters));
    }

    public static void debug(String content, String parameters) {
       String message=String.format(LOGFORMAT, content, parameters);
        if (!enabled || debug < level)
            return;
        if (showLocSrc) {
            log(debug, message, Thread.currentThread().getStackTrace(),null);
        } else {
            log(debug, message, null,null);
        }
    }






    /**
     * 记录Info级日志[功能日常交互信息类用此级别进行记录(如:请求接口参数、接口响应参数等)]
     *
     * @param content 信息综述
     * @param  parameters 详述
     */
    public static void info(String content) {
        info(content,"");
    }

    public static void info(String content, Object parameters) {
        info(content, JSONUtil.toJson(parameters));
    }

    public static void info(String content, String parameters) {
        String message=String.format(LOGFORMAT, content, parameters);
        if (!enabled || info < level)
            return;
        if (showLocSrc) {
            log(info, message, Thread.currentThread().getStackTrace(),null);
        } else {
            log(info, message, null,null);
        }
    }




    /**
     * 记录Warn级日志[所有验证未通过时用此级别进行记录(暂时只用来记录接口请求或通知等验证未通过需要警告进行跟进的操作)(如:签名解析失败记录)]
     *
     * @param content 信息综述
     * @param  parameters 详述
     */
    public static void warn(String content) {
        warn(content,"");
    }

    public static void warn(String content, Object parameters) {
        warn(content, JSONUtil.toJson(parameters));
    }

    public static void warn(String content, String parameters) {
        String message=String.format(LOGFORMAT, content, parameters);
        if (!enabled || warn < level)
            return;
        if (showLocSrc) {
            log(warn, message, Thread.currentThread().getStackTrace(),null);
        } else {
            log(warn, message, null,null);
        }
    }





    /**
     * Error:所有异常捕获的Catch节点中异常内容用此级别进行记录(如:Exception对象的e.ToString())
     *
     * @param
     * @param content 信息综述
     * @param  parameters 详述
     * @param e          异常信息
     */
    public static void error(String content) {
        error(content,"");
    }

    public static void error(String content, Object parameters) {
        error(content, JSONUtil.toJson(parameters));
    }

    public static void error(String content, String parameters) {
        String message=String.format(LOGFORMAT, content, parameters);
        if (!enabled || error < level)
            return;
        if (showLocSrc) {
            log(error, message, Thread.currentThread().getStackTrace(),null);
        } else {
            log(error, message, null,null);
        }
    }

    public static void error(String content,Exception e) {
        error(content,"",e);
    }

    public static void error(String content, Object parameters,Exception e) {
        error(content, JSONUtil.toJson(parameters),e);
    }

    public static void error(String content, String parameters,Exception e) {
        String message=String.format(LOGFORMAT, content, parameters);
        if (!enabled || error < level)
            return;
        if (showLocSrc) {
            log(error, message, Thread.currentThread().getStackTrace(),e);
        } else {
            log(error, message, null,e);
        }
    }


    private static String getStackMsg(StackTraceElement[] ste) {
        if (ste == null) return "";

        boolean srcFlag = false;
        for (int i = 0; i < ste.length; i++) {
            StackTraceElement s = ste[i];

            if(s==null) continue;

            // 如果上一行堆栈代码是本类的堆栈,则该行代码则为源代码的最原始堆栈。
            if (srcFlag) {
                if(!thisClassName.equals(s.getClassName()))
                    return s.toString();
            }

            // 定位本类的堆栈
            if (thisClassName.equals(s.getClassName())) {
                srcFlag = true;
            }
        }
        return "";
    }


    private static void log(int level, Object message, StackTraceElement[] ste, Exception e) {
        if (ste != null) {
            message = getStackMsg(ste) + msgSplit + message;
        }

        switch (level) {
            case info:
                log.info(message);
                break;
            case debug:
                log.debug(message);
                break;
            case warn:
                log.warn(message);
                break;
            case error:
                log.error(message,e);
                break;
            default:
                log.debug(message);
        }
    }

    public static void main(String[] args) {
        debug("ss");
        debug("sdf","sdf");
        debug("sdf",new HashMap());

        info("ss");
        info("sdf","sdf");
        info("sdf",new HashMap());

        warn("ss");
        warn("sdf","sdf");
        warn("sdf",new HashMap());


        error("ss");
        error("sdf","sdf");
        error("sdf",new HashMap());

        error("ss",new Exception());
        error("sdf","sdf",new Exception());
        error("sdf",new HashMap(),new Exception());
    }
}

 

三、启动项目访问页面

Spring Boot项目整合ELK、Kafka

 

四、使用kafka查看消费者打印

Spring Boot项目整合ELK、Kafka

 

五、使用kibana建立日志索引

Spring Boot项目整合ELK、Kafka

已经看到日志 

Spring Boot项目整合ELK、Kafka

点击创建需要等待

Spring Boot项目整合ELK、Kafka

成功后可以看到创建的索引

 

Spring Boot项目整合ELK、Kafka

 

六、使用kibana查询日志、es语法查询

1、这里选择查询时间段

Spring Boot项目整合ELK、Kafka

 

可以查看到日志

Spring Boot项目整合ELK、Kafka

Spring Boot项目整合ELK、Kafka

2、 用es语法查询

Spring Boot项目整合ELK、Kafka

 

 

推荐笔者elk、kafka、Springboot相关博客:

ELK架构体系、ELK运行原理、ELK应用场景、ELK简单介绍(一):https://blog.****.net/zjh_746140129/article/details/86483318

Linux安装ELK、elasticsearch部署安装(二):https://blog.****.net/zjh_746140129/article/details/86483661

Linux安装ELK、logstash部署安装(三):https://blog.****.net/zjh_746140129/article/details/86484586

Linux安装ELK、kibana部署安装(四):https://blog.****.net/zjh_746140129/article/details/86484862

ELK启动报错:OpenJDK 64-Bit Server VM warning:https://blog.****.net/zjh_746140129/article/details/86601574

ELK常见错误问题、ELK的一些坑、Unable to connect to Elasticsearch at http://localhost:9200:https://blog.****.net/zjh_746140129/article/details/86601791

Centons7下安装配置Kafka、Linux下安装配置Kafka:Centons7下安装配置Kafka、Linux下安装配置Kafka

Spring Boot2.0系列教程之 JPA 和 Thymeleaf 实践(五):Spring Boot2.0系列教程之 JPA 和 Thymeleaf 实践(五)