有這樣一個(gè)需求,生產(chǎn)者將消息存入數(shù)據(jù)庫(kù),并放入隊(duì)列中等待處理。消費(fèi)者獲取并處理消息,將更新后的消息存入數(shù)據(jù)庫(kù)。
主鍵策略為id自增
偽代碼如下
...
//生產(chǎn)者線程
email.setStatus(SANDING); //設(shè)置郵件的狀態(tài)為正在發(fā)送 ......1
email = emailRepository.save(email); //將郵件插入數(shù)據(jù)庫(kù),取得帶有id的實(shí)體 ......2
queue.add(email); //把郵件放入隊(duì)列 ......3
...
...
//消費(fèi)者線程
Email email = queue.poll(); //從隊(duì)列中取出郵件 ......1
try {
send(email); //發(fā)送郵件 ......2
email.setStatus(SUCCESS); //設(shè)置郵件的狀態(tài)為發(fā)送成功 ......3
} catch (Exception e) {
email.setStatus(FAIL); //失敗則設(shè)置郵件的狀態(tài)為發(fā)送失敗 ......4
e.printStackTrace()
}
emailRepository.save(email); //更新郵件狀態(tài) ......5
然而消費(fèi)者線程中的 5 并沒(méi)有更新記錄,而是插入一條新的記錄。
也許是兩次save間隔太短了吧,想起多線程的常用解決方案,不行就睡,于是將消費(fèi)者線程改造了一下
Thread.sleep(1000);
emailRepository.save(email); //更新郵件狀態(tài) ......5
好了,一切正常。
作為一個(gè)打破沙鍋問(wèn)到底的人,問(wèn)題雖然解決,但還是要弄清楚問(wèn)題發(fā)生的原因
繼續(xù)改造消費(fèi)者線程
// 實(shí)驗(yàn)一
...
System.out.println(email); // 輸出 Email{id=345 ...
Thread.sleep(1000);
email = emailRepository.save(email); //更新郵件狀態(tài) ......5
System.out.println(email); // 輸出 Email{id=345 ...
//實(shí)驗(yàn)二
..
System.out.println(email); // 輸出 Email{id=455 ...
email = emailRepository.save(email); //更新郵件狀態(tài) ......5
System.out.println(email); //輸出 Email{id=456 ...
生產(chǎn)者線程的 2 切切實(shí)實(shí)是先執(zhí)行于消費(fèi)者線程的 5 ,這就奇怪了,明明已經(jīng)保存好的數(shù)據(jù),再save時(shí)居然是插入操作。難道生產(chǎn)者線程的 2 執(zhí)行之后返回了email,但是沒(méi)有立即將數(shù)據(jù)插入數(shù)據(jù)庫(kù)嗎? 繼續(xù)我們的實(shí)驗(yàn)
//實(shí)驗(yàn)三
...
Email email = emailRepository.findById(email.getId()).orElse(new Email());
System.out.println(email.toString()); //輸出 Email{id=null ...
email = emailRepository.save(email); //更新郵件狀態(tài) ......5
問(wèn)題明確了,生產(chǎn)者線程事務(wù)結(jié)束后,消費(fèi)者線程沒(méi)有立即讀到生產(chǎn)者線程事務(wù)的結(jié)果,認(rèn)為email是一個(gè)新的數(shù)據(jù),插入到了數(shù)據(jù)庫(kù)中。
使用save()之后究竟發(fā)生了什么,看看源碼
save()的實(shí)現(xiàn)在SimpleJpaRepository中
@Transactional
public <S extends T> S save(S entity) {
if (entityInformation.isNew(entity)) { ...1
em.persist(entity); ...2
return entity;
} else {
return em.merge(entity); ...3
}
}
具體邏輯:
...1 這是一個(gè)新實(shí)例嗎,如何判斷這是一個(gè)新的實(shí)例呢。1.主鍵為空則為新實(shí)例。2.如果id是Number的子類,id == 0則為新實(shí)例
public boolean isNew(T entity) {
ID id = getId(entity);
Class<ID> idType = getIdType();
if (!idType.isPrimitive()) {
return id == null;
}
if (id instanceof Number) {
return ((Number) id).longValue() == 0L;
}
throw new IllegalArgumentException(String.format("Unsupported primitive id type %s!", idType));
}
...2 是新實(shí)例則persist
...3 不是則merge
merge執(zhí)行的sql是這樣的,先select看看數(shù)據(jù)庫(kù)中有沒(méi)有實(shí)例中的id對(duì)應(yīng)的記錄,有則update,無(wú)則insert
然而,jpa不保證save()之后返回的entity可以立即被find()發(fā)現(xiàn)(相當(dāng)于insert一條記錄后不能被立即select出來(lái))。經(jīng)實(shí)驗(yàn),save()執(zhí)行后需要20-70毫秒的時(shí)間將數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)
使用sleep睡100毫秒可以保證數(shù)據(jù)持久化的時(shí)間,但這無(wú)法將性能壓榨到極致,而且如果持久化的時(shí)間超過(guò)100毫秒,仍然find()不到save()的數(shù)據(jù)。
經(jīng)過(guò)千辛萬(wàn)苦的查找,在mysql文檔中找到了這個(gè),Locking Reads
(https://dev.mysql.com/doc/refman/8.0/en/innodb-locking-reads.html)
使用select ...for update或者select ...for share可以等待其它事務(wù)數(shù)據(jù)被提交(Commit)后才會(huì)執(zhí)行select
原文如下
If any of these rows were changed by another transaction that has not yet committed,
your query waits until that transaction ends and then uses the latest values.
jpa中@Lock可以在查詢時(shí)添加悲觀鎖,樂(lè)觀鎖。其原理是通過(guò)修改執(zhí)行的sql語(yǔ)句,在數(shù)據(jù)庫(kù)層面加鎖
于是我們重寫EmailRepository的findById()
@Override
@Lock(LockModeType.PESSIMISTIC_READ)
Optional<Email> findById(Long id);
繼續(xù)實(shí)驗(yàn)三的代碼
//實(shí)驗(yàn)三
...
Email email = emailRepository.findById(email.getId()).orElse(new Email());
System.out.println(email.toString()); //輸出 Email{id=455 ...
email = emailRepository.save(email); //更新郵件狀態(tài) ......5
這時(shí)候我們發(fā)現(xiàn)可以讀到最新更新的數(shù)據(jù),通過(guò)show-sql查看findById()執(zhí)行的sql為select... in share mode。
(ps: select ...for share和select ...in share mode功能相似)
于是如下修改消費(fèi)者線程
...
//消費(fèi)者線程
Email email = queue.poll(); //從隊(duì)列中取出郵件 ......1
try {
send(email); //發(fā)送郵件 ......2
email.setStatus(SUCCESS); //設(shè)置郵件的狀態(tài)為發(fā)送成功 ......3
} catch (Exception e) {
email.setStatus(FAIL); //失敗則設(shè)置郵件的狀態(tài)為發(fā)送失敗 ......4
e.printStackTrace()
}
emailRepository.findById(email.getId()); //保證自己能讀到最新的數(shù)據(jù),也同時(shí)保證了下面的save()執(zhí)行select語(yǔ)句時(shí)不會(huì)拿到空數(shù)據(jù)
emailRepository.save(email); //更新郵件狀態(tài) ......5
這樣生產(chǎn)者線程save()的數(shù)據(jù)就可以立即無(wú)等待地獲取了