開篇
?這篇文章的目的是講解RM Executor模塊當(dāng)中一些通用的方法,這些方法在各個(gè)Executor的父類當(dāng)中實(shí)現(xiàn)的,各個(gè)子類Executor模塊都會(huì)復(fù)用,因此抽取出來統(tǒng)一的進(jìn)行講解。
?個(gè)人是認(rèn)為抽取通用的內(nèi)容放在一篇文章講解完后可以針對(duì)每類Executor講解特有的功能,這樣能夠有更好的理解。這篇文章講解Executor的實(shí)現(xiàn)類SelectForUpdateExecutor。
類依賴圖

說明:
- 著重講解SelectForUpdateExecutor實(shí)現(xiàn)類。
-
SelectForUpdateExecutor的實(shí)現(xiàn)和其他的Executor從類圖實(shí)現(xiàn)上不同,因?yàn)閟elect不需要保存鏡像。
SelectForUpdateExecutor方法介紹
public class SelectForUpdateExecutor<S extends Statement> extends BaseTransactionalExecutor<ResultSet, S> {
public SelectForUpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<ResultSet, S> statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
public Object doExecute(Object... args) throws Throwable {
SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;
Connection conn = statementProxy.getConnection();
ResultSet rs = null;
Savepoint sp = null;
LockRetryController lockRetryController = new LockRetryController();
boolean originalAutoCommit = conn.getAutoCommit();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
selectSQLAppender.append(getColumnNameInSQL(getTableMeta().getPkName()));
selectSQLAppender.append(" FROM " + getFromTableInSQL());
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
if (!StringUtils.isEmpty(whereCondition)) {
selectSQLAppender.append(" WHERE " + whereCondition);
}
selectSQLAppender.append(" FOR UPDATE");
String selectPKSQL = selectSQLAppender.toString();
try {
if (originalAutoCommit) {
conn.setAutoCommit(false);
}
sp = conn.setSavepoint();
rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
while (true) {
// Try to get global lock of those rows selected
Statement stPK = null;
PreparedStatement pstPK = null;
ResultSet rsPK = null;
try {
if (paramAppender.isEmpty()) {
stPK = statementProxy.getConnection().createStatement();
rsPK = stPK.executeQuery(selectPKSQL);
} else {
pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
for (int i = 0; i < paramAppender.size(); i++) {
pstPK.setObject(i + 1, paramAppender.get(i));
}
rsPK = pstPK.executeQuery();
}
TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);
break;
} catch (LockConflictException lce) {
conn.rollback(sp);
lockRetryController.sleep(lce);
} finally {
if (rsPK != null) {
rsPK.close();
}
if (stPK != null) {
stPK.close();
}
if (pstPK != null) {
pstPK.close();
}
}
}
} finally {
if (sp != null) {
conn.releaseSavepoint(sp);
}
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
}
return rs;
}
}
說明:
- 從類的依賴圖可以看出SelectForUpdateExecutor的實(shí)現(xiàn)方式和其他Executor不一樣。
- SelectForUpdateExecutor因?yàn)樯婕暗降牟樵儾僮?,所以沒有執(zhí)行前后鏡像的問題。
- SelectForUpdateExecutor的執(zhí)行邏輯在于設(shè)置回滾點(diǎn),conn.setSavepoint()。
- SelectForUpdateExecutor通過拼接查詢主鍵的SQL語句獲取查詢記錄的主鍵key。
- SelectForUpdateExecutor根據(jù)查詢主鍵的記錄判斷是否可以鎖checkLock,如果不能鎖則直接回滾,然后進(jìn)行重試。