proxy启动源码分析
结构体
// Server is the Proxy Server
type Server struct {
ctx context.Context
wg sync.WaitGroup
// 是一个接口类型
proxy types.ProxyComponent
httpListener net.Listener
grpcListener net.Listener
tcpServer cmux.CMux
httpServer *http.Server
grpcInternalServer *grpc.Server
grpcExternalServer *grpc.Server
serverID atomic.Int64
etcdCli *clientv3.Client
rootCoordClient types.RootCoordClient
dataCoordClient types.DataCoordClient
queryCoordClient types.QueryCoordClient
}
分析变量rootCoordClient、dataCoordClient、queryCoordClient是何时赋予的值。
proxy是一个接口,实现proxyapi功能。
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy)
}
// creator用NewProxy替换
role, err = creator(ctx, factory)
components.NewProxy是一个函数。
NewProxy()用来创建Proxy结构体。
// NewProxy creates a new Proxy
func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
var err error
n := &Proxy{}
svr, err := grpcproxy.NewServer(ctx, factory)
if err != nil {
return nil, err
}
n.svr = svr
return n, nil
}
rc.NewServer()产生的是本结构体Server。
// NewServer create a Proxy server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {
var err error
server := &Server{
ctx: ctx,
}
server.proxy, err = proxy.NewProxy(server.ctx, factory)
if err != nil {
return nil, err
}
return server, err
}
proxy.NewProxy()产生结构体,是types.ProxyComponent接口的实现。
代码位于internal\proxy\proxy.go
执行Run()
Server结构体创建后,调用结构体的Run()方法。
func runComponent[T component](ctx context.Context,
localMsg bool,
runWg *sync.WaitGroup,
creator func(context.Context, dependency.Factory) (T, error),
metricRegister func(*prometheus.Registry),
) component {
var role T
sign := make(chan struct{})
go func() {
factory := dependency.NewFactory(localMsg)
var err error
role, err = creator(ctx, factory)
if localMsg {
paramtable.SetRole(typeutil.StandaloneRole)
} else {
paramtable.SetRole(role.GetName())
}
if err != nil {
panic(err)
}
close(sign)
// 在这里调用对应组件结构体的Run()方法,这里是Proxy结构体
if err := role.Run(); err != nil {
panic(err)
}
runWg.Done()
}()
......
}
runComponent是一个包裹函数。
// Run starts service
func (n *Proxy) Run() error {
if err := n.svr.Run(); err != nil {
log.Error("Proxy starts error", zap.Error(err))
return err
}
log.Info("Proxy successfully started")
return nil
}
Run()方法调用n.svr.Run()方法。srv是grpcproxy.NewServer()返回的结构体。
// Start start the Proxy Server
func (s *Server) Run() error {
log.Debug("init Proxy server")
if err := s.init(); err != nil {
log.Warn("init Proxy server failed", zap.Error(err))
return err
}
log.Debug("init Proxy server done")
log.Debug("start Proxy server")
if err := s.start(); err != nil {
log.Warn("start Proxy server failed", zap.Error(err))
return err
}
log.Debug("start Proxy server done")
if s.tcpServer != nil {
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.tcpServer.Serve(); err != nil && err != cmux.ErrServerClosed {
log.Warn("Proxy server for tcp port failed", zap.Error(err))
return
}
log.Info("Proxy tcp server exited")
}()
}
return nil
}
接下来分析s.init()和s.start()方法。
s.init()
func (s *Server) init() error {
etcdConfig := ¶mtable.Get().EtcdCfg
Params := ¶mtable.Get().ProxyGrpcServerCfg
log.Debug("Proxy init service's parameter table done")
HTTPParams := ¶mtable.Get().HTTPCfg
log.Debug("Proxy init http server's parameter table done")
if !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {
......
}
log.Debug("init Proxy's parameter table done", zap.String("internal address", Params.GetInternalAddress()), zap.String("external address", Params.GetAddress()))
serviceName := fmt.Sprintf("Proxy ip: %s, port: %d", Params.IP, Params.Port.GetAsInt())
log.Debug("init Proxy's tracer done", zap.String("service name", serviceName))
etcdCli, err := etcd.GetEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
log.Debug("Proxy connect to etcd failed", zap.Error(err))
return err
}
s.etcdCli = etcdCli
s.proxy.SetEtcdClient(s.etcdCli)
s.proxy.SetAddress(Params.GetInternalAddress())
errChan := make(chan error, 1)
{
// 启动grpc服务,开启服务端口,默认为19529端口,内部通讯端口
s.startInternalRPCServer(Params.InternalPort.GetAsInt(), errChan)
if err := <-errChan; err != nil {
......
}
}
{
log.Info("Proxy server listen on tcp", zap.Int("port", Params.Port.GetAsInt()))
var lis net.Listener
var listenErr error
log.Info("Proxy server already listen on tcp", zap.Int("port", Params.Port.GetAsInt()))
lis, listenErr = net.Listen("tcp", ":"+strconv.Itoa(Params.Port.GetAsInt()))
if listenErr != nil {
......
}
if HTTPParams.Enabled.GetAsBool() && Params.TLSMode.GetAsInt() == 0 &&
(HTTPParams.Port.GetValue() == "" || HTTPParams.Port.GetAsInt() == Params.Port.GetAsInt()) {
......
} else {
s.grpcListener = lis
}
if HTTPParams.Enabled.GetAsBool() && HTTPParams.Port.GetValue() != "" && HTTPParams.Port.GetAsInt() != Params.Port.GetAsInt() {
if Params.TLSMode.GetAsInt() == 0 {
......
}
} else if Params.TLSMode.GetAsInt() == 1 {
......
} else if Params.TLSMode.GetAsInt() == 2 {
......
}
}
}
{
// 启动grpc服务,开启服务端口,默认为19530端口,对外服务端口
s.startExternalRPCServer(Params.Port.GetAsInt(), errChan)
if err := <-errChan; err != nil {
......
}
}
if HTTPParams.Enabled.GetAsBool() {
registerHTTPHandlerOnce.Do(func() {
log.Info("register Proxy http server")
s.registerHTTPServer()
})
}
// 创建rootCoordClient
if s.rootCoordClient == nil {
var err error
log.Debug("create RootCoord client for Proxy")
s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
if err != nil {
log.Warn("failed to create RootCoord client for Proxy", zap.Error(err))
return err
}
log.Debug("create RootCoord client for Proxy done")
}
// 等待RootCoord状态正常
log.Debug("Proxy wait for RootCoord to be healthy")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoordClient, "RootCoord", 1000000, time.Millisecond*200); err != nil {
log.Warn("Proxy failed to wait for RootCoord to be healthy", zap.Error(err))
return err
}
log.Debug("Proxy wait for RootCoord to be healthy done")
log.Debug("set RootCoord client for Proxy")
s.proxy.SetRootCoordClient(s.rootCoordClient)
log.Debug("set RootCoord client for Proxy done")
// 创建dataCoordClient
if s.dataCoordClient == nil {
var err error
log.Debug("create DataCoord client for Proxy")
s.dataCoordClient, err = dcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
if err != nil {
log.Warn("failed to create DataCoord client for Proxy", zap.Error(err))
return err
}
log.Debug("create DataCoord client for Proxy done")
}
// 等待DataCoord状态正常
log.Debug("Proxy wait for DataCoord to be healthy")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.dataCoordClient, "DataCoord", 1000000, time.Millisecond*200); err != nil {
log.Warn("Proxy failed to wait for DataCoord to be healthy", zap.Error(err))
return err
}
log.Debug("Proxy wait for DataCoord to be healthy done")
log.Debug("set DataCoord client for Proxy")
s.proxy.SetDataCoordClient(s.dataCoordClient)
log.Debug("set DataCoord client for Proxy done")
// 创建queryCoordClient
if s.queryCoordClient == nil {
var err error
log.Debug("create QueryCoord client for Proxy")
s.queryCoordClient, err = qcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
if err != nil {
log.Warn("failed to create QueryCoord client for Proxy", zap.Error(err))
return err
}
log.Debug("create QueryCoord client for Proxy done")
}
// 等待QueryCoord状态正常
log.Debug("Proxy wait for QueryCoord to be healthy")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.queryCoordClient, "QueryCoord", 1000000, time.Millisecond*200); err != nil {
log.Warn("Proxy failed to wait for QueryCoord to be healthy", zap.Error(err))
return err
}
log.Debug("Proxy wait for QueryCoord to be healthy done")
log.Debug("set QueryCoord client for Proxy")
s.proxy.SetQueryCoordClient(s.queryCoordClient)
log.Debug("set QueryCoord client for Proxy done")
log.Debug(fmt.Sprintf("update Proxy's state to %s", commonpb.StateCode_Initializing.String()))
s.proxy.UpdateStateCode(commonpb.StateCode_Initializing)
log.Debug("init Proxy")
if err := s.proxy.Init(); err != nil {
log.Warn("failed to init Proxy", zap.Error(err))
return err
}
log.Debug("init Proxy done")
// nolint
// Intentionally print to stdout, which is usually a sign that Milvus is ready to serve.
fmt.Println("---Milvus Proxy successfully initialized and ready to serve!---")
return nil
}
这段可以看出来,获取了etcdCli并赋予给了s.etcdCli。
s.startInternalRPCServer启动grpc端口服务,内部通信使用,端口为19529。
s.startExternalRPCServer()启动grpc端口服务,对外提供服务使用,端口为19530。
最终调用s.proxy.Init()进行初始化,代码位置:internal\proxy\proxy.go
s.proxy接口types.ProxyComponent,ProxyComponent继承于Component。
// Proxy is the interface `proxy` package implements
type Proxy interface {
Component
proxypb.ProxyServer
milvuspb.MilvusServiceServer
}
// Component is the interface all services implement
type Component interface {
Init() error
Start() error
Stop() error
Register() error
}
接口套接口:
RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component
各组件最终的Init()初始化代码路径:
internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()
回过头来继续proxy的init。
// Init initialize proxy.
func (node *Proxy) Init() error {
log.Info("init session for Proxy")
if err := node.initSession(); err != nil {
log.Warn("failed to init Proxy's session", zap.Error(err))
return err
}
log.Info("init session for Proxy done")
node.factory.Init(Params)
accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
log.Debug("init access log for Proxy done")
err := node.initRateCollector()
if err != nil {
return err
}
log.Info("Proxy init rateCollector done", zap.Int64("nodeID", paramtable.GetNodeID()))
idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID())
if err != nil {
log.Warn("failed to create id allocator",
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.Error(err))
return err
}
node.rowIDAllocator = idAllocator
log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
tsoAllocator, err := newTimestampAllocator(node.rootCoord, paramtable.GetNodeID())
if err != nil {
log.Warn("failed to create timestamp allocator",
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.Error(err))
return err
}
node.tsoAllocator = tsoAllocator
log.Debug("create timestamp allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick)
if err != nil {
log.Warn("failed to create segment id assigner",
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.Error(err))
return err
}
node.segAssigner = segAssigner
node.segAssigner.PeerID = paramtable.GetNodeID()
log.Debug("create segment id assigner done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
dmlChannelsFunc := getDmlChannelsFunc(node.ctx, node.rootCoord)
chMgr := newChannelsMgrImpl(dmlChannelsFunc, defaultInsertRepackFunc, node.factory)
node.chMgr = chMgr
log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole))
replicateMsgChannel := Params.CommonCfg.ReplicateMsgChannel.GetValue()
node.replicateMsgStream, err = node.factory.NewMsgStream(node.ctx)
if err != nil {
log.Warn("failed to create replicate msg stream",
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.Error(err))
return err
}
node.replicateMsgStream.EnableProduce(true)
node.replicateMsgStream.AsProducer([]string{replicateMsgChannel})
node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
if err != nil {
log.Warn("failed to create task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err
}
log.Debug("create task scheduler done", zap.String("role", typeutil.ProxyRole))
syncTimeTickInterval := Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond) / 2
node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator)
log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval))
node.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))
if err := InitMetaCache(node.ctx, node.rootCoord, node.queryCoord, node.shardMgr); err != nil {
log.Warn("failed to init meta cache", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err
}
log.Debug("init meta cache done", zap.String("role", typeutil.ProxyRole))
return nil
}
从代码可以看出初始化是在填充Proxy结构体。
s.start()
启动组件的逻辑。
func (s *Server) start() error {
if err := s.proxy.Start(); err != nil {
log.Warn("failed to start Proxy server", zap.Error(err))
return err
}
if err := s.proxy.Register(); err != nil {
log.Warn("failed to register Proxy", zap.Error(err))
return err
}
if s.httpListener != nil {
log.Info("start Proxy http server")
errChan := make(chan error, 1)
s.wg.Add(1)
go s.startHTTPServer(errChan)
if err := <-errChan; err != nil {
log.Error("failed to create http rpc server", zap.Error(err))
return err
}
}
return nil
}
在这里先Start(),然后Register(),和rootcoord相反,原因待挖掘。
s.proxy是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。
Register():向元数据etcd注册。
Start():用来启动组件。
// Start starts a proxy node.
func (node *Proxy) Start() error {
if err := node.sched.Start(); err != nil {
log.Warn("failed to start task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err
}
log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole))
if err := node.rowIDAllocator.Start(); err != nil {
log.Warn("failed to start id allocator", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err
}
log.Debug("start id allocator done", zap.String("role", typeutil.ProxyRole))
if err := node.segAssigner.Start(); err != nil {
log.Warn("failed to start segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err
}
log.Debug("start segment id assigner done", zap.String("role", typeutil.ProxyRole))
if err := node.chTicker.start(); err != nil {
log.Warn("failed to start channels time ticker", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err
}
log.Debug("start channels time ticker done", zap.String("role", typeutil.ProxyRole))
node.sendChannelsTimeTickLoop()
// Start callbacks
for _, cb := range node.startCallbacks {
cb()
}
log.Debug("update state code", zap.String("role", typeutil.ProxyRole), zap.String("State", commonpb.StateCode_Healthy.String()))
node.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}
启动了如下部分:
node.sched.Start()
node.rowIDAllocator.Start()
node.segAssigner.Start()
node.chTicker.start()