Apache Doris Colocate Join 原理實踐教程
What Colocate Join
我們都知道Join的常見連接類型如下:
INNER JOINOUTER JOINCROSS JOINSEMI JOINANTI JOINJoin的常見算法實現包括:
Nested Loop JoinSort Merge JoinHash Join分布式系統實現連接數據分布的常用策略包括:
Shuffle JoinBroadcast JoinColocate/Local JoinColocate/Local Join是指多個節點加入時,沒有數據移動和網絡傳輸,每個節點只在本地加入。本地連接的前提是同一個連接鍵的數據分布在同一個節點。
Why Colocate Join
與混洗連接和廣播連接相比,同位連接具有更高的性能,因為在查詢期間沒有數據的網絡傳輸。在Doris的具體實現中,Colocate Join可以擁有比Shuffle Join更高的并發粒度,也可以顯著提高Join的性能,后面會有說明。
How Colocate Join
核心思維
對于協同定位表,在任何情況下都應該保證數據的局部性。
具體包括:數據導入時保證數據本地性查詢調度時保證數據本地性數據 balance 后保證數據本地性最復雜的實現是第三點:處理協同定位表的平衡。
術語定義
共生群
我們稱一組具有相同共置屬性的表為Group,下圖中t1和t2具有相同的共置組。
協同定位父表
我們把決定一個組的數據分布的表叫做父表,下圖中的t1就是同位父表。
協同定位子表
我們把一個組中除了父表以外的表都叫做子表,下圖中的t2就是共置子表。
時段序列
如下所示,如果一個表有n個分區,那么每個分區的第m個桶的桶序列是m。
1導入數據時確保本地性。
Doris的分區如下:首先按照分區字段范圍進行分區,然后按照指定的分布式Key Hash劃分成桶:
所以導入數據時保證本地性的核心思想是兩次映射。對于協同定位表,我們確保相同分布式鍵的數據映射到相同的Bucket Seq,然后確保相同Bucket Seq的Bucket映射到相同的BE。
具體來說,第一步:我們計算分布式密鑰的哈希值,并對bucket num取模,以確保相同分布式密鑰的數據映射到相同的Bucket Seq。
步驟2:將同一協同定位組下同一時段序列的所有時段映射到同一BE,如下所示:
Group 中所有 Table 的 Bucket Seq 和 BE 節點的映射關系和 Parent Table 一致Parent Table 中所有 Partition 的 Bucket Seq 和 BE 節點的映射關系和第一個 Partition 一致Parent Table 第一個 Partition 的 Bucket Seq 和 BE 節點的映射關系利用原生的 Round Robin 算法決定2協同定位連接查詢計劃
關于HashJoinFragment,由于Join的很多表都有數據局部性保證,所以可以去掉交換節點,避免網絡傳輸,直接設置ScanNode為Hash Join節點的子節點。
3協同定位連接查詢計劃
調度目標:在協同定位連接中,所有ScanNode中具有相同存儲桶序列的所有存儲桶都被調度到同一個BE。
調度策略:第一個ScanNode的桶隨機選擇BE,其余scannode與第一個scan node一致。
4在時段序列級別的協同定位連接
目前,Doris的散列連接是服務器粒度的:
關于協同定位連接,由于同一個協同定位組下的同一個Bucket Seq的Bucket分布在同一個BE中,我們將連接的粒度從服務器粒度降低到Bucket Seq粒度:
5協同定位連接元數據維護
對于協同定位連接,我們需要維護以下核心元數據:
代碼中,colocate group id 就是 colocate parent table idgroup2BackendsPerBucketSeq 代表每個 colocate group 中每個 bucket seq 映射到哪些 BE為了支持 balance,以及保證元數據的一致性,這些元數據都需要持久化6如何決定一個查詢可以共存連接
Join 的 tables 是 colocate ableThe colocate group 是 stable 狀態,沒有 balancingJoin 的 Key 包含分桶的 Distributed Key7協同定位連接支持平衡
核心理念:
添加一個守護線程來處理并置表的平衡,讓普通的平衡線程不處理并置表的平衡。
平衡時:
當添加、刪除或關閉BE節點時。
天平的粒度:
正常余額的粒度是桶,但是對于同宿表,我們必須保證所有桶在同一個同宿組下的數據局部性,所以我們余額的單位是同宿組。
余額對查詢的影響:
當共存組處于平衡狀態時,共存連接將退化為原始的隨機連接或廣播連接。
平衡過程:
為需要復制或遷移的 Bucket 選擇目標 BE標記 colocate group 的轉態為 balancing對于需要復制或遷移的 Bucket,發起 Clone Job,Clone Job 會從 Bucket 的現有副本復制一個新副本目標 BE更新 backendsPerBucketSeq(維護 Bucket Seq 到 BE 映射關系的元數據)當一個 colocate group 下的所有 Clone Job 都完成時,標記 colocate group 的轉態為 stable刪除冗余的副本當一個BE節點被刪除或長時間掛起時,選擇目標BE的策略:
選擇策略與正常平衡中的相同。考慮到集群的整體負載,盡量選擇負載較低的BE。
添加BE節點時,選擇目標BE的策略:
對于當前 colocate group,計算每個新增 BE 需要增加的 bucket seqs 個數:假如我們有 3 個 BE,8 個 bucket,每個 bucket 是 3 副本,則每個 BE 負責 8 個 bucket 副本,我們新增 1 個 BE 后,可以計算出每個 BE 負責的平均 bucket 副本數應該是 3 * 8 / 4 = 6,每個新增 BE 需要增加的 bucket seqs 個數為 6 / 1 = 6.對于每個 bucket seqs, 隨機選擇從哪個舊的 BE 遷移副本到新增的 BE。Colocate Join Performance
測試數據:
表A、B、C、B、C都有10天的數據,每天一個分區,每個分區有570萬個數據。
測試集群:
4臺低級物理機,每臺24個CPU和96個內存
測試SQL:
SQL1:
select count(*) FROM A t1INNER JOIN [shuffle] B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id))INNER JOIN [shuffle] C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id))where t1.dt in (xxx days);SQL2:
select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name,t5.id, t5.weight_time,t5.list,t6.ord_id, t6._idFROM A t1INNER JOIN B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id))INNER JOIN C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id))where t1.dt in (xxx days)limit 10000;SQL1的測試結果:
SQL2的測試結果:
可以看出,協同定位連接相對于混洗連接有明顯的性能提升,而且集群規模越大,連接的數據量越多,協同定位連接的優勢也會越明顯。
How To Use Colocate Join
最新的社區代碼已經支持Colocate Join,但默認情況下是關閉的。您只需在FE配置中將disable_colocate_join設置為false,即可啟用Colocate Join。
使用時,只需要在創建表時添加colocate _ with屬性即可。colocate_with的值可以設置為同一組colocate表中的任何一個,但是您需要確保首先創建colocate_with屬性中的表。
如果需要共同定位連接表t1和t2,可以根據以下語句構建一個表:
CREATE TABLE `t1` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES ("colocate_with" = "t1");CREATE TABLE `t2` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES ("colocate_with" = "t1");協同定位連接目前受到限制
Colocate Table 必須是 OLAP 類型的表colocate_with 屬性相同表的 BUCKET 數必須一樣colocate_with 屬性相同表的 副本數必須一樣 (這個限制之后可能會去掉,但對用戶應該沒有實際影響)colocate_with 屬性相同表的 DISTRIBUTED Columns 的數據類型必須一樣協同定位連接適用場景
Colocate Join非常適合幾個表按照同一個字段劃分為桶,按照同一個字段進行高頻連接的場景。比如很多電商的應用都是按照商家Id分桶,按照商家Id高頻加入。
協同定位連接常見問題
總之,任何不能共址加入的場景都會自動退化為原來的洗牌加入或者廣播加入。
Q1:你支持多表共存嗎?
答:支持
Q2:你支持共存表和普通表連接嗎?
答:支持
Q3:共存表支持用Join-bucket鍵連接嗎?
答:是:隨機加入或廣播加入將用于不滿足共存加入條件的加入。
Q4:如何根據協同定位連接來確定連接是否被執行?
在A: explain的結果中,如果Hash Join的子節點直接是OlapScanNode,并且沒有交換節點,則說明是Colocate Join。
Q5:如何修改colocate_with屬性?
a:更改表example_db.my_table集合(& quot與& quot= & quot目標表& quot);
問:如何禁用共存連接?
a:設置disable _ colocate _ join = true查詢時,您可以禁用共存聯接,并使用隨機聯接或廣播聯接。
總結
大部分支持Join的OLAP系統都會考慮支持Colocate Join,比如MemSQL、SnappyData、Ali AnalyticDB等。Ali AnalyticDB甚至在其數據模型中引入了表組的概念。一般來說,通過在數據導入、查詢計劃、查詢調度和數據平衡中保證和考慮數據的局部性,同位連接可以在特定場景下顯著地加快下行連接查詢的速度,是一個非常有用的特性。
以上是Apache Doris Colocate Join原理練習教程的詳細內容。關于Apache Doris co locate Join principle的更多信息,請關注腳本之家的其他相關文章!
如果您的問題還未解決可以聯系站長付費協助。

有問題可以加入技術QQ群一起交流學習
本站vip會員 請加入無憂模板網 VIP群(50604020) PS:加入時備注用戶名或昵稱
普通注冊會員或訪客 請加入無憂模板網 技術交流群(50604130)
客服微信號:15898888535
聲明:本站所有文章資源內容,如無特殊說明或標注,均為采集網絡資源。如若內容侵犯了原著者的合法權益,可聯系站長刪除。