MaxCompute Graph是一套面向迭代的圖計算處理架構。圖計算作業使用圖進行建模,圖由點(Vertex)和邊(Edge)組成,點和邊包含權值(Value)。
MaxCompute Graph支援以下圖編輯操作:
- 修改點或邊的權值。
- 增加、刪除點。
- 增加、刪除邊。
說明 編輯點和邊時,需要您同時在代碼中維護點與邊的關係。
通過迭代對圖進行編輯、演化,最終求解出結果。典型應用有PageRank、單源最短距離演算法、K-均值聚類演算法 等。您可以使用MaxCompute Graph提供的介面Java SDK編寫圖計算程式。
基本概念
- 圖(Graph):是用於表示對象之間關聯關係的一種抽象資料結構。使用頂點(Vertex)和邊(Edge)進行描述,頂點表示對象,邊表示對象之間的關係。可以抽象為用圖描述的資料即為圖資料。
- 點(Vertex):在圖模型中用於表示對象。
- 邊(Edge):在圖模型中用於表示對象之間的關係。由源ID、目標ID和與該邊緣關聯的資料群組成的單個定向邊緣。
- 有向圖:即邊有方向性的圖模型,一條邊的兩個頂點一般為不同的角色,例如頁面A串連向頁面B。有向圖中的邊分為出邊和入邊。
- 無向圖:即邊無方向性的圖模型,例如使用者組中的普通使用者。
- 出邊:指從當前頂點指向其它頂點的邊。
- 入邊:其它頂點指向當前頂點的邊。
- 度:度表示一個頂點的所有邊的數量。
- 出度:是一個頂點出邊的數量。
- 入度:是一個頂點入邊的數量。
- 超步(SuperStep):圖進行迭代計算時,一次迭代稱為一個超步。
Graph資料結構
MaxCompute Graph能夠處理的圖必須是一個由點(Vertex)和邊(Edge)組成的有向圖。由於MaxCompute僅提供二維表的儲存結構,因此需要您自行將圖資料分解為二維表格儲存體在MaxCompute中。您可以根據自身的業務情境進行分解。
在進行圖計算分析時,使用自訂的GraphLoader將二維表資料轉換為MaxCompute Graph引擎中的點和邊。
- 點的結構為<ID, Value, Halted, Edges>,參數分別表示:
- 點標識符(ID)
- 權值(Value)
- 狀態(Halted, 表示是否要停止迭代。)
- 出邊集合(Edges,以該點為起始點的所有邊列表。)
- 邊的結構為<DestVertexID,Value>,參數分別為:
- 目標點(DestVertexID)
- 權值(Value)
例如,上圖可以表述為以下二維表格式。
Vertex | <ID, Value, Halted, Edges> |
v0 | <0, 0, false, [<1, 5 >, <2, 10 >]> |
v1 | <1, 5, false, [<2, 3>, <3, 2>, <5, 9>]> |
v2 | <2, 8, false, [<1, 2>, <5, 1 >]> |
v3 | <3, Long.MAX_VALUE, false, [<0, 7>, <5, 6>]> |
v5 | <5, Long.MAX_VALUE, false, [<3, 4 >]> |
Graph程式邏輯
Graph程式主要包含圖載入、迭代計算、迭代終止等處理步驟。
- 圖載入
圖載入包含兩個步驟:
- 圖載入:架構調用您自訂的GraphLoader,將輸入表的記錄解析為點或邊。
- 分布式化:架構調用您自訂的Partitioner對點進行分區(預設的分區邏輯是,根據點ID的雜湊值對Worker個數模數分區),分配到相應的Worker。
- 迭代計算
一次迭代為一個超步(SuperStep),遍曆所有非結束狀態(Halted值為False)的點或者收到訊息的點(處於結束狀態的點收到資訊會被自動喚醒),並調用其
compute(ComputeContext context, Iterable messages)
方法。在您實現的compute(ComputeContext context, Iterable messages)
方法中:- 處理上一個超步發給當前點的訊息(Messages)。
- 根據需要對圖進行編輯:
- 修改點、邊的取值。
- 發送訊息給某些點。
- 增加、刪除點或邊。
- 通過Aggregator匯總資訊到全域資訊,詳情請參見Aggregator機制。
- 設定當前點狀態,結束或非結束狀態。
- 迭代進行過程中,架構會將訊息以非同步方式發送到對應Worker,並在下一個超步進行處理,無需人工幹預。
- 迭代終止
滿足以下任意一條,迭代即終止:
- 所有點處於結束狀態(Halted值為True)且沒有新訊息產生。
- 達到最大迭代次數。
- 某個Aggregator的
terminate
方法返回True。
Graph程式的虛擬碼描述如下所示。// 1. load for each record in input_table { GraphLoader.load(); } // 2. setup WorkerComputer.setup(); for each aggr in aggregators { aggr.createStartupValue(); } for each v in vertices { v.setup(); } // 3. superstep for (step = 0; step < max; step ++) { for each aggr in aggregators { aggr.createInitialValue(); } for each v in vertices { v.compute(); } } // 4. cleanup for each v in vertices { v.cleanup(); } WorkerComputer.cleanup();