最近的業(yè)務(wù)有kafka消息處理,由于源頭那里無(wú)法控制冪等性和分區(qū)指定(因?yàn)榻?jīng)過(guò)了AWS Kinesis, 不是單純的kafka),于是過(guò)來(lái)的消息有幾個(gè)問(wèn)題:
- 消息亂序
- 幾條消息同時(shí)過(guò)來(lái)
- 消息重復(fù)發(fā)到不同的partition. (如果是同一個(gè)partition, 可以用idempotent參數(shù)解決)
那么這個(gè)數(shù)據(jù)在入庫(kù)的時(shí)候,要做這件事:
- 如果是全新的,insert進(jìn)去
- 如果數(shù)據(jù)庫(kù)已經(jīng)存在:
- 比較ts (timestamp)字段,如果ts更大(新),就更新
- 如果ts 字段更小,就舍棄
- 如果ts字段跟數(shù)據(jù)庫(kù)的相等:比較sequence_number, 如果更大,就更新數(shù)據(jù)庫(kù),否則舍棄
要做這些邏輯,有很多解決方案,比如在分布式環(huán)境下,可以通過(guò)引入分布式鎖來(lái)解決。而本文想要通過(guò)數(shù)據(jù)庫(kù)原生的辦法來(lái)解決這個(gè)問(wèn)題。旨在學(xué)習(xí)一下postgresql的知識(shí)。
先建個(gè)表來(lái)試試:(tracker_id加入唯一性約束,后面就試它)
create table test(id SERIAL primary key, tracker_id varchar(37), last_sync_date bigint, ts bigint, sequence_number bigint);
alter table test add constraint test_unique_tracker unique(tracker_id);
那這句upsert要怎么寫(xiě)呢?通過(guò)google, 一般只看到存在就update的例子,很難找到update之前還有條件的。通過(guò)查找文檔和試驗(yàn),以下SQL是可以達(dá)到我們的目的的:
注意:
- set語(yǔ)句里面,等號(hào)左邊直接是字段名,右邊用
excluded.字段名- where 語(yǔ)句里, 左邊要用表名引用字段名,右邊用excluded引用字段名
insert into test (tracker_id, ts, sequence_number, last_sync_date)
values ('324bf0d7-63db-40eb-a335-9a0a017b0e6c', 1644662828, 94875, 1644662860)
on conflict (tracker_id)
do
update
set ts = EXCLUDED.ts,
sequence_number = EXCLUDED.sequence_number,
last_sync_date = EXCLUDED.last_sync_date
where test.ts < EXCLUDED.ts
or (test.ts = EXCLUDED.ts and test.sequence_number < EXCLUDED.sequence_number);
通過(guò)修改values里面的值,你可以觀(guān)察到,結(jié)果如我們所期望。
Java Springboot 里面試試
@Data
@Entity
@Table
public class Test {
@Id
private long id;
private String trackerId;
private long ts;
private long sequenceNumber;
private long lastSyncDate;
}
// DTO
@Data
public class TestDto {
private String trackerId;
private long ts;
private long sequenceNumber;
private long lastSyncDate;
}
public interface TestRepository extends CrudRepository<Test, Long> {
@Modifying
@Query(value = "insert into test(tracker_id, ts, sequence_number, last_sync_date) " +
"values (:trackerId, :ts, :sequenceNumber, :lastSyncDate) " +
"on conflict (tracker_id) " +
"do update " +
"set ts=EXCLUDED.ts, " +
" sequence_number=EXCLUDED.sequence_number, " +
" last_sync_date=EXCLUDED.last_sync_date " +
"where (test.ts<EXCLUDED.ts) " +
" or (test.ts=EXCLUDED.ts " +
" and test.sequence_number<EXCLUDED.sequence_number);",
nativeQuery = true)
void upsert(@Param("trackerId") String trackerId,
@Param("ts") long ts,
@Param("sequenceNumber") long sequenceNumber,
@Param("lastSyncDate") long lastSyncDate);
}
Controller 方法
@PostMapping("/test")
@Transactional
public String upsert(@RequestBody TestDto dto) {
repo.upsert(dto.getTrackerId(), dto.getTs(), dto.getSequenceNumber(), dto.getLastSyncDate());
return "Done";
}
試試看

Screen Shot 2022-02-12 at 7.13.14 PM.png
大功告成!