開篇
?這篇文章的目的是講解RM Executor模塊當(dāng)中一些通用的方法,這些方法在各個Executor的父類當(dāng)中實現(xiàn)的,各個子類Executor模塊都會復(fù)用,因此抽取出來統(tǒng)一的進(jìn)行講解。
?個人是認(rèn)為抽取通用的內(nèi)容放在一篇文章講解完后可以針對每類Executor講解特有的功能,這樣能夠有更好的理解。這篇文章講解Executor的實現(xiàn)類UpdateExecutor。
類依賴圖

說明:
- 著重講解UpdateExecutor實現(xiàn)類。
UpdateExecutor方法介紹
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public UpdateExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
protected TableRecords beforeImage() throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
TableMeta tmeta = getTableMeta();
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder)statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
selectSQLAppender.append(" FROM " + getFromTableInSQL() + " WHERE " + whereCondition + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords beforeImage = null;
PreparedStatement ps = null;
Statement st = null;
ResultSet rs = null;
try {
if (paramAppender.isEmpty()) {
st = statementProxy.getConnection().createStatement();
rs = st.executeQuery(selectSQL);
} else {
ps = statementProxy.getConnection().prepareStatement(selectSQL);
for (int i = 0; i< paramAppender.size(); i++) {
ps.setObject(i + 1, paramAppender.get(i));
}
rs = ps.executeQuery();
}
beforeImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (st != null) {
st.close();
}
if (ps != null) {
ps.close();
}
}
return beforeImage;
}
}
說明:
- UpdateExecutor需要保存SQL執(zhí)行前的鏡像。
- 執(zhí)行前鏡像的準(zhǔn)備整體思路是按照按照update的更新條件逆向拼接正向查詢SQL。
- 查詢字段當(dāng)中必須包含主鍵原因是因為afterImage()操作當(dāng)中需要根據(jù)主鍵進(jìn)行查詢。原因猜測因為update會把where條件中字段的值變更,保存主鍵最安全。
- 查詢的SQL的select的字段從update字段中解析并加上主鍵列。
- 查詢的SQL的條件通過解析update的條件生成。
- 根據(jù)TableRecords.buildRecords生成執(zhí)行前鏡像。
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public UpdateExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
List<Field> pkRows = beforeImage.pkRows();
selectSQLAppender.append(
" FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(pkRows) + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords afterImage = null;
PreparedStatement pst = null;
ResultSet rs = null;
try {
pst = statementProxy.getConnection().prepareStatement(selectSQL);
int index = 0;
for (Field pkField : pkRows) {
index++;
pst.setObject(index, pkField.getValue(), pkField.getType());
}
rs = pst.executeQuery();
afterImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (pst != null) {
pst.close();
}
}
return afterImage;
}
}
說明:
- UpdateExecutor需要保存SQL執(zhí)行后的鏡像。
- UpdateExecutor執(zhí)行后的鏡像通過查詢更新的字段字段生成的。
- 查詢更新后鏡像的SQL的select的字段從update字段中解析并加上主鍵列。
- 查詢更新后鏡像的SQL的條件是beforeImage()查詢得到的主鍵key。
- 根據(jù)TableRecords.buildRecords生成執(zhí)行前鏡像。
期待
??UpdateExecutor生成執(zhí)行前后鏡像的過程分析完后,接下去會分析生成鏡像日志的流程。