indexnode启动源码分析
结构体
// cmd\components\index_node.go
// IndexNode implements IndexNode grpc server
type IndexNode struct {
svr *grpcindexnode.Server
}
// internal\distributed\indexnode\service.go
// Server is the grpc wrapper of IndexNode.
type Server struct {
indexnode types.IndexNodeComponent
grpcServer *grpc.Server
grpcErrChan chan error
serverID atomic.Int64
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
etcdCli *clientv3.Client
}
indexnode是一个接口,实现indexnode api功能。
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
wg.Add(1)
rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
indexDataLocalPath := filepath.Join(rootPath, typeutil.IndexNodeRole)
cleanLocalDir(indexDataLocalPath)
return runComponent(ctx, localMsg, wg, components.NewIndexNode, metrics.RegisterIndexNode)
}
// creator用NewIndexNode替换
role, err = creator(ctx, factory)
components.NewIndexNode是一个函数。
NewIndexNode()用来创建IndexNode结构体。
// NewIndexNode creates a new IndexNode
func NewIndexNode(ctx context.Context, factory dependency.Factory) (*IndexNode, error) {
var err error
n := &IndexNode{}
svr, err := grpcindexnode.NewServer(ctx, factory)
if err != nil {
return nil, err
}
n.svr = svr
return n, nil
}
grpcindexnode.NewServer()产生的是本结构体Server。
// NewServer create a new IndexNode grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
node := indexnode.NewIndexNode(ctx1, factory)
return &Server{
loopCtx: ctx1,
loopCancel: cancel,
indexnode: node,
grpcErrChan: make(chan error),
}, nil
}
indexnode.NewIndexNode()返回一个结构体,是 types.IndexNodeComponent接口的一个实现
执行Run()
Server结构体创建后,调用结构体的Run()方法。
func runComponent[T component](ctx context.Context,
localMsg bool,
runWg *sync.WaitGroup,
creator func(context.Context, dependency.Factory) (T, error),
metricRegister func(*prometheus.Registry),
) component {
var role T
sign := make(chan struct{})
go func() {
factory := dependency.NewFactory(localMsg)
var err error
role, err = creator(ctx, factory)
if localMsg {
paramtable.SetRole(typeutil.StandaloneRole)
} else {
paramtable.SetRole(role.GetName())
}
if err != nil {
panic(err)
}
close(sign)
// 在这里调用对应组件结构体的Run()方法,这里是IndexNode结构体
if err := role.Run(); err != nil {
panic(err)
}
runWg.Done()
}()
......
}
runComponent是一个包裹函数。
// Run starts service
func (n *IndexNode) Run() error {
if err := n.svr.Run(); err != nil {
log.Error("IndexNode starts error", zap.Error(err))
return err
}
log.Debug("IndexNode successfully started")
return nil
}
Run()方法调用n.svr.Run()方法。srv是grpcindexnode.NewServer()返回的结构体。
进入Run()方法:
// Run initializes and starts IndexNode's grpc service.
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Debug("IndexNode init done ...")
if err := s.start(); err != nil {
return err
}
log.Debug("IndexNode start done ...")
return nil
}
接下来分析s.init()和s.start()方法。
s.init()
// init initializes IndexNode's grpc service.
func (s *Server) init() error {
etcdConfig := ¶mtable.Get().EtcdCfg
Params := ¶mtable.Get().IndexNodeGrpcServerCfg
var err error
if !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {
paramtable.Get().Save(Params.Port.Key, fmt.Sprintf("%d", funcutil.GetAvailablePort()))
log.Warn("IndexNode get available port when init", zap.Int("Port", Params.Port.GetAsInt()))
}
defer func() {
if err != nil {
err = s.Stop()
if err != nil {
log.Error("IndexNode Init failed, and Stop failed")
}
}
}()
s.loopWg.Add(1)
// 启动grpc服务,默认端口21121
go s.startGrpcLoop(Params.Port.GetAsInt())
// wait for grpc server loop start
err = <-s.grpcErrChan
if err != nil {
log.Error("IndexNode", zap.Any("grpc error", err))
return err
}
etcdCli, err := etcd.GetEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
log.Debug("IndexNode connect to etcd failed", zap.Error(err))
return err
}
s.etcdCli = etcdCli
s.indexnode.SetEtcdClient(etcdCli)
s.indexnode.SetAddress(Params.GetAddress())
// 真正执行初始化
err = s.indexnode.Init()
if err != nil {
log.Error("IndexNode Init failed", zap.Error(err))
return err
}
return nil
}
这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。
s.startGrpcLoop()启动grpc端口服务。
最终调用s.indexnode.Init()进行初始化,代码位置:internal\indexnode\indexnode.go
s.indexnode是接口类型types.IndexNodeComponent,IndexNodeComponent继承于Component。
type IndexNodeComponent interface {
IndexNode
SetAddress(address string)
GetAddress() string
SetEtcdClient(etcdClient *clientv3.Client)
UpdateStateCode(stateCode commonpb.StateCode)
}
// IndexNode is the interface `indexnode` package implements
type IndexNode interface {
Component
indexpb.IndexNodeServer
}
// Component is the interface all services implement
type Component interface {
Init() error
Start() error
Stop() error
Register() error
}
接口套接口:
RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component
各组件最终的Init()初始化代码路径:
internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()
回过头来继续querynode的init。
// Init initializes the IndexNode component.
func (i *IndexNode) Init() error {
var initErr error
i.initOnce.Do(func() {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("IndexNode init", zap.String("state", i.lifetime.GetState().String()))
err := i.initSession()
if err != nil {
log.Error("failed to init session", zap.Error(err))
initErr = err
return
}
log.Info("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID))
i.initSegcore()
})
log.Info("Init IndexNode finished", zap.Error(initErr))
return initErr
}
从代码可以看出初始化是在填充IndexNode结构体。
s.start()
启动组件的逻辑。
// start starts IndexNode's grpc service.
func (s *Server) start() error {
err := s.indexnode.Start()
if err != nil {
return err
}
err = s.indexnode.Register()
if err != nil {
log.Error("IndexNode Register etcd failed", zap.Error(err))
return err
}
log.Debug("IndexNode Register etcd success")
return nil
}
s.indexnode是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。
Register():向元数据etcd注册。
Start():用来启动组件。
// Start starts the IndexNode component.
func (i *IndexNode) Start() error {
var startErr error
i.once.Do(func() {
startErr = i.sched.Start()
i.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("IndexNode", zap.Any("State", i.lifetime.GetState().String()))
})
log.Info("IndexNode start finished", zap.Error(startErr))
return startErr
}
node节点都没有standby,coord节点有standby。