go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现

发布于:2024-04-26 ⋅ 阅读:(26) ⋅ 点赞:(0)

前言

在之前我们j基于sarama,tail还有go-ini实现了日志收集系统客户端的编写,但是我们梳理一下可以发现,该客户端还存在一些问题:

  • 客户端一次只能读取一个日志文件,无法同时读取多个分区
  • 无法管理日志存放的分区(topic)
    那我们一个如何去解决这个问题呢?在前两篇文章中我们介绍了etcd,它通过可以存储键值对并且通过watch操作来实现对键值对的实时监控,那我们能不能尝试用`etcd``来储存日志文件信息与对应分区信息?这就是我们今天这篇文章所探究的主题.

备注: etcd博文地址:
go语言并发实战——日志收集系统(七) etcd的介绍与简单使用
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控

初步实现的流程

存储数据格式

这里为了存储数据方便,我们利用json格式来存储数据,示例如下:

[
    {
        "path": "G:/goproject/-goroutine-/log-agent/log/log1",
        "topic": "web.log"
    },
    {
        "path": "G:/goproject/-goroutine-/log-agent/log/log2",
        "topic": "s4.log"
    }
]

etcd初始化的编写

在之前有段etcd的博文中,我们已经介绍过etcd的基本使用,这里不做赘述,首先我们在log-agent文件夹下创建etcd文件夹,创建etcd.go文件,编写etcd的初始化:

func Init(address []string) (err error) {
	client, err = clientv3.New(clientv3.Config{
		Endpoints:   address,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		logrus.Error("etcd client connect failed,err:%v", err)
		return
	}
	return
}

然后 在main.go文件中调用:

err = etcd.Init(ConfigObj.Etcdaddress.Addr)
	if err != nil {
		logrus.Error("InitEtcd failed, err:%v", err)
		return
	}
	logrus.Infof("InitEtcd success")

通过etcd拉取要收集文件的配置项

在初始化etcd后,我们就要通过etcd来拉取要收集文件的配置项了,首先定义一个结构体来接收信息:

type collectEntry struct {
	Path  string `json:"path"`
	Topic string `json:"topic"`
}

然后创建拉取配置项的函数:

func GetConf(key string) (err error, collectEntryList []collectEntry) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	response, err := client.Get(ctx, key)
	if err != nil {
		logrus.Error("get conf from etcd failed,err:%v", err)
		return
	}
	if len(response.Kvs) == 0 {
		logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
		return
	}
	fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
	err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
	if err != nil {
		logrus.Error("json unmarshal failed,err:%v", err)
		return
	}
	return
}

然后在main.go里面调用一下就可以了:

	//拉取要收集日志文件的配置项
	err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
	if err != nil {
		logrus.Error("GetConf failed, err:%v", err)
		return
	}
	fmt.Println(collectEntryList)

尝试用之前的demo设置一下配置文件中的key对应的value:

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"time"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"}, //服务端通信端口
		DialTimeout: 5 * time.Second,            //连接超时时间
	})
	//put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	str := "[{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log1\",\"topic\":\"web.log\"},{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log2\",\"topic\":\"s4.log\"}]"
	_, err = cli.Put(ctx, "collect_log_conf", str)
	cancel()
	if err != nil {
		fmt.Println("put failed,err:%v", err)
		return
	}

}

运行就可以看到我们接收到了配置项了:
在这里插入图片描述

涉及改动处的源代码

  • 配置文件
[kafka]
address=127.0.0.1:9092
topic=test1.log
chan_size=100000

[etcd]
address=127.0.0.1:2379
collect_key=collect_log_conf

[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1

  • main.go
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/go-ini/ini"
	"log-agent/Kafka"
	"log-agent/etcd"
	"log-agent/tailFile"
	"strings"
	"time"
)

type Config struct {
	Kafakaddress Kafkaddress `ini:"kafka"`
	LogFilePath  LogFilePath `ini:"collect"`
	Etcdaddress  EtcdAddress `ini:"etcd"`
}

type Kafkaddress struct {
	Addr        []string `ini:"address"`
	Topic       string   `ini:"topic"`
	MessageSize int64    `ini:"chan_size"`
}

type LogFilePath struct {
	Path string `ini:"logfile_path"`
}

type EtcdAddress struct {
	Addr []string `ini:"address"`
	Key  string   `ini:"collect_key"`
}

func run(config *Config) (err error) {
	for {
		line, ok := <-tailFile.TailObj.Lines
		if !ok {
			logrus.Error("read from tail failed,err:", err)
			time.Sleep(2 * time.Second)
			continue
		}
		if len(strings.Trim(line.Text, "\r")) == 0 {
			continue
		}
		msg := &sarama.ProducerMessage{}
		msg.Topic = config.Kafakaddress.Topic
		msg.Value = sarama.StringEncoder(line.Text)
		Kafka.MesChan(msg)
	}
}

func main() {
	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

	//初始化etcd
	err = etcd.Init(ConfigObj.Etcdaddress.Addr)
	if err != nil {
		logrus.Error("InitEtcd failed, err:%v", err)
		return
	}
	logrus.Infof("InitEtcd success")

	//拉取要收集日志文件的配置项
	err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
	if err != nil {
		logrus.Error("GetConf failed, err:%v", err)
		return
	}
	fmt.Println(collectEntryList)
	//初始化tail
	err = tailFile.InitTail(ConfigObj.LogFilePath.Path)
	if err != nil {
		logrus.Error("InitTail failed, err:%v", err)
		return
	}
	logrus.Infof("InitTail success")

	//利用sarama报发送消息到Kafka中
	err = run(ConfigObj)
	if err != nil {
		logrus.Error("run failed, err:%v", err)
		return
	}
}

  • etcd.go
package etcd

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	clientv3 "go.etcd.io/etcd/client/v3"
	"golang.org/x/net/context"
	"time"
)

var client *clientv3.Client

type collectEntry struct {
	Path  string `json:"path"`
	Topic string `json:"topic"`
}

func Init(address []string) (err error) {
	client, err = clientv3.New(clientv3.Config{
		Endpoints:   address,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		logrus.Error("etcd client connect failed,err:%v", err)
		return
	}
	return
}

func GetConf(key string) (err error, collectEntryList []collectEntry) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	response, err := client.Get(ctx, key)
	cancel()
	if err != nil {
		logrus.Error("get conf from etcd failed,err:%v", err)
		return
	}
	if len(response.Kvs) == 0 {
		logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
		return
	}
	fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
	err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
	if err != nil {
		logrus.Error("json unmarshal failed,err:%v", err)
		return
	}
	return
}