全部產品
Search
文件中心

Elasticsearch:通過DataWorks將MaxCompute資料同步到阿里雲ES

更新時間:Jun 30, 2024

如果您需要對MaxCompute(ODPS)中的海量資料進行資訊檢索、多維查詢、統計分析等操作,可藉助Elasticsearch實現。本文通過DataWorks的Data Integration服務,實現最快分鐘級,將海量MaxCompute資料離線同步到阿里雲ES中。

背景資訊

DataWorks是一個基於巨量資料引擎,整合資料開發、任務調度、資料管理等功能的全鏈路巨量資料開發治理平台。您可以通過DataWorks的同步任務,快速的將各種資料來源中的資料同步到阿里雲ES。

  • 支援同步的資料來源包括:

    • 阿里雲雲資料庫(MySQL、PostgreSQL、SQL Server、MongoDB、HBase)

    • 阿里雲PolarDB-X(原DRDS升級版)

    • 阿里雲MaxCompute

    • 阿里雲OSS

    • 阿里雲Tablestore

    • 自建HDFS、Oracle、FTP、DB2及以上資料庫類型的自建版本

  • 適用情境:

前提條件

說明
  • 僅支援將資料同步到阿里雲ES,不支援自建Elasticsearch。

  • MaxCompute專案、ES執行個體和DataWorks工作空間所在地區需保持一致。

  • ES執行個體、MaxCompute和DataWorks工作空間需要建立在同一時區下,否則同步與時間相關的資料時,同步前後的資料可能存在時區差。

費用說明

操作步驟

步驟一:準備來源資料

建立MaxCompute表並匯入測試資料。具體操作,請參見建立表匯入資料

本文使用的表結構和表資料如下所示:

  • 表結構表結構

  • 部分表資料表資料

步驟二:購買並配置獨享資源群組

購買一個Data Integration獨享資源群組,並為該資源群組綁定專用網路和工作空間。獨享資源群組可以保證資料快速、穩定地傳輸。

  1. 登入DataWorks控制台

  2. 在頂部功能表列選擇相應地區後,在左側導覽列單擊資源群組列表

  3. 獨享資源群組頁簽下,單擊建立整合資源群組

  4. DataWorks獨享資源購買頁面,獨享資源類型選擇獨享Data Integration資源,輸入資源群組名稱,單擊立即購買,購買獨享資源群組。

    更多配置資訊,請參見購買資源群組

  5. 在已建立的獨享資源群組的操作列,單擊網路設定,為該獨享資源群組綁定專用網路。具體操作,請參見綁定專用網路

    說明

    本文以獨享Data Integration資源群組通過VPC內網同步資料為例。關於通過公網同步資料,請參見使用獨享Data Integration資源群組執行任務需要在資料庫添加的IP白名單

    獨享資源需要與Elasticsearch執行個體的專用網路連通才能同步資料。因此需要綁定Elasticsearch執行個體所在的專用網路可用性區域交換器。查看Elasticsearch執行個體所在的專用網路、可用性區域和交換器,請參見查看Elasticsearch執行個體的基本資料

    重要

    綁定專用網路後,您需要將專用網路的交換器網段加入到Elasticsearch執行個體的VPC私網訪問白名單中。具體操作,請參見配置Elastic search執行個體公網或私網訪問白名單

  6. 在頁面左上方,單擊返回表徵圖,返回資源群組列表頁面。

  7. 在已建立的獨享資源群組的操作列,單擊修改歸屬工作空間,為該獨享資源群組繫結目標工作空間。

    具體操作,請參見綁定歸屬工作空間

步驟三:添加資料來源

將MaxCompute和Elasticsearch資料來源接入DataWorks的Data Integration服務中。

  1. 進入DataWorks的Data Integration頁面。

    1. 登入DataWorks控制台

    2. 在左側導覽列,單擊工作空間列表

    3. 在目標工作空間的操作列,選擇快速進入 Data Integration

  2. 在左側導覽列,單擊資料來源

  3. 新增MaxCompute資料來源。

    1. 資料來源列表頁面,單擊新增資料來源

    2. 新增資料來源頁面,搜尋並選擇MaxCompute資料來源。

    3. 新增MaxCompute資料來源對話方塊,在基礎資訊地區配置資料來源參數。

      配置詳情,請參見配置MaxCompute資料來源

    4. 串連配置地區,單擊測試連通性,連通狀態顯示為可連通時,表示連通成功。

    5. 單擊完成

  4. 使用同樣的方式添加Elasticsearch資料來源。配置詳情,請參見配置Elasticsearch資料來源

步驟四:配置並運行資料同步任務

資料同步任務將獨享資源群組作為一個可以執行任務的資源,獨享資源群組將擷取Data Integration服務中資料來源的資料,並將資料寫入Elasticsearch。

說明

有兩種方式可以配置離線同步任務,文本以嚮導模式配置離線同步任務為例。您也可以通過指令碼模式配置離線同步任務,詳情請參見通過指令碼模式配置離線同步任務Elasticsearch Writer

  1. 進入DataWorks的資料開發頁面。

    1. 登入DataWorks控制台

    2. 在左側導覽列,單擊工作空間列表

    3. 在目標工作空間的操作列,選擇快速進入 資料開發

  2. 建立一個離線同步任務。

    1. 在左側導覽列,選擇建立 > 建立商務程序,建立一個商務程序。

    2. 按右鍵建立的商務程序,選擇建立節點 > 離線同步

    3. 建立節點對話方塊中,輸入節點名稱,單擊確認

  3. 配置網路與資源

    1. 資料來源地區,資料來源選擇MaxCompute(ODPS),資料來源名稱選擇待同步的資料來源名稱。

    2. 我的資源群組地區,選擇獨享資源群組。

    3. 資料去向地區,資料去向選擇Elasticsearch,資料來源名稱選擇待同步的資料來源名稱。

  4. 單擊下一步。

  5. 配置任務。

    1. 資料來源地區,選擇待同步的表。

    2. 資料去向地區,配置資料去向的各參數。

    3. 欄位對應地區中,設定來源欄位目標欄位的映射關係。

    4. 通道控制地區,配置通道參數。

    詳細配置資訊,請參見通過嚮導模式配置離線同步任務

  6. 運行任務。

    1. (可選)配置任務調度屬性。在頁面右側,單擊調度配置,按照需求配置相應的調度參數。各配置的詳細說明,請參見調度配置

    2. 在節點地區的右上方,單擊儲存表徵圖,儲存任務。

    3. 在節點地區的右上方,單擊提交表徵圖,提交任務。

      如果您配置了任務調度屬性,任務會定期自動執行。您還可以在節點地區的右上方,單擊運行表徵圖,立即運行任務。

      作業記錄中出現Shell run successfully!表明任務運行成功。部分任務作業記錄如下所示:

      2023-10-31 16:52:35 INFO Exit code of the Shell command 0
      2023-10-31 16:52:35 INFO --- Invocation of Shell command completed ---
      2023-10-31 16:52:35 INFO Shell run successfully!
      2023-10-31 16:52:35 INFO Current task status: FINISH
      2023-10-31 16:52:35 INFO Cost time is: 33.106s

步驟五:驗證資料同步結果

在Kibana控制台中,查看同步成功的資料,並按條件查詢資料。

  1. 登入目標Elasticsearch執行個體的Kibana控制台。

    具體操作,請參見登入Kibana控制台

  2. 單擊Kibana頁面左上方的菜單.png表徵圖,選擇Dev Tools(開發工具)。

  3. Console(控制台)中,執行如下命令查看同步的資料。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {}}
    }
    說明

    odps_index為您在資料同步指令碼中設定的index欄位的值。

    資料同步成功後,返回如下結果。查看同步的資料

  4. 執行如下命令,搜尋文檔中的categorybrand欄位。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "_source": ["category", "brand"]
    }
  5. 執行如下命令,搜尋category生鮮的文檔。

    POST /odps_index/_search?pretty
    {
    "query": { "match": {"category":"生鮮"} }
    }
  6. 執行如下命令,按照trans_num欄位對文檔進行排序。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "sort": { "trans_num": { "order": "desc" } }
    }

    更多命令和訪問方式,請參見Elastic.co官方協助中心