MLK分布式日志处理

发布于:2022-10-29 ⋅ 阅读:(328) ⋅ 点赞:(0)

目录

场景以及优点

Java代码集合logstash,es进行检索

将信息通过logstash输入到es中

然后我们将es中的日志信息输出(做一个日志的接口)


 

场景以及优点

当系统发现故障时,那么我们需要登录服务器grep 等脚本工具去日志查找原因,在没有日志系统的情况下,如果服务器部署了多个实例,我们需要进每个实例的容器种去寻找日志文件(如果量毕比较大的话,比如每天产生一个新的logfile,那么多节点时间一长就会导致文件过多)

(50条消息) tail 命令详解_Coder杨公子的博客-CSDN博客_tail命令

优化:

我们可以将日志集中管理,并且提供集中检索的方法(ELK=elasticsearch+logstash+kibana),

logstash:负责数据的收集整理,相当于一个通道

Elasticsearch:数据的存储和分析

在这里插入图片描述

 logstash的工作原理:

logstash事件处理管道有三个阶段:1.输入->过滤器->输出,输入生成事件,然后过滤器去过滤修改他们将他们输出到其他地方

在这里插入图片描述

 1.首先docker pull一共logstash的tar包,进行解压得到镜像

 2.然后修改配置文件

 3.最后重启容器即可

Java代码集合logstash,es进行检索

将信息通过logstash输入到es中

1.logstash日志配置文件

<?xml version="1.0" encoding="UTF-8"?>
<!-- 该日志将日志级别不同的 log 信息保存到不同的文件中 -->
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml" />
    <springProperty scope="context" name="springAppName" source="spring.application.name" />
    <!-- 日志在工程中的输出位置 -->
    <property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}" />
    <!-- 控制台的日志输出样式 -->
    <property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}" />
    <!-- 控制台输出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <!-- 日志输出编码 -->
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- logstash 远程日志配置 -->
    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>82.157.198.247:4560</destination>
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>

    <!-- 日志输出级别 -->
    <root level="DEBUG">
        <appender-ref ref="console" />
        <appender-ref ref="logstash" />
    </root>
</configuration>

2.启动类上利用LoggerFactory打印日志 

package com.wyh.logstash;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LogStashApplication {

    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger(LogStashApplication.class);
        logger.error("自己写的bug");
        SpringApplication.run(LogStashApplication.class, args);
    }

}

3.logstash依赖

<dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>6.3</version>
        </dependency>

此时会发现日志通过logstash这通道输出到es中

然后我们将es中的日志信息输出(做一个日志的接口)

0.实体类

package com.wyh.logstashsys.pojo;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.util.Date;

@Data
@Document(indexName = "test_log")
public class LogPojo {

    @Id
    private String id;
    @Field(type = FieldType.Integer)
    private Integer port;
    @Field(type = FieldType.Text)
    private String message;
    @Field(name = "@version",type = FieldType.Keyword)
    private String version;
    //注意:如果不配置date_time
    @Field(name = "@timestamp",type = FieldType.Date,format = DateFormat.date_time)
    private Date timestamp;
    @Field(type = FieldType.Keyword)
    private String host;

}

1.接口

public interface LogService {
    /**
     * 1.查询es中的日志信息
     * @return
     */
    List<LogPojo> selectPage(Integer page,Integer rows);
}

2.业务实现类(指定时间范围得到hits,然后放到集合中进行返回)


@Service
public class LogServiceImpl implements LogService{

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    /**
     * 1.分页查询es中的分布式日志
     * @param page
     * @param rows
     * @return
     */
    @Override
    public List<LogPojo> selectPage(Integer page, Integer rows) {
        //1.设置查询参数:小于当前时间15分钟的时间对象
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,-15);
        //2.查询最近15分钟以内的日志
        Query query = new NativeSearchQuery(QueryBuilders.rangeQuery("@timestamp").gte(calendar.getTime()));
        query.setPageable(PageRequest.of(page-1,rows));
        //3.es客户端进行查询
        SearchHits<LogPojo> searchHits = elasticsearchRestTemplate.search(query, LogPojo.class);

        //4.对结果进行循环遍历输出
        Vector<LogPojo> vector = new Vector<>();
        for (SearchHit<LogPojo> searchHit : searchHits) {
            vector.add(searchHit.getContent());
        }

        return vector;
    }
}

 3.接口层

 /**
     * 1.得到分布式事务的分页结果
     * @param page
     * @param rows
     * @return
     */
    @RequestMapping("/")
    public List<LogPojo> showLogs(Integer page,Integer rows){
        return logService.selectPage(page,rows);
    }

 

 

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到