1.数据源管理
1.1 添加Gravitino数据源
添加成功之后,会在Gravitino中创建一个名为配置的中的meatalake
1.2. 添加Paimon数据源
属性gravitinoId
可以关联前面创建的Gravitino数据源,关联后,会在gravitino下创建一个该数据源的catalog
。
2. 集成演示
2.1 创建任务
入口:通过顶部菜单栏选择 任务开发,或通过快捷入口 快速创建任务。
任务类型:选择
FlinkPipeline
。
2.2 配置任务
点击任务名称,进入任务详情页。任务节点如下
功能说明:
使用DataGen
节点生成100条测试数据,使用JDBCWrite
将数据写入mysql的user表,再通过JDBCRead
读取mysql的user表,使用SQLExecute
写入到Paimon。通过SQLQuery
读取Paimon,最后通过ShowData
节点输入读取的数据。
{
"flow": {
"engineType": "flink",
"name": "flink_gravition_paimon",
"paths": [
{
"from": "Gravitino_tuky5",
"inport": "",
"outport": "",
"to": "DataGen_r6QRQ"
},
{
"from": "DataGen_r6QRQ",
"inport": "",
"outport": "",
"to": "JDBCWrite__yYGc"
},
{
"from": "JDBCWrite__yYGc",
"inport": "",
"outport": "",
"to": "JDBCRead_WREvh"
},
{
"from": "JDBCRead_WREvh",
"inport": "",
"outport": "",
"to": "SQLExecute_bMeVm"
},
{
"from": "SQLExecute_bMeVm",
"inport": "",
"outport": "",
"to": "SQLQuery_8jwT7"
},
{
"from": "SQLQuery_8jwT7",
"inport": "",
"outport": "",
"to": "ShowData_aMIL3"
}
],
"runMode": "DEBUG",
"stops": [
{
"bundle": "cn.piflow.bundle.flink.catalog.Gravitino",
"customizedProperties": {
},
"name": "Gravitino_tuky5",
"properties": {
"gravitinoUri": "http://127.0.0.1:8092/",
"metalake": "metalake"
},
"uuid": "Gravitino_tuky5"
},
{
"bundle": "cn.piflow.bundle.flink.common.DataGen",
"customizedProperties": {
},
"name": "DataGen_r6QRQ",
"properties": {
"count": "100",
"ratio": "10",
"registerTableName": "datagen_source",
"schema": "[{\"id\":\"323868\",\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":\"1\",\"end\":\"1000\"},{\"id\":\"479324\",\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":\"5\",\"index\":1}]"
},
"uuid": "DataGen_r6QRQ"
},
{
"bundle": "cn.piflow.bundle.flink.jdbc.JDBCWrite",
"customizedProperties": {
},
"name": "JDBCWrite__yYGc",
"properties": {
"driver": "com.mysql.jdbc.Driver",
"password": "123456",
"properties": "{}",
"tableDefinition": "{\"tableBaseInfo\":{},\"physicalColumnDefinition\":[{\"columnName\":\"id\",\"columnType\":\"INT\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"},{\"columnName\":\"name\",\"columnType\":\"STRING\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"}],\"metadataColumnDefinition\":[],\"computedColumnDefinition\":[],\"watermarkDefinition\":{}}",
"tableName": "user",
"url": "jdbc:mysql://127.0.0.1:3306/test2?characterEncoding=utf8&autoReconnect=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai",
"username": "root"
},
"uuid": "JDBCWrite__yYGc"
},
{
"bundle": "cn.piflow.bundle.flink.jdbc.JDBCRead",
"customizedProperties": {
},
"name": "JDBCRead_WREvh",
"properties": {
"driver": "com.mysql.jdbc.Driver",
"fetchSize": "10",
"password": "123456",
"properties": "{}",
"tableDefinition": "{\"tableBaseInfo\":{\"catalogName\":\"\",\"dbname\":\"\",\"schema\":\"\",\"registerTableName\":\"t_user_source\",\"registerTableComment\":\"\",\"ifNotExists\":true,\"selectStatement\":\"\",\"likeStatement\":\"\"},\"physicalColumnDefinition\":[{\"columnName\":\"id\",\"columnType\":\"INT\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"},{\"columnName\":\"name\",\"columnType\":\"STRING\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"}],\"metadataColumnDefinition\":[],\"computedColumnDefinition\":[],\"watermarkDefinition\":{}}",
"tableName": "user",
"url": "jdbc:mysql://127.0.0.1:3306/test2?characterEncoding=utf8&autoReconnect=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai",
"username": "root",
"useTableEnv": "true"
},
"uuid": "JDBCRead_WREvh"
},
{
"bundle": "cn.piflow.bundle.flink.common.SQLQuery",
"customizedProperties": {
},
"name": "SQLQuery_8jwT7",
"properties": {
"registerResultViewName": "",
"registerSourceViewName": "",
"sql": "select id, name from paimon.test.t_user",
"useTableEnv": "true"
},
"uuid": "SQLQuery_8jwT7"
},
{
"bundle": "cn.piflow.bundle.flink.common.SQLExecute",
"customizedProperties": {
},
"name": "SQLExecute_bMeVm",
"properties": {
"sql": "create table if not exists paimon.test.t_user (\r\n id int,\r\n name string\r\n);\r\n\r\ninsert into paimon.test.t_user select * from t_user_source;",
"useTableEnv": "true"
},
"uuid": "SQLExecute_bMeVm"
},
{
"bundle": "cn.piflow.bundle.flink.common.ShowData",
"customizedProperties": {
},
"name": "ShowData_aMIL3",
"properties": {
"changeLog": "false",
"showNumber": "100"
},
"uuid": "ShowData_aMIL3"
}
],
"uuid": "111"
}
}
2.3 运行任务
- 点击 运行 按钮启动任务。
🔗 平台体验地址:DataStudio (http://1.94.182.15:8090)