PostgreSQL upsert (Insert 或者update或者放棄)

最近的業(yè)務(wù)有kafka消息處理,由于源頭那里無(wú)法控制冪等性和分區(qū)指定(因?yàn)榻?jīng)過(guò)了AWS Kinesis, 不是單純的kafka),于是過(guò)來(lái)的消息有幾個(gè)問(wèn)題:

  • 消息亂序
  • 幾條消息同時(shí)過(guò)來(lái)
  • 消息重復(fù)發(fā)到不同的partition. (如果是同一個(gè)partition, 可以用idempotent參數(shù)解決)

那么這個(gè)數(shù)據(jù)在入庫(kù)的時(shí)候,要做這件事:

  • 如果是全新的,insert進(jìn)去
  • 如果數(shù)據(jù)庫(kù)已經(jīng)存在:
    • 比較ts (timestamp)字段,如果ts更大(新),就更新
    • 如果ts 字段更小,就舍棄
    • 如果ts字段跟數(shù)據(jù)庫(kù)的相等:比較sequence_number, 如果更大,就更新數(shù)據(jù)庫(kù),否則舍棄

要做這些邏輯,有很多解決方案,比如在分布式環(huán)境下,可以通過(guò)引入分布式鎖來(lái)解決。而本文想要通過(guò)數(shù)據(jù)庫(kù)原生的辦法來(lái)解決這個(gè)問(wèn)題。旨在學(xué)習(xí)一下postgresql的知識(shí)。

先建個(gè)表來(lái)試試:(tracker_id加入唯一性約束,后面就試它)

create table test(id SERIAL primary key, tracker_id varchar(37), last_sync_date bigint, ts bigint, sequence_number bigint);
alter table test add constraint test_unique_tracker unique(tracker_id);

那這句upsert要怎么寫(xiě)呢?通過(guò)google, 一般只看到存在就update的例子,很難找到update之前還有條件的。通過(guò)查找文檔和試驗(yàn),以下SQL是可以達(dá)到我們的目的的:

注意:

  • set語(yǔ)句里面,等號(hào)左邊直接是字段名,右邊用excluded.字段名
  • where 語(yǔ)句里, 左邊要用表名引用字段名,右邊用excluded引用字段名
insert into test (tracker_id, ts, sequence_number, last_sync_date)
values ('324bf0d7-63db-40eb-a335-9a0a017b0e6c', 1644662828, 94875, 1644662860)
on conflict (tracker_id)
do 
  update
     set ts = EXCLUDED.ts,
         sequence_number = EXCLUDED.sequence_number,
         last_sync_date = EXCLUDED.last_sync_date
    where test.ts < EXCLUDED.ts
     or (test.ts = EXCLUDED.ts and test.sequence_number < EXCLUDED.sequence_number);

通過(guò)修改values里面的值,你可以觀(guān)察到,結(jié)果如我們所期望。

Java Springboot 里面試試

@Data
@Entity
@Table
public class Test {
    @Id
    private long id;
    private String trackerId;
    private long ts;
    private long sequenceNumber;
    private long lastSyncDate;
}

// DTO
@Data
public class TestDto {
    private String trackerId;
    private long ts;
    private long sequenceNumber;
    private long lastSyncDate;
}

public interface TestRepository extends CrudRepository<Test, Long> {
    @Modifying
    @Query(value = "insert into test(tracker_id, ts, sequence_number, last_sync_date) " +
            "values (:trackerId, :ts, :sequenceNumber, :lastSyncDate) " +
            "on conflict (tracker_id) " +
            "do update " +
            "set ts=EXCLUDED.ts, " +
            "    sequence_number=EXCLUDED.sequence_number, " +
            "    last_sync_date=EXCLUDED.last_sync_date " +
            "where (test.ts<EXCLUDED.ts) " +
            "   or (test.ts=EXCLUDED.ts " +
            "      and test.sequence_number<EXCLUDED.sequence_number);",
            nativeQuery = true)
    void upsert(@Param("trackerId") String trackerId,
                @Param("ts") long ts,
                @Param("sequenceNumber") long sequenceNumber,
                @Param("lastSyncDate") long lastSyncDate);
}

Controller 方法

    @PostMapping("/test")
    @Transactional
    public String upsert(@RequestBody TestDto dto) {
        repo.upsert(dto.getTrackerId(), dto.getTs(), dto.getSequenceNumber(), dto.getLastSyncDate());
        return "Done";
    }

試試看


Screen Shot 2022-02-12 at 7.13.14 PM.png

大功告成!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容