全部產品
Search
文件中心

MaxCompute:通過DataWorksData Integration遷移日誌資料至MaxCompute

更新時間:Feb 28, 2024

本文為您介紹如何通過Data Integration功能同步LogHub資料至MaxCompute。

背景資訊

Log Service支援以下資料同步情境:

  • 跨地區的LogHub與MaxCompute等資料來源的資料同步。

  • 不同阿里雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。

  • 同一阿里雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。

  • 公用雲與金融雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。

以B帳號進入Data Integration配置同步任務,將A帳號的LogHub資料同步至B帳號的MaxCompute為例,跨阿里雲帳號的特別說明如下:

  1. 使用A帳號的AccessKey ID和AccessKey Secret建立LogHub資料來源。

    此時B帳號可以同步A帳號下所有Log Service專案的資料。

  2. 使用A帳號下的RAM使用者A1的AccessKey ID和AccessKey Secret建立LogHub資料來源。

    • A帳號為RAM使用者A1賦予Log Service的通用許可權,即AliyunLogFullAccessAliyunLogReadOnlyAccess,詳情請參見建立RAM使用者及授權

    • A帳號給RAM使用者A1賦予Log Service的自訂許可權。

      主帳號A進入RAM控制台 > 許可權管理 > 權限原則頁面,單擊建立權限原則

      相關的授權請參見簡介概覽

      根據下述策略進行授權後,B帳號通過RAM使用者A1隻能同步處理記錄服務project_name1以及project_name2的資料。

      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }

建立LogHub資料來源

  1. 登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 單擊左側導覽列中的資料來源

  3. 資料來源列表頁面,單擊新增資料來源

  4. 新增資料來源對話方塊中,選擇資料來源類型為LogHub

  5. 填寫新增LogHub資料來源對話方塊中的配置。

    參數

    描述

    資料來源名稱

    資料來源名稱必須以字母、數字、底線組合,且不能以數字和底線開頭。

    資料來源描述

    對資料來源進行簡單描述,不得超過80個字元。

    LogHub Endpoint

    LogHub的Endpoint,格式為http://example.com。詳情請參見服務入口

    Project

    輸入專案名稱。

    AccessKey ID

    存取金鑰中的AccessKey ID,您可以進入控制台的使用者資訊管理頁面進行複製。

    AccessKey Secret

    存取金鑰中的AccessKey Secret,相當於登入密碼。

  6. 單擊測試連通性

  7. 連通性測試通過後,單擊完成

建立離線同步節點

  1. 資料來源頁面,單擊左上方的表徵圖,選擇全部產品 > DataStudio(資料開發)

  2. 資料開發頁面,滑鼠移至上方至建立表徵圖,單擊商務程序

  3. 建立商務程序對話方塊中,輸入商務程序名稱描述,單擊建立

  4. 展開商務程序,按右鍵Data Integration,選擇建立節點 > 離線同步

  5. 建立節點對話方塊中,輸入節點名稱,並選擇路徑

  6. 單擊確認,進入離線節點編輯頁面。

通過嚮導模式配置同步任務

  1. 在離線節點編輯頁面,選擇資料來源。

    資料來源

    參數

    描述

    資料來源

    輸入LogHub資料來源的名稱。

    Logstore

    目標日誌庫的名稱。

    日誌開始時間

    資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界,為yyyyMMddHHmmss格式的時間字串(例如20180111013000)。該參數可以和DataWorks的調度時間參數配合使用。

    日誌結束時間

    資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界,為yyyyMMddHHmmss格式的時間字串(例如20180111013010)。該參數可以和DataWorks的調度時間參數配合使用。

    批量條數

    一次讀取的資料條數,預設為256。

    說明

    您可以進行資料預覽,此處僅選擇LogHub中的幾條資料展現在預覽框。由於您在進行同步任務時,會指定開始時間和結束時間,會導致預覽結果和實際的同步結果不一致。

  2. 選擇MaxCompute資料來源及目標表。

  3. 選擇欄位的映射關係。

  4. 通道控制中配置作業速率上限和髒資料檢查規則。

  5. 確認當前節點的配置無誤後,單擊左上方的儲存

  6. 運行離線同步節點。

    您可以通過以下兩種方式運行離線同步節點:

    • 直接運行(一次性運行)

      單擊節點編輯頁面工具列中的運行表徵圖,直接在頁面運行。

      說明

      運行之前需要配置自訂參數的具體取值。

    • 調度運行

      單擊節點編輯頁面工具列中的提交表徵圖,提交離線同步節點至調度系統,調度系統會根據配置的屬性,從第2天開始自動定時運行。提交

      如上圖所示,設定開始時間為系統前10分鐘,結束時間為系統前5分鐘:startTime=$[yyyymmddhh24miss-10/24/60] endTime=$[yyyymmddhh24miss-5/24/60]分調度

      如上圖所示,設定離線同步節點的調度周期為分鐘,從00:00~23:59每5分鐘調度一次。

通過指令碼模式配置離線同步節點

  1. 成功建立離線同步節點後,單擊工具列中的轉換指令碼

    轉換指令碼

  2. 單擊提示對話方塊中的確認,即可進入指令碼模式進行開發。

  3. 單擊工具列中的匯入模板

    匯入模板

  4. 匯入模板對話方塊中,選擇從來源端的LogHub資料來源同步至目標端的ODPS資料來源的匯入模板,單擊確認

  5. 匯入模板後,根據自身需求編輯代碼,樣本指令碼如下。

    {
    "type": "job",
    "version": "1.0",
    "configuration": {
    "reader": {
    "plugin": "loghub",
    "parameter": {
    "datasource": "loghub_lzz",//資料來源名,需要和您添加的資料來源名一致。
    "logstore": "logstore-ut2",//目標日誌庫的名字,LogStore是Log Service中日誌資料的採集、儲存和查詢單元。
    "beginDateTime": "${startTime}",//資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。
    "endDateTime": "${endTime}",//資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。
    "batchSize": 256,//一次讀取的資料條數,預設為256。
    "splitPk": "",
    "column": [
    "key1",
    "key2",
    "key3"
    ]
    }
    },
    "writer": {
    "plugin": "odps",
    "parameter": {
    "datasource": "odps_first",//資料來源名,需要和您添加的資料來源名一致。
    "table": "ok",//目標表名。
    "truncate": true,
    "partition": "",//分區資訊。
    "column": [//目標列名。
    "key1",
    "key2",
    "key3"
    ]
    }
    },
    "setting": {
    "speed": {
    "mbps": 8,//作業速率上限,此處1mbps = 1MB/s。
    "concurrent": 7//並發數。
    }
    }
    }
    }