【obdiag共建之路】—— OceanBase 敏捷诊断工具“一键集群洞察“功能共建

发布于:2024-11-29 ⋅ 阅读:(23) ⋅ 点赞:(0)

作者:某上市公司DBA,obdiag SIG Committer

前段时间一年一度的双11终于落下了帷幕,终于有时间坐下来好好整理之前在obdiag diaplay共建过程中的点点滴滴,从8月初刚接到这个任务时候对obdiag内部架构的一无所知,再到后面慢慢熟悉和构建功能,再到后来10月功能开发完成顺利发版的喜悦。也是小小的满足了一下自己的成就感。在此,也是衷心的希望各位运维大牛和开发大佬可以加入obdiag sig,一起共建一款简单、高效和易用的OceanBase敏捷诊断工具。

obdiag display是obdiag的第五个一级功能,这个功能构建的初衷是将常用的一些命令和SQL集成到obdiag中,快速响应并展示结果,无需任何交互,不需要积累运维SQL资产,让小白用户也能像老鸟一样在OceanBase的海洋中遨游。另外obdiag display框架中支持自定义场景的集成,只需要编辑好我们yaml文件,就可以自助的添加我们的展示场景,无需修改任何源码,非常方便。

obdiag的功能介绍

目前obdiag display已经在obdiag2.5.0中集成,大家可以下载安装体验,obdiag display一期集成的功能如下:

功能 描述 执行步骤
display场景展示 展示所有的display场景 obdiag display scene list
集群信息展示 展示集群信息 obdiag display scene run --scene=observer.cluster_info
event信息展示 展示event分类信息 obdiag display scene run --scene=observer.event --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest'
锁等待信息展示 展示锁阻塞信息 obdiag display scene run --scene=observer.lockholder --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest'
SQL的执行计划展示 展示SQL的执行计划 obdiag display scene run --scene=observer.plan --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest' --env sqlid=test
租户的processlist信息展示 展示processlist信息 obdiag display scene run --scene=observer.processlist --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest'
rootservice信息展示 展示rootservice的信息 obdiag display scene run --scene=observer.rs
server信息展示 展示server的信息 obdiag display scene run --scene=observer.server_info
慢sql信息展示 展示慢sql的信息 obdiag display scene run --scene=observer.slowsql --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest' --env mtime=10
表信息展示 展示表的信息 obdiag display scene run --scene=observer.table_info --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest' --env database_name=test --env table_name=test
租户信息展示 展示租户的信息 obdiag display scene run --scene=observer.tenant_info --env tenant_name=test
topsql信息展示 展示topsql的信息 obdiag display scene run --scene=observer.topsql --env db_connect='-h127.0.0.1 -P2881 -utest@test -p****** -Dtest' --env mtime=10
unit信息展示 展示unit的信息 obdiag display scene run --scene=observer.unit_info
zone的信息展示 展示zone的信息 obdiag display scene run --scene=observer.zone_info

我们可以执行obdiag display scene list来查看obdiag display支持的所有场景。

1732505666

Tips:obdiag是支持tab补全的,对命令还不熟悉的同学可以输入obdiag命令然后一直按tab就可以选择自己想要的功能了。

obdiag display的场景共建

由于obdiag框架代码中已经实现了对场景配置文件的读取和识别功能,因此我们可以直接在~/.obdiag/display/tasks/observer/中通过新建yaml配置文件来新增展示场景

如processlist场景的配置如下

名称:processlist.yaml

info_en: "[processlist]"
info_cn: "[查看processlist]"
command: obdiag display scene run --scene=observer.processlist --env tenant_name=test
task:
  - version: "[4.0.0.0, *]"
    steps:
      - type: sql
        sql: "select * from oceanbase.gv$ob_processlist where tenant='{tenant_name}';"
        global: true
  - version: "[3.0.0.0, 4.0.0.0]"
    steps:
      - type: sql
        sql: "select * from oceanbase.__all_virtual_processlist where tenant='{tenant_name}';"
        global: true

info_en:场景的英文名称

info_cn:   场景的中文名称

command:运行场景的命令示例,此处--scene=observer后的内容需要与配置名称中的xxx.yaml保持一致,否则可能会有问题

task:任务操作

version:场景支持的版本,不同的版本步骤可能不同

steps:任务步骤

type:步骤操作的类型,此处为sql代表该步骤执行的是sql操作

sql:具体的sql内容,在obdiag diaplay中,如果需要在sql引入变量,并且在执行时进行变量传递,可以使用{变量名}的占位符,并且在执行时使用--env 变量名=变量值进行变量传递,目前有计划后续统改成#{变量名}的占位符,清大家留意后续的release列表

global:是否为全局步骤,如为true,则集群只执行一次,其余节点跳过,否则,所有节点都会执行

具体的场景共建内容,可以移步官方文档: https://www.oceanbase.com/docs/common-obdiag-cn-1000000001491192,其中obdiag check/gather/display scene场景新增逻辑是相同的。

在我们正确编辑好yaml文件之后,再运行obdiag display scene list就可以看到我们的新增场景了,无需修改任何代码。

obdiag 的代码结构解析

在我们对obdiag使用和场景等结构熟悉之后,有创意的小伙伴就可以开始着手修改源码并为obdiag贡献功能了,以obdiag display为例,obdiag的代码结构大致如下:

├── common/ ⼀些通⽤的功能抽取到common⽬录下,便于其他模块调⽤
├── conf/ obdiag的内置的配置⽂件,编译的rpm包安装的时候放在了/usr/local/oceanbase
-diagnostic-tool/conf
│ └── inner_config.yml
├── dependencies/ 依赖的包,⽐如obstack的⼆进制⽂件等存放的⽬录
├── docs/ ⽂档
├── example/ ⽤户侧配置样例,安装后会放在~/.obdiag/example/
├── handler
│ ├── display/ 信息展示的代码
│ │ ├── scenes
│ │ | ├── base.py 信息展示模块的代码⼊⼝,调⽤从这出发
│ │ | ├── list.py 信息展示场景列表展示的代码
│ │ | └── register.py 信息展示注册代码
│ │ ├── step/ 信息展示具体的逻辑处理
│ │ │ ├── base.py 
│ │ │ ├── sql.py
│ │ │ └── ssh.py
│ │ ├── tasks/ 实际的信息展示的yaml,开发者可以在这个⽬录下添加展示的场景
│ │ │ ├── obproxy
│ │ │ ├── observer
│ │ │ └── other
│ └── display_scenes.py 
├── main.py 整个项⽬的⼊⼝
├── diag_cmd.py 命令⾏注册解析
├── config.py ⽤户侧配置⽂件以及系统配置⽂件解析和⽣成
├── core.py 被main.py调⽤,调⽤对应诊断模块
├── context.py 封装context,透传到各个诊断模块的上下⽂全靠这部分能⼒来实现
├── stdio.py ⽇志打印、动画、进度条等打印相关
├── err.py 错误打印和建议
├── dev_init.sh 开发环境初始化脚本
├── init_obdiag_cmd.sh obdiag命令补全实现脚本
├── init.sh 安装obdiag时环境初始化脚本
├── resources/ ⼀些css资源
├── rpm/ 打rpm包的⽂件夹
├── telemetry/ 遥测功能
├── test/ 测试模块
└── update/ 热更新代码

obdiag的调用关系图如下:

1732505722

main.py的内容如下,main.py整个项⽬的⼊⼝,是官方的老师已经实现好的,在共建中我们不需要关心和修改这部分代码

import sys
from diag_cmd import MainCommand
from stdio import IO

ROOT_IO = IO(1)

if __name__ == '__main__':
    defaultencoding = 'utf-8'
    if sys.getdefaultencoding() != defaultencoding:
        try:
            from imp import reload
        except:
            pass
        reload(sys)
        sys.setdefaultencoding(defaultencoding)
    ROOT_IO.track_limit += 2
    if MainCommand().init(sys.argv[0], sys.argv[1:]).do_command():
        ROOT_IO.exit(0)
    else:
        ROOT_IO.exit(1)

我们需要在diag_cmd中注册我们的命令,以display为例,需要增加display scene list和display scene run两条命令,并在命令中添加一些参数,其中action=append行为代表参数可以指定多次,如--env key1=value1 --env key2=values2

class ObdiagDisplaySceneListCommand(ObdiagOriginCommand):

    def __init__(self):
        super(ObdiagDisplaySceneListCommand, self).__init__('list', 'display scene list')

    def init(self, cmd, args):
        super(ObdiagDisplaySceneListCommand, self).init(cmd, args)
        return self

    def _do_command(self, obdiag):
        return obdiag.display_scenes_list(self.opts)

class ObdiagDisplaySceneRunCommand(ObdiagOriginCommand):

    def __init__(self):
        super(ObdiagDisplaySceneRunCommand, self).__init__('run', 'display scene run')
        self.parser.add_option('--scene', type='string', help="Specify the scene to be display")
        self.parser.add_option('--from', type='string', help="specify the start of the time range. format: 'yyyy-mm-dd hh:mm:ss'")
        self.parser.add_option('--to', type='string', help="specify the end of the time range. format: 'yyyy-mm-dd hh:mm:ss'")
        self.parser.add_option('--since', type='string', help="Specify time range that from 'n' [d]ays, 'n' [h]ours or 'n' [m]inutes. before to now. format: <n> <m|h|d>. example: 1h.", default='30m')
        self.parser.add_option('--env', action="append", type='string', help='env options Format: --env key=value')
        self.parser.add_option('-c', type='string', help='obdiag custom config', default=os.path.expanduser('~/.obdiag/config.yml'))
        self.parser.add_option('--config', action="append", type="string", help='config options Format: --config key=value')

    def init(self, cmd, args):
        super(ObdiagDisplaySceneRunCommand, self).init(cmd, args)
        return self

    def _do_command(self, obdiag):
        return obdiag.display_function('display_scenes_run', self.opts)

在声明好命令和参数之后,我们需要在diag_cmd中使用下列代码把命令注册

class ObdiagDisplayCommand(MajorCommand):

    def __init__(self):
        super(ObdiagDisplayCommand, self).__init__('display', 'display oceanbase info')
        self.register_command(ObdiagDisplaySceneCommand())


class ObdiagDisplaySceneCommand(MajorCommand):

    def __init__(self):
        super(ObdiagDisplaySceneCommand, self).__init__('scene', 'Display scene diagnostic info')
        self.register_command(ObdiagDisplaySceneListCommand())
        self.register_command(ObdiagDisplaySceneRunCommand())

core.py主要是被main.py调用,并调用对应的诊断模块的,这块没有什么特别,和其它功能保持一致即可,如display功能是调用DisplaySceneHandler和DisplayScenesListHandler两个Handler

    def display_function(self, function_type, opt):
        config = self.config_manager
        if not config:
            self._call_stdio('error', 'No such custum config')
            return ObdiagResult(ObdiagResult.INPUT_ERROR_CODE, error_data='No such custum config')
        else:
            self.stdio.print("{0} start ...".format(function_type))
            self.update_obcluster_nodes(config)
            self.set_context(function_type, 'display', config)
            timestamp = TimeUtils.get_current_us_timestamp()
            self.context.set_variable('display_timestamp', timestamp)
            if function_type == 'display_scenes_run':
                handler = DisplaySceneHandler(self.context)
                return handler.handle()
            else:
                self._call_stdio('error', 'Not support display function: {0}'.format(function_type))
                return ObdiagResult(ObdiagResult.INPUT_ERROR_CODE, error_data='Not support display function: {0}'.format(function_type))

    def display_scenes_list(self, opt):
        self.set_offline_context('display_scenes_list', 'display')
        handler = DisplayScenesListHandler(self.context)
        return handler.handle()

handler的调用首先会进入display_scenes.py这个文件中,这里主要需要注意的逻辑有两处:

 #line 94
 #此处用于初始化数据库连接,如果通过--env 'db_connect = '-hxx -Pxx -uxx -pxx -Dxx'传入了数据库连接串,并且连接串是有效的,则以传入的连接串为准,
 #否则,使用sys租户的连接串,其中,sys租户的连接串的信息来自于配置文件
    def __init_db_conn(self, cli_connection_string):
        try:
            self.db_conn = StringUtils.parse_mysql_conn(cli_connection_string)
            if StringUtils.validate_db_info(self.db_conn):
                self.__init_db_connector()
            else:
                self.stdio.error("db connection information requird [db_connect = '-hxx -Pxx -uxx -pxx -Dxx'] but provided {0}, please check the --env {0}".format(env_dict))
                self.db_connector = self.sys_connector
        except Exception as e:
            self.stdio.exception("init db connector, error: {0}, please check --env option ")
            
#line 215
#判断--env是否有传值,使用StringUtils.parse_env_display方法解析出连接串信息,并初始化数据库连接串,主要是为了适用于需要连接到业务租户的场景
if env_option:
            env_dict = StringUtils.parse_env_display(env_option)
            self.env = env_dict
            cli_connection_string = self.env.get("db_connect")
            if cli_connection_string != None:
                self.__init_db_conn(cli_connection_string)
            else:
                self.db_connector = self.sys_connector
        else:
            self.db_connector = self.sys_connector

通过display_scenes.py中的方法对配置和变量等信息进行解析和初始化之后,我们会通过execute方法执行对应的任务,主要有两种__execute_yaml_task_one和__execute_code_task_one,在display中所有的任务都是yaml任务,这个执行方法最终会初始化一个SceneBase类,这个类位于obdiag/handler/display/scenes/base.py中

 #line 84
 for key, value in zip(self.yaml_tasks.keys(), self.yaml_tasks.values()):
     self.__execute_yaml_task_one(key, value)
 for task in self.code_tasks:
     self.__execute_code_task_one(task)

scenes/base.py:

 def execute(self):
        try:
            if self.mode == "yaml":
                if self.task_type == "observer":
                    self.__execute_yaml_mode(self.ob_nodes)
                elif self.task_type == "obproxy":
                    self.__execute_yaml_mode(self.obproxy_nodes)
                elif self.task_type == "other":
                    self.__execute_yaml_mode(self.ob_nodes)
                    self.__execute_yaml_mode(self.obproxy_nodes)
            elif self.mode == "code":
                self.__execute_code_mode()
            else:
                self.stdio.error("Unsupported mode. SKIP")
                raise Exception("Unsupported mode. SKIP")
        except Exception as e:
            raise Exception("execute failed, error: {0}".format(e))

base.py最终会执行__execute_yaml_mode方法,执行实际的step步骤,对于每一个node的和配置文件中的每一个step,都会初始化一个Base类来执行,这个类位于obdiag/handler/display/step/base.py中

        for node in nodes:
            # self.stdio.print("run scene excute yaml mode in node: {0} start".format(StringUtils.node_cut_passwd_for_log(node['ip'], self.stdio)))
            steps = self.scene[steps_nu]
            nu = 1
            node_number = node_number + 1
            for step in steps["steps"]:
                try:
                    self.stdio.verbose("step nu: {0}".format(nu))
                    if len(self.cluster) == 0:
                        self.stdio.error("cluster is not exist")
                        return
                    step_run = Base(self.context, step, node, self.cluster, self.scene_variable_dict, self.env, node_number, self.db_connector)
                    self.stdio.verbose("step nu: {0} initted, to execute".format(nu))
                    step_run.execute()
                    self.scene_variable_dict = step_run.update_task_variable_dict()

stop/base.py

def execute(self):
        self.stdio.verbose("step: {0}".format(self.step))
        no_cluster_name_msg = "(Please set ob_cluster_name or obproxy_cluster_name)"
        try:
            if "ip" in self.node:
                self.task_variable_dict["remote_ip"] = self.node["ip"]
            elif "ssh_type" in self.node and self.node["ssh_type"] == "docker":
                self.stdio.verbose("execute ssh_type is docker")
                ssh_client = SshClient(self.context, self.node)
                self.task_variable_dict["remote_ip"] = ssh_client.get_ip()
            self.task_variable_dict["remote_home_path"] = self.node["home_path"]

            if "type" not in self.step:
                self.stdio.error("Missing field :type")
            if (self.node_number > 1) and self.step.get("global") and (self.step.get("global") is True):
                self.stdio.verbose("step sets the value of the global is true and it is processing the {0} node, skipping display".format(self.node_number))
            else:
                if self.step["type"] == "sql":
                    handler = StepSQLHandler(self.context, self.step, self.cluster, self.task_variable_dict, self.env, self.db_connector)
                    handler.execute()
                else:
                    self.stdio.error("the type not support: {0}".format(self.step["type"]))

base主要有ssh和sql两个类型的执行,display目前只使用sql的方法, 这部分内容没有特别需要注意的,可以直接复用gather实现的方式。

最终的执行是使用StepSQLHandler类,核心的代码如下:

        sql = StringUtils.replace_parameters(sql, self.env)
            self.stdio.verbose("StepSQLHandler execute: {0}".format(sql))
            columns, data = self.db_connector.execute_sql_return_columns_and_data(sql)
            if data is None or len(data) == 0:
                self.stdio.verbose("excute sql: {0},  result is None".format(sql))
            table = PrettyTable(columns)
            for row in data:
                table.add_row(row)
            title = self.step.get("tittle")
            if title is not None:
                title = StringUtils.replace_parameters(title, self.env)
                formatted_title = f"\n[obdiag display]: {title} "
                self.stdio.print(formatted_title)
            self.stdio.print(table)          

使用StringUtils.replace_parameters方法把sql中的占位符替换为--env key=value中传入的值,并且执行sql,使用PrettyTable工具格式化输出,最后使用self.stdio.print(table)打印结果。

上述就是display功能实现的主要代码。其它还有一些周边和集成的功能:

common.tool的StringUtils通用方法:

 @staticmethod
    def parse_env_display(env_list):
        env_dict = {}
        for env_string in env_list:
            # 分割键和值
            key_value = env_string.split('=', 1)
            if len(key_value) == 2:
                key, value = key_value
                if value.startswith('"') and value.endswith('"'):
                    value = value[1:-1]
                elif value.startswith("'") and value.endswith("'"):
                    value = value[1:-1]
                env_dict[key.strip()] = value.strip()
        return env_dict

    @staticmethod
    def extract_parameters(query_template):
        # 使用正则表达式查找占位符
        pattern = re.compile(r'\{(\w+)\}')
        parameters = pattern.findall(query_template)
        return parameters

    @staticmethod
    def replace_parameters(query_template, params):
        # 使用正则表达式查找占位符
        pattern = re.compile(r'\{(\w+)\}')

        # 定义替换函数
        def replacer(match):
            key = match.group(1)
            return str(params.get(key, match.group(0)))

        # 替换占位符
        query = pattern.sub(replacer, query_template)
        return query

 init_diag_cmd.sh中的自定义补全规则脚本:

#line 20
display)
                    if [ "$COMP_CWORD" -eq 2 ]; then
                        type_list="scene"
                    elif [ "${COMP_WORDS[2]}" = "scene" ] && [ "$COMP_CWORD" -eq 3 ]; then
                        type_list="list run"
                    fi
                    ;;
#line 49
      elif [ "${COMP_WORDS[1]}" = "display" ] && [ "${COMP_WORDS[2]}" = "scene" ]; then
                type_list="list run"
                COMPREPLY=($(compgen -W "${type_list}" -- "${cur_word}"))

dev_init.sh中的开发环境初始化脚本:

#line 43
mkdir -p ${OBDIAG_HOME}/display
 
#line 53
if [ -d "${WORK_DIR}/handler/display/tasks" ]; then
        cp -rf ${WORK_DIR}/handler/display/tasks  ${OBDIAG_HOME}/display/
fi 

安装obdiag时的环境初始化脚本init.sh,主要作用是把相关yaml文件拷贝到.obdiag工作目录中:

#line 20
mkdir -p ${OBDIAG_HOME}/display

#line 37
if [ -d "${WORK_DIR}/display" ]; then
    cp -rf ${WORK_DIR}/display  ${OBDIAG_HOME}/
fi

从obdiag display的使用介绍到obdiag display场景共建再到obdiag display的源码共建,在这里算是小小的为各位社区大佬抛砖引玉了,期待各路大神的参与到obdiag的共建中来,也期待obdiag 第六乃至第N个一级功能的诞生,最终打造一款简易、易用和高效的OceanBase敏捷诊断工具。