目录标题
一、官方文档
二、安装
因为是Java 开发的,所以安装方式都是通用的,解压应用包就可以了,不区分win和linux版本。win运行.bat,Linux 运行.sh。
注意: jdk版本。
报这个错误就是jdk版本太低
三、快速生成迁移脚本
简单迁移,适合表结构没有变动的同构数据库迁移。
四、动态获取表,然后迁移数据
参考B站这个视频实现。kettle数据整库迁移方案
五、循环
这种连接头尾相连的就是循环
,一般搭配一个判断使用。如果不判断,会一直循环。
注意: kettle 本身就是分批获取数据的,不用分页查询也是可以的,可以一次性查询全部数据,就算数据量有一亿也不会有问题,内存不会溢出
。
(一)整体预览
(二)设置初始值
脚本后面的:ture; 是控制流程的,如果是ture;流向成功,如下:
如果是false; 流向失败,如下:
(三)循环判断
(四)核心业务
(五)日志,可选
(六)变量控制
里面可以用一些JS函数,应该是通用的。
六、转换中JS脚本的使用
使用非常简单,需要用什么直接双击就能用
了。如果不知道方法什么意思,怎么用,鼠标右键有示例
。
七、转换-过滤记录
正则表达式匹配的一个例子:
匹配日期
:
八、Java 脚本使用
技巧:先把kettle里面常用的jar导入开发工具里面,方便查看里面的方法。不然会有很多方法你不知道怎么调用。
导入idea
这种开发工具,然后在里面查看方法、类
- data 等价于 RowMeta;
- 获取输入Java 脚本组件的表数据和表头:表数据Object[] inputRow = getRow()。表头(表字段等信息):RowMetaInterface inputRowMeta = getInputRowMeta();
- 更改数据表结构和数据:克隆输入元数据作为输出元数据RowMetaInterface outputRowMeta = inputRowMeta.clone();
改变putRow(outputRowMeta, outputRow);
- 新增字段的简单例子
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.logging.LogChannel;
import cn.hutool.core.util.IdUtil;
private RowMetaInterface outputRowMeta; // 输出行元数据
private boolean isFirstRow = true; // 标记是否为第一行,用于初始化元数据
private String newFieldName = "ID"; // 新字段名
private int newFieldType = ValueMetaInterface.TYPE_STRING; // 新字段类型(字符串)
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
// 获取输入行
Object[] inputRow = getRow();
if (inputRow == null) {
setOutputDone();
return false;
}
// 初始化元数据(仅第一次执行)
if (isFirstRow) {
isFirstRow = false;
// 获取输入元数据
RowMetaInterface inputRowMeta = getInputRowMeta();
if (inputRowMeta == null) {
logError("输入行元数据为空,无法添加新字段");
return false;
}
// 克隆输入元数据作为输出元数据
outputRowMeta = inputRowMeta.clone();
// 定义新字段元数据
ValueMetaInterface newFieldMeta = new ValueMetaString("ID");
newFieldMeta.setLength(36);
// 添加新字段到元数据
outputRowMeta.addValueMeta(newFieldMeta);
logBasic("成功添加新字段: " + newFieldName + ",输出字段总数: " + outputRowMeta.size()+",原字段总数:"+inputRowMeta.size());
}
// 构建输出行
int inputRowLength = inputRow.length;
Object[] outputRow = new Object[inputRowLength + 1];
// 复制原始字段
System.arraycopy(inputRow, 0, outputRow, 0, inputRowLength);
int originalIdIndex = outputRowMeta.indexOfValue("ID");
String uuid = IdUtil.getSnowflake(0, 0).nextIdStr();
outputRow[originalIdIndex] = uuid; // 注意值的类型需与 newFieldType 匹配
// 输出新行outputRowMeta表头,outputRow 数据
putRow(outputRowMeta, outputRow);
return true;
}
- 如果只需要简单遍历里面的字段,然后处理一下
双击里面就有简单的示例
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.logging.LogChannel;
import cn.hutool.core.util.IdUtil;
import java.util.Date;
import org.pentaho.di.core.row.value.ValueMetaDate;
private RowMetaInterface outputRowMeta; // 输出行元数据
private boolean isFirstRow = true; // 标记是否为第一行,用于初始化元数据
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
// 获取输入行
Object[] inputRow = getRow();
if (inputRow == null) {
setOutputDone();
return false;
}
// 初始化元数据(仅第一次执行)
if (isFirstRow) {
isFirstRow = false;
// 获取输入元数据
RowMetaInterface inputRowMeta = getInputRowMeta();
if (inputRowMeta == null) {
logError("输入行元数据为空,无法添加新字段");
return false;
}
// 克隆输入元数据作为输出元数据
outputRowMeta = inputRowMeta.clone();
int originalIdIndex = outputRowMeta.indexOfValue("ID");
int originalUuidIndex = outputRowMeta.indexOfValue("UUID");
int originalCreateTimeIndex = outputRowMeta.indexOfValue("CREATE_TIME");
int originalUpdateTimeIndex = outputRowMeta.indexOfValue("UPDATE_TIME");
if(originalIdIndex==-1){
// 定义新字段元数据
ValueMetaInterface ID = new ValueMetaString("ID");
ID.setLength(20);
// 添加新字段到元数据
outputRowMeta.addValueMeta(ID);
}
if(originalUuidIndex==-1){
ValueMetaInterface UUID = new ValueMetaString("UUID");
UUID.setLength(20);
// 添加新字段到元数据
outputRowMeta.addValueMeta(UUID);
}
if(originalCreateTimeIndex==-1){
ValueMetaInterface createTime = new ValueMetaString("CREATE_TIME");
// 添加新字段到元数据
outputRowMeta.addValueMeta(createTime);
}
if(originalUpdateTimeIndex==-1){
ValueMetaInterface updateTime = new ValueMetaDate("UPDATE_TIME");
// 添加新字段到元数据
outputRowMeta.addValueMeta(updateTime);
}
}
// LogChannel.GENERAL.logBasic("字段个数:"+outputRowMeta.size());
// 构建输出行
int inputRowLength = outputRowMeta.size();
Object[] outputRow = new Object[inputRowLength];
// 复制原始字段
System.arraycopy(inputRow, 0, outputRow, 0, inputRowLength);
int originalIdIndex = outputRowMeta.indexOfValue("ID");
if(originalIdIndex!=-1){
Object fieldIdValue = outputRow[originalIdIndex];
if (fieldIdValue == null) {
String uuid = IdUtil.getSnowflake(0, 0).nextIdStr();
outputRow[originalIdIndex] = uuid;
}
}
int originalUuidIndex = outputRowMeta.indexOfValue("UUID");
if(originalIdIndex!=-1){
Object fieldUuidValue = outputRow[originalUuidIndex];
if (fieldUuidValue == null) {
String uuid = IdUtil.getSnowflake(0, 0).nextIdStr();
outputRow[originalUuidIndex] = uuid;
}
}
int originalCreateTimeIndex = outputRowMeta.indexOfValue("CREATE_TIME");
if(originalIdIndex!=-1){
Object fieldCreateTimeValue = outputRow[originalCreateTimeIndex];
if (fieldCreateTimeValue == null) {
outputRow[originalCreateTimeIndex] = new Date();
}
}
int originalUpdateTimeIndex = outputRowMeta.indexOfValue("UPDATE_TIME");
if(originalIdIndex!=-1){
Object fieldUpdateTimeValue = outputRow[originalUpdateTimeIndex];
if (fieldUpdateTimeValue == null) {
outputRow[originalUpdateTimeIndex] = new Date();
}
}
// 输出新行
putRow(outputRowMeta, outputRow);
return true;
}