【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理

发布于:2024-09-05 ⋅ 阅读:(12) ⋅ 点赞:(0)

Sharding-JDBC系列

1、Sharding-JDBC分库分表的基本使用

2、Sharding-JDBC分库分表之SpringBoot分片策略

3、Sharding-JDBC分库分表之SpringBoot主从配置

4、SpringBoot集成Sharding-JDBC-5.3.0分库分表

5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表

6、【源码】Sharding-JDBC源码分析之JDBC

7、【源码】Sharding-JDBC源码分析之SPI机制

8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理

9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)

10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)

11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理

12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理

13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理

14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理

前言

【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理-CSDN博客博文的3.1.2中介绍了ContextManager的创建,本文从源码角度分享一下ContextManager的创建过程中获取配置的数据源的元数据的实现原理。

StandaloneContextManagerBuilder回顾

【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理-CSDN博客博文中,介绍了ContextManager是通过StandaloneContextManagerBuilder的bulid()方法创建的。在创建ContextManager前,先结合mode的配置,通过JDBCRepository实现了配置信息的持久化存储。

StandaloneContextManagerBuilder的bulid()的源码如下:

package org.apache.shardingsphere.mode.manager.standalone;

/**
 * 单机上下文管理创建者。持久化配置信息到H2或Mysql
 */
public final class StandaloneContextManagerBuilder implements ContextManagerBuilder {
    
    @Override
    public ContextManager build(final ContextManagerBuilderParameter param) throws SQLException {
        // 获取配置的元数据持久化信息
        PersistRepositoryConfiguration repositoryConfig = param.getModeConfiguration().getRepository();
        // 获取持久化接口对象,默认为JDBCRepository对象
        StandalonePersistRepository repository = null == repositoryConfig
                ? RequiredSPIRegistry.getRegisteredService(StandalonePersistRepository.class)
                : TypedSPIRegistry.getRegisteredService(StandalonePersistRepository.class, repositoryConfig.getType(), repositoryConfig.getProps());
        // 新建元数据持久化service类
        MetaDataPersistService persistService = new MetaDataPersistService(repository);
        // 持久化配置信息,默认保存到h2数据库中
        persistConfigurations(persistService, param);
        InstanceContext instanceContext = buildInstanceContext(param);
        // 监听,处理Standalone的订阅者
        new ProcessStandaloneSubscriber(instanceContext.getEventBusContext());
        // 创建MetaDataContexts,创建ShardingSphereDatabase,保存到MetaDataContexts中
        MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, instanceContext);
        ContextManager result = new ContextManager(metaDataContexts, instanceContext);
        setContextManagerAware(result);
        return result;
    }
}

在build()方法中,持久化配置信息之后,继续执行如下:

1)执行buildInstanceContext(),创建InstanceContext;

2)实现Standalone模式的事件订阅,添加进程列表请求和终止进程列表的请求;

3)执行MetaDataContextsFactory.create()方法,创建MetadataContexts对象;

在创建MetadataContexts之前,先通过配置的数据源、分片规则及数据源默认的数据库类型,获取数据源中定义的元数据(如表、表的列、主键、索引等元数据信息),创建ShardingSphereDatabase对象;

4)创建ContextManager对象,保存了InstanceContext、MetadataContexts对象;

MetaDataContextsFactory

package org.apache.shardingsphere.mode.metadata;

/**
 * 元数据上下文工厂
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MetaDataContextsFactory {
    
    public static MetaDataContexts create(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param, final InstanceContext instanceContext) throws SQLException {
        return create(persistService, param, instanceContext, Collections.emptyMap());
    }
    
    public static MetaDataContexts create(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param,
                                          final InstanceContext instanceContext, final Map<String, StorageNodeDataSource> storageNodes) throws SQLException {
        // 获取数据库名称,默认为logic_db
        Collection<String> databaseNames = instanceContext.getInstance().getMetaData() instanceof JDBCInstanceMetaData
                ? param.getDatabaseConfigs().keySet()
                : persistService.getDatabaseMetaDataService().loadAllDatabaseNames();
        // 创建有效的数据库配置。从数据库中获取配置的信息。key为数据库名称(默认为logic_db),value为DataSourceProvidedDatabaseConfiguration
        Map<String, DatabaseConfiguration> effectiveDatabaseConfigs = createEffectiveDatabaseConfigurations(databaseNames, param.getDatabaseConfigs(), persistService);
        checkDataSourceStates(effectiveDatabaseConfigs, storageNodes, param.isForce());
        // 从数据库中获取全局规则配置信息
        Collection<RuleConfiguration> globalRuleConfigs = persistService.getGlobalRuleService().load();
        // 从数据库中获取配置的props信息
        ConfigurationProperties props = new ConfigurationProperties(persistService.getPropsService().load());
        // 创建ShardingSphereDatabase集合
        Map<String, ShardingSphereDatabase> databases = ShardingSphereDatabasesFactory.create(effectiveDatabaseConfigs, props, instanceContext);
        // 从repository表中重新加载对应数据库的信息
        databases.putAll(reloadDatabases(databases, persistService));
        // 创建规则元数据对象
        ShardingSphereRuleMetaData globalMetaData = new ShardingSphereRuleMetaData(GlobalRulesBuilder.buildRules(globalRuleConfigs, databases, props));
        return new MetaDataContexts(persistService, new ShardingSphereMetaData(databases, globalMetaData, props));
    }

    /**
     * 创建有效的数据库配置。根据数据库逻辑名,从持久化库中获取有效的DataSource、配置规则,封装成DataSourceProvidedDatabaseConfiguration。
     * key为逻辑名,value为DataSourceProvidedDatabaseConfiguration
     */
    private static Map<String, DatabaseConfiguration> createEffectiveDatabaseConfigurations(final Collection<String> databaseNames,
                                                                                            final Map<String, DatabaseConfiguration> databaseConfigs, final MetaDataPersistService persistService) {
        return databaseNames.stream().collect(
                Collectors.toMap(each -> each, each -> createEffectiveDatabaseConfiguration(each, databaseConfigs, persistService), (a, b) -> b, () -> new HashMap<>(databaseNames.size(), 1)));
    }
    
    private static DatabaseConfiguration createEffectiveDatabaseConfiguration(final String databaseName,
                                                                              final Map<String, DatabaseConfiguration> databaseConfigs, final MetaDataPersistService persistService) {
        // 从持久化库中获取有效的DataSource
        Map<String, DataSource> effectiveDataSources = persistService.getEffectiveDataSources(databaseName, databaseConfigs);
        // 从持久化库中获取规则配置
        Collection<RuleConfiguration> databaseRuleConfigs = persistService.getDatabaseRulePersistService().load(databaseName);
        // 封装成DataSourceProvidedDatabaseConfiguration对象
        return new DataSourceProvidedDatabaseConfiguration(effectiveDataSources, databaseRuleConfigs);
    }
    
    private static void checkDataSourceStates(final Map<String, DatabaseConfiguration> databaseConfigs, final Map<String, StorageNodeDataSource> storageNodes, final boolean force) {
        Map<String, DataSourceState> storageDataSourceStates = getStorageDataSourceStates(storageNodes);
        databaseConfigs.forEach((key, value) -> {
            if (!value.getDataSources().isEmpty()) {
                DataSourceStateManager.getInstance().initStates(key, value.getDataSources(), storageDataSourceStates, force);
            }
        });
    }
    
    private static Map<String, DataSourceState> getStorageDataSourceStates(final Map<String, StorageNodeDataSource> storageDataSourceStates) {
        Map<String, DataSourceState> result = new HashMap<>(storageDataSourceStates.size(), 1);
        storageDataSourceStates.forEach((key, value) -> {
            List<String> values = Splitter.on(".").splitToList(key);
            Preconditions.checkArgument(3 == values.size(), "Illegal data source of storage node.");
            String databaseName = values.get(0);
            String dataSourceName = values.get(2);
            result.put(databaseName + "." + dataSourceName, DataSourceState.valueOf(value.getStatus().toUpperCase()));
        });
        return result;
    }

    /**
     * 从repository中重新加载数据库信息。如果没有,则为原来的信息
     */
    private static Map<String, ShardingSphereDatabase> reloadDatabases(final Map<String, ShardingSphereDatabase> databases, final MetaDataPersistService persistService) {
        Map<String, ShardingSphereDatabase> result = new ConcurrentHashMap<>(databases.size(), 1);
        databases.forEach((key, value) -> {
            Map<String, ShardingSphereSchema> schemas = persistService.getDatabaseMetaDataService().loadSchemas(key);
            result.put(key.toLowerCase(), new ShardingSphereDatabase(value.getName(),
                    value.getProtocolType(), value.getResourceMetaData(), value.getRuleMetaData(), schemas.isEmpty() ? value.getSchemas() : schemas));
        });
        return result;
    }
}

在create()方法,主要执行如下:

1)获取数据库名称,默认为logic_db;

2)创建有效的数据库配置Map集合,key为数据库名称,value为DataSourceProvidedDatabaseConfiguration对象;

2.1)通过配置信息持久化MetaDataPersistService,从持久化存储库中获取对应数据库的配置信息。并通过YamlDataSourceConfigurationSwapper,转换为DataSourceProperties对象,然后通过DataSourcePoolCreator,采用反射,创建对应的DataSource对象;

详见:【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)-CSDN博客

2.2)通过配置信息持久化MetaDataPersistService,从持久化存储库中获取对应数据库的规则配置信息。并通过YamlRuleConfigurationSwapperEngine,转换为RuleConfiguration对象

2.3)创建DataSourceProvidedDatabaseConfiguration对象;

3)检测数据源的状态;

3.1)如果是分布式部署的,storageNodes为从分布式配置中心中间件中获取保存的数据源节点的状态信息;

3.2)遍历2)中创建的有效的数据库Map集合中的数据源,如果在storageNodes中,且定义为DISABLE,则记录当前的数据库的数据源的状态为DISABLE;否则,通过数据源的DataSource对象获取Connection对象,如果能够正常获取,则记录当前的数据源的状态为ENABLE,如果获取不了Connection,则抛异常;

4)从配置信息持久化存储库中获取全局规则配置RuleConfiguration集合;

5)从配置信息持久化存储库中获取props配置信息,封装为ConfigurationProperties对象;

6)执行ShardingSphereDatabasesFactory.create(),创建ShardingSphereDatabase集合;

7)创建ShardingSphereRuleMetaData对象,保存全局规则的副本;

8)创建MetaDataContexts对象,保存MetaDataPersistService、ShardingSphereMetaData对象,其中ShardingSphereMetaData保存了规则、ShardingSphereDatabase及其他信息;

ShardingSphereDatabasesFactory

ShardingSphereDatabasesFactory的源码如下:

package org.apache.shardingsphere.infra.metadata.database;

public final class ShardingSphereDatabasesFactory {

    public static ShardingSphereDatabase create(final String databaseName, final DatabaseConfiguration databaseConfig,
                                                final ConfigurationProperties props, final InstanceContext instanceContext) throws SQLException {
        DatabaseType protocolType = DatabaseTypeEngine.getProtocolType(databaseName, databaseConfig, props);
        Map<String, DatabaseType> storageTypes = DatabaseTypeEngine.getStorageTypes(databaseName, databaseConfig);
        return ShardingSphereDatabase.create(databaseName, protocolType, storageTypes, databaseConfig, props, instanceContext);
    }

    /**
     * 创建ShardingSphereDatabase集合
     * @param databaseConfigMap 从持久化存储库中获取的数据库的配置信息对象
     * @param props 从持久化存储库中获取的props配置信息
     * @param instanceContext 实例上下文
     */
    public static Map<String, ShardingSphereDatabase> create(final Map<String, DatabaseConfiguration> databaseConfigMap,
                                                             final ConfigurationProperties props, final InstanceContext instanceContext) throws SQLException {
        // 获取数据库类型,默认为MYSQLDatabaseType
        DatabaseType protocolType = DatabaseTypeEngine.getProtocolType(databaseConfigMap, props);
        // 定义一个Map,长度为配置的数据库个数加上对应数据源类型的系统库。如MySQL数据库,系统库有information_schema、mysql、sys等
        Map<String, ShardingSphereDatabase> result = new ConcurrentHashMap<>(databaseConfigMap.size() + protocolType.getSystemDatabaseSchemaMap().size(), 1);
        // 创建通用数据库
        result.putAll(createGenericDatabases(databaseConfigMap, protocolType, props, instanceContext));
        // 创建系统默认的数据库。如MySQL数据库,创建information_schema、performance_schema、mysql、sys、shardingsphere
        result.putAll(createSystemDatabases(databaseConfigMap, protocolType));
        return result;
    }

    /**
     * 创建通用数据库
     */
    private static Map<String, ShardingSphereDatabase> createGenericDatabases(final Map<String, DatabaseConfiguration> databaseConfigMap, final DatabaseType protocolType,
                                                                              final ConfigurationProperties props, final InstanceContext instanceContext) throws SQLException {
        Map<String, ShardingSphereDatabase> result = new HashMap<>(databaseConfigMap.size(), 1);
        for (Entry<String, DatabaseConfiguration> entry : databaseConfigMap.entrySet()) {
            // 默认为logic_db
            String databaseName = entry.getKey();
            if (!entry.getValue().getDataSources().isEmpty() || !protocolType.getSystemSchemas().contains(databaseName)) {
                // 获取存储类型。通过DataSource获取Connection,再通过Connection的url判断哪种存储类型。如MysqlDatabaseType、H2DatabaseType等
                // key为对应的逻辑数据库名,value为对应的数据类型。如order_ds:MYSQLDatabaseType
                Map<String, DatabaseType> storageTypes = DatabaseTypeEngine.getStorageTypes(entry.getKey(), entry.getValue());
                result.put(databaseName.toLowerCase(), ShardingSphereDatabase.create(databaseName, protocolType, storageTypes, entry.getValue(), props, instanceContext));
            }
        }
        return result;
    }
    
    private static Map<String, ShardingSphereDatabase> createSystemDatabases(final Map<String, DatabaseConfiguration> databaseConfigMap, final DatabaseType protocolType) {
        Map<String, ShardingSphereDatabase> result = new HashMap<>(protocolType.getSystemDatabaseSchemaMap().size(), 1);
        for (String each : protocolType.getSystemDatabaseSchemaMap().keySet()) {
            if (!databaseConfigMap.containsKey(each) || databaseConfigMap.get(each).getDataSources().isEmpty()) {
                result.put(each.toLowerCase(), ShardingSphereDatabase.create(each, protocolType));
            }
        }
        return result;
    }
}

create()方法执行如下:

1)根据配置的props或数据库信息,获取数据库的类型;

1.1)通过props的proxy-frontend-database-protocol-type配置数据库类型(如配置为mysql,则数据库类型为MySQLDatabaseType);

1.2)如果没有通过props配置,从配置的数据库中,查找第一个ENABLE的数据源,通过数据源的url信息,确定数据库类型(如jdbc:mysql,则数据库类型为MySQLDatabaseType,jdbc:oracle,则数据库类型为OracleDatabaseType);

1.3)如果以上都没有,则默认返回MySQLDatabaseType;

2)定义一个Map,长度为配置的数据库个数加上对应数据源类型的系统库;

如MySQL数据库,系统库有information_schema、performance_schema、mysql、sys和shardingsphere,长度要加上5;

3)执行createGenericDatabases(),创建通用的数据库ShardingSphereDatabase对象,返回Map集合,其中key为数据库名称(默认为logic_db),value为ShardingSphereDatabase对象;

遍历配置的database配置信息,如果对应的数据库有配置数据源,且该数据库不属于数据库默认的数据库,则执行如下:

3.1)获取当前数据库配置的数据源对应的数据库类型;

3.2)执行ShardingSphereDatabase.create(),创建ShardingSphereDatabase对象;

3.3)将创建的ShardingSphereDatabase对象放入到Map中;

4)创建系统默认的数据库对应的ShardingSphereDatabase对象;

也是通过ShardingSphereDatabase.create()进行创建。

5)返回创建的所有ShardingSphereDatabase对象;

ShardingSphereDatabase

ShardingSphereDatabase的源码如下:

package org.apache.shardingsphere.infra.metadata.database;

/**
 * 数据库信息
 */
@Getter
public final class ShardingSphereDatabase {

    // 默认logic_db
    private final String name;

    // 数据库类型。如MySQLDatabaseType
    private final DatabaseType protocolType;

    // 资源元数据。数据源、数据库类型、数据源的元数据(hostname、port等)
    private final ShardingSphereResourceMetaData resourceMetaData;

    // 配置的规则的元数据集合,线程安全
    private final ShardingSphereRuleMetaData ruleMetaData;

    // schema中定义的表和视图的信息。key默认为logic_db
    private final Map<String, ShardingSphereSchema> schemas;
    
    public ShardingSphereDatabase(final String name, final DatabaseType protocolType, final ShardingSphereResourceMetaData resourceMetaData,
                                  final ShardingSphereRuleMetaData ruleMetaData, final Map<String, ShardingSphereSchema> schemas) {
        this.name = name;
        this.protocolType = protocolType;
        this.resourceMetaData = resourceMetaData;
        this.ruleMetaData = ruleMetaData;
        this.schemas = new ConcurrentHashMap<>(schemas.size(), 1);
        schemas.forEach((key, value) -> this.schemas.put(key.toLowerCase(), value));
    }
    
    /**
     * 创建一个ShardingSphereDatabase
     * @param name 默认为logic_db
     * @param protocolType 默认为MySQLDatabaseType
     * @param storageTypes 对应逻辑数据源及协议类型。如:order_ds: MySQLDatabaseType
     * @param databaseConfig 数据源配置。默认为DataSourceProvidedDatabaseConfiguration对象,数据源提供数据库配置。从数据库中获取的数据库配置信息
     * @param props 配置的props
     * @param instanceContext
     * @return
     * @throws SQLException
     */
    public static ShardingSphereDatabase create(final String name, final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes,
                                                final DatabaseConfiguration databaseConfig, final ConfigurationProperties props, final InstanceContext instanceContext) throws SQLException {
        // 获取配置的数据源规则,如ShardingRule(分片的规则。将ShardingRuleConfiguration中的规则配置项转化成对应规则的对象)等
        // 创建数据源的规则。根据配置的规则对象(xxxConfiguration)转化成对应规则对象(xxxRule)
        Collection<ShardingSphereRule> databaseRules = DatabaseRulesBuilder.build(name, databaseConfig, instanceContext);
        Map<String, ShardingSphereSchema> schemas = new ConcurrentHashMap<>();
        // 从数据源中组装数据库元数据(表元数据、表中列元数据)
        schemas.putAll(GenericSchemaBuilder.build(new GenericSchemaBuilderMaterial(protocolType, storageTypes,
                // 获取可用的DataSource的Map
                DataSourceStateManager.getInstance().getEnabledDataSourceMap(name, databaseConfig.getDataSources()), databaseRules, props,
                DatabaseTypeEngine.getDefaultSchemaName(protocolType, name))));
        schemas.putAll(SystemSchemaBuilder.build(name, protocolType));
        return create(name, protocolType, databaseConfig, databaseRules, schemas);
    }

    public static ShardingSphereDatabase create(final String name, final DatabaseType protocolType) {
        DatabaseConfiguration databaseConfig = new DataSourceProvidedDatabaseConfiguration(new LinkedHashMap<>(), new LinkedList<>());
        return create(name, protocolType, databaseConfig, new LinkedList<>(), SystemSchemaBuilder.build(name, protocolType));
    }

    /**
     * 创建ShardingSphereDatabase
     * @param name 默认logic_name
     * @param protocolType MySQLDatabaseType
     */
    private static ShardingSphereDatabase create(final String name, final DatabaseType protocolType, final DatabaseConfiguration databaseConfig,
                                                 final Collection<ShardingSphereRule> rules, final Map<String, ShardingSphereSchema> schemas) {
        // 资源元数据。数据源、数据库类型、数据源的元数据(hostname、port等)
        ShardingSphereResourceMetaData resourceMetaData = createResourceMetaData(name, databaseConfig.getDataSources());
        // 配置的规则的元数据集合,线程安全
        ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
        return new ShardingSphereDatabase(name, protocolType, resourceMetaData, ruleMetaData, schemas);
    }
    
    private static ShardingSphereResourceMetaData createResourceMetaData(final String databaseName, final Map<String, DataSource> dataSourceMap) {
        return new ShardingSphereResourceMetaData(databaseName, dataSourceMap);
    }
    
    public ShardingSphereSchema getSchema(final String schemaName) {
        return schemas.get(schemaName.toLowerCase());
    }

    public void putSchema(final String schemaName, final ShardingSphereSchema schema) {
        schemas.put(schemaName.toLowerCase(), schema);
    }
    
    public void removeSchema(final String schemaName) {
        schemas.remove(schemaName.toLowerCase());
    }
    
    public boolean containsSchema(final String schemaName) {
        return schemas.containsKey(schemaName.toLowerCase());
    }

    public boolean isComplete() {
        return !ruleMetaData.getRules().isEmpty() && !resourceMetaData.getDataSources().isEmpty();
    }

    public boolean containsDataSource() {
        return !resourceMetaData.getDataSources().isEmpty();
    }

    public synchronized void reloadRules(final Class<? extends ShardingSphereRule> ruleClass) {
        Collection<? extends ShardingSphereRule> toBeReloadedRules = ruleMetaData.findRules(ruleClass);
        RuleConfiguration ruleConfig = toBeReloadedRules.stream().map(ShardingSphereRule::getConfiguration).findFirst().orElse(null);
        Collection<ShardingSphereRule> databaseRules = new LinkedList<>(ruleMetaData.getRules());
        toBeReloadedRules.stream().findFirst().ifPresent(optional -> {
            databaseRules.removeAll(toBeReloadedRules);
            databaseRules.add(((MutableDataNodeRule) optional).reloadRule(ruleConfig, name, resourceMetaData.getDataSources(), databaseRules));
        });
        ruleMetaData.getRules().clear();
        ruleMetaData.getRules().addAll(databaseRules);
    }
}

5.1 通用ShardingSphereDatabase的创建

通过create(final String name, final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes,final DatabaseConfiguration databaseConfig, final ConfigurationProperties props, final InstanceContext instanceContext)创建通用的ShardingSphereDatabase对象。该方法执行如下:

1)执行DatabaseRulesBuilder.build()方法,将配置的数据库规则转换为ShardingSphereRule集合对象;

1.1)根据配置的规则,获取能够创建该配置规则的DatabaseRuleBuilder生成器;

1.2)如果没有配置Single,则默认添加Single的生成器;

1.3)通过DatabaseRuleBuilder生成器的build()方法,将RuleConfiguration,转换为ShardingSphereRule对象;

说明:分片规则信息在Yaml中配置,解析成对应YamlXxxRuleConfiguration。通过对用的YamlXxxRuleConfigurationSwapper,和XxxRuleConfiguration互转。通过对应的DatabaseRuleBuilder生成器,将XxxRuleConfiguration转换为对应的ShardingSphereRule对象;

2)添加配置的数据源的数据库元数据ShardingSphereSchema对象;

2.1)获取配置的分片表名;

2.2)通过配置的数据源,获取Connection连接,查找对应表的元数据信息。如字段、字段类型、主键、外键等,封装成SchemaMetaData对象;

2.3)将SchemaMetaData对象转换为ShardingSphereTable对象,封装到ShardingSphereSchema对象;

3)生成系统的schema。获取databaseType定义的databaseName的默认表集合,从包中获取对应表的yaml文件,解析成ShardingSphereSchema;

3.1)通过databaseType,获取对应DatabaseType定义的系统数据库及默认表信息;

3.2)从包的schema目录下找到对应表的yaml定义。yaml文件中定义了对应表的字段等信息,将信息解析为YamlShardingSpehreTable对象;

3.3)通过YamlTableSwapper转换器,将YamlShardingSpehreTable对象转换为ShardingSphereTable对象;

3.4)将ShardingSphereTable对象封装到ShardingSphereSchema对象;

4)执行同名的create()方法,创建ShardingSphereDatabase对象;

即如下的5.2的create()方法。

5.2 系统ShardingSphereDatabase的创建

通过create(final String name, final DatabaseType protocolType, final DatabaseConfiguration databaseConfig, final Collection<ShardingSphereRule> rules, final Map<String, ShardingSphereSchema> schemas)创建ShardingSphereDatabase对象。方法执行如下:

1)创建ShardingSphereResouceMetaData对象,用于保存配置的数据源信息;

2)创建ShardingSphereRuleMetaData对象,用于保存配置的规则信息;

3)创建ShardingSphereDatabase对象,传入上面的两个对象;

小结

限于篇幅,本篇先分享到这里。结合上一篇,以下做一个小结:

ShardingSphere通过ContextManagerBuilder的build()方法创建ContextManager对象。ContextManagerBuilder是一个接口类,只有build()一个接口方法。有两个实现类,分别为ClusterContextManagerBuilder和StandaloneContextManagerBuilder,分别对应集群和单机两种模式。

本篇以Standalone为例,在StandaloneContextManagerBuilder的build()方法中,执行流程如下:

1)通过mode的配置信息,获取对应的持久化配置对象。对于Standalone,此处为StandalonePersistRepositoryConfiguration对象;

本篇以Standalone为例,对于Cluster,实现思路类似,只是对应的持久化存储由MySQL、H2换为分布式配置中心中间件,如Nacos、Zookeeper、Etc、Consul等。

2)创建持久化StandalonePersistRepository对象,此处为JDBCRepository对象;

2.1)JDBCRepository根据mode配置中的props的provider(默认为H2),获取对应的JDBCRepositoryProvider。如MySQL的MySQLJDBCRepositoryProvider,其提供了repository表的创建、删除,以及repository表的增删改查操作的SQL语句;

2.2)根据mode配置中的props中的jdbc_url、username、password,创建HikariDataSource对象;

2.3)获取JDBCRepositoryProvider中创建repository表的SQL语句,通过HikariDataSource对象获取连接,执行SQL语句,实现持久化存储库的表的创建;

2.4)提供了repository表的插入、修改、删除方法;

3)创建元数据持久化service类,MetaDataPersistService对象;

3.1)该对象保存了数据库、数据库规则、全局规则、属性等对应的持久化存储Service对象,对应的Service持久化存储对象提供了保存对应配置信息的接口;

3.2)Service类中对应的持久化方法是通过 2)中的JDBCRepository对象,实现了对应配置信息持久化;

3.3)配置项以多路径的方式作为key存储在repository表中;

4)执行persistConfigurations()方法,调用MetaDataPersistService的持久化方法,持久化存储配置信息;

5)创建InstanceContext对象;

创建StandaloneModeContextManager、GlobalLockContext,传入InstanceContext构造器。

6)添加监听,处理Standalone模式的订阅者;

7)创建MetaDataContexts,创建ShardingSphereDatabase,保存到MetaDataContexts中;

7.1)通过配置信息持久化MetaDataPersistService,从持久化存储库中获取对应数据库的配置信息。并通过YamlDataSourceConfigurationSwapper,转换为DataSourceProperties对象,然后通过DataSourcePoolCreator,采用反射,创建对应的DataSource对象;

详见:【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)-CSDN博客

7.2)通过配置信息持久化MetaDataPersistService,从持久化存储库中获取对应数据库的规则配置信息。并通过YamlRuleConfigurationSwapperEngine,转换为RuleConfiguration对象

7.3)创建DataSourceProvidedDatabaseConfiguration对象,每个数据库一个,包含多个数据源;

7.4)遍历DataSourceProvidedDatabaseConfiguration对象,执行如下:

7.4.1)获取能够创建该配置规则的DatabaseRuleBuilder生成器。如果没有配置Single,则默认添加Single的生成器。通过DatabaseRuleBuilder生成器的build()方法,将RuleConfiguration,转换为ShardingSphereRule对象;

说明:分片规则信息在Yaml中配置,解析成对应YamlXxxRuleConfiguration。通过对用的YamlXxxRuleConfigurationSwapper,和XxxRuleConfiguration互转。通过对应的DatabaseRuleBuilder生成器,将XxxRuleConfiguration转换为对应的ShardingSphereRule对象;

详见:【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理-CSDN博客

7.4.2)获取配置的分片表、数据源中其他单表(没有设置分片的表)的元数据(如表名、表的列、主键、外键等),封装成ShardingSphereSchema对象;

7.4.3)通过DatabaseType,获取其定义的系统默认的数据库及库表名称,从包中获取对应表的yaml文件(文件中定义了对应表的表名、表的列等),解析成ShardingSphereSchema对象;

7.4.4)创建ShardingSphereDatabase对象;

8)创建ContextManager对象;

9)将ContextManager对象赋值给StandaloneModeContextManager;

10)返回创建的ContextManager对象;

关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。


网站公告

今日签到

点亮在社区的每一天
去签到