ClickHouse——數(shù)據(jù)分片

一、分區(qū)和分片

分區(qū)

分區(qū)是表的分區(qū),是解決大數(shù)據(jù)存儲的常見解決方案,具體的DDL操作關(guān)鍵詞是 PARTITION BY,指的是一個表按照某一列數(shù)據(jù)(比如日期)進(jìn)行分區(qū),對應(yīng)到最終的結(jié)果就是不同分區(qū)的數(shù)據(jù)會寫入不同的文件中。功能會比較類似于mysql的索引,主要是為了解決,有時候我們查詢只關(guān)心表中的一部分?jǐn)?shù)據(jù),建表時引入partition概念,可以按照對應(yīng)的分區(qū)字段,找出對應(yīng)的文件進(jìn)行查詢展示,防止查詢中會掃描整個表內(nèi)容,消耗很多時間做沒必要的工作。

分片

clickhouse的分片。其實也就是一個視圖聚合的功能,復(fù)用了數(shù)據(jù)庫的分區(qū),相當(dāng)于在原有的分區(qū)下,作為第二層分區(qū), 是在不同節(jié)點/機器上的體現(xiàn)。clickhouse可以支持讀取每個分片上的內(nèi)容的集合。底層是通過Distributed這個引實現(xiàn)的。默認(rèn)是隨機寫到某個分片,但是也可以自己去指定寫到具體某臺機器上。
Distributed引擎的官網(wǎng)介紹

分區(qū)和分片的具體關(guān)系如下:


數(shù)據(jù)分區(qū)-允許查詢在指定了分區(qū)鍵的條件下,盡可能的少讀取數(shù)據(jù)
數(shù)據(jù)分片-允許多臺機器/節(jié)點同并行執(zhí)行查詢,實現(xiàn)了分布式并行計算

二、分區(qū)相關(guān)操作

2.1 創(chuàng)建分區(qū)表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
    INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
    INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]

2.2 刪除分區(qū)

alter table [db.]table_name drop partition  key [ON CLUSTER cluster]

2.3 查詢分區(qū)信息

select * from system.parts where table='table_name'

三、分片原理

在分布式模式下,ClickHouse會將數(shù)據(jù)分為多個分片,并且分布到不同節(jié)點上。不同的分片策略在應(yīng)對不同的SQL Pattern時,各有優(yōu)勢。ClickHouse提供了豐富的sharding策略,讓業(yè)務(wù)可以根據(jù)實際需求選用。

  • 1) random隨機分片:寫入數(shù)據(jù)會被隨機分發(fā)到分布式集群中的某個節(jié)點上。

  • 2) constant固定分片:寫入數(shù)據(jù)會被分發(fā)到固定一個節(jié)點上。

  • 3) column value分片:按照某一列的值進(jìn)行hash分片。

  • 4) 自定義表達(dá)式分片:指定任意合法表達(dá)式,根據(jù)表達(dá)式被計算后的值進(jìn)行hash分片。

數(shù)據(jù)分片,讓ClickHouse可以充分利用整個集群的大規(guī)模并行計算能力,快速返回查詢結(jié)果。

更重要的是,多樣化的分片功能,為業(yè)務(wù)優(yōu)化打開了想象空間。比如在hash sharding的情況下,JOIN計算能夠避免數(shù)據(jù)shuffle,直接在本地進(jìn)行l(wèi)ocal join;支持自定義sharding,可以為不同業(yè)務(wù)和SQL Pattern定制最適合的分片策略;利用自定義sharding功能,通過設(shè)置合理的sharding expression可以解決分片間數(shù)據(jù)傾斜問題等。
另外,sharding機制使得ClickHouse可以橫向線性拓展,構(gòu)建大規(guī)模分布式集群,從而具備處理海量數(shù)據(jù)的能力。

不過ClickHouse的集群的水平拓展目前是一個瓶頸,因為歷史數(shù)據(jù)的存在, 避免新增節(jié)點之后的數(shù)據(jù)傾斜是個難點。

四、ClickHouse數(shù)據(jù)分片

  • clickhouse中每個服務(wù)器節(jié)點都可以被稱為一個 shard(分片)。 假設(shè)有 N 臺服務(wù)器,每個服務(wù)器上都有一張數(shù)據(jù)表 A,且每個服務(wù)器上的 數(shù)據(jù)表 A 的數(shù)據(jù)不重復(fù),那么就可以說數(shù)據(jù)表 A 擁有 N 的分片。

  • 對于一個完整的方案來說,還要考慮在數(shù)據(jù)寫入時如何被均勻低寫到各個分片中,以及數(shù)據(jù)在查詢時如何路由到每個分片,組合成結(jié)果集。

  • clickhouse 的數(shù)據(jù)分片需要結(jié)合 DIstributed 表引擎一起使用。DIstributed 表引擎本身不存儲任何數(shù)據(jù),它能夠作為分布式表的一層代理,在集群內(nèi)部自動展開數(shù)據(jù)寫入、分發(fā)、查詢、路由等工作。

4.1 集群的配置方式

在 clickhouse 中集群配置用shard 代表分配,replica 代表副本。

  • 1 分片,0 副本配置
<shard> <!--分片-->
  <replica> <!--副本-->
  </replica>
</shard>
  • 1 分片,1 副本配置
<shard> <!--分片-->
  <replica> <!--副本-->
  </replica>
    <replica> <!--副本-->
  </replica>
</shard>
clickhouse 集群有兩種配置方式
  • 4.1.1 不包含副本的分片
    如果直接使用 node 標(biāo)簽定義分配節(jié)點,那么該節(jié)點質(zhì)保函分配,不包含副本,配置如下
<yandex>
  <!-- 自定義配置名稱,與 conf.xml 配置的 include 屬性相同即可-->
  <clickhouse_remote_servers>
    <shard_1> <!--自定義集群名稱-->
      <node> <!--自定義 clickhouse 節(jié)點-->
        <!--必填參數(shù)-->
        <host>node3</host>
        <port>9977</port>

        <!--選填參數(shù)-->
        <weight>1</weight>
        <user>default</user>
        <password></password>
        <secure></secure>
        <compression></compression>
      </node>
      <node>
        <host>node2</host>
        <port>9977</port>
      </node>
    </shard_1>
  </clickhouse_remote_servers>
</yandex>
<!-- 配置定義了一個名為 shard_1 的集群,包含了兩個節(jié)點 node3、node2 -->
配置 說明
shard_1 自定義集群名稱,全局唯一,是后續(xù)引用集群配置的唯一標(biāo)識
node 用于定義節(jié)點,不包含副本
host clickhouse 節(jié)點服務(wù)器地址
port clickhouse 服務(wù)的tcp 端口
weight 分片權(quán)重,默認(rèn)為 1
user clickhouse 用戶,默認(rèn)為 default
password clickhouse 的用戶密碼,默認(rèn)為空字符
secure SSL 連接端口,默認(rèn) 9440
conpression 是否要開啟數(shù)據(jù)壓縮功能,默認(rèn) true
  • 4.1.2 自定義副本和分片

  • 集群配置支持自定義分配和副本的數(shù)量,這種形式需要使用 shard 標(biāo)簽代替前面配置的 node標(biāo)簽,除此之外的配置完全相同。

  • 配置自定義副本和分片時,副本和分片的數(shù)量完全交給由配置所決定。

  • 其中 shard 表示邏輯上的數(shù)據(jù)分片,而物理上的分片則用 replica 表示

  • 如果在一個 shard 標(biāo)簽下定義 N 組 replica,則該 shard 的語義表示 1 個分片和 N-1 個副本。

  • 不包含副本的分片

<!-- 2 分片,0 副本-->
<sharding_simple> <!-- 集群自定義名稱 -->
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node3</host>
      <port>9977</port>
    </replica>
  </shard>
    <shard>
    <replica>
      <host>node2</host>
      <port>9977</port>
    </replica>
  </shard>
</sharding_simple>
  • N 分片和 N 副本
    可以根據(jù)自己的需求,配置副本與分片的組合
<!-- 1 分片,1 副本-->
<sharding_simple> <!-- 集群自定義名稱 -->
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node3</host>
      <port>9977</port>
    </replica>
    <replica>
      <host>node2</host>
      <port>9977</port>
    </replica>
  </shard>
</sharding_simple>

<!-- 2 分片,1 副本-->
<sharding_simple> <!-- 集群自定義名稱 -->
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node3</host>
      <port>9977</port>
    </replica>
    <replica>
      <host>node2</host>
      <port>9977</port>
    </replica>
  </shard>
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node4</host>
      <port>9977</port>
    </replica>
    <replica>
      <host>node5</host>
      <port>9977</port>
    </replica>
  </shard>
</sharding_simple>
<!-- 集群部署中,副本數(shù)量的上線是 clickhouse 節(jié)點的數(shù)量決定的 -->

在 clickhouse 中給我們配置了一些示例,可以打開配置文件看一下

 <remote_servers>
        <!-- Test only shard config for testing distributed storage -->
        <test_shard_localhost>
            <!-- Inter-server per-cluster secret for Distributed queries
                 default: no secret (no authentication will be performed)

                 If set, then Distributed queries will be validated on shards, so at least:
                 - such cluster should exist on the shard,
                 - such cluster should have the same secret.

                 And also (and which is more important), the initial_user will
                 be used as current user for the query.

                 Right now the protocol is pretty simple and it only takes into account:
                 - cluster name
                 - query

                 Also it will be nice if the following will be implemented:
                 - source hostname (see interserver_http_host), but then it will depends from DNS,
                   it can use IP address instead, but then the you need to get correct on the initiator node.
                 - target hostname / ip address (same notes as for source hostname)
                 - time-based security tokens
            -->
            <!-- <secret></secret> -->

            <shard>
                <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
                <!-- <internal_replication>false</internal_replication> -->
                <!-- Optional. Shard weight when writing data. Default: 1. -->
                <!-- <weight>1</weight> -->
                <replica>
                    <host>localhost</host>
                    <port>9000</port>
                    <!-- Optional. Priority of the replica for load_balancing. Default: 1 (less value has more priority). -->
                    <!-- <priority>1</priority> -->
                </replica>
            </shard>
        </test_shard_localhost>
        <test_cluster_two_shards_localhost>
             <shard>
                 <replica>
                     <host>localhost</host>
                     <port>9000</port>
                 </replica>
             </shard>
             <shard>
                 <replica>
                     <host>localhost</host>
                     <port>9000</port>
                 </replica>
             </shard>
        </test_cluster_two_shards_localhost>


      <!-- 配置 2 個分配,0 副本 -->
        <test_cluster_two_shards>
            <shard>
                <replica>
                    <host>127.0.0.1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>127.0.0.2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </test_cluster_two_shards>

       <!--2 分片,0 副本-->
        <test_cluster_two_shards_internal_replication>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>127.0.0.1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>127.0.0.2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </test_cluster_two_shards_internal_replication>

       <!--1分片 0 副本,權(quán)重設(shè)為 1-->
        <test_shard_localhost_secure>
            <shard>
                <replica>
                    <host>localhost</host>
                    <port>9440</port>
                    <secure>1</secure>
                </replica>
            </shard>
        </test_shard_localhost_secure>


        <test_unavailable_shard>
            <shard>
                <replica>
                    <host>localhost</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>localhost</host>
                    <port>1</port>
                </replica>
            </shard>
        </test_unavailable_shard>

      <!-- 手動添加新的集群 -->
      <two_shard>
              <shard>
                <replica>
                    <host>node3</host>
                    <port>9977</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>node2</host>
                    <port>9977</port>
                </replica>
            </shard>
      </two_shard>


    </remote_servers>
-- 在 system.clusters 中查看配置情況
select cluster,host_name from system.clusters;

┌─cluster──────────────────────────────────────┬─host_name─┐
│ test_cluster_two_shards                      │ 127.0.0.1 │
│ test_cluster_two_shards                      │ 127.0.0.2 │
│ test_cluster_two_shards_internal_replication │ 127.0.0.1 │
│ test_cluster_two_shards_internal_replication │ 127.0.0.2 │
│ test_cluster_two_shards_localhost            │ localhost │
│ test_cluster_two_shards_localhost            │ localhost │
│ test_shard_localhost                         │ localhost │
│ test_shard_localhost_secure                  │ localhost │
│ test_unavailable_shard                       │ localhost │
│ test_unavailable_shard                       │ localhost │
└──────────────────────────────────────────────┴───────────┘
  • 定義動態(tài)變量
    在每個節(jié)點的 config 配置文件中 增加變量配置
# node3
vim /etc/clickhouse-server/config.xml
# 增加如下內(nèi)容
<macros>
  <shard>01</shard>
  <replica>node3</replica>
</macros>

# node2
vim /etc/clickhouse-server/config.xml
# 增加如下內(nèi)容
<macros>
  <shard>02</shard>
  <replica>node2</replica>
</macros>
-- 進(jìn)入 clickhouse 命令行查看變量是否配置成功

select * from system.macros;

-- 查看遠(yuǎn)端節(jié)點的數(shù)據(jù)
select * from remote('node2:9977','system','macros','default')

4.2 基于集群實現(xiàn)分布式 DDL

在默認(rèn)情況下,創(chuàng)建多張副本表需要在不同服務(wù)器上進(jìn)行創(chuàng)建,這是因為 create、drop、rename和 alter 等 ddl 語句不支持分布式執(zhí)行,而在假如集群配置后,就可以使用新的語法實現(xiàn)分布式DDL 執(zhí)行了。

create / drop / rename / alter table on cluster cluster_name

-- cluster_name 對應(yīng)為配置文件中的汲取名稱,clickhouse 會根據(jù)集群的配置,去各個節(jié)點執(zhí)行 DDL 語句

-- 在 two_shard 集群 創(chuàng)建測試表

CREATE TABLE t_shard ON CLUSTER two_shard
(
    `id` UInt8,
    `name` String,
    `date` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_shard', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY id

┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node2 │ 9977 │      0 │       │                   1 │                1 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │      0 │       │                   0 │                0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

-- 表引擎可以使用其他任意引擎
-- {shard} 和 {replica} 兩個動態(tài)變量代替了前面的硬編碼方式
-- clickhouse 會根據(jù) shard_2 的配置在 node3 和 node2 中創(chuàng)建 t_shard 數(shù)據(jù)表

-- 刪除 t_shard 表
drop table t_shard on cluster shard_2;

4.2.1 數(shù)據(jù)結(jié)構(gòu)

  • 1、zookeeper 內(nèi)的節(jié)點結(jié)構(gòu)
<!-- 在默認(rèn)情況下,分布式 DDL 在 zookeeper 內(nèi)使用的根路徑由config.xml distributed_ddl 標(biāo)簽配置 -->

<distributed_ddl>
  <path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>

<!-- 默認(rèn)為 /clickhouse/task_queue/ddl-->

在此路徑之下,還有一些其他監(jiān)聽節(jié)點,包括 /query-[seq] 這是 DDL 操作日志,每執(zhí)行一次分布式 DDL 查詢,該節(jié)點下就會增加一條操作日志,記錄響應(yīng)操作。當(dāng)各個節(jié)點監(jiān)聽到有新的日志假如的時候,便會響應(yīng)執(zhí)行。

DDL 操作日志使用 zookeeper 持久化順序節(jié)點,每條指令的名稱以 query-[seq] 為前綴,后面的序號遞增,在 query-[seq] 操作日志下,還有兩個狀態(tài)節(jié)點:

  • query-[seq]/active:用做監(jiān)控狀態(tài),在執(zhí)行任務(wù)的過程中,在該節(jié)點下會臨時保存當(dāng)前集群內(nèi)狀態(tài)為 active 的節(jié)點
  • query-[seq]/finished:用于檢查任務(wù)完成情況,在任務(wù)執(zhí)行過程中,每當(dāng)集群內(nèi)的某個 host 節(jié)點執(zhí)行完成之后,就會在該節(jié)點下寫入記錄。
/query-000001/finished
node3 : 0
node2 : 0

# 表示 node3,node2 兩個節(jié)點已經(jīng)執(zhí)行完成
  • 2、DDLLogEntry 日志對象的數(shù)據(jù)結(jié)構(gòu)
# 在 /query-[seq]下記錄的信息由 DDLLogEntry 承載,它的核心屬性有以下幾個:
version: 1

query: CREATE TABLE default.t_shard UUID \'d1679b02-9eae-4766-8032-8201a2746692\' ON CLUSTER two_shard (`id` UInt8, `name` String, `date` DateTime) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/t_shard\', \'{replica}\') PARTITION BY toYYYYMM(date) ORDER BY id

hosts: ['node3:9977','node2:9977']

initiator: node3%2Exy%2Ecom:9977

# query:記錄了 DDL 查詢的執(zhí)行語句
# host:記錄了指定集群的 hosts 主機列表,集群由分布式 DDL 語句中的 on cluster 指定,在分布式 DDL 執(zhí)行過程中,會根據(jù) hosts 列表逐個判斷它們的執(zhí)行狀態(tài)。
# initiator:記錄初始 host 主機的名稱,hosts 主機列表的取值來自于初始化 host 節(jié)點上的去集群

host主機列表的取值來源等同于下面的查詢

SELECT host_name
FROM system.clusters
WHERE cluster = 'two_shard'

┌─host_name─┐
│ node3     │
│ node2     │
└───────────┘

4.2.2 分布式 DDL 的執(zhí)行流程

以創(chuàng)建分布式表為例說明分布式 DDL 的執(zhí)行流程。

分布式 DDL 整個流程按照從上而下的時間順序執(zhí)行,大致分成 3 個步驟:

  • 1、推送 DDL 日志:首先在 node3 節(jié)點執(zhí)行 create table on cluster ,同時 node3 也會創(chuàng)建 DDLLogEntry 日志 ,并將日志推送到 zookeeper 中,并監(jiān)控任務(wù)的執(zhí)行進(jìn)度
  • 2、拉取日志并執(zhí)行:node3、node2 兩個節(jié)點分別監(jiān)控到 ddl/query-[seq] 日志的推送,分別拉取日志到本地,首先會判斷各自的 host 是否被包含在 DDLLogEntry 的 host 列表中,如果包含進(jìn)到執(zhí)行流程,執(zhí)行完畢后寫入 finished 節(jié)點,如果不包含,忽略
  • 3、確認(rèn)執(zhí)行進(jìn)度:在第一步中執(zhí)行 DDL 語句后,客戶端會阻塞 180 秒,以期望所有 host 執(zhí)行完畢,如果等待時間大于 180 秒,則會轉(zhuǎn)入后臺線程繼續(xù)等待,等待時間由 distributed_ddl_task_timeout 參數(shù)設(shè)置,默認(rèn) 180

五、Distributed 原理解析

Distributed 表引擎是分布式表的代名詞,他自身不存儲任何數(shù)據(jù),而是作為數(shù)據(jù)分片的代理,能夠自動路由數(shù)據(jù)至集群中的各個節(jié)點,所以 DIstributed 表引擎需要和其他表引擎一起協(xié)同工作。

從上圖可以看出一張表分成了兩部分:

  • 本地表:通常以 _local 后綴進(jìn)行命名。本地表是承接數(shù)據(jù)的載體,可以使用 非 Distributed 的任意表引擎,yi’zhang’ben

  • 分布式表:通常以 _all 為后綴進(jìn)行命名,分布式表只能使用 Distributed 表引擎,他們與本地表形成一對多的映射關(guān)系,以后通過分布式表代理操作多張本地表。

對于分布式表與本地表之間表結(jié)構(gòu)的一致性檢查,Distributed 表引擎采用了讀時檢查的機制,這意味著如果他們的表結(jié)構(gòu)不兼容,需要在查詢時才會拋出異常,而在創(chuàng)建表引擎時不會進(jìn)行檢查,不同 clickhouse 節(jié)點上的本地表之間使用不同表引擎也是可行的,但是通常不會這么做,保持他們的結(jié)構(gòu)一致,有利于后期的維護(hù)避免造成不可預(yù)計的后果。

5.1 定義形式

Distributed 表引擎的定義形式

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name on cluster cluster_name(
  name1 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
  name2 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
  ...
) ENGINE = Distributed(cluster,database,table,[sharding_key])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
  • cluster:集群名稱,與集群配置中的自定義名稱相對應(yīng),在對分布式表執(zhí)行寫入和查詢過程中,它會使用集群的配置信息來找對應(yīng)的節(jié)點。
  • database:對應(yīng)數(shù)據(jù)庫名稱
  • table:對應(yīng)數(shù)據(jù)表名稱,
  • sharding_key:分片鍵,選填參數(shù),在寫入數(shù)據(jù)的過程中,分布式表會依據(jù)分片鍵的規(guī)則,將數(shù)據(jù)分布到各個本地表所在的節(jié)點中。
-- 創(chuàng)建分布式表 t_shard_2_all 代理 two_shard 集群的 drfault.t_shard_2_local 表

CREATE TABLE t_shard_2_all ON CLUSTER two_shard
(
    `id` UInt8,
    `name` String,
    `date` DateTime
)
ENGINE = Distributed(two_shard, default, t_shard_2_local, rand())

Query id: 83e4f090-0f7d-4892-bbf3-a094f97a6eea

┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node2 │ 9977 │      0 │       │                   1 │                1 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │      0 │       │                   0 │                0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

-- 這里用的是 on cluster 分布式 DDL, 所以在 two_shard 集群中每個節(jié)點都會創(chuàng)建一張分布式表
-- 寫入數(shù)據(jù)時會根據(jù) rand() 隨機函數(shù)的取值決定寫入那個分片,
-- 當(dāng)這時還沒有創(chuàng)建 本地表,可以看出Distributed 是讀數(shù)據(jù)時才會進(jìn)行檢查。

-- 嘗試 查詢 t_shard_2_all 分布式表 
SELECT *
FROM t_shard_2_all;


Received exception from server (version 21.4.3):
Code: 60. DB::Exception: Received from localhost:9977. DB::Exception: Table default.t_shard_2_local doesn t exist.


-- 使用分布式 DDL 創(chuàng)建本地表
CREATE TABLE t_shard_2_local ON CLUSTER two_shard
(
    `id` UInt8,
    `name` String,
    `date` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_shard_2_local', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY id

┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │      0 │       │                   1 │                0 │
│ node2 │ 9977 │      0 │       │                   0 │                0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

-- 嘗試 查詢 t_shard_2_all 分布式表 
SELECT *
FROM t_shard_2_all;

Query id: 5ae82696-0f07-469b-bff5-bd17dc513da7

Ok.

0 rows in set. Elapsed: 0.009 sec.

-- 到現(xiàn)在為止,擁有兩個數(shù)據(jù)分配的分布式表 t_shard_2_all 就創(chuàng)建好了

5.2 查詢的分類

分布式表的查詢操作可以分為以下幾類:

  • 1、作用于本地表的查詢:對應(yīng) select 和 insert 分布式表會以分布式的方式作用于 local 本地表
  • 2、只會影響分布式表自身,不會作用于本地表的查詢,分布式表支持部分元數(shù)據(jù)操作,包括 create、drop、rename 和 alter,其中 alter 并不包括分區(qū)的操作(attach partition 和 replace partition等)。這些操作只會修改分布式表自身,并不會修改 local 本地表,
  • 3、如果想要徹底刪除一張分布式表,需要分別刪除分布式表和本地表
-- 刪除分布式表
drop table t_shard_2_all on cluster two_shard;

-- 刪除本地表
drop table t_shard_2_local on cluster two_shard;
  • 4、不支持的操作,分布式表不支持任何的 mutation 類型的操作,包括 alter delete 和 alter update。

5.3 分片規(guī)則

關(guān)于分片的規(guī)則這里進(jìn)一步說明,分片鍵要求返回一個整型類型的取值,包括 Int和 UInt 類型的系列

-- 分片鍵可以是一個具體的整型字段
-- 按照用戶 ID 劃分
Distributed(cluster,database,table,userid)

-- 分片鍵也可以是返回整型的表達(dá)式
-- 按照隨機數(shù)劃分
Distributed(cluster,database,table,rand())

-- 按照用戶 ID 的散列值劃分
Distributed(cluster,database,table,intHash64(userid))

如果不聲明分片鍵,那么分布式表只能包含一個分片,這意味著只能映射一張表,否則寫入數(shù)據(jù)時將拋出異常。當(dāng)一個分布式表只包含一個分片的時候也就失去了分布式的意義,所以通常會按照業(yè)務(wù)需要設(shè)置分片鍵。

5.3.1 分片權(quán)重(weight)

在配置集群時,有一項 weight 的設(shè)置1

weight 默認(rèn)為 1,它可以被設(shè)置成任意整除,但是建議將其設(shè)置為比較小的值。分片權(quán)重會影響分片中的數(shù)據(jù)傾斜程度,分片權(quán)重越大,寫入的數(shù)據(jù)就會越多。

5.3.2 槽(slot)

slot 的數(shù)量等于所有分片權(quán)重之和,假設(shè)集群有兩個分片,第一個分片 weight 為 10,第二個 weight 為20 ,那么 slot 的數(shù)量為 30(10+20),slot 按照權(quán)重元素的取值區(qū)間,與對應(yīng)的分片形成映射關(guān)系,

  • 如果 slot 取值區(qū)間在[0-10) 區(qū)間,則對應(yīng)第一個分片
  • 如果 slot 取值區(qū)間在[10-20) 區(qū)間,則對應(yīng)第二個分片

5.3.3 選擇函數(shù)

選擇函數(shù)用于判斷一行待寫入的數(shù)據(jù)應(yīng)該被寫到哪個分片中,判斷過程大致分成兩個步驟:

  • 1、找出 slot 取值,計算公式:slot = shard_value % sum_weight

    • shard_value 為分片鍵的取值,
    • sum_weight 為所有分片的權(quán)重之和
      如果某行數(shù)據(jù) shard_value = 10,sum_weight = 30, 那么 30%10 = 10,即 slot = 10
  • 2、基于 slot 值找到對應(yīng)的數(shù)據(jù)分片,當(dāng) slot 等于 10 的時候它屬于 [10,20) 區(qū)間,所以這行數(shù)據(jù)會被對應(yīng)到第二個分片。

5.4 分布式寫入的核心流程

向集群內(nèi)的分片寫入數(shù)據(jù)時,通常有兩種思路,

  • 借助外部計算系統(tǒng),先將數(shù)據(jù)均勻分片,再借由計算系統(tǒng)直接將數(shù)據(jù)寫入 clickhouse 集群的各個本地表,這種方案通常有更好的寫入性能,因為分片數(shù)據(jù)時被并西恩點對點寫入的,但是這種方案主要依賴外部系統(tǒng),而不在于 clickhouse 自身。
  • 通過Distributed 表引擎代理分片數(shù)據(jù)。下面詳細(xì)介紹這種方式的寫入流程。

為了便于理解,這里將分片寫入和副本復(fù)制拆分成兩個部分講解,使用一個擁有 2 個分片 0 個副本的集群講解分片寫入流程,使用一個擁有 1 個分片 1 個副本的集群講解分片副本復(fù)制流程。

5.4.1 將數(shù)據(jù)寫入分片的流程

在對 Distributed 表執(zhí)行 insert 操作的時候,會進(jìn)入數(shù)據(jù)寫入的執(zhí)行邏輯。整個過程大約分成 5 個步驟。

  • 1、在第一個分片節(jié)點寫入本地分片數(shù)據(jù)
    首先在 node3 節(jié)點對分布式表 t_shard_2_all 執(zhí)行 insert 操作,寫入 10,30,40,60 4 行數(shù)據(jù),執(zhí)行之后分布式表會做兩件事:

    • 根據(jù)分配規(guī)則劃分?jǐn)?shù)據(jù),在這個示例中,30,60 被劃分到第一個分片,10,40 被劃分到第二個分片
    • 數(shù)據(jù)當(dāng)前分配的數(shù)據(jù)直接寫入本地表 t_shard_2_local
  • 2、第一個分片建立遠(yuǎn)端連接,準(zhǔn)備發(fā)送遠(yuǎn)端分片數(shù)據(jù)

將需要放到遠(yuǎn)端分片的數(shù)據(jù)以分區(qū)為單位,分別寫入 t_shard_2_all 存儲目錄下的臨時 bin 文件,數(shù)據(jù)文件命名規(guī)則如下:

/database@host:port/[increase_num].bin

# 10,40 的兩條數(shù)據(jù)會寫入到這個臨時文件中

臨時數(shù)據(jù)寫完后會嘗試與 第二個分片的服務(wù)器進(jìn)行連接。

  • 3、第一個分片向遠(yuǎn)端發(fā)送數(shù)據(jù)
    這時會有另一組監(jiān)聽人會負(fù)責(zé)監(jiān)聽 t_shard_2_all 目錄下的文件變化,這些任務(wù)負(fù)責(zé)將目錄數(shù)據(jù)發(fā)送到遠(yuǎn)端分片,其中每份數(shù)據(jù)將由獨立的進(jìn)程負(fù)責(zé)發(fā)送,數(shù)據(jù)在傳輸之前會被壓縮。

  • 4、第二個分片接收數(shù)據(jù)并寫入本地
    第二個分片與第一個分片的服務(wù)器建立連接后接受來自第一個分片的數(shù)據(jù),并將他們寫入本地表

  • 5、第一個分片確認(rèn)完成寫入
    由數(shù)據(jù)發(fā)送方確認(rèn)所有數(shù)據(jù)發(fā)送完畢,至此數(shù)據(jù)寫入流程完畢。

由 Distributed 表負(fù)責(zé)向遠(yuǎn)端分片發(fā)送數(shù)據(jù)時,有異步和同步兩種模式:

  • 異步:在 Distributed 表在寫完本地分片之后,insert 操作就會返回寫入成功信息
  • 同步:在執(zhí)行 insert 操作之后,會等待所有分片完成寫入

由 insert_distributed_sync 參數(shù)控制使用何種模式,默認(rèn) false(異步),如果設(shè)置為 true ,還需要設(shè)置 insert_distributed_timeout 參數(shù)控制同步等待超時時間。

5.4.2 副本復(fù)制的流程

  • 1、通過 Distributed 復(fù)制數(shù)據(jù)
    在這種實現(xiàn)方式下,即使本地表不使用 ReplicatedMergeTree 表引擎,也能實現(xiàn)數(shù)據(jù)副本的功能,Distributed 會同時負(fù)責(zé)副本與分片的數(shù)據(jù)寫入工作,而副本的寫入流程與分片的寫入流程相同,這種情況下,Distributed 節(jié)點的寫入性能可能成為瓶頸。

  • 2、通過 ReplicatedMergeTree 復(fù)制數(shù)據(jù)
    如果在集群的 shard 配置中增加設(shè)置 internal_replication = true,那么 Distributed 將在沒每個分片只寫一份數(shù)據(jù),不負(fù)責(zé)其副本的寫入,如果此時,本地表使用的是 ReplicatedMergeTree 表引擎,那么在 shard 內(nèi)的多個副本會由 ReplicatedMergeTree 自己處理。

Distributed 選擇 replica的算法大致是,clickhouse 服務(wù)器節(jié)點中擁有一個全局計數(shù)器 errors_count,當(dāng)服務(wù)器出現(xiàn)異常時計數(shù)器 +1,當(dāng)一個分片有多個副本時,選擇 errors_count 計數(shù)最小的服務(wù)器,進(jìn)行數(shù)據(jù)寫入。

5.5 分布式查詢的核心流程

與寫入數(shù)據(jù)有所不同,面向集群查詢數(shù)據(jù)的時候,只能通過Distributed 表引擎實現(xiàn),當(dāng)Distributed 表執(zhí)行查詢操作的時候,會依次查詢每個分片的數(shù)據(jù),然后再匯總返回。

5.5.1 多副本路由選擇

在查詢數(shù)據(jù)的時候,如果一個集群中有一個分片有多個副本,那么 Distributed 需要面臨副本選擇的問題,clickhouse 會使用負(fù)載均衡算法從眾多副本中選擇一個,而具體使用哪種算法由load_balancing參數(shù)控制。

# clickhouse 提供四種負(fù)載均衡算法
load_balancing=random/nearest_hostname/in_order/first_or_random
  • 1、random
    random 是默認(rèn)的負(fù)載均衡算法,clickhouse 服務(wù)器節(jié)點中有一個全局計數(shù)器 errors_count, 當(dāng)服務(wù)器發(fā)生異常時 計數(shù)器+1,random 就是選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,那么隨機選擇一個。

  • 2、nearest_hostname
    可以看做是 random 的變種,同樣選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,那么選擇選擇與當(dāng)前配置的 hostname 最相似的一個。

  • 3、in_order
    可以看做是 random 的變種,同樣選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,那么根據(jù) replica 的配置順序逐個選擇。

  • 4、first_or_random
    可以看做是 in_order 的變種,同樣選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,選擇配置的第一個 replica 節(jié)點,如果第一個 replica 節(jié)點不可用,隨機選擇一個。

5.5.2 多分片查詢的核心流程

分布式查詢與分布式寫入類似,同樣是誰發(fā)起誰負(fù)責(zé),它會由接收 select 查詢的 Distributed 表,負(fù)責(zé)串聯(lián)起整個查詢。

首先針對分布式表的查詢SQL,按照分片數(shù)量將查詢根據(jù)分片拆分成若干個針對本地表查詢的子查詢,然后向各個表發(fā)起查詢,最后再匯總各個分片的結(jié)果。

-- 例如在分布式表執(zhí)行下面查詢,查看執(zhí)行計劃

EXPLAIN
SELECT count(1)
FROM t_shard_2_all;

┌─explain─────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY))                                 │
│   MergingAggregated                                                         │
│     SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│       Union                                                                 │
│         Expression (Convert block structure for query from local replica)   │
│           ReadFromPreparedSource (Optimized trivial count)                  │
│         ReadFromPreparedSource (Read from remote replica)                   │
└─────────────────────────────────────────────────────────────────────────────┘

整個執(zhí)行計劃從上至下可分成兩個步驟

  • 1、查詢各分片數(shù)據(jù)
    其中 讀取本地數(shù)據(jù) 和 讀取遠(yuǎn)程數(shù)據(jù) 是并行的,他們分別負(fù)責(zé)執(zhí)行本地和遠(yuǎn)端分片的查詢動作。
  • 2、返回合并結(jié)果
    將返回結(jié)果合并

5.5.3 使用 global 優(yōu)化分布式子查詢

如果在分布式查詢中使用子查詢,可能會面臨兩難的局面,

先看下面一個示例:

-- 使用分布式 DDL 創(chuàng)建 分布式表
CREATE TABLE t_distributed_query_all ON CLUSTER two_shard
(
    `id` UInt8, -- 用戶編號
    `repo` UInt8 -- 倉庫編號
)
ENGINE = Distributed(two_shard, default, t_distributed_query_local, rand());

-- 使用分布式 DDL 創(chuàng)建本地表
CREATE TABLE t_distributed_query_local ON CLUSTER two_shard
(
    `id` UInt8,
    `repo` UInt8
)
ENGINE = TinyLog;

-- 在 node2 節(jié)點寫入數(shù)據(jù)
insert into t_distributed_query_local values (1,100),(2,100),(3,100);
-- 查詢數(shù)據(jù)
select * from t_distributed_query_local;
┌─id─┬─repo─┐
│  1 │  100 │
│  2 │  100 │
│  3 │  100 │
└────┴──────┘

-- 在 node3 節(jié)點寫入數(shù)據(jù)
insert into t_distributed_query_local values (3,200),(4,200);
-- 查詢數(shù)據(jù)
select * from t_distributed_query_local;
┌─id─┬─repo─┐
│  3 │  200 │
│  4 │  200 │
└────┴──────┘

-- 查詢?nèi)直頂?shù)據(jù)
select * from t_distributed_query_all;

┌─id─┬─repo─┐
│  1 │  100 │
│  2 │  100 │
│  3 │  100 │
└────┴──────┘
┌─id─┬─repo─┐
│  3 │  200 │
│  4 │  200 │
└────┴──────┘

要求找到同時擁有兩個倉庫的用戶,對于這種查詢可以使用 in 查詢子句,與此同時面臨的問題是 in 查詢使用分布式表還是本地表?

  • 1、使用本地表的問題
    如果在 in 查詢中使用本地表:
SELECT uniq(id)
FROM t_distributed_query_all
WHERE (repo = 100) AND (
  id IN
    (
      SELECT id
      FROM t_distributed_query_local
      WHERE repo = 200
    )
);

┌─uniq(id)─┐
│        0 │
└──────────┘
-- 并沒有查詢出結(jié)果

-- 在分布式表在接收到查詢后,將上面 SQL 替換成本地表的形式再發(fā)送到每個分片進(jìn)行執(zhí)行

SELECT uniq(id)
FROM t_distributed_query_local
WHERE (repo = 100) AND (
  id IN
    (
      SELECT id
      FROM t_distributed_query_local
      WHERE repo = 200
    )
);
-- 單獨在分片 1 或分片 2 都無法找到滿足 同時等于 100 和 200 的數(shù)據(jù)
  • 2、使用 global 優(yōu)化查詢
    為了解決查詢問題,可以使用 global in 或者 join 進(jìn)行優(yōu)化
SELECT uniq(id)
FROM t_distributed_query_all
WHERE (repo = 100) AND (id GLOBAL IN
(
    SELECT id
    FROM t_distributed_query_all
    WHERE repo = 200
))

Query id: 0a55d59d-c87b-4bc8-8985-dad26f0a39b9

┌─uniq(id)─┐
│        1 │
└──────────┘

Global 查詢流程:

  • 1、將 in 子句單獨提出,發(fā)起一次分布式查詢
  • 2、將分布式表轉(zhuǎn)成 local 表后,分別在本地和遠(yuǎn)端分片執(zhí)行查詢
  • 3、將 in 子查詢結(jié)果進(jìn)行匯總,放入一張臨時的內(nèi)存表進(jìn)行保存
  • 4、將內(nèi)存表發(fā)送到遠(yuǎn)端分片節(jié)點
  • 5、將分布式表轉(zhuǎn)為本地表后,開始執(zhí)行完整的 SQL 語句,in 子句直接使用臨時表的數(shù)據(jù)

在使用 global 修飾符之后,clickhouse 使用內(nèi)存表臨時保存了 in 子查詢到的數(shù)據(jù),并將其發(fā)送到遠(yuǎn)端分片節(jié)點,以此達(dá)到了數(shù)據(jù)共享的目的,從而避免了查詢放大的問題,in 或者 join 子句返回的數(shù)據(jù)不宜過大,如果內(nèi)存表存在重復(fù)數(shù)據(jù),可以實現(xiàn)在子句中增加 distinct 實現(xiàn)去重。

參考:
https://pushkin.blog.csdn.net/article/details/127020068

https://blog.csdn.net/aizhupo1314/article/details/120016988

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容