第20節(jié):從庫MTS多線程并行回放(二)


本節(jié)包含一個筆記如下:
http://www.itdecent.cn/p/e920a6d33005


這一節(jié)會先描述MTS的工作線程執(zhí)行Event的大概流程。然后重點描述一下MTS中檢查點的概念。在后面的第25節(jié)我們可以看到,MTS的異常恢復很多情況下需要依賴這個檢查點,從檢查點位置開始掃描relay log做恢復操作,但是在GTID AUTO_POSITION MODE模式且設置了recovery_relay_log=1的情況下這種依賴將會弱化。

一、工作線程執(zhí)行Event

前面我們已經(jīng)討論了協(xié)調(diào)線程分發(fā)Event的規(guī)則,實際上協(xié)調(diào)線程只是將Event分發(fā)到了工作線程的執(zhí)行隊列中。那么工作線程執(zhí)行Event就需要從執(zhí)行隊列中拿出這些Event,然后進行執(zhí)行。整個過程可以參考函數(shù)slave_worker_exec_job_group。因為這個流程比較簡單,因此就不需要畫圖了,但是我們需要關注一些點如下:

(1)從執(zhí)行隊列中讀取Event。注意這里如果執(zhí)行隊列中沒有Event那么就進入空閑等待,也就是工作線程處于無事可做的狀態(tài),等待狀態(tài)為‘Waiting for an event from Coordinator’。

(2)如果執(zhí)行到XID_EVENT那么說明事務已經(jīng)結(jié)束了那么需要完成內(nèi)存信息更新操作??蓞⒖糞lave_worker::slave_worker_exec_event和Xid_apply_log_event::do_apply_event_worker函數(shù)。更新內(nèi)存相關信息可參考函數(shù)commit_positions函數(shù)。下面是一些更新的信息,我們可以看到和slave_worker_info表中的信息基本一致,如下:

1、更新當前信息
strmake(group_relay_log_name, ptr_g->group_relay_log_name,
sizeof(group_relay_log_name) - 1);
group_relay_log_pos= ev->future_event_relay_log_pos;
set_group_master_log_pos(ev->common_header->log_pos);
set_group_master_log_name(c_rli->get_group_master_log_name());
                  
2、將檢查點信息進行寫入:
strmake(checkpoint_relay_log_name, ptr_g-
>checkpoint_relay_log_name,sizeof(checkpoint_relay_log_name) - 1);
checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
strmake(checkpoint_master_log_name, ptr_g-
>checkpoint_log_name,sizeof(checkpoint_master_log_name) - 1);
checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
                  
3、設置GAQ序號:
 checkpoint_seqno= ptr_g->checkpoint_seqno;
更新整個BITMAP,可能已經(jīng)由檢查點進行GAQ出隊:
for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++) 
//重新設置位圖 因為checkpoint已經(jīng) 
{                                                                     
//ptr_g->shifted是GAQ中出隊的事務個數(shù)
if (bitmap_is_set(&group_shifted, pos))                            
//這里就需要偏移掉出隊的事務,恢復已經(jīng)不需要了
bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
}
4、設置位圖:
bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
//在本次事務相應的位置設置為1

(3)如果執(zhí)行到XID_EVENT那么說明事務已經(jīng)結(jié)束了那么需要完成內(nèi)存信息的持久化,即強制刷內(nèi)存信息持久化到slave_worker_info表中(relay_log_info_repository設置為TABLE)??蓞⒖己瘮?shù)commit_positions函數(shù),如下:

if ((error= w->commit_positions(this, ptr_group,
w->is_transactional())))  

(4)如果執(zhí)行到XID_EVENT還需要進行事務的提交操作,也就是進行Innodb層事務的提交。

從上面我們可以看到MTS中每次事務的提交并不會更新slave_relay_log_info表,而是進行slave_worker_info表的更新,將最新的信息寫入到slave_worker_info表中。
我們前面也說過SQL線程已經(jīng)蛻變?yōu)閰f(xié)調(diào)線程,那么slave_relay_log_info表什么時候更新呢?下面我們就能看到slave_relay_log_info表的更新實際上由協(xié)調(diào)線程在做完檢查點之后更新。

二、MTS中檢查點中的重要概念

總的說來MTS中的檢查點是MTS進行異常恢復的起點。實際上就是代表到這個位置之前(包含自身)事務都是已經(jīng)在從庫執(zhí)行過了,但之后的事務可能執(zhí)行完成了也可能沒有執(zhí)行完成。檢查點由協(xié)調(diào)線程進行。

(1)協(xié)調(diào)線程的GAQ隊列

前面我們已經(jīng)知道MTS中為每個工作線程維護了一個Event的分發(fā)隊列。除此之外協(xié)調(diào)線程還維護了一個非常的重要的隊列GAQ,它是一個環(huán)形隊列。下面是源碼中的定義:

  /*
    master-binlog ordered queue of Slave_job_group descriptors of groups
    that are under processing. The queue size is @c checkpoint_group. Group assigned
  */
  Slave_committed_queue *gaq;

每次協(xié)調(diào)線程分發(fā)事務的時候都會將事務記錄到GAQ隊列中,因此GAQ中事務的順序總是和relay log文件中事務的順序一致的。檢查點正是作用在GAQ隊列上的,每次檢查點的位置稱為LWM,還記得上一節(jié)我叫大家先忽略的LWM嗎?就是這個。源碼中定義也正是如此,它在GAQ隊列中進行維護。如下:

  /*
     The last checkpoint time Low-Water-Mark
  */
  Slave_job_group lwm;

在GAQ隊列中還維護有一個叫做checkpoint_seqno的序號,它是最后一次檢查點以來每個分配事務的序號,下面是源碼中的定義:

uint checkpoint_seqno;  // counter of groups executed after the most recent CP

在協(xié)調(diào)線程讀取到GTID_LOG_EVENT后為其分配序號,記做checkpoint_seqno,如下:

rli->checkpoint_seqno++;//增加seqno

當協(xié)調(diào)線程進行檢查點的時候checkpoint_seqno序號會減去出隊的事務數(shù)量,如下:

checkpoint_seqno= checkpoint_seqno - shift; //這里減去出隊的事務

在MTS異?;謴偷臅r候也會用到這個序號,每個工作線程會通過這個序號來確認本工作線程執(zhí)行事務的上限,如下:

      for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
                 j= 0; i <= w->checkpoint_seqno; i++, j++)
            {
              if (bitmap_is_set(&w->group_executed, i))
//如果這一位 已經(jīng)設置
              {
                DBUG_PRINT("mts", ("Setting bit %u.", j));
                bitmap_fast_test_and_set(groups, j);
//那么GTOUPS 這個 bitmap中應該設置,最終GTOUPS會包含全的需要恢復的事務
              }
            } 

關于詳細的異?;謴土鞒虒⒃诘?5節(jié)描述。

(2)工作線程的Bitmap

有了GAQ隊列和檢查點就知道異?;謴烷_始的位置了。但是我們并不知道每一個工作線程都完成了哪些事務,哪些又沒有執(zhí)行完成,因此就不能確認哪些事務需要恢復。在MTS中并行回放事務的提交并不是按分發(fā)順序的進行的,某些大事務(或者其他原因比鎖堵塞)可能遲遲不能提交,而一些小事務卻會很快提交完成。這些遲遲不能提交的事務就成為了所謂的'gap',如果使用了GTID那么在查看已經(jīng)執(zhí)行GTID SET的時候可能出現(xiàn)一些‘空洞’,為了防止'gap'的發(fā)生通常需要設置參數(shù)slave_preserve_commit_order。下一節(jié)我們將會看到這種‘空洞’以及slave_preserve_commit_order的作用。但是如果要設置了slave_preserve_commit_order參數(shù)就需要開啟從庫記錄binary log的功能,因此必須開啟log_slave_updates參數(shù)。下面是源碼的判斷:

  if (opt_slave_preserve_commit_order && rli->opt_slave_parallel_workers > 0 &&
      opt_bin_log && opt_log_slave_updates)
    commit_order_mngr= new Commit_order_manager(rli->opt_slave_parallel_workers);
//order commit 管理器

這里先提前說一下MTS恢復的會有兩個關鍵階段:

  • 掃描階段

通過掃描檢查點以后的relay log。通過每個工作線程的Bitmap區(qū)分出哪些事務已經(jīng)執(zhí)行完成,哪些事務沒有執(zhí)行完成,并且匯總形成恢復Bitmap,同時得到需要恢復的事務總量。

  • 執(zhí)行階段

通過這個匯總的恢復Bitmap,將這些沒有執(zhí)行完成事務讀取relay log再次執(zhí)行。

這個Bitmap位圖和GAQ中的事務一一對應。當執(zhí)行XID_EVENT完成提交后這一位將會被設置為‘1’。

(3)協(xié)調(diào)線程信息的持久化

這個已經(jīng)在前面提到過,實際上每次進行檢查點的時候都需要將檢查點的位置固化到slave_relay_log_info表中(relay_log_info_repository設置為TABLE)。因此slave_relay_log_info中存儲的實際上不是實時的信息而是檢查點的信息。下面就是slave_relay_log_info表的表結(jié)構:

mysql> desc slave_relay_log_info;
+-------------------+---------------------+------+-----+---------+-------+
| Field             | Type                | Null | Key | Default | Extra |
+-------------------+---------------------+------+-----+---------+-------+
| Number_of_lines   | int(10) unsigned    | NO   |     | NULL    |       |
| Relay_log_name    | text                | NO   |     | NULL    |       |
| Relay_log_pos     | bigint(20) unsigned | NO   |     | NULL    |       |
| Master_log_name   | text                | NO   |     | NULL    |       |
| Master_log_pos    | bigint(20) unsigned | NO   |     | NULL    |       |
| Sql_delay         | int(11)             | NO   |     | NULL    |       |
| Number_of_workers | int(10) unsigned    | NO   |     | NULL    |       |
| Id                | int(10) unsigned    | NO   |     | NULL    |       |
| Channel_name      | char(64)            | NO   | PRI | NULL    |       |
+-------------------+---------------------+------+-----+---------+-------+

與此同時show slave status中的某些信息也是檢查點的內(nèi)存信息。下面的信息將是來自檢查點:

  • Relay_Log_File :最新一次檢查點的relay log文件名。
  • Relay_Log_Pos :最新一次檢查點的relay log位點。
  • Relay_Master_Log_File:最新一次檢查點的主庫binary log文件名。
  • Exec_Master_Log_Pos:最新一次檢查點的主庫binary log位點。
  • Seconds_Behind_Master:根據(jù)檢查點指向事務的提交時間計算的延遲。

需要注意的是我們的GTID模塊獨立在這一套理論之外,在第3節(jié)我們講GTID模塊的初始化的時候我們就說過GTID模塊的初始化是在從庫信息初始化之前就完成了。因此在做MTS異?;謴偷臅r候使用GTID AUTO_POSITION MODE模式將會變得更加簡單和安全,細節(jié)將在第25節(jié)描述。

(4)工作線程信息的持久化

工作線程的信息就持久化在slave_worker_info 表中,前面我們描述工作線程執(zhí)行Event注意點的時候已經(jīng)做了相應的描述。執(zhí)行XID_EVENT完成事務提交之后會將信息寫入到slave_worker_info 表中(relay_log_info_repository設置為TABLE)。其中包括信息:

  • Relay_log_name:工作線程最后一個提交事務的relay log文件名。

  • Relay_log_pos:工作線程最后一個提交事務的relay log位點。

  • Master_log_name:工作線程最后一個提交事務的主庫binary log文件名。

  • Master_log_pos:工作線程最后一個提交事務的主庫binary log文件位點。

  • Checkpoint_relay_log_name:工作線程最后一個提交事務對應檢查點的relay log文件名。

  • Checkpoint_relay_log_pos:工作線程最后一個提交事務對應檢查點的relay log位點。

  • Checkpoint_master_log_name:工作線程最后一個提交事務對應檢查點的主庫binary log文件名。

  • Checkpoint_master_log_pos:工作線程最后一個提交事務對應檢查點的主庫binary log位點。

  • Checkpoint_seqno:工作線程最后一個提交事務對應checkpoint_seqno序號。

  • Checkpoint_group_size:工作線程的Bitmap字節(jié)數(shù),約等于 GAQ隊列大小/8,因為1個字節(jié)為8位。

  • Checkpoint_group_bitmap:工作線程對應的Bitmap位圖信息。

關于Checkpoint_group_size的換算參考函數(shù)Slave_worker::write_info。

(5)兩個參數(shù)
  • slave_checkpoint_group:GAQ隊列大小。
  • slave_checkpoint_period:多久執(zhí)行一次檢查點,默認300毫秒。
(6)檢查點執(zhí)行的時機
  • 超過slave_checkpoint_period配置。可參考next_event函數(shù)如下:
if (rli->is_parallel_exec() && (opt_mts_checkpoint_period != 0 || force))
{
ulonglong period= static_cast<ulonglong>(opt_mts_checkpoint_period * 1000000ULL);
...
(void) mts_checkpoint_routine(rli, period, force, true/*need_data_lock=true*/);
...
      }
  • 達到GAQ隊列已滿,如下:
 //如果達到了 GAQ的大小 設置為force 強制checkpoint 
bool force= (rli->checkpoint_seqno > (rli->checkpoint_group - 1));
  • 正常stop slave。
(7)一個列子

通常有壓力的情況下的slave_worker_info中的所有工作線程最大的Checkpoint_master_log_pos應該和slave_relay_log_info中的Master_log_pos 相等,因為這是最后一個檢查點的位點信息,如下:

三、MTS中的檢查點的流程

這一部分將詳細描述一下檢查點的步驟,關于檢查點可以參考函數(shù)mts_checkpoint_routine。

假設現(xiàn)在有7個事務是可以并行執(zhí)行的,工作線程數(shù)量為4個。當前協(xié)調(diào)線程已經(jīng)分發(fā)了5個,前面4個事務都已經(jīng)執(zhí)行完成,其中第5的一個事務是大事務。那么可能當前的狀態(tài)圖如下(圖20-1):

20-1.png

前面4個事務每個工作線程都分到一個,最后一個大事務這里假設由工作線程2進行執(zhí)行,圖中用紅色部分表示。

(1)判斷是超過了slave_checkpoint_period設置的大小,如果超過需要進行檢查點。
  if (!force && diff < period)
//是否需要進行檢查點是否超過了slave_checkpoint_period的設置
  {
    /*
      We do not need to execute the checkpoint now because
      the time elapsed is not enough.
    */
    DBUG_RETURN(FALSE);
  }
(2)掃描GAQ隊列進行出隊操作,直到第一個沒有提交的事務為止。圖中紅色部分就是一個大事務,檢查點只能停留在它之前。
cnt= rli->gaq->move_queue_head(&rli->workers); 
//work數(shù)組 返回出隊的個數(shù)

move_queue_head部分代碼如下:

    if (ptr_g->worker_id == MTS_WORKER_UNDEF ||
        my_atomic_load32(&ptr_g->done) == 0) 
//當前GROUP是否已經(jīng)執(zhí)行完成 如果沒有執(zhí)行完成就需要 停止本次檢查點
      break; /* 'gap' at i'th */
(3)更新內(nèi)存和relay_log_info_repository表的信息為本次檢查點指向的位置。

先更新內(nèi)存信息,也就是我們show slave status中看到的信息:

  rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
  rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
  rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);

然后強制寫入表slave_relay_log_info中:

error= rli->flush_info(TRUE); 
//將本次檢查點信息 寫入到relay_log_info_repository表中
(4)更新last_master_timestamp信息為檢查點位置事務的XID_EVENT的timstamp值

這個值在第27節(jié)中會詳細描述,它是計算Seconds_behind_master的一個因素:

/*
    Update the rli->last_master_timestamp for reporting correct Seconds_behind_master.
    If GAQ is empty, set it to zero.
    Else, update it with the timestamp of the first job of the Slave_job_queue
    which was assigned in the Log_event::get_slave_worker() function.
  */
ts= rli->gaq->empty()? 0 : reinterpret_cast<Slave_job_group*>(rli->gaq->head_queue())->ts;
//rli->gaq->head_queue 檢查點位置的GROUP的時間
rli->reset_notified_checkpoint(cnt, ts, need_data_lock, true);
reset_notified_checkpoint函數(shù)中有:
last_master_timestamp= new_ts;

因此MTS中Seconds_behind_master的計算和檢查點息息相關。

(5)最后還會將前面GAQ出隊的事務數(shù)量進行統(tǒng)計,因為每個工作線程需要根據(jù)這個值來進行Bitmap位圖的偏移。并且還會維護我們前面說的GAQ的checkpoint_seqno值。

這個操作也是在函數(shù)Relay_log_info::reset_notified_checkpoint中完成的,實際上很簡單部分代碼如下:

for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
//循環(huán)每個woker
w->bitmap_shifted= w->bitmap_shifted + shift; 
//每個worker線程都會增加 這個偏移量
checkpoint_seqno= checkpoint_seqno - shift; 
//這里減去 移動的個數(shù)

到這里整個檢查點的基本操作就完成了。我們看到實際上步驟并不多,拿到Bitmap偏移量后每個工作線程就會在隨后的第一個事務提交的時候進行位圖的偏移,checkpoint_seqno 計數(shù)也會更新。

我們前面的假設環(huán)境中,如果觸發(fā)了一次檢查點,并且協(xié)調(diào)線程將后兩個可以并行的事務發(fā)給了工作線程1和3進行處理并且處理完成。那么我們的圖會變成如下(圖20-2):

20-2.png

這張圖中我用不同樣色表示了不同線條,因為它們交叉比較多。GAQ中的紅色事務就是我們假設的大事務它仍然沒有執(zhí)行完成,它也是我們所謂的‘gap’。如果這個時候MySQL實例異常重啟,那么這個紅色‘gap’就是我們啟動后需要找到的事務,方式就是通過Bitmap位圖進行比對,后面說異?;謴偷臅r候再詳細討論。如果是開啟了GTID,這種‘gap’很容易就能觀察到,下一節(jié)將進行測試。
同時我們需要注意這個時候工作線程2并沒有分發(fā)新的事務執(zhí)行,因為工作線程2沒有執(zhí)行完大事務, 因此在slave_woker_info表中它的信息仍然顯示為上一次提交事務的信息。而工作線程4因為沒有分配到新的事務,因此slave_woker_info表中它的信息也顯示為上一次提交事務的信息。因此在slave_woker_info中工作線程2和工作線程4的檢查點信息、Bitmap信息、checkpoint_seqno都是老的信息。

總結(jié)

好了到這里我已經(jīng)說明了MTS中三個關鍵點

  • 協(xié)調(diào)線程是根據(jù)什么規(guī)則進行事務分發(fā)的。
  • 工作線程如何拿到分發(fā)的事務。
  • MTS中的檢查點是如何進行的。

但是還有一個關鍵點沒有說,就是前面多次提到的異?;謴?,第25節(jié)將重點解釋。


第20節(jié)結(jié)束

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容