frp 是一款高性能的反向代理应用,专注于内网穿透,支持多种协议和 P2P 通信功能,目前在 GitHub 上已有 80k 的 star。本文将深入探讨其源码,揭示其背后的实现原理。
1. 前言
frp 是一款高性能的反向代理应用,专注于内网穿透。它支持多种协议,包括 TCP、UDP、HTTP、HTTPS 等,并且具备 P2P 通信功能。使用 frp,您可以安全、便捷地将内网服务暴露到公网,通过拥有公网 IP 的节点进行中转,具体场景就是:将客户端部署到你的内网中,然后该客户端与你内网服务网络可达,当客户端与在公网的服务端连接后,我们就可以通过访问服务端的指定端口,去访问到内网服务。
目前 GitHub 已经有 80k 的 star,这么猛的项目,我决定阅读一番源码偷师一波。
2. pkg/auth
这个包负责客户端和服务端认证的代码,这里面一共用到了 2 种验证机制,一种是基于 token,就是预共享密钥,客户端和服务端实现配置一样的字符串密钥,第二种是 OAuth 2.0,依赖第三方授权服务器颁发的访问令牌,然后客户端带着令牌去访问服务端。
这里面有很多技巧值得学习:
2.1. 工厂函数
通过不同的配置生成对应的认证方式。
type Setter interface {
SetLogin(*msg.Login) error
SetPing(*msg.Ping) error
SetNewWorkConn(*msg.NewWorkConn) error
}
// 根据客户端配置创建认证提供者
func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter) {
switch cfg.Method {
// token 认证模式
case v1.AuthMethodToken:
authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)
// openid 认证模式
case v1.AuthMethodOIDC:
authProvider = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)
default:
panic(fmt.Sprintf(「wrong method: 『%s』」, cfg.Method))
}
return authProvider
}
2.2. 常量时间的字符串比较
正常情况来说,token 模式下,两边比较一下字符串是不是相等就完了,但其实这个是有安全隐患的,第一个就是攻击者可以进行重放攻击,一直进行密码爆破,第二个就是攻击者可以进行定时攻击,比如普通比较(如 ==
)在发现第一个不匹配字节时会立即返回,攻击者可通过测量响应时间差异推断出匹配的字节位置,ConstantTimeCompare
始终遍历全部字节(即使已发现不匹配),使攻击者无法通过时间差获取敏感信息。
// token 和客户端上线的时间戳组成 key
func GetAuthKey(token string, timestamp int64) (key string) {
md5Ctx := md5.New()
md5Ctx.Write([]byte(token))
md5Ctx.Write([]byte(strconv.FormatInt(timestamp, 10)))
data := md5Ctx.Sum(nil)
return hex.EncodeToString(data)
}
// 全量匹配字节
func ConstantTimeCompare(x, y []byte) int {
if len(x) != len(y) {
return 0
}
var v byte
for i := 0; i < len(x); i++ {
v |= x[i] ^ y[i]
}
return ConstantTimeByteEq(v, 0)
}
// ConstantTimeByteEq returns 1 if x == y and 0 otherwise.
func ConstantTimeByteEq(x, y uint8) int {
return int((uint32(x^y) - 1) >> 31)
}
3. pkg/config
config 文件夹是 frp
配置管理的核心模块,涵盖了配置的加载、解析、验证、转换和命令行支持等功能。它确保了 frp
的灵活性和兼容性,同时为用户提供了多种配置方式。
3.1. 使用环境变量进行模板渲染
serverAddr = 「{{ .Envs.FRP_SERVER_ADDR }}」
serverPort = 7000
[[proxies]]
name = 「ssh」
type = 「tcp」
localIP = 「127.0.0.1」
localPort = 22
remotePort = {{ .Envs.FRP_SSH_REMOTE_PORT }}
export FRP_SERVER_ADDR=「x.x.x.x」
export FRP_SSH_REMOTE_PORT=「6000」
./frpc -C ./frpc.toml
这个实现是采用了 template 模板库,其中 Envs 前缀是由字段名 Envs
决定的:
type Values struct {
Envs map[string]string // 「{{ .Envs.FRP_SERVER_ADDR }}」 Envs 的由来
}
func RenderWithTemplate(in []byte, values *Values) ([]byte, error) {
tmpl, err := template.New(「frp」).Funcs(template.FuncMap{
「parseNumberRange」: parseNumberRange,
「parseNumberRangePair」: parseNumberRangePair,
}).Parse(string(in))
if err != nil {
return nil, err
}
buffer := bytes.NewBufferString(「」)
if err := tmpl.Execute(buffer, values); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
// 将端口范围解析为 端口列表
func parseNumberRange(firstRangeStr string) ([]int64, error) {
... ...
}
这里面有一些自定义的解析函数,比如说:
ports = 「{{ parseNumberRange .Envs.PORT_RANGE }}」
export PORT_RANGE = 「1000-1005」
// 这样 ports 就会被 template 的 parseNumberRange 函数解析并渲染为
// ports = 1000, 1001, 1002, 1003, 1004, 1005
3.2. 配置拆分
通过 includes
参数可以在主配置中包含其他配置文件,从而实现将代理配置拆分到多个文件中管理
# frpc.toml
serverAddr = 「x.x.x.x」
serverPort = 7000
includes = [「./confd/*.toml」]
上述配置在 frpc.toml 中通过 includes 额外包含了 ./confd
目录下所有的 toml 文件的代理配置内容,效果等价于将这两个文件合并成一个文件。
这个实现是采用了,循环读取文件内容 + 模板渲染 + 配置合并+ toml 反序列化 的方法:
// 主文件配置,就是 frpc.toml
var content []byte
content, err = GetRenderedConfFromFile(filePath)
if err != nil {
return
}
configBuffer := bytes.NewBuffer(nil)
configBuffer.Write(content)
... ...
var buf []byte
// 循环读取 include 的文件
// getIncludeContents
// ->ReadFile
// ->RenderContent
// ->template.New(「frp」).Parse(string(in))
buf, err = getIncludeContents(cfg.IncludeConfigFiles)
if err != nil {
err = fmt.Errorf(「getIncludeContents error: %v」, err)
return
}
configBuffer.WriteString(「
」)
configBuffer.Write(buf)
// 将所有配置合并,然后将 toml 序列化为 type ClientCommonConf struct
代理 Cfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {
return
}
return
3.3. 配置热加载
frpc reload -C ./frpc.toml
等待一段时间后,客户端将根据新的配置文件创建、更新或删除代理。
这里面也比较简单,主要逻辑在于配置校验,旧配置中与新配置里同名的且代理内容不一样的 proxy 停止,新增的配置的 proxy 再启动,也就是说老配置和新配置完全一样的是不动的
func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {
xl := xlog.FromContextSafe(pm.ctx)
proxyCfgsMap := lo.KeyBy(proxyCfgs, func(C v1.ProxyConfigurer) string {
return C.GetBaseConfig().Name
})
pm.mu.Lock()
defer pm.mu.Unlock()
delPxyNames := make([]string, 0)
for name, pxy := range pm.proxies {
del := false
cfg, ok := proxyCfgsMap[name]
if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
del = true
}
if del {
delPxyNames = append(delPxyNames, name)
delete(pm.proxies, name)
pxy.Stop()
}
}
if len(delPxyNames) > 0 {
xl.Infof(「proxy removed: %s」, delPxyNames)
}
addPxyNames := make([]string, 0)
for _, cfg := range proxyCfgs {
name := cfg.GetBaseConfig().Name
if _, ok := pm.proxies[name]; !ok {
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)
if pm.inWorkConnCallback != nil {
pxy.SetInWorkConnCallback(pm.inWorkConnCallback)
}
pm.proxies[name] = pxy
addPxyNames = append(addPxyNames, name)
pxy.Start()
}
}
if len(addPxyNames) > 0 {
xl.Infof(「proxy added: %s」, addPxyNames)
}
4. 监控
frps 服务端支持两种监控系统:指标存在内存中,和指标输出到 Prometheus。主要监控以下指标:
type serverMetrics struct {
// 记录当前连接到服务端的客户端数量。
clientCount Prometheus.Gauge
// 记录当前代理的数量,按代理类型(如 TCP、HTTP)分类。
proxyCount *Prometheus.GaugeVec
// 记录当前连接的数量,按代理类型(如 TCP、HTTP)分类。
connectionCount *Prometheus.GaugeVec
// 记录流入的总流量,按代理类型(如 TCP、HTTP)分类。
trafficIn *Prometheus.CounterVec
// 记录流出的总流量,按代理类型(如 TCP、HTTP)分类。
trafficOut *Prometheus.CounterVec
}
内存监控没啥,但统计的增删改,这里用到了原子操作的技巧:
func (C *StandardCounter) Count() int32 {
return atomic.LoadInt32(&C.count)
}
func (C *StandardCounter) Inc(count int32) {
atomic.AddInt32(&C.count, count)
}
func (C *StandardCounter) Dec(count int32) {
atomic.AddInt32(&C.count, -count)
}
对于不同类型的 proxy 的统计,frp 没有使用 syn map,而是用一把读写锁保平安:
m.mu.Lock()
defer m.mu.Unlock()
counter, ok := m.info.ProxyTypeCounts[proxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
对于如何进行 Prometheus 监控,frp 的使用流程可以借鉴,整体来说分为以下几个步骤:
- 编码前,先定义指标,类似于:
Namespace: 「frp」,
Subsystem: 「server」,
Name: 「traffic_out」,
Help: 「The total out traffic」,
- frp 注册 Prometheus 指标
trafficOut: Prometheus.NewCounterVec(Prometheus.CounterOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: 「traffic_out」,
Help: 「The total out traffic」,
}, []string{「name」, 「type」}),
}
Prometheus.MustRegister(m.clientCount)
Prometheus.MustRegister(m.proxyCount)
Prometheus.MustRegister(m.connectionCount)
Prometheus.MustRegister(m.trafficIn)
Prometheus.MustRegister(m.trafficOut)
- frp 暴露 HTTP 服务,一般是/metric,promhttp 提供一个 HTTP 处理器,用于暴露所有注册的 Prometheus 指标。
if svr.cfg.EnablePrometheus {
subRouter.Handle(「/metrics」, promhttp.Handler())
}
- 配置 Prometheus 定时抓取这个 HTTP 路径,舒服了
全球:
scrape_interval: 15s # 每 15 秒抓取一次数据
scrape_configs:
- job_name: 「frp_server」
static_configs:
- targets: [「localhost:8080」] # 替换为 frp 服务端暴露的 /metrics 端点
5. 通信安全
当 frpc 和 frps 之间启用了 TLS 之后,流量会被全局加密,不再需要配置单个代理上的加密,新版本中已经默认启用。每一个代理都可以选择是否启用加密和压缩的功能。
在每一个代理的配置中使用如下参数指定:
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.useEncryption = true
transport.useCompression = true
5.1. 加密
通过设置 transport.useEncryption = true
,将 frpc 与 frps 之间的通信内容加密传输,将会有效防止传输内容被截取。
这个加密它使用了装饰器模式,传入普通的 IO,WithEncryption 后就会得到一个可以加密的 IO
remote, err = libio.WithEncryption(remote, encKey)
if err != nil {
workConn.
xl.Errorf(「create encryption stream error: %v」, err)
return
}
我们接下来看如何加密的:
总体加密算法采用 aes-128-cfb,aes 是一个对称加密,主要靠 key 和 iv 两个值
// pbkdf2 会生成一个用于 aes 加密的 key
// 入参 key 为:配置的 token
// DefaultSalt 为字符串默认值
key = pbkdf2.Key(key, []byte(DefaultSalt), 64, aes.BlockSize, sha1.New)
// iv 是用 rand 函数生成的安全加密的随机数
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return nil, err
}
// Reader is a global, shared instance of a cryptographically
// secure random number generator. It is safe for concurrent use.
//
// - On Linux, FreeBSD, Dragonfly, and Solaris, Reader uses getrandom(2).
// - On legacy Linux (< 3.17), Reader opens /dev/urandom on first use.
// - On macOS, iOS, and OpenBSD Reader, uses arc4random_buf(3).
// - On NetBSD, Reader uses the kern.arandom sysctl.
// - On Windows, Reader uses the ProcessPrng API.
// - On js/wasm, Reader uses the Web Crypto API.
// - On wasi/wasm, Reader uses random_get.
//
// In FIPS 140-3 mode, the output passes through an SP 800-90A Rev. 1
// Deterministic Random Bit Generator (DRBG).
var Reader io.Reader
这样后续的 IO 操作都会自带加密了。
5.2. 压缩
压缩也是同理,搞一个压缩的 IO 装饰器就好了。
如果传输的报文长度较长,通过设置 transport.useCompression = true
对传输内容进行压缩,可以有效减小 frpc 与 frps 之间的网络流量,加快流量转发速度,但是会额外消耗一些 CPU 资源。
压缩算法采用 snappy 库。
sr := snappy.NewReader(rwc)
sw := snappy.NewWriter(rwc)
return WrapReadWriteCloser(sr, sw, func() error {
_ = sw.Close()
return rwc.Close()
})
}
5.3. 自定义 TLS
这个其实就是使用自签发的 CA,去生成密钥和证书,然后客户端和服务端加载起来后,可以进行双向或者单向验证,进行 HTTPS 握手,后续流量也是 HTTPS 加密的。
客户端单向校验服务端:
# frpc.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」
# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
服务端单向校验客户端:
# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
# frps.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」
双向验证
# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」
# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」
介绍这个之前,我们先回顾以下 TLS 握手的过程,hhh:
okk,那我们看 frp 是如何实现 tls 的:
// 获取 TLS 配置,作为 dial 选项
// tlsConfig, err = transport.NewClientTLSConfig
// tlsConfig, err = transport.NewServerTLSConfig
dialOptions = append(dialOptions, libnet.WithTLSConfig(tlsConfig))
...
// dail tcp 本身就是 tls 的了
conn, err := libnet.DialContext(
C.ctx,
net.JoinHostPort(C.cfg.ServerAddr, strconv.Itoa(C.cfg.ServerPort)),
dialOptions...,
)
// 加载服务端的 ca,证书+key
// 核心是 tls 库 tls.LoadX509KeyPair(certfile, keyfile),去管理证书和 key
func NewServerTLSConfig(certPath, keyPath, caPath string) (*tls.Config, error) {
base := &tls.Config{}
if certPath == «» || keyPath == «» {
// server will generate tls conf by itself
cert := newRandomTLSKeyPair()
base.Certificates = []tls.Certificate{*cert}
} else {
// 调的是这个 tlsCert, err := tls.LoadX509KeyPair(certfile, keyfile)
cert, err := newCustomTLSKeyPair(certPath, keyPath)
if err != nil {
return nil, err
}
base.Certificates = []tls.Certificate{*cert}
}
if caPath != '' {
// ca 证书
pool, err := newCertPool(caPath)
if err != nil {
return nil, err
}
// 校验客户端
base.ClientAuth = tls.RequireAndVerifyClientCert
base.ClientCAs = pool
}
return base, nil
}
// 加载客户端的 ca,证书+key
func NewClientTLSConfig(certPath, keyPath, caPath, serverName string) (*tls.Config, error) {
base := &tls.Config{}
if certPath != '' && keyPath != '' {
cert, err := newCustomTLSKeyPair(certPath, keyPath)
if err != nil {
return nil, err
}
base.Certificates = []tls.Certificate{*cert}
}
base.ServerName = serverName
if caPath != '' {
pool, err := newCertPool(caPath)
if err != nil {
return nil, err
}
base.RootCAs = pool
// 校验服务端
base.InsecureSkipVerify = false
} else {
base.InsecureSkipVerify = true
}
return base, nil
}
// Only support one ca file to add
func newCertPool(caPath string) (*x509.CertPool, error) {
pool := x509.NewCertPool()
cacrt, err := os.ReadFile(caPath)
if err != nil {
return nil, err
}
pool.AppendCertsFromPEM(caCrt)
return pool, nil
}
6. 代理配置
6.1. proxy
代理是 frp 的核心,这里详细介绍一下它的流程。
frpc 和 frps 的整体流程,里面可以抽象为 3 种连接,整体我画了一张图:
- 用户连接 (User Connection):
- 这是外部用户连接到 FRP 服务端(frps)特定端口的连接,也就是说想要访问内网服务的,例如,当运维访问
frps.example.com:8080
时建立的连接就是用户连接,它实际访问的是客户侧某个管理平台 - 在 frps 端,这个连接由
handleUserTCPConnection
函数处理。
- 工作连接 (Work Connection):
- 这是 frps 和 frpc 之间预先建立的连接,用于传输用户连接的数据。
- frps 在需要处理用户连接时会从连接池中获取一个可用的工作连接。
- 如果池中没有可用的工作连接,frps 会通知 frpc 创建新的工作连接。
- 工作连接是 frps 和 frpc 之间的隧道,用户数据通过这个隧道在外部用户和内部服务之间传输。
- 本地连接 (Local Connection):
- 在 frp 的上下文中,远程连接通常指的是 frpc 连接到内部服务的连接。
- 例如,当 frpc 收到从工作连接传来的数据时,它会创建一个连接到配置中指定的本地服务(如 localhost:80),这个连接就是远程连接。
下面是 FRP 数据流的完整过程:
- 外部用户(用户连接) -> frps 监听端口
- frps 从工作连接池中获取一个 工作连接(frps <-> frpc)
- frps 将用户连接与工作连接绑定(通过双向数据转发)
- frpc 接收到来自工作连接的数据,然后建立一个 远程连接(frpc -> 内部服务)
- frpc 将工作连接与远程连接绑定(通过双向数据转发)
下面来看看关键代码实现:
// 用户连接 (User Connection):
// frps 侧
// tcp 代理启动
func (pxy *TCPProxy) Run() (string, error) {
if pxy.cfg.LoadBalancer.Group != «» {
// 获取组监听器(实际共享端口)
l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)
pxy.listeners = append(pxy.listeners, l)
// 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)
pxy.startCommonTCPListenersHandler()
}
// ...
}
// 用户链接处理
func (pxy *BaseProxy) startCommonTCPListenersHandler() {
for _, listener := range pxy.listeners {
Go func(l net.Listener) {
for {
conn, err := l.Accept() // 此处调用 TCPGroupListener.Accept()
Go pxy.handleUserTCPConnection(conn) // 处理连接
}
}(listener)
}
}
// 工作连接 (Work Connection):
// frps 侧
// 从连接池中获取一个已建立的到 FRP 客户端的连接
// 内部实现路径:pxy.GetWorkConn() → pxy.workConnManager.Get()
// 底层通过 FRP 协议发送 NewWorkConn 消息到客户端建立隧道,这部分就是内部服务不一样的地方
// -> GetWorkConn
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
// 启动双向数据转发
inCount, outCount, _ := libio.Join(local, userConn)
// 在取出工作连接后,frps 会立即向 frpc 发送 msg.ReqWorkConn 消息,请求新的工作连接。
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
// 如果连接池为空,frps 会等待 frpc 创建新的工作连接并发送过来。
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
xl.Warnf(「no work connections available, %v」, err)
return
}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
err = fmt.Errorf(「timeout trying to get work connection」)
xl.Warnf(「%v」, err)
return
}
// 本地连接 (Local Connection):
// frpc 侧
// handleReqWorkConn
// HandleWorkConn
// HandleTCPWorkConnection
unc (ctl *Control) handleReqWorkConn(_ msg.Message) {
xl := ctl.xl
workConn, err := ctl.connectServer()
if err != nil {
xl.Warnf(「start new connection to server error: %v」, err)
return
}
m := &msg.NewWorkConn{
RunID: ctl.sessionCtx.RunID,
}
if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {
xl.Warnf(「error during NewWorkConn authentication: %v」, err)
workConn.Close()
return
}
if err = msg.WriteMsg(workConn, m); err != nil {
xl.Warnf(「work connection write to server error: %v」, err)
workConn.Close()
return
}
var startMsg msg.StartWorkConn
if err = msg.ReadMsgInto(workConn, &startMsg); err != nil {
xl.Tracef(「work connection closed before response StartWorkConn message: %v」, err)
workConn.Close()
return
}
if startMsg.Error != 「」 {
xl.Errorf(「StartWorkConn contains error: %s」, startMsg.Error)
workConn.Close()
return
}
// dispatch this work connection to related proxy
ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
}
remote = workConn
... ...
localConn, err := libnet.Dial(
net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
libnet.WithTimeout(10*time.Second),
)
... ...
_, _, errs := libio.Join(localConn, remote)
双向转发的实现灰常简洁,值得学习:
func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64, errors []error) {
var wait sync.WaitGroup
recordErrs := make([]error, 2)
pipe := func(number int, to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
defer wait.Done()
defer CosClose()
defer from.Close()
buf := pool.GetBuf(16 * 1024)
defer pool.PutBuf(buf)
*count, recordErrs[number] = io.CopyBuffer(to, from, buf)
}
wait.Add(2)
Go pipe(0, c1, c2, &inCount)
Go pipe(1, c2, c1, &outCount)
wait.Wait()
for _, e := range recordErrs {
if e != nil {
errors = append(errors, e)
}
}
return
}
6.2. 负载均衡
你可以将多个相同类型的代理加入到同一个 group
中,以实现负载均衡的能力,当用户连接 frps 服务器的 80 端口时,frps 会将接收到的用户连接随机分发给其中一个存活的代理。这可以确保即使一台 frpc 机器挂掉,仍然有其他节点能够提供服务。
# frpc.toml
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 8080
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」
[[proxies]]
name = 「test2」
type = 「tcp」
localPort = 8081
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」
这个负载均衡的实现的关键结构体是 TCPGroupCtl *group.TCPGroupCtl:
// 管理 TCP 代理的分组逻辑,包括分组的创建、监听、连接分发等功能。
TCPGroupCtl *group.TCPGroupCtl
// 主要有三大功能
// 1. 分组管理:
// 将多个 TCP 代理分组到一起,形成一个逻辑组。
// 每个组可以共享一个端口,分发连接到组内的代理。
// 2. 负载均衡:
// 根据一定的规则随机分发,将链接分发到组内的代理。
// 3. 资源管理:
// 负责监听和关闭组内的连接。
// 管理组的生命周期。
// tcp 代理分组
// 分组内统一监听,共享一个 remote port 的 coon,这个我们叫 remote conn,就是用户 connection
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string, addr string, port int) (l net.Listener, realPort int, err error) {
tgc.mu.Lock()
tcpGroup, ok := tgc.groups[group]
if !ok {
tcpGroup = NewTCPGroup(tgc)
tgc.groups[group] = tcpGroup
}
tgc.mu.Unlock()
return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}
// 代理加入组
func (tg *TCPGroup) Listen(proxyName, group, groupKey, addr string, port int) (*TCPGroupListener, int, error) {
tg.mu.Lock()
defer tg.mu.Unlock()
// 首次加入组:创建真实监听
if len(tg.lns) == 0 {
realPort, err := tg.ctl.portManager.Acquire(proxyName, port) // 申请端口
tcpLn, err := net.Listen(「tcp」, net.JoinHostPort(addr, strconv.Itoa(port)))
tg.realPort = realPort
tg.tcpLn = tcpLn
Go tg.worker() // 启动连接分发协程
...
}
}
// 当新连接到达共享端口时,会被放入全局通道(acceptCh),
// 组内所有代理通过竞争机制获取链接,实现负载均衡
func (tg *TCPGroup) worker() {
for {
conn, err := tg.tcpLn.Accept() // 接收新连接
tg.acceptCh <- conn // 放入全局通道
}
}
func (ln *TCPGroupListener) Accept() (net.Conn, error) {
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case conn := <-ln.group.acceptCh: // 从全局通道竞争获取连接
return conn, nil
}
}
// tcp 代理启动
func (pxy *TCPProxy) Run() (string, error) {
if pxy.cfg.LoadBalancer.Group != 「」 {
// 获取组监听器(实际共享端口)
l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)
pxy.listeners = append(pxy.listeners, l)
// 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)
pxy.startCommonTCPListenersHandler()
}
// ...
}
6.3. 健康检查
通过给代理配置健康检查参数,可以在要反向代理的服务出现故障时,将该服务从 frps 中摘除。结合负载均衡的功能,这可用于实现高可用架构,避免服务单点故障。
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 22
remotePort = 6000
# 启用健康检查,类型为 tcp
healthCheck.type = 「tcp」
# 建立连接超时时间为 3 秒
healthCheck.timeoutSeconds = 3
# 连续 3 次检查失败,此 proxy 会被摘除
healthCheck.maxFailed = 3
# 每隔 10 秒进行一次健康检查
healthCheck.intervalSeconds = 10
这个配置被加载到 TCPProxyConfig-》ProxyBaseConfig-》HealthCheckConfig
type HealthCheckConfig struct {
// Type specifies what protocol to use for health checking.
// Valid values include 「tcp」, 「HTTP」, and 「」. If this value is 「」, health
// checking will not be performed.
//
// If the type is 「tcp」, a connection will be attempted to the target
// server. If a connection cannot be established, the health check fails.
//
// If the type is 「HTTP」, a GET request will be made to the endpoint
// specified by HealthCheckURL. If the response is not a 200, the health
// check fails.
Type string `json:「type」` // tcp | HTTP
// TimeoutSeconds specifies the number of seconds to wait for a health
// check attempt to connect. If the timeout is reached, this counts as a
// health check failure. By default, this value is 3.
TimeoutSeconds int `json:「timeoutSeconds,omitempty」`
// MaxFailed specifies the number of allowed failures before the
// is stopped. By default, this value is 1.
MaxFailed int `json:「maxFailed,omitempty」`
// IntervalSeconds specifies the time in seconds between health
// checks. By default, this value is 10.
IntervalSeconds int `json:「intervalSeconds」`
// Path specifies the path to send health checks to if the
// health check type is 「HTTP」.
Path string `json:「path,omitempty」`
// HTTPHeaders specifies the headers to send with the health request, if
// the health check type is 「HTTP」.
HTTPHeaders []HTTPHeader `json:「httpHeaders,omitempty」`
}
这部分代码非常独立,相当于起了一个定时的 monitor,去监控代理的 proxy 是否有效,连续检查失败,此 proxy 会被摘除
func (monitor *Monitor) checkWorker() {
for
err := monitor.doCheck(doCtx)
... ...
time.Sleep(monitor.interval)
}
}
func (monitor *Monitor) doCheck(ctx context.Context) error {
switch monitor.checkType {
case 「tcp」:
return monitor.doTCPCheck(ctx)
case 「HTTP」:
return monitor.doHTTPCheck(ctx)
default:
return ErrHealthCheckType
}
}
func (monitor *Monitor) doTCPCheck(ctx context.Context) error {
// if tcp address is not specified, always return nil
if monitor.addr == 「」 {
return nil
}
var d net.Dialer
conn, err := d.DialContext(ctx, 「tcp」, monitor.addr)
if err != nil {
return err
}
conn.Close()
return nil
}
6.4. 代理限速
# frpc.toml
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.bandwidthLimit = 「1MB」
核心代码,依然是获取 tcp 连接时,加一个限速的装饰器:
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
if pxy.GetLimiter() != nil {
local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
return local.Close()
})
}
limit 使用的是原生的 rate 包:
func (r *Reader) Read(p []byte) (n int, err error) {
// 1. 获取令牌桶的突发容量
b := r.limiter.Burst()
// 2. 如果请求的读取量超过突发容量,调整读取大小
if b < len(p) {
p = p[:b]
}
// 3. 执行实际读取操作
n, err = r.r.Read(p)
if err != nil {
// 4. 如果读取过程中出错,直接返回
return
}
// 5. 根据实际读取的字节数消耗令牌
err = r.limiter.WaitN(context.Background(), n)
if err != nil {
return
}
return
}