全部產品
Search
文件中心

DataWorks:Log Service通過Data Integration投遞資料

更新時間:Jun 19, 2024

本文將以LogHub資料同步至MaxCompute為例,為您介紹如何通過Data Integration功能同步LogHub資料至Data Integration已支援的目的端資料來源(例如MaxCompute、OSS、Tablestore、RDBMS和DataHub等)。

前提條件

背景資訊

Log Service支援以下資料同步情境:

  • 跨地區的LogHub與MaxCompute等資料來源的資料同步。

  • 不同阿里雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。

  • 同一阿里雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。

  • 公用雲與金融雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。

以B帳號進入Data Integration配置同步任務,將A帳號的LogHub資料同步至B帳號的MaxCompute為例,跨阿里雲帳號的特別說明如下:

  1. 使用A帳號的AccessKey ID和AccessKey Secret建立LogHub資料來源。

    此時B帳號可以同步A帳號下所有Log Service專案的資料。

  2. 使用A帳號下的RAM使用者A1的AccessKey ID和AccessKey Secret建立LogHub資料來源。

    • A帳號為RAM使用者A1賦予Log Service的通用許可權,即AliyunLogFullAccessAliyunLogReadOnlyAccess,詳情請參見建立RAM使用者及授權

      說明

      為RAM帳號授予AliyunLogFullAccessAliyunLogReadOnlyAccess系統策略後,RAM帳號可以查詢主帳號下的所有Log Service。

    • A帳號給RAM使用者A1賦予Log Service的自訂許可權。

      主帳號A進入RAM控制台 > 許可權管理 > 權限原則頁面,單擊建立權限原則

      相關的授權請參見簡介概覽

      根據下述策略進行授權後,B帳號通過RAM使用者A1隻能同步處理記錄服務project_name1以及project_name2的資料。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "log:Get*",
                      "log:List*",
                      "log:CreateConsumerGroup",
                      "log:UpdateConsumerGroup",
                      "log:DeleteConsumerGroup",
                      "log:ListConsumerGroup",
                      "log:ConsumerGroupUpdateCheckPoint",
                      "log:ConsumerGroupHeartBeat",
                      "log:GetConsumerGroupCheckPoint"
                  ],
                  "Resource": [
                      "acs:log:*:*:project/project_name1",
                      "acs:log:*:*:project/project_name1/*",
                      "acs:log:*:*:project/project_name2",
                      "acs:log:*:*:project/project_name2/*"
                  ],
                  "Effect": "Allow"
              }
          ]
      }

建立LogHub資料來源

  1. 登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 單擊左側導覽列中的資料來源

  3. 資料來源列表頁面,單擊新增資料來源

  4. 新增資料來源對話方塊中,選擇資料來源類型為LogHub

  5. 填寫新增LogHub資料來源對話方塊中的配置。

    參數

    描述

    資料來源名稱

    資料來源名稱必須以字母、數字、底線組合,且不能以數字和底線開頭。

    資料來源描述

    對資料來源進行簡單描述,不得超過80個字元。

    LogHub Endpoint

    LogHub的Endpoint,格式為http://example.com。詳情請參見服務入口

    Project

    輸入專案名稱。

    AccessKey ID

    存取金鑰中的AccessKey ID,您可以進入控制台的使用者資訊管理頁面進行複製。

    AccessKey Secret

    存取金鑰中的AccessKey Secret,相當於登入密碼。

  6. 單擊測試連通性

  7. 連通性測試通過後,單擊完成

建立離線同步節點

  1. 資料來源頁面,單擊左上方的image表徵圖,選擇全部產品 > DataStudio(資料開發)

  2. 資料開發頁面,滑鼠移至上方至建立表徵圖,單擊商務程序

  3. 建立商務程序對話方塊中,輸入商務程序名稱描述,單擊建立

  4. 展開商務程序,按右鍵Data Integration,選擇建立節點 > 離線同步

  5. 建立節點對話方塊中,輸入節點名稱,並選擇路徑

  6. 單擊確認,進入離線節點編輯頁面。

通過嚮導模式配置同步任務

  1. 在離線節點編輯頁面,選擇資料來源和資料去向。

    資料來源

    參數

    描述

    資料來源

    選擇LogHub

    資料來源名稱

    選擇以添加的Log Service資料來源名稱。

    資源群組

    選擇獨享Data Integration資源群組。

    資料去向

    選擇MaxCompute。

    資料來源名稱

    選擇以添加的MaxCompute資料來源名稱。

  2. 測試網路連通性,資料來源和資料去向均可連通後,點擊下一步

  3. 配置資料來源與資料去向具體同步的表等資訊。

    image

    資料來源參數說明:

    參數

    描述

    Logstore

    目標日誌庫的名稱。

    日誌開始時間

    資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界,為yyyyMMddHHmmss格式的時間字串(例如20180111013000)。該參數可以和DataWorks的調度時間參數配合使用。

    日誌結束時間

    資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界,為yyyyMMddHHmmss格式的時間字串(例如20180111013010)。該參數可以和DataWorks的調度時間參數配合使用。

    批量條數

    一次讀取的資料條數,預設為256。

    說明

    您可以進行資料預覽,此處僅選擇LogHub中的幾條資料展現在預覽框。由於您在進行同步任務時,會指定開始時間和結束時間,會導致預覽結果和實際的同步結果不一致。

  4. 選擇欄位的映射關係。

  5. 通道控制中配置同步速率髒資料策略等參數。

  6. 單擊右側調度配置,配置重跑屬性調度資源群組以及依賴的上遊節點等參數。

    說明

    依賴的上遊節點配置為使用工作空間根節點

  7. 確認當前節點的配置無誤後,單擊左上方的image

  8. 運行離線同步節點。

    您可以通過以下兩種方式運行離線同步節點:

    • 直接運行(一次性運行)

      單擊節點編輯頁面工具列中的image表徵圖,直接在頁面運行。

      說明

      運行之前需要配置自訂參數的具體取值。

    • 調度運行

      1. 單擊右側調度配置,設定時間屬性,配置調度周期。

        image

      2. 單擊節點編輯頁面工具列中的image表徵圖,然後單擊image表徵圖,提交離線同步節點至調度系統,調度系統會根據配置的屬性,從第2天開始自動定時運行。

通過指令碼模式配置離線同步節點

  1. 成功建立離線同步節點後,單擊工具列中的轉換指令碼

    轉換指令碼

  2. 單擊提示對話方塊中的確認,即可進入指令碼模式進行開發。

  3. 單擊工具列中的匯入模板

    匯入模板

  4. 匯入模板對話方塊中,選擇從來源端的LogHub資料來源同步至目標端的ODPS資料來源的匯入模板,單擊確認

  5. 匯入模板後,根據自身需求編輯代碼,樣本指令碼如下。

    {
        "type": "job",
        "version": "1.0",
        "configuration": {
            "reader": {
                "plugin": "loghub",
                "parameter": {
                    "datasource": "loghub_lzz",//資料來源名,需要和您添加的資料來源名一致。
                    "logstore": "logstore-ut2",//目標日誌庫的名字,LogStore是Log Service中日誌資料的採集、儲存和查詢單元。
                    "beginDateTime": "${startTime}",//資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。
                    "endDateTime": "${endTime}",//資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。
                    "batchSize": 256,//一次讀取的資料條數,預設為256。
                    "splitPk": "",
                    "column": [
                        "key1",
                        "key2",
                        "key3"
                    ]
                }
            },
            "writer": {
                "plugin": "odps",
                "parameter": {
                    "datasource": "odps_source",//資料來源名,需要和您添加的資料來源名一致。
                    "table": "test",//目標表名。
                    "truncate": true,
                    "partition": "",//分區資訊。
                    "column": [//目標列名。
                        "key1",
                        "key2",
                        "key3"
                    ]
                }
            },
            "setting": {
                "speed": {
                    "mbps": 8,//作業速率上限,單位MB/s。
                    "concurrent": 7//並發數。
                }
            }
        }
    }