goreplay 录制回放源码解析
前言
一般而言流量的录制与回放经常在测试过程中使用,自动化批量验证功能,或者用来做ABTest实验。流量在互联网公司就是数据,是用户,因而很多互联网公司就是流量公司,就是风口,本质上讲大数据分析就是分析流量,比如:杀熟。流量的回放仅仅是流量的一种用途,流量的录制是基础,只有录制好流量,可以有很多用途。
流量录制
流量的录制实际上有很多种方式,常见的有2种:代理和监听
- 代理
代理顾名思义,就是流量代理转发复制一份数据,nginx,sidecar等都是代理的表现。 - 监听
监听就是偏硬件驱动方式,就是网卡回调的时候,告诉驱动再回调一次监听程序,从而拿到流量。
流量回放
流量回放仅仅是流量的一种处理方式,流量用处极大,回放一般用于测试或者ABTest。流量的回放可以应用层回放,也可以传输层回放,应用层回放具有侵入性,但是能精细修改数据,数据友好可见;传输层回放直接发送tcp或者udp数据包即可。根据实际情况选择方式
数据模拟
数据模拟熟称mock,理论上流量的录制也可以是mock,录制流量实际上不仅仅是录制入口流量,包括数据库、缓存、mq的都需要录制,并提供mock能力,方便回放,这个就需要定制了。
goreplay的源码分析
上面说了很多概念,实际上录制选型2种方案,对应用影响最小的网卡监听,在go语言的gopacket有现成的抓包能力,goreplay封装了http处理能力。另外开源的有tcpcopy,也可以使用代理方案,代理方案对业务侵入性比较大,要考虑影响。
goreplay demo构造
随意写一个http demo,Spring boot 2.6.4版本,配置多个实例运行
package com.feng.boot.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
public class BootMain {
public static void main(String[] args) {
SpringApplication.run(BootMain.class, args);
}
@RequestMapping("/hello")
public String hello(){
return "ok";
}
}
通过配置yaml支持多个profile
通过参数激活不同的端口
启动服务,分别启动8080和8082, 8080作为被录制的服务,8082用来回放处理。
goreplay源码分析
go语言启动是依靠main包下的main方法,打开goreplay源码,按照GitHub的md文件,配置一个实时录制回放
这里配置的参数是–input-raw :8080 --output-http http://localhost:8082,其他扩展参数同理。
启动分析
var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to this file")
)
优先初始化main包全局变量,flag包函数表示可以参数传入,默认空字符串,然后初始化init函数
func init() {
//结构体
var defaultServeMux http.ServeMux
http.DefaultServeMux = &defaultServeMux //获取指针,方便修改值
//注册了很多http url处理逻辑
http.HandleFunc("/debug/vars", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, "{\n")
first := true
expvar.Do(func(kv expvar.KeyValue) {
if kv.Key == "memstats" || kv.Key == "cmdline" {
return
}
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
})
http.HandleFunc("/debug/pprof/", httppptof.Index)
http.HandleFunc("/debug/pprof/cmdline", httppptof.Cmdline)
http.HandleFunc("/debug/pprof/profile", httppptof.Profile)
http.HandleFunc("/debug/pprof/symbol", httppptof.Symbol)
http.HandleFunc("/debug/pprof/trace", httppptof.Trace)
}
关键还是参数绑定的init函数,代码很简单,但是我们的自定义参数都来自于这里,也可以根据goreplay的设计自由的增加
settings.go
func init() {
flag.Usage = usage
flag.StringVar(&Settings.Pprof, "http-pprof", "", "Enable profiling. Starts http server on specified port, exposing special /debug/pprof endpoint. Example: `:8181`")
flag.IntVar(&Settings.Verbose, "verbose", 0, "set the level of verbosity, if greater than zero then it will turn on debug output")
……
flag.StringVar(&Settings.Middleware, "middleware", "", "Used for modifying traffic using external command")
flag.Var(&Settings.OutputHTTP, "output-http", "Forwards incoming requests to given http address.\n\t# Redirect all incoming requests to staging.com address \n\tgor --input-raw :80 --output-http http://staging.com")
……
// default values, using for tests
Settings.OutputFileConfig.SizeLimit = 33554432
Settings.OutputFileConfig.OutputFileMaxSize = 1099511627776
Settings.CopyBufferSize = 5242880
}
执行main
func main() {
//设置cpu
if os.Getenv("GOMAXPROCS") == "" {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)
}
//参数获取,另一种方式,第一个参数默认是go编译参数,所以下标从1开始
args := os.Args[1:]
//插件结构体定义,插件是goreplay的核心,相当与扩展
var plugins *InOutPlugins
//文件server模式,创建http server,一般不用这种模式
if len(args) > 0 && args[0] == "file-server" {
if len(args) != 2 {
log.Fatal("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
}
dir, _ := os.Getwd()
Debug(0, "Started example file server for current directory on address ", args[1])
log.Fatal(http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))))
} else {
//绑定参数
flag.Parse()
//检查文件默认值
checkSettings()
//初始化插件,非常的关键;如果在settings.go文件自定义参数,那么这里插件需要对应增加,为啥不做成SPI机制,需要修改代码
plugins = NewPlugins()
}
log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), VERSION)
//插件检查,说明必须插件才能运行
if len(plugins.Inputs) == 0 || len(plugins.Outputs) == 0 {
log.Fatal("Required at least 1 input and 1 output")
}
if *memprofile != "" {
profileMEM(*memprofile)
}
if *cpuprofile != "" {
profileCPU(*cpuprofile)
}
if Settings.Pprof != "" {
go func() {
log.Println(http.ListenAndServe(Settings.Pprof, nil))
}()
}
//创建关闭管道
closeCh := make(chan int)
//同步器和插件
emitter := NewEmitter()
//协程跑,核心代码,注册监听器就在这里
go emitter.Start(plugins, Settings.Middleware)
//定时退出,通过刚刚的管道
if Settings.ExitAfter > 0 {
log.Printf("Running gor for a duration of %s\n", Settings.ExitAfter)
time.AfterFunc(Settings.ExitAfter, func() {
log.Printf("gor run timeout %s\n", Settings.ExitAfter)
close(closeCh)
})
}
//信号管道
c := make(chan os.Signal, 1)
//Notify causes package signal to relay incoming signals to c,如果有中断信号
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
exit := 0
select {
case <-c:
exit = 1
case <-closeCh:
exit = 0
}
//先close插件,然后引用置空,同步器就是这里用来处理插件的
emitter.Close()
//终止进程
os.Exit(exit)
}
初始化插件代码NewPlugins();藕联很重啊,跟刚刚settings.go文件的绑定参数结合初始化插件,如果我们自定义一个插件,需要写一个参数settings.go绑定,写个文件通过结构体的继承方法函数,在这里注册
func NewPlugins() *InOutPlugins {
plugins := new(InOutPlugins)
……
for _, options := range Settings.InputRAW {
plugins.registerPlugin(NewRAWInput, options, Settings.RAWInputConfig)
}
……
for _, options := range Settings.OutputHTTP {
plugins.registerPlugin(NewHTTPOutput, options, &Settings.OutputHTTPConfig)
}
for _, options := range Settings.OutputBinary {
plugins.registerPlugin(NewBinaryOutput, options, &Settings.OutputBinaryConfig)
}
if Settings.OutputKafkaConfig.Host != "" && Settings.OutputKafkaConfig.Topic != "" {
plugins.registerPlugin(NewKafkaOutput, "", &Settings.OutputKafkaConfig, &Settings.KafkaTLSConfig)
}
if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
}
return plugins
}
注册就是反射调用使plugins增加插件结构体
RAWInput插件注册时,注册capture监听器
// NewRAWInput constructor for RAWInput. Accepts raw input config as arguments.
func NewRAWInput(address string, config RAWInputConfig) (i *RAWInput) {
i = new(RAWInput)
i.RAWInputConfig = config
i.quit = make(chan bool)
host, _ports, err := net.SplitHostPort(address)
if err != nil {
log.Fatalf("input-raw: error while parsing address: %s", err)
}
var ports []uint16
if _ports != "" {
portsStr := strings.Split(_ports, ",")
for _, portStr := range portsStr {
port, err := strconv.Atoi(strings.TrimSpace(portStr))
if err != nil {
log.Fatalf("parsing port error: %v", err)
}
ports = append(ports, uint16(port))
}
}
i.host = host
i.ports = ports
//前面处理参数,这里添加监听
i.listen(address)
return
}
i.listen(address)
func (i *RAWInput) listen(address string) {
var err error
//创建监听器
i.listener, err = capture.NewListener(i.host, i.ports, "", i.Engine, i.Protocol, i.TrackResponse, i.Expire, i.AllowIncomplete)
if err != nil {
log.Fatal(err)
}
i.listener.SetPcapOptions(i.PcapOptions)
//激活监听
err = i.listener.Activate()
if err != nil {
log.Fatal(err)
}
var ctx context.Context
//cancel
ctx, i.cancelListener = context.WithCancel(context.Background())
//后台监听
errCh := i.listener.ListenBackground(ctx)
<-i.listener.Reading
Debug(1, i)
go func() {
<-errCh // the listener closed voluntarily
i.Close()
}()
}
capture.NewListener,先找到网卡设备
en1就是我现在正在使用的网卡
WiFi无线网卡
i.listener.Activate()
func (l *Listener) activatePcap() error {
var e error
var msg string
for _, ifi := range l.Interfaces {
var handle *pcap.Handle
//设置bpffilter
handle, e = l.PcapHandle(ifi)
if e != nil {
msg += ("\n" + e.Error())
continue
}
//把pcap的handle传递给监听器,这样listener的handle就可以拿到pcap的数据了
l.Handles[ifi.Name] = packetHandle{
handler: handle,
ips: interfaceIPs(ifi),
}
}
if len(l.Handles) == 0 {
return fmt.Errorf("pcap handles error:%s", msg)
}
return nil
}

设置bpfFilter,BPF是Berkely Packet Filter(伯克利数据包过滤器),提升pcap过滤性能的,文件位于/dev下
后来扩展了BPF为ebpf(extended Berkeley Packet Filter),相当于动态编译代码到内核态,通过用户态执行内核态指令并拿到结果,在k8s中广泛用于替代iptables。
- 可以使用 LLVM 或者 GCC 工具将编写的 BPF 代码程序编译成 BPF 字节码
- 使用加载程序 Loader 将字节码加载至内核
- 内核中运行的 BPF 字节码程序可以将测量数据回传至用户空间

监听器启动
// ListenBackground is like listen but can run concurrently and signal error through channel
func (l *Listener) ListenBackground(ctx context.Context) chan error {
err := make(chan error, 1)
go func() {
defer close(err)
if e := l.Listen(ctx); err != nil {
err <- e
}
}()
return err
}
读取网卡,等待信号close handle
// Listen listens for packets from the handles, and call handler on every packet received
// until the context done signal is sent or there is unrecoverable error on all handles.
// this function must be called after activating pcap handles
func (l *Listener) Listen(ctx context.Context) (err error) {
l.read()
done := ctx.Done()
select {
case <-done:
close(l.quit) // signal close on all handles
<-l.closeDone // wait all handles to be closed
err = ctx.Err()
case <-l.closeDone: // all handles closed voluntarily
}
return
}
注册就在read函数
func (l *Listener) read() {
l.Lock()
defer l.Unlock()
//每个网卡一个handle,因为是协程,乱序执行
for key, handle := range l.Handles {
go func(key string, hndl packetHandle) {
runtime.LockOSThread()
defer l.closeHandles(key)
linkSize := 14
linkType := int(layers.LinkTypeEthernet)
if _, ok := hndl.handler.(*pcap.Handle); ok {
linkType = int(hndl.handler.(*pcap.Handle).LinkType())
linkSize, ok = pcapLinkTypeLength(linkType)
if !ok {
if os.Getenv("GORDEBUG") != "0" {
log.Printf("can not identify link type of an interface '%s'\n", key)
}
return // can't find the linktype size
}
}
//这里有个协程go parser.wait()
// 这里非常关键,l.messages就是messageParser的message,指针传递,**<font color="red">非常关键,后面数据回传就依靠这个message</font>**
messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.expiry, l.allowIncomplete)
//这里只能处理http协议,因为start和end函数是http,要解析其他协议,必须增加协议支持
if l.protocol == tcp.ProtocolHTTP {
messageParser.Start = http1StartHint
messageParser.End = http1EndHint
}
timer := time.NewTicker(1 * time.Second)
//这里是一个类似死循环
for {
select {
case <-l.quit:
return
case <-timer.C:
if h, ok := hndl.handler.(PcapStatProvider); ok {
s, err := h.Stats()
if err == nil {
stats.Add("packets_received", int64(s.PacketsReceived))
stats.Add("packets_dropped", int64(s.PacketsDropped))
stats.Add("packets_if_dropped", int64(s.PacketsIfDropped))
}
}
default:
//**读包也是这里读取的,读取会触发管道parser.wait(),就是上面创建的**
data, ci, err := hndl.handler.ReadPacketData()
if err == nil {
if l.TimestampType == "go" {
ci.Timestamp = time.Now()
}
//触发管道,管道写parser.wait()
messageParser.PacketHandler(&tcp.PcapPacket{
Data: data,
LType: linkType,
LTypeLen: linkSize,
Ci: &ci,
})
continue
}
if enext, ok := err.(pcap.NextError); ok && enext == pcap.NextErrorTimeoutExpired {
continue
}
if eno, ok := err.(syscall.Errno); ok && eno.Temporary() {
continue
}
if enet, ok := err.(*net.OpError); ok && (enet.Temporary() || enet.Timeout()) {
continue
}
if err == io.EOF || err == io.ErrClosedPipe {
log.Printf("stopped reading from %s interface with error %s\n", key, err)
return
}
log.Printf("stopped reading from %s interface with error %s\n", key, err)
return
}
}
}(key, handle)
}
close(l.Reading)
}
messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.expiry, l.allowIncomplete)
这句非常关键,因为抓取的packet,就是通过messages这个管道传输的,这里l.messages指针引用,就把messageParser和Listener的通过通信来共享内存打通了。
进一步看核心启动代码emitter.Start(plugins, Settings.Middleware)
// Start initialize loop for sending data from inputs to outputs
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
if Settings.CopyBufferSize < 1 {
Settings.CopyBufferSize = 5 << 20
}
e.plugins = plugins
//中间件指令,是可以扩展的
if middlewareCmd != "" {
middleware := NewMiddleware(middlewareCmd)
//扩展输入
for _, in := range plugins.Inputs {
middleware.ReadFrom(in)
}
e.plugins.Inputs = append(e.plugins.Inputs, middleware)
e.plugins.All = append(e.plugins.All, middleware)
e.Add(1)
go func() {
defer e.Done()
if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
}
}()
} else {
//处理输入指令,demo是监听http 8080流量
for _, in := range plugins.Inputs {
e.Add(1)
//协程
go func(in PluginReader) {
defer e.Done()
//输入到输出
if err := CopyMulty(in, plugins.Outputs...); err != nil {
Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
}
}(in)
}
}
}
看看怎么处理的
func CopyMulty(src PluginReader, writers …PluginWriter) error
实际上就是Reader输入插件处理,然后使用Writer输出插件处理,细节先不管。
// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
wIndex := 0
modifier := NewHTTPModifier(&Settings.ModifierConfig)
filteredRequests := make(map[string]int64)
filteredRequestsLastCleanTime := time.Now().UnixNano()
filteredCount := 0
//死循环
for {
// 管道通信,管道获取message
msg, err := src.PluginRead()
if err != nil {
if err == ErrorStopped || err == io.EOF {
return nil
}
return err
}
if msg != nil && len(msg.Data) > 0 {
if len(msg.Data) > int(Settings.CopyBufferSize) {
msg.Data = msg.Data[:Settings.CopyBufferSize]
}
meta := payloadMeta(msg.Meta)
if len(meta) < 3 {
Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
continue
}
requestID := byteutils.SliceToString(meta[1])
// start a subroutine only when necessary
if Settings.Verbose >= 3 {
Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
}
if modifier != nil {
Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
if isRequestPayload(msg.Meta) {
msg.Data = modifier.Rewrite(msg.Data)
// If modifier tells to skip request
if len(msg.Data) == 0 {
filteredRequests[requestID] = time.Now().UnixNano()
filteredCount++
continue
}
Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)
} else {
if _, ok := filteredRequests[requestID]; ok {
delete(filteredRequests, requestID)
filteredCount--
continue
}
}
}
if Settings.PrettifyHTTP {
msg.Data = prettifyHTTP(msg.Data)
if len(msg.Data) == 0 {
continue
}
}
if Settings.SplitOutput {
if Settings.RecognizeTCPSessions {
if !PRO {
log.Fatal("Detailed TCP sessions work only with PRO license")
}
hasher := fnv.New32a()
hasher.Write(meta[1])
wIndex = int(hasher.Sum32()) % len(writers)
if _, err := writers[wIndex].PluginWrite(msg); err != nil {
return err
}
} else {
// Simple round robin
if _, err := writers[wIndex].PluginWrite(msg); err != nil {
return err
}
wIndex = (wIndex + 1) % len(writers)
}
} else {
//write输出插件处理
for _, dst := range writers {
if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
return err
}
}
}
}
// 每1000次主动GC
// Run GC on each 1000 request
if filteredCount > 0 && filteredCount%1000 == 0 {
// Clean up filtered requests for which we didn't get a response to filter
now := time.Now().UnixNano()
if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
for k, v := range filteredRequests {
if now-v > int64(60*time.Second) {
delete(filteredRequests, k)
filteredCount--
}
}
filteredRequestsLastCleanTime = time.Now().UnixNano()
}
}
}
}
至此goreplay的启动代码就结束了,下面抓一个http试试
运行时监听
浏览器发起http://localhost:8080/hello
handle抓包,然后解析包
解析包
拿到data,这里的End
可以看到,http请求的URL,header,没有body
这里管道传递,不要通过共享内存来通信;而应通过通信来共享内存.
parser.messages在前面发现实际就是listener的messages,所以只需要listener的messages管道通信即可
在启动分析时就执行了这个代码,实际上是管道阻塞逻辑
极其关键的代码
// PluginRead reads meassage from this plugin
func (i *RAWInput) PluginRead() (*Message, error) {
var msgTCP *tcp.Message
var msg Message
select {
case <-i.quit:
return nil, ErrorStopped
case msgTCP = <-i.listener.Messages(): //管道通信,就是这里获取结果
msg.Data = msgTCP.Data()
}
var msgType byte = ResponsePayload
if msgTCP.Direction == tcp.DirIncoming {
msgType = RequestPayload
if i.RealIPHeader != "" {
msg.Data = proto.SetHeader(msg.Data, []byte(i.RealIPHeader), []byte(msgTCP.SrcAddr))
}
}
//goreplay自己拼装的meta 定义的msgType uuid 启动纳秒 解析packet耗时
msg.Meta = payloadHeader(msgType, msgTCP.UUID(), msgTCP.Start.UnixNano(), msgTCP.End.UnixNano()-msgTCP.Start.UnixNano())
// to be removed....
if msgTCP.Truncated {
Debug(2, "[INPUT-RAW] message truncated, increase copy-buffer-size")
}
// to be removed...
if msgTCP.TimedOut {
Debug(2, "[INPUT-RAW] message timeout reached, increase input-raw-expire")
}
if i.Stats {
stat := msgTCP.Stats
go i.addStats(stat)
}
msgTCP = nil
return &msg, nil //拿到包
}
然后执行write插件
writer插件
以outputhttp为例,这个插件表示直接把包转发到一个url
在注册插件时
for _, options := range Settings.OutputHTTP {
plugins.registerPlugin(NewHTTPOutput, options, &Settings.OutputHTTPConfig)
}
写好了通信逻辑
for i := 0; i < o.config.WorkersMin; i++ {
go o.startWorker()
}
定义了
func (o *HTTPOutput) startWorker() {
for {
select {
case <-o.stopWorker:
return
case msg := <-o.queue: //通信,发送请求,msg就是共享的数据
o.sendRequest(o.client, msg)
}
}
}
然后就是监听触发
//write输出插件处理
for _, dst := range writers {
if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
return err
}
}
这个在上面提到,就是插件处理逻辑,先read然后多个插件write
// PluginWrite writes message to this plugin
func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
//通过封装的meta检查payload,就是抓的包,是不是符合要求的
if !isRequestPayload(msg.Meta) {
return len(msg.Data), nil
}
select {
case <-o.stop:
return 0, ErrorStopped
case o.queue <- msg: // 通信传递,刚刚的startWorker就可以干活了
}
if o.config.Stats {
o.queueStats.Write(len(o.queue))
}
//startWorker如果干活忙不过来
if len(o.queue) > 0 {
// try to start a new worker to serve
if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
//启动多协程干活
go o.startWorker()
atomic.AddInt32(&o.activeWorkers, 1)
}
}
return len(msg.Data) + len(msg.Meta), nil
}
发送http的代码就很简单了
func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
if !isRequestPayload(msg.Meta) {
return
}
uuid := payloadID(msg.Meta)
start := time.Now()
//发送http,获取结果,这里可以加工处理,比如把req resp做其他加工,还能拿到耗时
resp, err := client.Send(msg.Data)
stop := time.Now()
if err != nil {
Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
return
}
if resp == nil {
return
}
//Track刚刚发送的结果,这里的uuid就是标识,可以对应请求和结果,因为请求结果的返回不知道先后
//也可以修改代码把输入加进去,track顾名思义是继续作为输入
if o.config.TrackResponses {
o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
}
//es这块处理很好,直接把data和resp放在一起
if o.elasticSearch != nil {
o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
}
}
TrackResponses
client.Send(msg.Data)
func (c *HTTPClient) Send(data []byte) ([]byte, error) {
var req *http.Request
var resp *http.Response
var err error
//包装request,比如去掉Host
req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
if err != nil {
return nil, err
}
// we don't send CONNECT or OPTIONS request
if req.Method == http.MethodConnect {
return nil, nil
}
if !c.config.OriginalHost {
req.Host = c.config.url.Host
}
// fix #862
if c.config.url.Path == "" && c.config.url.RawQuery == "" {
req.URL.Scheme = c.config.url.Scheme
req.URL.Host = c.config.url.Host
} else {
req.URL = c.config.url
}
// force connection to not be closed, which can affect the global client
req.Close = false
// it's an error if this is not equal to empty string
req.RequestURI = ""
// 发送请求
resp, err = c.Client.Do(req)
if err != nil {
return nil, err
}
//track 才会dump,毕竟http是输出流,这里持久化,否则close后就拿不到结果
if c.config.TrackResponses {
return httputil.DumpResponse(resp, true)
}
_ = resp.Body.Close()
return nil, nil
}
至此源码分析结束,这里仅仅分析了一个输入一个输出参数,实际上是插件模式,可以配置很多参数,原理相同。
总结
这里仅仅使用goreplay为例,其他的流量录制可能会有很多区别,比如代理模式。而且goreplay此次示例使用应用层回放,实际上可以使用传输层回放,直接把packet处理了,扔回去,这样就很通用,但是非常不明确,很难读。吐槽go的管道,真的很绕,尤其是指针传递管道,开始笔者一头雾水,还是仔细阅读源码才明白。