milvus proxy启动源码分析

发布于:2024-04-24 ⋅ 阅读:(22) ⋅ 点赞:(0)

proxy启动源码分析

结构体

// Server is the Proxy Server
type Server struct {
	ctx                context.Context
	wg                 sync.WaitGroup
    // 是一个接口类型
	proxy              types.ProxyComponent
	httpListener       net.Listener
	grpcListener       net.Listener
	tcpServer          cmux.CMux
	httpServer         *http.Server
	grpcInternalServer *grpc.Server
	grpcExternalServer *grpc.Server

	serverID atomic.Int64

	etcdCli          *clientv3.Client
	rootCoordClient  types.RootCoordClient
	dataCoordClient  types.DataCoordClient
	queryCoordClient types.QueryCoordClient
}

分析变量rootCoordClient、dataCoordClient、queryCoordClient是何时赋予的值。

proxy是一个接口,实现proxyapi功能。

func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
	wg.Add(1)
	return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy)
}

// creator用NewProxy替换
role, err = creator(ctx, factory)

components.NewProxy是一个函数。

NewProxy()用来创建Proxy结构体。

// NewProxy creates a new Proxy
func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
	var err error
	n := &Proxy{}

	svr, err := grpcproxy.NewServer(ctx, factory)
	if err != nil {
		return nil, err
	}
	n.svr = svr
	return n, nil
}

rc.NewServer()产生的是本结构体Server。

// NewServer create a Proxy server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {
	var err error
	server := &Server{
		ctx: ctx,
	}

	server.proxy, err = proxy.NewProxy(server.ctx, factory)
	if err != nil {
		return nil, err
	}
	return server, err
}

proxy.NewProxy()产生结构体,是types.ProxyComponent接口的实现。

代码位于internal\proxy\proxy.go

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,
	localMsg bool,
	runWg *sync.WaitGroup,
	creator func(context.Context, dependency.Factory) (T, error),
	metricRegister func(*prometheus.Registry),
) component {
	var role T

	sign := make(chan struct{})
	go func() {
		factory := dependency.NewFactory(localMsg)
		var err error
		role, err = creator(ctx, factory)
		if localMsg {
			paramtable.SetRole(typeutil.StandaloneRole)
		} else {
			paramtable.SetRole(role.GetName())
		}
		if err != nil {
			panic(err)
		}
		close(sign)
        // 在这里调用对应组件结构体的Run()方法,这里是Proxy结构体
		if err := role.Run(); err != nil {
			panic(err)
		}
		runWg.Done()
	}()
    ......
}

runComponent是一个包裹函数。

// Run starts service
func (n *Proxy) Run() error {
	if err := n.svr.Run(); err != nil {
		log.Error("Proxy starts error", zap.Error(err))
		return err
	}
	log.Info("Proxy successfully started")
	return nil
}

Run()方法调用n.svr.Run()方法。srv是grpcproxy.NewServer()返回的结构体。

// Start start the Proxy Server
func (s *Server) Run() error {
	log.Debug("init Proxy server")
	if err := s.init(); err != nil {
		log.Warn("init Proxy server failed", zap.Error(err))
		return err
	}
	log.Debug("init Proxy server done")

	log.Debug("start Proxy server")
	if err := s.start(); err != nil {
		log.Warn("start Proxy server failed", zap.Error(err))
		return err
	}
	log.Debug("start Proxy server done")

	if s.tcpServer != nil {
		s.wg.Add(1)
		go func() {
			defer s.wg.Done()
			if err := s.tcpServer.Serve(); err != nil && err != cmux.ErrServerClosed {
				log.Warn("Proxy server for tcp port failed", zap.Error(err))
				return
			}
			log.Info("Proxy tcp server exited")
		}()
	}
	return nil
}

接下来分析s.init()和s.start()方法。

s.init()

func (s *Server) init() error {
	etcdConfig := &paramtable.Get().EtcdCfg
	Params := &paramtable.Get().ProxyGrpcServerCfg
	log.Debug("Proxy init service's parameter table done")
	HTTPParams := &paramtable.Get().HTTPCfg
	log.Debug("Proxy init http server's parameter table done")

	if !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {
		......
	}

	log.Debug("init Proxy's parameter table done", zap.String("internal address", Params.GetInternalAddress()), zap.String("external address", Params.GetAddress()))

	serviceName := fmt.Sprintf("Proxy ip: %s, port: %d", Params.IP, Params.Port.GetAsInt())
	log.Debug("init Proxy's tracer done", zap.String("service name", serviceName))

	etcdCli, err := etcd.GetEtcdClient(
		etcdConfig.UseEmbedEtcd.GetAsBool(),
		etcdConfig.EtcdUseSSL.GetAsBool(),
		etcdConfig.Endpoints.GetAsStrings(),
		etcdConfig.EtcdTLSCert.GetValue(),
		etcdConfig.EtcdTLSKey.GetValue(),
		etcdConfig.EtcdTLSCACert.GetValue(),
		etcdConfig.EtcdTLSMinVersion.GetValue())
	if err != nil {
		log.Debug("Proxy connect to etcd failed", zap.Error(err))
		return err
	}
	s.etcdCli = etcdCli
	s.proxy.SetEtcdClient(s.etcdCli)
	s.proxy.SetAddress(Params.GetInternalAddress())

	errChan := make(chan error, 1)
	{
	    // 启动grpc服务,开启服务端口,默认为19529端口,内部通讯端口
		s.startInternalRPCServer(Params.InternalPort.GetAsInt(), errChan)
		if err := <-errChan; err != nil {
			......
		}
	}
	{
		log.Info("Proxy server listen on tcp", zap.Int("port", Params.Port.GetAsInt()))
		var lis net.Listener
		var listenErr error

		log.Info("Proxy server already listen on tcp", zap.Int("port", Params.Port.GetAsInt()))
		lis, listenErr = net.Listen("tcp", ":"+strconv.Itoa(Params.Port.GetAsInt()))
		if listenErr != nil {
			......
		}

		if HTTPParams.Enabled.GetAsBool() && Params.TLSMode.GetAsInt() == 0 &&
			(HTTPParams.Port.GetValue() == "" || HTTPParams.Port.GetAsInt() == Params.Port.GetAsInt()) {
			......
		} else {
			s.grpcListener = lis
		}

		if HTTPParams.Enabled.GetAsBool() && HTTPParams.Port.GetValue() != "" && HTTPParams.Port.GetAsInt() != Params.Port.GetAsInt() {
			if Params.TLSMode.GetAsInt() == 0 {
				......
				}
			} else if Params.TLSMode.GetAsInt() == 1 {
				......
			} else if Params.TLSMode.GetAsInt() == 2 {
				......
			}
		}
	}
	{
        // 启动grpc服务,开启服务端口,默认为19530端口,对外服务端口
		s.startExternalRPCServer(Params.Port.GetAsInt(), errChan)
		if err := <-errChan; err != nil {
			......
		}
	}

	if HTTPParams.Enabled.GetAsBool() {
		registerHTTPHandlerOnce.Do(func() {
			log.Info("register Proxy http server")
			s.registerHTTPServer()
		})
	}
    // 创建rootCoordClient
	if s.rootCoordClient == nil {
		var err error
		log.Debug("create RootCoord client for Proxy")
		s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
		if err != nil {
			log.Warn("failed to create RootCoord client for Proxy", zap.Error(err))
			return err
		}
		log.Debug("create RootCoord client for Proxy done")
	}
    // 等待RootCoord状态正常
	log.Debug("Proxy wait for RootCoord to be healthy")
	if err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoordClient, "RootCoord", 1000000, time.Millisecond*200); err != nil {
		log.Warn("Proxy failed to wait for RootCoord to be healthy", zap.Error(err))
		return err
	}
	log.Debug("Proxy wait for RootCoord to be healthy done")

	log.Debug("set RootCoord client for Proxy")
	s.proxy.SetRootCoordClient(s.rootCoordClient)
	log.Debug("set RootCoord client for Proxy done")
    // 创建dataCoordClient
	if s.dataCoordClient == nil {
		var err error
		log.Debug("create DataCoord client for Proxy")
		s.dataCoordClient, err = dcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
		if err != nil {
			log.Warn("failed to create DataCoord client for Proxy", zap.Error(err))
			return err
		}
		log.Debug("create DataCoord client for Proxy done")
	}
    // 等待DataCoord状态正常
	log.Debug("Proxy wait for DataCoord to be healthy")
	if err := componentutil.WaitForComponentHealthy(s.ctx, s.dataCoordClient, "DataCoord", 1000000, time.Millisecond*200); err != nil {
		log.Warn("Proxy failed to wait for DataCoord to be healthy", zap.Error(err))
		return err
	}
	log.Debug("Proxy wait for DataCoord to be healthy done")

	log.Debug("set DataCoord client for Proxy")
	s.proxy.SetDataCoordClient(s.dataCoordClient)
	log.Debug("set DataCoord client for Proxy done")
    // 创建queryCoordClient
	if s.queryCoordClient == nil {
		var err error
		log.Debug("create QueryCoord client for Proxy")
		s.queryCoordClient, err = qcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
		if err != nil {
			log.Warn("failed to create QueryCoord client for Proxy", zap.Error(err))
			return err
		}
		log.Debug("create QueryCoord client for Proxy done")
	}
    // 等待QueryCoord状态正常
	log.Debug("Proxy wait for QueryCoord to be healthy")
	if err := componentutil.WaitForComponentHealthy(s.ctx, s.queryCoordClient, "QueryCoord", 1000000, time.Millisecond*200); err != nil {
		log.Warn("Proxy failed to wait for QueryCoord to be healthy", zap.Error(err))
		return err
	}
	log.Debug("Proxy wait for QueryCoord to be healthy done")

	log.Debug("set QueryCoord client for Proxy")
	s.proxy.SetQueryCoordClient(s.queryCoordClient)
	log.Debug("set QueryCoord client for Proxy done")

	log.Debug(fmt.Sprintf("update Proxy's state to %s", commonpb.StateCode_Initializing.String()))
	s.proxy.UpdateStateCode(commonpb.StateCode_Initializing)

	log.Debug("init Proxy")
	if err := s.proxy.Init(); err != nil {
		log.Warn("failed to init Proxy", zap.Error(err))
		return err
	}
	log.Debug("init Proxy done")
	// nolint
	// Intentionally print to stdout, which is usually a sign that Milvus is ready to serve.
	fmt.Println("---Milvus Proxy successfully initialized and ready to serve!---")

	return nil
}

这段可以看出来,获取了etcdCli并赋予给了s.etcdCli。

s.startInternalRPCServer启动grpc端口服务,内部通信使用,端口为19529。

s.startExternalRPCServer()启动grpc端口服务,对外提供服务使用,端口为19530。

最终调用s.proxy.Init()进行初始化,代码位置:internal\proxy\proxy.go

s.proxy接口types.ProxyComponent,ProxyComponent继承于Component。

// Proxy is the interface `proxy` package implements
type Proxy interface {
	Component
	proxypb.ProxyServer
	milvuspb.MilvusServiceServer
}

// Component is the interface all services implement
type Component interface {
	Init() error
	Start() error
	Stop() error
	Register() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续proxy的init。

// Init initialize proxy.
func (node *Proxy) Init() error {
	log.Info("init session for Proxy")
	if err := node.initSession(); err != nil {
		log.Warn("failed to init Proxy's session", zap.Error(err))
		return err
	}
	log.Info("init session for Proxy done")

	node.factory.Init(Params)

	accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
	log.Debug("init access log for Proxy done")

	err := node.initRateCollector()
	if err != nil {
		return err
	}
	log.Info("Proxy init rateCollector done", zap.Int64("nodeID", paramtable.GetNodeID()))

	idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID())
	if err != nil {
		log.Warn("failed to create id allocator",
			zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
			zap.Error(err))
		return err
	}
	node.rowIDAllocator = idAllocator
	log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))

	tsoAllocator, err := newTimestampAllocator(node.rootCoord, paramtable.GetNodeID())
	if err != nil {
		log.Warn("failed to create timestamp allocator",
			zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
			zap.Error(err))
		return err
	}
	node.tsoAllocator = tsoAllocator
	log.Debug("create timestamp allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))

	segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick)
	if err != nil {
		log.Warn("failed to create segment id assigner",
			zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
			zap.Error(err))
		return err
	}
	node.segAssigner = segAssigner
	node.segAssigner.PeerID = paramtable.GetNodeID()
	log.Debug("create segment id assigner done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))

	dmlChannelsFunc := getDmlChannelsFunc(node.ctx, node.rootCoord)
	chMgr := newChannelsMgrImpl(dmlChannelsFunc, defaultInsertRepackFunc, node.factory)
	node.chMgr = chMgr
	log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole))

	replicateMsgChannel := Params.CommonCfg.ReplicateMsgChannel.GetValue()
	node.replicateMsgStream, err = node.factory.NewMsgStream(node.ctx)
	if err != nil {
		log.Warn("failed to create replicate msg stream",
			zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
			zap.Error(err))
		return err
	}
	node.replicateMsgStream.EnableProduce(true)
	node.replicateMsgStream.AsProducer([]string{replicateMsgChannel})

	node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
	if err != nil {
		log.Warn("failed to create task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err))
		return err
	}
	log.Debug("create task scheduler done", zap.String("role", typeutil.ProxyRole))

	syncTimeTickInterval := Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond) / 2
	node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator)
	log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval))

	node.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
	log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))

	if err := InitMetaCache(node.ctx, node.rootCoord, node.queryCoord, node.shardMgr); err != nil {
		log.Warn("failed to init meta cache", zap.String("role", typeutil.ProxyRole), zap.Error(err))
		return err
	}
	log.Debug("init meta cache done", zap.String("role", typeutil.ProxyRole))

	return nil
}

从代码可以看出初始化是在填充Proxy结构体。

s.start()

启动组件的逻辑。

func (s *Server) start() error {
	if err := s.proxy.Start(); err != nil {
		log.Warn("failed to start Proxy server", zap.Error(err))
		return err
	}

	if err := s.proxy.Register(); err != nil {
		log.Warn("failed to register Proxy", zap.Error(err))
		return err
	}

	if s.httpListener != nil {
		log.Info("start Proxy http server")
		errChan := make(chan error, 1)
		s.wg.Add(1)
		go s.startHTTPServer(errChan)
		if err := <-errChan; err != nil {
			log.Error("failed to create http rpc server", zap.Error(err))
			return err
		}
	}

	return nil
}

在这里先Start(),然后Register(),和rootcoord相反,原因待挖掘。

s.proxy是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

// Start starts a proxy node.
func (node *Proxy) Start() error {
	if err := node.sched.Start(); err != nil {
		log.Warn("failed to start task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err))
		return err
	}
	log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole))

	if err := node.rowIDAllocator.Start(); err != nil {
		log.Warn("failed to start id allocator", zap.String("role", typeutil.ProxyRole), zap.Error(err))
		return err
	}
	log.Debug("start id allocator done", zap.String("role", typeutil.ProxyRole))

	if err := node.segAssigner.Start(); err != nil {
		log.Warn("failed to start segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Error(err))
		return err
	}
	log.Debug("start segment id assigner done", zap.String("role", typeutil.ProxyRole))

	if err := node.chTicker.start(); err != nil {
		log.Warn("failed to start channels time ticker", zap.String("role", typeutil.ProxyRole), zap.Error(err))
		return err
	}
	log.Debug("start channels time ticker done", zap.String("role", typeutil.ProxyRole))

	node.sendChannelsTimeTickLoop()

	// Start callbacks
	for _, cb := range node.startCallbacks {
		cb()
	}

	log.Debug("update state code", zap.String("role", typeutil.ProxyRole), zap.String("State", commonpb.StateCode_Healthy.String()))
	node.UpdateStateCode(commonpb.StateCode_Healthy)

	return nil
}

启动了如下部分:

node.sched.Start()
node.rowIDAllocator.Start()
node.segAssigner.Start()
node.chTicker.start()