goreplay 录制回放源码解析

发布于:2022-07-27 ⋅ 阅读:(283) ⋅ 点赞:(0)

前言

一般而言流量的录制与回放经常在测试过程中使用,自动化批量验证功能,或者用来做ABTest实验。流量在互联网公司就是数据,是用户,因而很多互联网公司就是流量公司,就是风口,本质上讲大数据分析就是分析流量,比如:杀熟。流量的回放仅仅是流量的一种用途,流量的录制是基础,只有录制好流量,可以有很多用途。

流量录制

流量的录制实际上有很多种方式,常见的有2种:代理和监听

  1. 代理
    代理顾名思义,就是流量代理转发复制一份数据,nginx,sidecar等都是代理的表现。
  2. 监听
    监听就是偏硬件驱动方式,就是网卡回调的时候,告诉驱动再回调一次监听程序,从而拿到流量。

流量回放

流量回放仅仅是流量的一种处理方式,流量用处极大,回放一般用于测试或者ABTest。流量的回放可以应用层回放,也可以传输层回放,应用层回放具有侵入性,但是能精细修改数据,数据友好可见;传输层回放直接发送tcp或者udp数据包即可。根据实际情况选择方式

数据模拟

数据模拟熟称mock,理论上流量的录制也可以是mock,录制流量实际上不仅仅是录制入口流量,包括数据库、缓存、mq的都需要录制,并提供mock能力,方便回放,这个就需要定制了。

goreplay的源码分析

上面说了很多概念,实际上录制选型2种方案,对应用影响最小的网卡监听,在go语言的gopacket有现成的抓包能力,goreplay封装了http处理能力。另外开源的有tcpcopy,也可以使用代理方案,代理方案对业务侵入性比较大,要考虑影响。

goreplay demo构造

随意写一个http demo,Spring boot 2.6.4版本,配置多个实例运行

package com.feng.boot.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class BootMain {
    public static void main(String[] args) {
        SpringApplication.run(BootMain.class, args);
    }

    @RequestMapping("/hello")
    public String hello(){
        return "ok";
    }
}

通过配置yaml支持多个profile

通过参数激活不同的端口

启动服务,分别启动8080和8082, 8080作为被录制的服务,8082用来回放处理。

goreplay源码分析

go语言启动是依靠main包下的main方法,打开goreplay源码,按照GitHub的md文件,配置一个实时录制回放

这里配置的参数是–input-raw :8080 --output-http http://localhost:8082,其他扩展参数同理。

启动分析

var (
	cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
	memprofile = flag.String("memprofile", "", "write memory profile to this file")
)

优先初始化main包全局变量,flag包函数表示可以参数传入,默认空字符串,然后初始化init函数

func init() {
	//结构体
	var defaultServeMux http.ServeMux
	http.DefaultServeMux = &defaultServeMux //获取指针,方便修改值
	//注册了很多http url处理逻辑
	http.HandleFunc("/debug/vars", func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json; charset=utf-8")
		fmt.Fprintf(w, "{\n")
		first := true
		expvar.Do(func(kv expvar.KeyValue) {
			if kv.Key == "memstats" || kv.Key == "cmdline" {
				return
			}

			if !first {
				fmt.Fprintf(w, ",\n")
			}
			first = false
			fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
		})
		fmt.Fprintf(w, "\n}\n")
	})

	http.HandleFunc("/debug/pprof/", httppptof.Index)
	http.HandleFunc("/debug/pprof/cmdline", httppptof.Cmdline)
	http.HandleFunc("/debug/pprof/profile", httppptof.Profile)
	http.HandleFunc("/debug/pprof/symbol", httppptof.Symbol)
	http.HandleFunc("/debug/pprof/trace", httppptof.Trace)
}

关键还是参数绑定的init函数,代码很简单,但是我们的自定义参数都来自于这里,也可以根据goreplay的设计自由的增加
settings.go

func init() {
	flag.Usage = usage
	flag.StringVar(&Settings.Pprof, "http-pprof", "", "Enable profiling. Starts  http server on specified port, exposing special /debug/pprof endpoint. Example: `:8181`")
	flag.IntVar(&Settings.Verbose, "verbose", 0, "set the level of verbosity, if greater than zero then it will turn on debug output")
	……


	flag.StringVar(&Settings.Middleware, "middleware", "", "Used for modifying traffic using external command")

	flag.Var(&Settings.OutputHTTP, "output-http", "Forwards incoming requests to given http address.\n\t# Redirect all incoming requests to staging.com address \n\tgor --input-raw :80 --output-http http://staging.com")

	……

	// default values, using for tests
	Settings.OutputFileConfig.SizeLimit = 33554432
	Settings.OutputFileConfig.OutputFileMaxSize = 1099511627776
	Settings.CopyBufferSize = 5242880

}

执行main

func main() {
	//设置cpu
	if os.Getenv("GOMAXPROCS") == "" {
		runtime.GOMAXPROCS(runtime.NumCPU() * 2)
	}

	//参数获取,另一种方式,第一个参数默认是go编译参数,所以下标从1开始
	args := os.Args[1:]
	//插件结构体定义,插件是goreplay的核心,相当与扩展
	var plugins *InOutPlugins
	//文件server模式,创建http server,一般不用这种模式
	if len(args) > 0 && args[0] == "file-server" {
		if len(args) != 2 {
			log.Fatal("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
		}
		dir, _ := os.Getwd()

		Debug(0, "Started example file server for current directory on address ", args[1])

		log.Fatal(http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))))
	} else {
		//绑定参数
		flag.Parse()
		//检查文件默认值
		checkSettings()
		//初始化插件,非常的关键;如果在settings.go文件自定义参数,那么这里插件需要对应增加,为啥不做成SPI机制,需要修改代码
		plugins = NewPlugins()
	}

	log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), VERSION)
	//插件检查,说明必须插件才能运行
	if len(plugins.Inputs) == 0 || len(plugins.Outputs) == 0 {
		log.Fatal("Required at least 1 input and 1 output")
	}
	
	if *memprofile != "" {
		profileMEM(*memprofile)
	}

	if *cpuprofile != "" {
		profileCPU(*cpuprofile)
	}

	if Settings.Pprof != "" {
		go func() {
			log.Println(http.ListenAndServe(Settings.Pprof, nil))
		}()
	}
	//创建关闭管道
	closeCh := make(chan int)
	//同步器和插件
	emitter := NewEmitter()
	//协程跑,核心代码,注册监听器就在这里
	go emitter.Start(plugins, Settings.Middleware)
	//定时退出,通过刚刚的管道
	if Settings.ExitAfter > 0 {
		log.Printf("Running gor for a duration of %s\n", Settings.ExitAfter)

		time.AfterFunc(Settings.ExitAfter, func() {
			log.Printf("gor run timeout %s\n", Settings.ExitAfter)
			close(closeCh)
		})
	}
	//信号管道
	c := make(chan os.Signal, 1)
	//Notify causes package signal to relay incoming signals to c,如果有中断信号
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	exit := 0
	select {
	case <-c:
		exit = 1
	case <-closeCh:
		exit = 0
	}
	//先close插件,然后引用置空,同步器就是这里用来处理插件的
	emitter.Close()
	//终止进程
	os.Exit(exit)
}

初始化插件代码NewPlugins();藕联很重啊,跟刚刚settings.go文件的绑定参数结合初始化插件,如果我们自定义一个插件,需要写一个参数settings.go绑定,写个文件通过结构体的继承方法函数,在这里注册

func NewPlugins() *InOutPlugins {
	plugins := new(InOutPlugins)

	……

	for _, options := range Settings.InputRAW {
		plugins.registerPlugin(NewRAWInput, options, Settings.RAWInputConfig)
	}

	……

	for _, options := range Settings.OutputHTTP {
		plugins.registerPlugin(NewHTTPOutput, options, &Settings.OutputHTTPConfig)
	}

	for _, options := range Settings.OutputBinary {
		plugins.registerPlugin(NewBinaryOutput, options, &Settings.OutputBinaryConfig)
	}

	if Settings.OutputKafkaConfig.Host != "" && Settings.OutputKafkaConfig.Topic != "" {
		plugins.registerPlugin(NewKafkaOutput, "", &Settings.OutputKafkaConfig, &Settings.KafkaTLSConfig)
	}

	if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
		plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
	}

	return plugins
}

注册就是反射调用使plugins增加插件结构体
RAWInput插件注册时,注册capture监听器

// NewRAWInput constructor for RAWInput. Accepts raw input config as arguments.
func NewRAWInput(address string, config RAWInputConfig) (i *RAWInput) {
	i = new(RAWInput)
	i.RAWInputConfig = config
	i.quit = make(chan bool)

	host, _ports, err := net.SplitHostPort(address)
	if err != nil {
		log.Fatalf("input-raw: error while parsing address: %s", err)
	}

	var ports []uint16
	if _ports != "" {
		portsStr := strings.Split(_ports, ",")

		for _, portStr := range portsStr {
			port, err := strconv.Atoi(strings.TrimSpace(portStr))
			if err != nil {
				log.Fatalf("parsing port error: %v", err)
			}
			ports = append(ports, uint16(port))

		}
	}

	i.host = host
	i.ports = ports
	//前面处理参数,这里添加监听
	i.listen(address)

	return
}

i.listen(address)

func (i *RAWInput) listen(address string) {
	var err error
	//创建监听器
	i.listener, err = capture.NewListener(i.host, i.ports, "", i.Engine, i.Protocol, i.TrackResponse, i.Expire, i.AllowIncomplete)
	if err != nil {
		log.Fatal(err)
	}
	i.listener.SetPcapOptions(i.PcapOptions)
	//激活监听
	err = i.listener.Activate()
	if err != nil {
		log.Fatal(err)
	}

	var ctx context.Context
	//cancel
	ctx, i.cancelListener = context.WithCancel(context.Background())
	//后台监听
	errCh := i.listener.ListenBackground(ctx)
	<-i.listener.Reading
	Debug(1, i)
	go func() {
		<-errCh // the listener closed voluntarily
		i.Close()
	}()
}

capture.NewListener,先找到网卡设备

en1就是我现在正在使用的网卡

WiFi无线网卡

i.listener.Activate()

func (l *Listener) activatePcap() error {
	var e error
	var msg string
	for _, ifi := range l.Interfaces {
		var handle *pcap.Handle
		//设置bpffilter
		handle, e = l.PcapHandle(ifi)
		if e != nil {
			msg += ("\n" + e.Error())
			continue
		}
		//把pcap的handle传递给监听器,这样listener的handle就可以拿到pcap的数据了
		l.Handles[ifi.Name] = packetHandle{
			handler: handle,
			ips:     interfaceIPs(ifi),
		}
	}
	if len(l.Handles) == 0 {
		return fmt.Errorf("pcap handles error:%s", msg)
	}
	return nil
}

设置bpfFilter,BPF是Berkely Packet Filter(伯克利数据包过滤器),提升pcap过滤性能的,文件位于/dev下
后来扩展了BPF为ebpf(extended Berkeley Packet Filter),相当于动态编译代码到内核态,通过用户态执行内核态指令并拿到结果,在k8s中广泛用于替代iptables。

  1. 可以使用 LLVM 或者 GCC 工具将编写的 BPF 代码程序编译成 BPF 字节码
  2. 使用加载程序 Loader 将字节码加载至内核
  3. 内核中运行的 BPF 字节码程序可以将测量数据回传至用户空间

监听器启动

// ListenBackground is like listen but can run concurrently and signal error through channel
func (l *Listener) ListenBackground(ctx context.Context) chan error {
	err := make(chan error, 1)
	go func() {
		defer close(err)
		if e := l.Listen(ctx); err != nil {
			err <- e
		}
	}()
	return err
}

读取网卡,等待信号close handle

// Listen listens for packets from the handles, and call handler on every packet received
// until the context done signal is sent or there is unrecoverable error on all handles.
// this function must be called after activating pcap handles
func (l *Listener) Listen(ctx context.Context) (err error) {
	l.read()
	done := ctx.Done()
	select {
	case <-done:
		close(l.quit) // signal close on all handles
		<-l.closeDone // wait all handles to be closed
		err = ctx.Err()
	case <-l.closeDone: // all handles closed voluntarily
	}
	return
}

注册就在read函数

func (l *Listener) read() {
	l.Lock()
	defer l.Unlock()
	//每个网卡一个handle,因为是协程,乱序执行
	for key, handle := range l.Handles {
		go func(key string, hndl packetHandle) {
			runtime.LockOSThread()

			defer l.closeHandles(key)
			linkSize := 14
			linkType := int(layers.LinkTypeEthernet)
			if _, ok := hndl.handler.(*pcap.Handle); ok {
				linkType = int(hndl.handler.(*pcap.Handle).LinkType())
				linkSize, ok = pcapLinkTypeLength(linkType)
				if !ok {
					if os.Getenv("GORDEBUG") != "0" {
						log.Printf("can not identify link type of an interface '%s'\n", key)
					}
					return // can't find the linktype size
				}
			}

			//这里有个协程go parser.wait()
			// 这里非常关键,l.messages就是messageParser的message,指针传递,**<font color="red">非常关键,后面数据回传就依靠这个message</font>**
			messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.expiry, l.allowIncomplete)
			//这里只能处理http协议,因为start和end函数是http,要解析其他协议,必须增加协议支持
			if l.protocol == tcp.ProtocolHTTP {
				messageParser.Start = http1StartHint
				messageParser.End = http1EndHint
			}

			timer := time.NewTicker(1 * time.Second)
			//这里是一个类似死循环
			for {
				select {
				case <-l.quit:
					return
				case <-timer.C:
					if h, ok := hndl.handler.(PcapStatProvider); ok {
						s, err := h.Stats()
						if err == nil {
							stats.Add("packets_received", int64(s.PacketsReceived))
							stats.Add("packets_dropped", int64(s.PacketsDropped))
							stats.Add("packets_if_dropped", int64(s.PacketsIfDropped))
						}
					}
				default:
					//**读包也是这里读取的,读取会触发管道parser.wait(),就是上面创建的**
					data, ci, err := hndl.handler.ReadPacketData()
					if err == nil {
						if l.TimestampType == "go" {
							ci.Timestamp = time.Now()
						}
						//触发管道,管道写parser.wait()
						messageParser.PacketHandler(&tcp.PcapPacket{
							Data:     data,
							LType:    linkType,
							LTypeLen: linkSize,
							Ci:       &ci,
						})
						continue
					}
					if enext, ok := err.(pcap.NextError); ok && enext == pcap.NextErrorTimeoutExpired {
						continue
					}
					if eno, ok := err.(syscall.Errno); ok && eno.Temporary() {
						continue
					}
					if enet, ok := err.(*net.OpError); ok && (enet.Temporary() || enet.Timeout()) {
						continue
					}
					if err == io.EOF || err == io.ErrClosedPipe {
						log.Printf("stopped reading from %s interface with error %s\n", key, err)
						return
					}

					log.Printf("stopped reading from %s interface with error %s\n", key, err)
					return
				}
			}
		}(key, handle)
	}
	close(l.Reading)
}

messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.expiry, l.allowIncomplete)
这句非常关键,因为抓取的packet,就是通过messages这个管道传输的,这里l.messages指针引用,就把messageParser和Listener的通过通信来共享内存打通了。

进一步看核心启动代码emitter.Start(plugins, Settings.Middleware)

// Start initialize loop for sending data from inputs to outputs
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
	if Settings.CopyBufferSize < 1 {
		Settings.CopyBufferSize = 5 << 20
	}
	e.plugins = plugins
	//中间件指令,是可以扩展的
	if middlewareCmd != "" {
		middleware := NewMiddleware(middlewareCmd)
		//扩展输入
		for _, in := range plugins.Inputs {
			middleware.ReadFrom(in)
		}

		e.plugins.Inputs = append(e.plugins.Inputs, middleware)
		e.plugins.All = append(e.plugins.All, middleware)
		e.Add(1)
		go func() {
			defer e.Done()
			if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
				Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
			}
		}()
	} else {
		//处理输入指令,demo是监听http 8080流量
		for _, in := range plugins.Inputs {
			e.Add(1)
			//协程
			go func(in PluginReader) {
				defer e.Done()
				//输入到输出
				if err := CopyMulty(in, plugins.Outputs...); err != nil {
					Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
				}
			}(in)
		}
	}
}

看看怎么处理的
func CopyMulty(src PluginReader, writers …PluginWriter) error
实际上就是Reader输入插件处理,然后使用Writer输出插件处理,细节先不管。

// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
	wIndex := 0
	modifier := NewHTTPModifier(&Settings.ModifierConfig)
	filteredRequests := make(map[string]int64)
	filteredRequestsLastCleanTime := time.Now().UnixNano()
	filteredCount := 0

	//死循环
	for {
		// 管道通信,管道获取message
		msg, err := src.PluginRead()
		if err != nil {
			if err == ErrorStopped || err == io.EOF {
				return nil
			}
			return err
		}
		if msg != nil && len(msg.Data) > 0 {
			if len(msg.Data) > int(Settings.CopyBufferSize) {
				msg.Data = msg.Data[:Settings.CopyBufferSize]
			}
			meta := payloadMeta(msg.Meta)
			if len(meta) < 3 {
				Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
				continue
			}
			requestID := byteutils.SliceToString(meta[1])
			// start a subroutine only when necessary
			if Settings.Verbose >= 3 {
				Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
			}
			if modifier != nil {
				Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
				if isRequestPayload(msg.Meta) {
					msg.Data = modifier.Rewrite(msg.Data)
					// If modifier tells to skip request
					if len(msg.Data) == 0 {
						filteredRequests[requestID] = time.Now().UnixNano()
						filteredCount++
						continue
					}
					Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)

				} else {
					if _, ok := filteredRequests[requestID]; ok {
						delete(filteredRequests, requestID)
						filteredCount--
						continue
					}
				}
			}

			if Settings.PrettifyHTTP {
				msg.Data = prettifyHTTP(msg.Data)
				if len(msg.Data) == 0 {
					continue
				}
			}

			if Settings.SplitOutput {
				if Settings.RecognizeTCPSessions {
					if !PRO {
						log.Fatal("Detailed TCP sessions work only with PRO license")
					}
					hasher := fnv.New32a()
					hasher.Write(meta[1])

					wIndex = int(hasher.Sum32()) % len(writers)
					if _, err := writers[wIndex].PluginWrite(msg); err != nil {
						return err
					}
				} else {
					// Simple round robin
					if _, err := writers[wIndex].PluginWrite(msg); err != nil {
						return err
					}

					wIndex = (wIndex + 1) % len(writers)
				}
			} else {
				//write输出插件处理
				for _, dst := range writers {
					if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
						return err
					}
				}
			}
		}

		// 每1000次主动GC
		// Run GC on each 1000 request
		if filteredCount > 0 && filteredCount%1000 == 0 {
			// Clean up filtered requests for which we didn't get a response to filter
			now := time.Now().UnixNano()
			if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
				for k, v := range filteredRequests {
					if now-v > int64(60*time.Second) {
						delete(filteredRequests, k)
						filteredCount--
					}
				}
				filteredRequestsLastCleanTime = time.Now().UnixNano()
			}
		}
	}
}

至此goreplay的启动代码就结束了,下面抓一个http试试

运行时监听

浏览器发起http://localhost:8080/hello
handle抓包,然后解析包

解析包

拿到data,这里的End

可以看到,http请求的URL,header,没有body

这里管道传递,不要通过共享内存来通信;而应通过通信来共享内存.

parser.messages在前面发现实际就是listener的messages,所以只需要listener的messages管道通信即可

在启动分析时就执行了这个代码,实际上是管道阻塞逻辑

极其关键的代码

// PluginRead reads meassage from this plugin
func (i *RAWInput) PluginRead() (*Message, error) {
	var msgTCP *tcp.Message
	var msg Message
	select {
	case <-i.quit:
		return nil, ErrorStopped
	case msgTCP = <-i.listener.Messages(): //管道通信,就是这里获取结果
		msg.Data = msgTCP.Data()
	}

	var msgType byte = ResponsePayload
	if msgTCP.Direction == tcp.DirIncoming {
		msgType = RequestPayload
		if i.RealIPHeader != "" {
			msg.Data = proto.SetHeader(msg.Data, []byte(i.RealIPHeader), []byte(msgTCP.SrcAddr))
		}
	}

	//goreplay自己拼装的meta 定义的msgType uuid 启动纳秒 解析packet耗时
	msg.Meta = payloadHeader(msgType, msgTCP.UUID(), msgTCP.Start.UnixNano(), msgTCP.End.UnixNano()-msgTCP.Start.UnixNano())

	// to be removed....
	if msgTCP.Truncated {
		Debug(2, "[INPUT-RAW] message truncated, increase copy-buffer-size")
	}
	// to be removed...
	if msgTCP.TimedOut {
		Debug(2, "[INPUT-RAW] message timeout reached, increase input-raw-expire")
	}
	if i.Stats {
		stat := msgTCP.Stats
		go i.addStats(stat)
	}
	msgTCP = nil
	return &msg, nil  //拿到包
}

然后执行write插件

writer插件

以outputhttp为例,这个插件表示直接把包转发到一个url
在注册插件时

for _, options := range Settings.OutputHTTP {
	plugins.registerPlugin(NewHTTPOutput, options, &Settings.OutputHTTPConfig)
}

写好了通信逻辑

for i := 0; i < o.config.WorkersMin; i++ {
	go o.startWorker()
}

定义了

func (o *HTTPOutput) startWorker() {
	for {
		select {
		case <-o.stopWorker:
			return
		case msg := <-o.queue: //通信,发送请求,msg就是共享的数据
			o.sendRequest(o.client, msg)
		}
	}
}

然后就是监听触发

//write输出插件处理
for _, dst := range writers {
	if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
		return err
	}
}

这个在上面提到,就是插件处理逻辑,先read然后多个插件write

// PluginWrite writes message to this plugin
func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
	//通过封装的meta检查payload,就是抓的包,是不是符合要求的
	if !isRequestPayload(msg.Meta) {
		return len(msg.Data), nil
	}

	select {
	case <-o.stop:
		return 0, ErrorStopped
	case o.queue <- msg: // 通信传递,刚刚的startWorker就可以干活了
	}

	if o.config.Stats {
		o.queueStats.Write(len(o.queue))
	}
	//startWorker如果干活忙不过来
	if len(o.queue) > 0 {
		// try to start a new worker to serve
		if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
			//启动多协程干活
			go o.startWorker()
			atomic.AddInt32(&o.activeWorkers, 1)
		}
	}
	return len(msg.Data) + len(msg.Meta), nil
}

发送http的代码就很简单了

func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
	if !isRequestPayload(msg.Meta) {
		return
	}

	uuid := payloadID(msg.Meta)
	start := time.Now()
	//发送http,获取结果,这里可以加工处理,比如把req resp做其他加工,还能拿到耗时
	resp, err := client.Send(msg.Data)
	stop := time.Now()

	if err != nil {
		Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
		return
	}
	if resp == nil {
		return
	}

	//Track刚刚发送的结果,这里的uuid就是标识,可以对应请求和结果,因为请求结果的返回不知道先后
	//也可以修改代码把输入加进去,track顾名思义是继续作为输入
	if o.config.TrackResponses {
		o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
	}

	//es这块处理很好,直接把data和resp放在一起
	if o.elasticSearch != nil {
		o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
	}
}

TrackResponses
在这里插入图片描述

client.Send(msg.Data)

func (c *HTTPClient) Send(data []byte) ([]byte, error) {
	var req *http.Request
	var resp *http.Response
	var err error

	//包装request,比如去掉Host
	req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
	if err != nil {
		return nil, err
	}
	// we don't send CONNECT or OPTIONS request
	if req.Method == http.MethodConnect {
		return nil, nil
	}

	if !c.config.OriginalHost {
		req.Host = c.config.url.Host
	}

	// fix #862
	if c.config.url.Path == "" && c.config.url.RawQuery == "" {
		req.URL.Scheme = c.config.url.Scheme
		req.URL.Host = c.config.url.Host
	} else {
		req.URL = c.config.url
	}

	// force connection to not be closed, which can affect the global client
	req.Close = false
	// it's an error if this is not equal to empty string
	req.RequestURI = ""
	// 发送请求
	resp, err = c.Client.Do(req)
	if err != nil {
		return nil, err
	}
	//track 才会dump,毕竟http是输出流,这里持久化,否则close后就拿不到结果
	if c.config.TrackResponses {
		return httputil.DumpResponse(resp, true)
	}
	_ = resp.Body.Close()
	return nil, nil
}

至此源码分析结束,这里仅仅分析了一个输入一个输出参数,实际上是插件模式,可以配置很多参数,原理相同。

总结

这里仅仅使用goreplay为例,其他的流量录制可能会有很多区别,比如代理模式。而且goreplay此次示例使用应用层回放,实际上可以使用传输层回放,直接把packet处理了,扔回去,这样就很通用,但是非常不明确,很难读。吐槽go的管道,真的很绕,尤其是指针传递管道,开始笔者一头雾水,还是仔细阅读源码才明白。

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到