一、分區(qū)和分片
分區(qū)
分區(qū)是表的分區(qū),是解決大數(shù)據(jù)存儲的常見解決方案,具體的DDL操作關(guān)鍵詞是 PARTITION BY,指的是一個表按照某一列數(shù)據(jù)(比如日期)進(jìn)行分區(qū),對應(yīng)到最終的結(jié)果就是不同分區(qū)的數(shù)據(jù)會寫入不同的文件中。功能會比較類似于mysql的索引,主要是為了解決,有時候我們查詢只關(guān)心表中的一部分?jǐn)?shù)據(jù),建表時引入partition概念,可以按照對應(yīng)的分區(qū)字段,找出對應(yīng)的文件進(jìn)行查詢展示,防止查詢中會掃描整個表內(nèi)容,消耗很多時間做沒必要的工作。
分片
clickhouse的分片。其實也就是一個視圖聚合的功能,復(fù)用了數(shù)據(jù)庫的分區(qū),相當(dāng)于在原有的分區(qū)下,作為第二層分區(qū), 是在不同節(jié)點/機器上的體現(xiàn)。clickhouse可以支持讀取每個分片上的內(nèi)容的集合。底層是通過Distributed這個引實現(xiàn)的。默認(rèn)是隨機寫到某個分片,但是也可以自己去指定寫到具體某臺機器上。
Distributed引擎的官網(wǎng)介紹
分區(qū)和分片的具體關(guān)系如下:

數(shù)據(jù)分區(qū)-允許查詢在指定了分區(qū)鍵的條件下,盡可能的少讀取數(shù)據(jù)
數(shù)據(jù)分片-允許多臺機器/節(jié)點同并行執(zhí)行查詢,實現(xiàn)了分布式并行計算
二、分區(qū)相關(guān)操作
2.1 創(chuàng)建分區(qū)表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]
2.2 刪除分區(qū)
alter table [db.]table_name drop partition key [ON CLUSTER cluster]
2.3 查詢分區(qū)信息
select * from system.parts where table='table_name'
三、分片原理
在分布式模式下,ClickHouse會將數(shù)據(jù)分為多個分片,并且分布到不同節(jié)點上。不同的分片策略在應(yīng)對不同的SQL Pattern時,各有優(yōu)勢。ClickHouse提供了豐富的sharding策略,讓業(yè)務(wù)可以根據(jù)實際需求選用。
1) random隨機分片:寫入數(shù)據(jù)會被隨機分發(fā)到分布式集群中的某個節(jié)點上。
2) constant固定分片:寫入數(shù)據(jù)會被分發(fā)到固定一個節(jié)點上。
3) column value分片:按照某一列的值進(jìn)行hash分片。
4) 自定義表達(dá)式分片:指定任意合法表達(dá)式,根據(jù)表達(dá)式被計算后的值進(jìn)行hash分片。
數(shù)據(jù)分片,讓ClickHouse可以充分利用整個集群的大規(guī)模并行計算能力,快速返回查詢結(jié)果。
更重要的是,多樣化的分片功能,為業(yè)務(wù)優(yōu)化打開了想象空間。比如在hash sharding的情況下,JOIN計算能夠避免數(shù)據(jù)shuffle,直接在本地進(jìn)行l(wèi)ocal join;支持自定義sharding,可以為不同業(yè)務(wù)和SQL Pattern定制最適合的分片策略;利用自定義sharding功能,通過設(shè)置合理的sharding expression可以解決分片間數(shù)據(jù)傾斜問題等。
另外,sharding機制使得ClickHouse可以橫向線性拓展,構(gòu)建大規(guī)模分布式集群,從而具備處理海量數(shù)據(jù)的能力。
不過ClickHouse的集群的水平拓展目前是一個瓶頸,因為歷史數(shù)據(jù)的存在, 避免新增節(jié)點之后的數(shù)據(jù)傾斜是個難點。
四、ClickHouse數(shù)據(jù)分片
clickhouse中每個服務(wù)器節(jié)點都可以被稱為一個 shard(分片)。 假設(shè)有 N 臺服務(wù)器,每個服務(wù)器上都有一張數(shù)據(jù)表 A,且每個服務(wù)器上的 數(shù)據(jù)表 A 的數(shù)據(jù)不重復(fù),那么就可以說數(shù)據(jù)表 A 擁有 N 的分片。
對于一個完整的方案來說,還要考慮在數(shù)據(jù)寫入時如何被均勻低寫到各個分片中,以及數(shù)據(jù)在查詢時如何路由到每個分片,組合成結(jié)果集。
clickhouse 的數(shù)據(jù)分片需要結(jié)合 DIstributed 表引擎一起使用。DIstributed 表引擎本身不存儲任何數(shù)據(jù),它能夠作為分布式表的一層代理,在集群內(nèi)部自動展開數(shù)據(jù)寫入、分發(fā)、查詢、路由等工作。

4.1 集群的配置方式
在 clickhouse 中集群配置用shard 代表分配,replica 代表副本。
- 1 分片,0 副本配置
<shard> <!--分片-->
<replica> <!--副本-->
</replica>
</shard>
- 1 分片,1 副本配置
<shard> <!--分片-->
<replica> <!--副本-->
</replica>
<replica> <!--副本-->
</replica>
</shard>
clickhouse 集群有兩種配置方式
- 4.1.1 不包含副本的分片
如果直接使用 node 標(biāo)簽定義分配節(jié)點,那么該節(jié)點質(zhì)保函分配,不包含副本,配置如下
<yandex>
<!-- 自定義配置名稱,與 conf.xml 配置的 include 屬性相同即可-->
<clickhouse_remote_servers>
<shard_1> <!--自定義集群名稱-->
<node> <!--自定義 clickhouse 節(jié)點-->
<!--必填參數(shù)-->
<host>node3</host>
<port>9977</port>
<!--選填參數(shù)-->
<weight>1</weight>
<user>default</user>
<password></password>
<secure></secure>
<compression></compression>
</node>
<node>
<host>node2</host>
<port>9977</port>
</node>
</shard_1>
</clickhouse_remote_servers>
</yandex>
<!-- 配置定義了一個名為 shard_1 的集群,包含了兩個節(jié)點 node3、node2 -->
| 配置 | 說明 |
|---|---|
| shard_1 | 自定義集群名稱,全局唯一,是后續(xù)引用集群配置的唯一標(biāo)識 |
| node | 用于定義節(jié)點,不包含副本 |
| host | clickhouse 節(jié)點服務(wù)器地址 |
| port | clickhouse 服務(wù)的tcp 端口 |
| weight | 分片權(quán)重,默認(rèn)為 1 |
| user | clickhouse 用戶,默認(rèn)為 default |
| password | clickhouse 的用戶密碼,默認(rèn)為空字符 |
| secure | SSL 連接端口,默認(rèn) 9440 |
| conpression | 是否要開啟數(shù)據(jù)壓縮功能,默認(rèn) true |
4.1.2 自定義副本和分片
集群配置支持自定義分配和副本的數(shù)量,這種形式需要使用 shard 標(biāo)簽代替前面配置的 node標(biāo)簽,除此之外的配置完全相同。
配置自定義副本和分片時,副本和分片的數(shù)量完全交給由配置所決定。
其中 shard 表示邏輯上的數(shù)據(jù)分片,而物理上的分片則用 replica 表示
如果在一個 shard 標(biāo)簽下定義 N 組 replica,則該 shard 的語義表示 1 個分片和 N-1 個副本。
不包含副本的分片
<!-- 2 分片,0 副本-->
<sharding_simple> <!-- 集群自定義名稱 -->
<shard> <!-- 分片 -->
<replica> <!-- 副本 -->
<host>node3</host>
<port>9977</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9977</port>
</replica>
</shard>
</sharding_simple>
- N 分片和 N 副本
可以根據(jù)自己的需求,配置副本與分片的組合
<!-- 1 分片,1 副本-->
<sharding_simple> <!-- 集群自定義名稱 -->
<shard> <!-- 分片 -->
<replica> <!-- 副本 -->
<host>node3</host>
<port>9977</port>
</replica>
<replica>
<host>node2</host>
<port>9977</port>
</replica>
</shard>
</sharding_simple>
<!-- 2 分片,1 副本-->
<sharding_simple> <!-- 集群自定義名稱 -->
<shard> <!-- 分片 -->
<replica> <!-- 副本 -->
<host>node3</host>
<port>9977</port>
</replica>
<replica>
<host>node2</host>
<port>9977</port>
</replica>
</shard>
<shard> <!-- 分片 -->
<replica> <!-- 副本 -->
<host>node4</host>
<port>9977</port>
</replica>
<replica>
<host>node5</host>
<port>9977</port>
</replica>
</shard>
</sharding_simple>
<!-- 集群部署中,副本數(shù)量的上線是 clickhouse 節(jié)點的數(shù)量決定的 -->
在 clickhouse 中給我們配置了一些示例,可以打開配置文件看一下
<remote_servers>
<!-- Test only shard config for testing distributed storage -->
<test_shard_localhost>
<!-- Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)
If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.
And also (and which is more important), the initial_user will
be used as current user for the query.
Right now the protocol is pretty simple and it only takes into account:
- cluster name
- query
Also it will be nice if the following will be implemented:
- source hostname (see interserver_http_host), but then it will depends from DNS,
it can use IP address instead, but then the you need to get correct on the initiator node.
- target hostname / ip address (same notes as for source hostname)
- time-based security tokens
-->
<!-- <secret></secret> -->
<shard>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<!-- <internal_replication>false</internal_replication> -->
<!-- Optional. Shard weight when writing data. Default: 1. -->
<!-- <weight>1</weight> -->
<replica>
<host>localhost</host>
<port>9000</port>
<!-- Optional. Priority of the replica for load_balancing. Default: 1 (less value has more priority). -->
<!-- <priority>1</priority> -->
</replica>
</shard>
</test_shard_localhost>
<test_cluster_two_shards_localhost>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards_localhost>
<!-- 配置 2 個分配,0 副本 -->
<test_cluster_two_shards>
<shard>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards>
<!--2 分片,0 副本-->
<test_cluster_two_shards_internal_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards_internal_replication>
<!--1分片 0 副本,權(quán)重設(shè)為 1-->
<test_shard_localhost_secure>
<shard>
<replica>
<host>localhost</host>
<port>9440</port>
<secure>1</secure>
</replica>
</shard>
</test_shard_localhost_secure>
<test_unavailable_shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>1</port>
</replica>
</shard>
</test_unavailable_shard>
<!-- 手動添加新的集群 -->
<two_shard>
<shard>
<replica>
<host>node3</host>
<port>9977</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9977</port>
</replica>
</shard>
</two_shard>
</remote_servers>
-- 在 system.clusters 中查看配置情況
select cluster,host_name from system.clusters;
┌─cluster──────────────────────────────────────┬─host_name─┐
│ test_cluster_two_shards │ 127.0.0.1 │
│ test_cluster_two_shards │ 127.0.0.2 │
│ test_cluster_two_shards_internal_replication │ 127.0.0.1 │
│ test_cluster_two_shards_internal_replication │ 127.0.0.2 │
│ test_cluster_two_shards_localhost │ localhost │
│ test_cluster_two_shards_localhost │ localhost │
│ test_shard_localhost │ localhost │
│ test_shard_localhost_secure │ localhost │
│ test_unavailable_shard │ localhost │
│ test_unavailable_shard │ localhost │
└──────────────────────────────────────────────┴───────────┘
- 定義動態(tài)變量
在每個節(jié)點的 config 配置文件中 增加變量配置
# node3
vim /etc/clickhouse-server/config.xml
# 增加如下內(nèi)容
<macros>
<shard>01</shard>
<replica>node3</replica>
</macros>
# node2
vim /etc/clickhouse-server/config.xml
# 增加如下內(nèi)容
<macros>
<shard>02</shard>
<replica>node2</replica>
</macros>
-- 進(jìn)入 clickhouse 命令行查看變量是否配置成功
select * from system.macros;
-- 查看遠(yuǎn)端節(jié)點的數(shù)據(jù)
select * from remote('node2:9977','system','macros','default')
4.2 基于集群實現(xiàn)分布式 DDL
在默認(rèn)情況下,創(chuàng)建多張副本表需要在不同服務(wù)器上進(jìn)行創(chuàng)建,這是因為 create、drop、rename和 alter 等 ddl 語句不支持分布式執(zhí)行,而在假如集群配置后,就可以使用新的語法實現(xiàn)分布式DDL 執(zhí)行了。
create / drop / rename / alter table on cluster cluster_name
-- cluster_name 對應(yīng)為配置文件中的汲取名稱,clickhouse 會根據(jù)集群的配置,去各個節(jié)點執(zhí)行 DDL 語句
-- 在 two_shard 集群 創(chuàng)建測試表
CREATE TABLE t_shard ON CLUSTER two_shard
(
`id` UInt8,
`name` String,
`date` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_shard', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY id
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node2 │ 9977 │ 0 │ │ 1 │ 1 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │ 0 │ │ 0 │ 0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
-- 表引擎可以使用其他任意引擎
-- {shard} 和 {replica} 兩個動態(tài)變量代替了前面的硬編碼方式
-- clickhouse 會根據(jù) shard_2 的配置在 node3 和 node2 中創(chuàng)建 t_shard 數(shù)據(jù)表
-- 刪除 t_shard 表
drop table t_shard on cluster shard_2;
4.2.1 數(shù)據(jù)結(jié)構(gòu)
- 1、zookeeper 內(nèi)的節(jié)點結(jié)構(gòu)
<!-- 在默認(rèn)情況下,分布式 DDL 在 zookeeper 內(nèi)使用的根路徑由config.xml distributed_ddl 標(biāo)簽配置 -->
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
<!-- 默認(rèn)為 /clickhouse/task_queue/ddl-->
在此路徑之下,還有一些其他監(jiān)聽節(jié)點,包括 /query-[seq] 這是 DDL 操作日志,每執(zhí)行一次分布式 DDL 查詢,該節(jié)點下就會增加一條操作日志,記錄響應(yīng)操作。當(dāng)各個節(jié)點監(jiān)聽到有新的日志假如的時候,便會響應(yīng)執(zhí)行。
DDL 操作日志使用 zookeeper 持久化順序節(jié)點,每條指令的名稱以 query-[seq] 為前綴,后面的序號遞增,在 query-[seq] 操作日志下,還有兩個狀態(tài)節(jié)點:
- query-[seq]/active:用做監(jiān)控狀態(tài),在執(zhí)行任務(wù)的過程中,在該節(jié)點下會臨時保存當(dāng)前集群內(nèi)狀態(tài)為 active 的節(jié)點
- query-[seq]/finished:用于檢查任務(wù)完成情況,在任務(wù)執(zhí)行過程中,每當(dāng)集群內(nèi)的某個 host 節(jié)點執(zhí)行完成之后,就會在該節(jié)點下寫入記錄。
/query-000001/finished
node3 : 0
node2 : 0
# 表示 node3,node2 兩個節(jié)點已經(jīng)執(zhí)行完成

- 2、DDLLogEntry 日志對象的數(shù)據(jù)結(jié)構(gòu)
# 在 /query-[seq]下記錄的信息由 DDLLogEntry 承載,它的核心屬性有以下幾個:
version: 1
query: CREATE TABLE default.t_shard UUID \'d1679b02-9eae-4766-8032-8201a2746692\' ON CLUSTER two_shard (`id` UInt8, `name` String, `date` DateTime) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/t_shard\', \'{replica}\') PARTITION BY toYYYYMM(date) ORDER BY id
hosts: ['node3:9977','node2:9977']
initiator: node3%2Exy%2Ecom:9977
# query:記錄了 DDL 查詢的執(zhí)行語句
# host:記錄了指定集群的 hosts 主機列表,集群由分布式 DDL 語句中的 on cluster 指定,在分布式 DDL 執(zhí)行過程中,會根據(jù) hosts 列表逐個判斷它們的執(zhí)行狀態(tài)。
# initiator:記錄初始 host 主機的名稱,hosts 主機列表的取值來自于初始化 host 節(jié)點上的去集群
host主機列表的取值來源等同于下面的查詢
SELECT host_name
FROM system.clusters
WHERE cluster = 'two_shard'
┌─host_name─┐
│ node3 │
│ node2 │
└───────────┘
4.2.2 分布式 DDL 的執(zhí)行流程
以創(chuàng)建分布式表為例說明分布式 DDL 的執(zhí)行流程。
分布式 DDL 整個流程按照從上而下的時間順序執(zhí)行,大致分成 3 個步驟:
- 1、推送 DDL 日志:首先在 node3 節(jié)點執(zhí)行 create table on cluster ,同時 node3 也會創(chuàng)建 DDLLogEntry 日志 ,并將日志推送到 zookeeper 中,并監(jiān)控任務(wù)的執(zhí)行進(jìn)度
- 2、拉取日志并執(zhí)行:node3、node2 兩個節(jié)點分別監(jiān)控到 ddl/query-[seq] 日志的推送,分別拉取日志到本地,首先會判斷各自的 host 是否被包含在 DDLLogEntry 的 host 列表中,如果包含進(jìn)到執(zhí)行流程,執(zhí)行完畢后寫入 finished 節(jié)點,如果不包含,忽略
- 3、確認(rèn)執(zhí)行進(jìn)度:在第一步中執(zhí)行 DDL 語句后,客戶端會阻塞 180 秒,以期望所有 host 執(zhí)行完畢,如果等待時間大于 180 秒,則會轉(zhuǎn)入后臺線程繼續(xù)等待,等待時間由 distributed_ddl_task_timeout 參數(shù)設(shè)置,默認(rèn) 180
五、Distributed 原理解析
Distributed 表引擎是分布式表的代名詞,他自身不存儲任何數(shù)據(jù),而是作為數(shù)據(jù)分片的代理,能夠自動路由數(shù)據(jù)至集群中的各個節(jié)點,所以 DIstributed 表引擎需要和其他表引擎一起協(xié)同工作。

從上圖可以看出一張表分成了兩部分:
本地表:通常以 _local 后綴進(jìn)行命名。本地表是承接數(shù)據(jù)的載體,可以使用 非 Distributed 的任意表引擎,yi’zhang’ben
分布式表:通常以 _all 為后綴進(jìn)行命名,分布式表只能使用 Distributed 表引擎,他們與本地表形成一對多的映射關(guān)系,以后通過分布式表代理操作多張本地表。
對于分布式表與本地表之間表結(jié)構(gòu)的一致性檢查,Distributed 表引擎采用了讀時檢查的機制,這意味著如果他們的表結(jié)構(gòu)不兼容,需要在查詢時才會拋出異常,而在創(chuàng)建表引擎時不會進(jìn)行檢查,不同 clickhouse 節(jié)點上的本地表之間使用不同表引擎也是可行的,但是通常不會這么做,保持他們的結(jié)構(gòu)一致,有利于后期的維護(hù)避免造成不可預(yù)計的后果。
5.1 定義形式
Distributed 表引擎的定義形式
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name on cluster cluster_name(
name1 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
name2 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
...
) ENGINE = Distributed(cluster,database,table,[sharding_key])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
- cluster:集群名稱,與集群配置中的自定義名稱相對應(yīng),在對分布式表執(zhí)行寫入和查詢過程中,它會使用集群的配置信息來找對應(yīng)的節(jié)點。
- database:對應(yīng)數(shù)據(jù)庫名稱
- table:對應(yīng)數(shù)據(jù)表名稱,
- sharding_key:分片鍵,選填參數(shù),在寫入數(shù)據(jù)的過程中,分布式表會依據(jù)分片鍵的規(guī)則,將數(shù)據(jù)分布到各個本地表所在的節(jié)點中。
-- 創(chuàng)建分布式表 t_shard_2_all 代理 two_shard 集群的 drfault.t_shard_2_local 表
CREATE TABLE t_shard_2_all ON CLUSTER two_shard
(
`id` UInt8,
`name` String,
`date` DateTime
)
ENGINE = Distributed(two_shard, default, t_shard_2_local, rand())
Query id: 83e4f090-0f7d-4892-bbf3-a094f97a6eea
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node2 │ 9977 │ 0 │ │ 1 │ 1 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │ 0 │ │ 0 │ 0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
-- 這里用的是 on cluster 分布式 DDL, 所以在 two_shard 集群中每個節(jié)點都會創(chuàng)建一張分布式表
-- 寫入數(shù)據(jù)時會根據(jù) rand() 隨機函數(shù)的取值決定寫入那個分片,
-- 當(dāng)這時還沒有創(chuàng)建 本地表,可以看出Distributed 是讀數(shù)據(jù)時才會進(jìn)行檢查。
-- 嘗試 查詢 t_shard_2_all 分布式表
SELECT *
FROM t_shard_2_all;
Received exception from server (version 21.4.3):
Code: 60. DB::Exception: Received from localhost:9977. DB::Exception: Table default.t_shard_2_local doesn t exist.
-- 使用分布式 DDL 創(chuàng)建本地表
CREATE TABLE t_shard_2_local ON CLUSTER two_shard
(
`id` UInt8,
`name` String,
`date` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_shard_2_local', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY id
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │ 0 │ │ 1 │ 0 │
│ node2 │ 9977 │ 0 │ │ 0 │ 0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
-- 嘗試 查詢 t_shard_2_all 分布式表
SELECT *
FROM t_shard_2_all;
Query id: 5ae82696-0f07-469b-bff5-bd17dc513da7
Ok.
0 rows in set. Elapsed: 0.009 sec.
-- 到現(xiàn)在為止,擁有兩個數(shù)據(jù)分配的分布式表 t_shard_2_all 就創(chuàng)建好了

5.2 查詢的分類
分布式表的查詢操作可以分為以下幾類:
- 1、作用于本地表的查詢:對應(yīng) select 和 insert 分布式表會以分布式的方式作用于 local 本地表
- 2、只會影響分布式表自身,不會作用于本地表的查詢,分布式表支持部分元數(shù)據(jù)操作,包括 create、drop、rename 和 alter,其中 alter 并不包括分區(qū)的操作(attach partition 和 replace partition等)。這些操作只會修改分布式表自身,并不會修改 local 本地表,
- 3、如果想要徹底刪除一張分布式表,需要分別刪除分布式表和本地表
-- 刪除分布式表
drop table t_shard_2_all on cluster two_shard;
-- 刪除本地表
drop table t_shard_2_local on cluster two_shard;
- 4、不支持的操作,分布式表不支持任何的 mutation 類型的操作,包括 alter delete 和 alter update。
5.3 分片規(guī)則
關(guān)于分片的規(guī)則這里進(jìn)一步說明,分片鍵要求返回一個整型類型的取值,包括 Int和 UInt 類型的系列
-- 分片鍵可以是一個具體的整型字段
-- 按照用戶 ID 劃分
Distributed(cluster,database,table,userid)
-- 分片鍵也可以是返回整型的表達(dá)式
-- 按照隨機數(shù)劃分
Distributed(cluster,database,table,rand())
-- 按照用戶 ID 的散列值劃分
Distributed(cluster,database,table,intHash64(userid))
如果不聲明分片鍵,那么分布式表只能包含一個分片,這意味著只能映射一張表,否則寫入數(shù)據(jù)時將拋出異常。當(dāng)一個分布式表只包含一個分片的時候也就失去了分布式的意義,所以通常會按照業(yè)務(wù)需要設(shè)置分片鍵。
5.3.1 分片權(quán)重(weight)
在配置集群時,有一項 weight 的設(shè)置1
weight 默認(rèn)為 1,它可以被設(shè)置成任意整除,但是建議將其設(shè)置為比較小的值。分片權(quán)重會影響分片中的數(shù)據(jù)傾斜程度,分片權(quán)重越大,寫入的數(shù)據(jù)就會越多。
5.3.2 槽(slot)
slot 的數(shù)量等于所有分片權(quán)重之和,假設(shè)集群有兩個分片,第一個分片 weight 為 10,第二個 weight 為20 ,那么 slot 的數(shù)量為 30(10+20),slot 按照權(quán)重元素的取值區(qū)間,與對應(yīng)的分片形成映射關(guān)系,
- 如果 slot 取值區(qū)間在[0-10) 區(qū)間,則對應(yīng)第一個分片
- 如果 slot 取值區(qū)間在[10-20) 區(qū)間,則對應(yīng)第二個分片
5.3.3 選擇函數(shù)
選擇函數(shù)用于判斷一行待寫入的數(shù)據(jù)應(yīng)該被寫到哪個分片中,判斷過程大致分成兩個步驟:
-
1、找出 slot 取值,計算公式:slot = shard_value % sum_weight
- shard_value 為分片鍵的取值,
- sum_weight 為所有分片的權(quán)重之和
如果某行數(shù)據(jù) shard_value = 10,sum_weight = 30, 那么 30%10 = 10,即 slot = 10
2、基于 slot 值找到對應(yīng)的數(shù)據(jù)分片,當(dāng) slot 等于 10 的時候它屬于 [10,20) 區(qū)間,所以這行數(shù)據(jù)會被對應(yīng)到第二個分片。

5.4 分布式寫入的核心流程
向集群內(nèi)的分片寫入數(shù)據(jù)時,通常有兩種思路,
- 借助外部計算系統(tǒng),先將數(shù)據(jù)均勻分片,再借由計算系統(tǒng)直接將數(shù)據(jù)寫入 clickhouse 集群的各個本地表,這種方案通常有更好的寫入性能,因為分片數(shù)據(jù)時被并西恩點對點寫入的,但是這種方案主要依賴外部系統(tǒng),而不在于 clickhouse 自身。
- 通過Distributed 表引擎代理分片數(shù)據(jù)。下面詳細(xì)介紹這種方式的寫入流程。
為了便于理解,這里將分片寫入和副本復(fù)制拆分成兩個部分講解,使用一個擁有 2 個分片 0 個副本的集群講解分片寫入流程,使用一個擁有 1 個分片 1 個副本的集群講解分片副本復(fù)制流程。
5.4.1 將數(shù)據(jù)寫入分片的流程
在對 Distributed 表執(zhí)行 insert 操作的時候,會進(jìn)入數(shù)據(jù)寫入的執(zhí)行邏輯。整個過程大約分成 5 個步驟。
-
1、在第一個分片節(jié)點寫入本地分片數(shù)據(jù)
首先在 node3 節(jié)點對分布式表 t_shard_2_all 執(zhí)行 insert 操作,寫入 10,30,40,60 4 行數(shù)據(jù),執(zhí)行之后分布式表會做兩件事:- 根據(jù)分配規(guī)則劃分?jǐn)?shù)據(jù),在這個示例中,30,60 被劃分到第一個分片,10,40 被劃分到第二個分片
- 數(shù)據(jù)當(dāng)前分配的數(shù)據(jù)直接寫入本地表 t_shard_2_local
2、第一個分片建立遠(yuǎn)端連接,準(zhǔn)備發(fā)送遠(yuǎn)端分片數(shù)據(jù)
將需要放到遠(yuǎn)端分片的數(shù)據(jù)以分區(qū)為單位,分別寫入 t_shard_2_all 存儲目錄下的臨時 bin 文件,數(shù)據(jù)文件命名規(guī)則如下:
/database@host:port/[increase_num].bin
# 10,40 的兩條數(shù)據(jù)會寫入到這個臨時文件中
臨時數(shù)據(jù)寫完后會嘗試與 第二個分片的服務(wù)器進(jìn)行連接。
3、第一個分片向遠(yuǎn)端發(fā)送數(shù)據(jù)
這時會有另一組監(jiān)聽人會負(fù)責(zé)監(jiān)聽 t_shard_2_all 目錄下的文件變化,這些任務(wù)負(fù)責(zé)將目錄數(shù)據(jù)發(fā)送到遠(yuǎn)端分片,其中每份數(shù)據(jù)將由獨立的進(jìn)程負(fù)責(zé)發(fā)送,數(shù)據(jù)在傳輸之前會被壓縮。4、第二個分片接收數(shù)據(jù)并寫入本地
第二個分片與第一個分片的服務(wù)器建立連接后接受來自第一個分片的數(shù)據(jù),并將他們寫入本地表5、第一個分片確認(rèn)完成寫入
由數(shù)據(jù)發(fā)送方確認(rèn)所有數(shù)據(jù)發(fā)送完畢,至此數(shù)據(jù)寫入流程完畢。
由 Distributed 表負(fù)責(zé)向遠(yuǎn)端分片發(fā)送數(shù)據(jù)時,有異步和同步兩種模式:
- 異步:在 Distributed 表在寫完本地分片之后,insert 操作就會返回寫入成功信息
- 同步:在執(zhí)行 insert 操作之后,會等待所有分片完成寫入
由 insert_distributed_sync 參數(shù)控制使用何種模式,默認(rèn) false(異步),如果設(shè)置為 true ,還需要設(shè)置 insert_distributed_timeout 參數(shù)控制同步等待超時時間。
5.4.2 副本復(fù)制的流程

1、通過 Distributed 復(fù)制數(shù)據(jù)
在這種實現(xiàn)方式下,即使本地表不使用 ReplicatedMergeTree 表引擎,也能實現(xiàn)數(shù)據(jù)副本的功能,Distributed 會同時負(fù)責(zé)副本與分片的數(shù)據(jù)寫入工作,而副本的寫入流程與分片的寫入流程相同,這種情況下,Distributed 節(jié)點的寫入性能可能成為瓶頸。2、通過 ReplicatedMergeTree 復(fù)制數(shù)據(jù)
如果在集群的 shard 配置中增加設(shè)置 internal_replication = true,那么 Distributed 將在沒每個分片只寫一份數(shù)據(jù),不負(fù)責(zé)其副本的寫入,如果此時,本地表使用的是 ReplicatedMergeTree 表引擎,那么在 shard 內(nèi)的多個副本會由 ReplicatedMergeTree 自己處理。
Distributed 選擇 replica的算法大致是,clickhouse 服務(wù)器節(jié)點中擁有一個全局計數(shù)器 errors_count,當(dāng)服務(wù)器出現(xiàn)異常時計數(shù)器 +1,當(dāng)一個分片有多個副本時,選擇 errors_count 計數(shù)最小的服務(wù)器,進(jìn)行數(shù)據(jù)寫入。
5.5 分布式查詢的核心流程
與寫入數(shù)據(jù)有所不同,面向集群查詢數(shù)據(jù)的時候,只能通過Distributed 表引擎實現(xiàn),當(dāng)Distributed 表執(zhí)行查詢操作的時候,會依次查詢每個分片的數(shù)據(jù),然后再匯總返回。
5.5.1 多副本路由選擇
在查詢數(shù)據(jù)的時候,如果一個集群中有一個分片有多個副本,那么 Distributed 需要面臨副本選擇的問題,clickhouse 會使用負(fù)載均衡算法從眾多副本中選擇一個,而具體使用哪種算法由load_balancing參數(shù)控制。
# clickhouse 提供四種負(fù)載均衡算法
load_balancing=random/nearest_hostname/in_order/first_or_random
1、random
random 是默認(rèn)的負(fù)載均衡算法,clickhouse 服務(wù)器節(jié)點中有一個全局計數(shù)器 errors_count, 當(dāng)服務(wù)器發(fā)生異常時 計數(shù)器+1,random 就是選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,那么隨機選擇一個。2、nearest_hostname
可以看做是 random 的變種,同樣選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,那么選擇選擇與當(dāng)前配置的 hostname 最相似的一個。3、in_order
可以看做是 random 的變種,同樣選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,那么根據(jù) replica 的配置順序逐個選擇。4、first_or_random
可以看做是 in_order 的變種,同樣選擇 errors_count 最少的節(jié)點,如果有多個計數(shù)最少的errors_count節(jié)點,選擇配置的第一個 replica 節(jié)點,如果第一個 replica 節(jié)點不可用,隨機選擇一個。
5.5.2 多分片查詢的核心流程
分布式查詢與分布式寫入類似,同樣是誰發(fā)起誰負(fù)責(zé),它會由接收 select 查詢的 Distributed 表,負(fù)責(zé)串聯(lián)起整個查詢。
首先針對分布式表的查詢SQL,按照分片數(shù)量將查詢根據(jù)分片拆分成若干個針對本地表查詢的子查詢,然后向各個表發(fā)起查詢,最后再匯總各個分片的結(jié)果。
-- 例如在分布式表執(zhí)行下面查詢,查看執(zhí)行計劃
EXPLAIN
SELECT count(1)
FROM t_shard_2_all;
┌─explain─────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ MergingAggregated │
│ SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│ Union │
│ Expression (Convert block structure for query from local replica) │
│ ReadFromPreparedSource (Optimized trivial count) │
│ ReadFromPreparedSource (Read from remote replica) │
└─────────────────────────────────────────────────────────────────────────────┘
整個執(zhí)行計劃從上至下可分成兩個步驟
- 1、查詢各分片數(shù)據(jù)
其中 讀取本地數(shù)據(jù) 和 讀取遠(yuǎn)程數(shù)據(jù) 是并行的,他們分別負(fù)責(zé)執(zhí)行本地和遠(yuǎn)端分片的查詢動作。 - 2、返回合并結(jié)果
將返回結(jié)果合并
5.5.3 使用 global 優(yōu)化分布式子查詢
如果在分布式查詢中使用子查詢,可能會面臨兩難的局面,
先看下面一個示例:
-- 使用分布式 DDL 創(chuàng)建 分布式表
CREATE TABLE t_distributed_query_all ON CLUSTER two_shard
(
`id` UInt8, -- 用戶編號
`repo` UInt8 -- 倉庫編號
)
ENGINE = Distributed(two_shard, default, t_distributed_query_local, rand());
-- 使用分布式 DDL 創(chuàng)建本地表
CREATE TABLE t_distributed_query_local ON CLUSTER two_shard
(
`id` UInt8,
`repo` UInt8
)
ENGINE = TinyLog;
-- 在 node2 節(jié)點寫入數(shù)據(jù)
insert into t_distributed_query_local values (1,100),(2,100),(3,100);
-- 查詢數(shù)據(jù)
select * from t_distributed_query_local;
┌─id─┬─repo─┐
│ 1 │ 100 │
│ 2 │ 100 │
│ 3 │ 100 │
└────┴──────┘
-- 在 node3 節(jié)點寫入數(shù)據(jù)
insert into t_distributed_query_local values (3,200),(4,200);
-- 查詢數(shù)據(jù)
select * from t_distributed_query_local;
┌─id─┬─repo─┐
│ 3 │ 200 │
│ 4 │ 200 │
└────┴──────┘
-- 查詢?nèi)直頂?shù)據(jù)
select * from t_distributed_query_all;
┌─id─┬─repo─┐
│ 1 │ 100 │
│ 2 │ 100 │
│ 3 │ 100 │
└────┴──────┘
┌─id─┬─repo─┐
│ 3 │ 200 │
│ 4 │ 200 │
└────┴──────┘
要求找到同時擁有兩個倉庫的用戶,對于這種查詢可以使用 in 查詢子句,與此同時面臨的問題是 in 查詢使用分布式表還是本地表?
- 1、使用本地表的問題
如果在 in 查詢中使用本地表:
SELECT uniq(id)
FROM t_distributed_query_all
WHERE (repo = 100) AND (
id IN
(
SELECT id
FROM t_distributed_query_local
WHERE repo = 200
)
);
┌─uniq(id)─┐
│ 0 │
└──────────┘
-- 并沒有查詢出結(jié)果
-- 在分布式表在接收到查詢后,將上面 SQL 替換成本地表的形式再發(fā)送到每個分片進(jìn)行執(zhí)行
SELECT uniq(id)
FROM t_distributed_query_local
WHERE (repo = 100) AND (
id IN
(
SELECT id
FROM t_distributed_query_local
WHERE repo = 200
)
);
-- 單獨在分片 1 或分片 2 都無法找到滿足 同時等于 100 和 200 的數(shù)據(jù)
- 2、使用 global 優(yōu)化查詢
為了解決查詢問題,可以使用 global in 或者 join 進(jìn)行優(yōu)化
SELECT uniq(id)
FROM t_distributed_query_all
WHERE (repo = 100) AND (id GLOBAL IN
(
SELECT id
FROM t_distributed_query_all
WHERE repo = 200
))
Query id: 0a55d59d-c87b-4bc8-8985-dad26f0a39b9
┌─uniq(id)─┐
│ 1 │
└──────────┘
Global 查詢流程:
- 1、將 in 子句單獨提出,發(fā)起一次分布式查詢
- 2、將分布式表轉(zhuǎn)成 local 表后,分別在本地和遠(yuǎn)端分片執(zhí)行查詢
- 3、將 in 子查詢結(jié)果進(jìn)行匯總,放入一張臨時的內(nèi)存表進(jìn)行保存
- 4、將內(nèi)存表發(fā)送到遠(yuǎn)端分片節(jié)點
- 5、將分布式表轉(zhuǎn)為本地表后,開始執(zhí)行完整的 SQL 語句,in 子句直接使用臨時表的數(shù)據(jù)
在使用 global 修飾符之后,clickhouse 使用內(nèi)存表臨時保存了 in 子查詢到的數(shù)據(jù),并將其發(fā)送到遠(yuǎn)端分片節(jié)點,以此達(dá)到了數(shù)據(jù)共享的目的,從而避免了查詢放大的問題,in 或者 join 子句返回的數(shù)據(jù)不宜過大,如果內(nèi)存表存在重復(fù)數(shù)據(jù),可以實現(xiàn)在子句中增加 distinct 實現(xiàn)去重。