datax批量oraclewriter同时支持insert & update

网友投稿 575 2022-11-22

datax批量oraclewriter同时支持insert & update

阿里开源抽数工具datax,oraclewriter源码只支持insert模式,update需要二次开发,下面将update二开代码记录一下。

(参考网友的代码进行测试,发现文档应该是不全,执行失败。且这部分代码支持了update模式后,insert模式又用不了。)

首先修改oraclewriter/src/main/java/com/alibaba/datax/plugin/writer/oraclewriter/OracleWriter.java

将红框中的代码注释掉。这部分代码是禁止调用oraclewriter plugin时在“.json”配置文件中配置“writeMode”配置项,默认配置是"insert"。

下面是需要注释的代码块

第二步修改plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java

复制以下代码

// Oracle update语句生成 if (dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update")) { writeDataSqlTemplate = new StringBuilder() .append(onMergeIntoDoString(writeMode, columnHolders, valueHolders)) .append("INSERT (") .append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")").toString(); } else { if (writeMode.trim().toLowerCase().startsWith("update")) { writeMode = "replace"; } writeDataSqlTemplate = new StringBuilder().append(writeMode) .append(" INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")").toString(); }

第三步修改plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java

最下面增加两个方法

复制以下代码

// 新增方法,支持oracle update增量写数据,生成MERGE INTO语句 public static String onMergeIntoDoString(String merge, List columnHolders, List valueHolders) { String[] sArray = getStrings(merge); StringBuilder sb = new StringBuilder(); sb.append("MERGE INTO %s A USING ( SELECT "); boolean first = true; boolean first1 = true; StringBuilder str = new StringBuilder(); StringBuilder update = new StringBuilder(); for (String columnHolder : columnHolders) { if (Arrays.asList(sArray).contains(columnHolder)) { if (!first) { sb.append(","); str.append(" AND "); } else { first = false; } str.append("TMP.").append(columnHolder); sb.append("?"); str.append(" = "); sb.append(" AS "); str.append("A.").append(columnHolder); sb.append(columnHolder); } } for (String columnHolder : columnHolders) { if (!Arrays.asList(sArray).contains(columnHolder)) { if (!first1) { update.append(","); } else { first1 = false; } update.append(columnHolder); update.append(" = "); update.append("?"); } } sb.append(" FROM DUAL ) TMP ON ("); sb.append(str); sb.append(" ) WHEN MATCHED THEN UPDATE SET "); sb.append(update); sb.append(" WHEN NOT MATCHED THEN "); LOG.info("update sql => {}", sb.toString()); return sb.toString(); } // 解析".json"配置中"writeMode"参数 public static String[] getStrings(String merge) { merge = merge.replace("update", ""); merge = merge.replace("(", ""); merge = merge.replace(")", ""); merge = merge.replace(" ", ""); return merge.split(","); } // oracle update增量写数据新增方法。over

第四步修改plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java

复制以下代码

// 以下代码为支持oracl update增量写数据 List columns = new ArrayList<>(); List columnsOne = new ArrayList<>(); List columnsTwo = new ArrayList<>(); if (this.dataBaseType == DataBaseType.Oracle && this.writeMode.trim().toLowerCase().startsWith("update")) { String merge = this.writeMode; String[] sArray = WriterUtil.getStrings(merge); int size = this.columns.size(); int i = 0; for (int j = 0; j < size; j++) { if (Arrays.asList(sArray).contains(this.columns.get(j))) { columnsOne.add(this.columns.get(j)); } } for (int j = 0; j < size; j++) { if (!Arrays.asList(sArray).contains(this.columns.get(j))) { columnsTwo.add(this.columns.get(j)); } } for (String column : columnsOne) { columns.add(i, column); i++; } for (String column : columnsTwo) { columns.add(i, column); i++; } columns.addAll(this.columns); this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ",")); } else { this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(this.columns, ",")); } // oracal update增量写数据支持代码。over

第五步修改plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java

复制以下代码

// 以下代码为支持oracle update增量写数据 LOG.info("For {}",this.writeMode + "data to Oracle"); if (this.dataBaseType == DataBaseType.Oracle && this.writeMode.trim().toLowerCase().startsWith("update")) { String merge = this.writeMode; String[] sArray = WriterUtil.getStrings(merge); for (Record record : buffer) { List recordOne = new ArrayList<>(); for (int j = 0; j < this.columns.size(); j++) { if (Arrays.asList(sArray).contains(this.columns.get(j))) { recordOne.add(record.getColumn(j)); } } for (int j = 0; j < this.columns.size(); j++) { if (!Arrays.asList(sArray).contains(this.columns.get(j))) { recordOne.add(record.getColumn(j)); } } for (int j = 0; j < this.columns.size(); j++) { recordOne.add(record.getColumn(j)); } for (int j = 0; j < recordOne.size(); j++) { record.setColumn(j, recordOne.get(j)); } preparedStatement = fillPreparedStatement( preparedStatement, record, this.writeMode); preparedStatement.addBatch(); } } else { for (Record record : buffer) { preparedStatement = fillPreparedStatement( preparedStatement, record); preparedStatement.addBatch(); } } // 支持oracle update增量写数据代码。over

第六步修改plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java

新增方法

复制以下代码

// 为支持oracle update protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record, String writeMode) throws SQLException { for (int i = 0; i < record.getColumnNumber(); i++) { int columnSqltype = this.resultSetMetaData.getMiddle().get(i); preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, record.getColumn(i)); } return preparedStatement; }

以上是所有改动。编译时先编译"plugin-rdbms-util"模块,再编译"oraclewwriter"模块。用编译后的oraclewriter/target/datax/plugin/writer/oraclewriter目录直接替换线上同级目录。完成

提供一个job.json的配置文件:

{ "core": { "transport" : { "channel": { "speed": { "channal": 1, "record": 1000, "byte": 10240000 } } } }, "job": { "setting": { "speed": { "channel": 10, "record": 1000, "byte": 10240000 }, "errorLimit":{ "record": 10, "percentage": 0.05 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": "mysqlUserName", "password": "Password", "column": ["id","name","age","gender","create_time","change_time"], "splitPk": "id", "connection": [ { "jdbcUrl": [ "jdbc:mysql://ipaddress:3306/database?useUnicode=true&characterEncoding=utf8" ], "table": ["tablename"] }] } }, "writer": { "name": "oraclewriter", "parameter": { "username": "oracleUserName", "password": "Password", "column": [ "ID", "NAME", "AGE", "GENDER", "CREATE_TIME", "CHANGE_TIME", ], "writeMode": "update (ID)", "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@//ipaddress:1521", "table": [ "tablename" ] } ], } } }] }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:关于Java Interface接口的简单练习题
下一篇:基于PXI总线的数据传输接口设计与实现方案
相关文章

 发表评论

暂时没有评论,来抢沙发吧~