本文將以LogHub資料同步至MaxCompute為例,為您介紹如何通過Data Integration功能同步LogHub資料至Data Integration已支援的目的端資料來源(例如MaxCompute、OSS、Tablestore、RDBMS和DataHub等)。
前提條件
準備好相關的資料來源,詳情請參見建立MaxCompute資料來源。
準備需要同步的來源表與目標表。
背景資訊
Log Service支援以下資料同步情境:
跨地區的LogHub與MaxCompute等資料來源的資料同步。
不同阿里雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。
同一阿里雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。
公用雲與金融雲帳號下的LogHub與MaxCompute等資料來源間的資料同步。
以B帳號進入Data Integration配置同步任務,將A帳號的LogHub資料同步至B帳號的MaxCompute為例,跨阿里雲帳號的特別說明如下:
使用A帳號的AccessKey ID和AccessKey Secret建立LogHub資料來源。
此時B帳號可以同步A帳號下所有Log Service專案的資料。
使用A帳號下的RAM使用者A1的AccessKey ID和AccessKey Secret建立LogHub資料來源。
A帳號為RAM使用者A1賦予Log Service的通用許可權,即
AliyunLogFullAccess
和AliyunLogReadOnlyAccess
,詳情請參見建立RAM使用者及授權。說明為RAM帳號授予
AliyunLogFullAccess
和AliyunLogReadOnlyAccess
系統策略後,RAM帳號可以查詢主帳號下的所有Log Service。A帳號給RAM使用者A1賦予Log Service的自訂許可權。
主帳號A進入
頁面,單擊建立權限原則。根據下述策略進行授權後,B帳號通過RAM使用者A1隻能同步處理記錄服務project_name1以及project_name2的資料。
{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
建立LogHub資料來源
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration。
單擊左側導覽列中的資料來源。
在資料來源列表頁面,單擊新增資料來源。
在新增資料來源對話方塊中,選擇資料來源類型為LogHub。
填寫新增LogHub資料來源對話方塊中的配置。
參數
描述
資料來源名稱
資料來源名稱必須以字母、數字、底線組合,且不能以數字和底線開頭。
資料來源描述
對資料來源進行簡單描述,不得超過80個字元。
LogHub Endpoint
LogHub的Endpoint,格式為
http://example.com
。詳情請參見服務入口。Project
輸入專案名稱。
AccessKey ID
存取金鑰中的AccessKey ID,您可以進入控制台的使用者資訊管理頁面進行複製。
AccessKey Secret
存取金鑰中的AccessKey Secret,相當於登入密碼。
單擊測試連通性。
連通性測試通過後,單擊完成。
建立離線同步節點
在資料來源頁面,單擊左上方的表徵圖,選擇 。
在資料開發頁面,滑鼠移至上方至表徵圖,單擊商務程序。
在建立商務程序對話方塊中,輸入商務程序名稱和描述,單擊建立。
展開商務程序,按右鍵Data Integration,選擇 。
在建立節點對話方塊中,輸入節點名稱,並選擇路徑。
單擊確認,進入離線節點編輯頁面。
通過嚮導模式配置同步任務
在離線節點編輯頁面,選擇資料來源和資料去向。
參數
描述
資料來源
選擇LogHub。
資料來源名稱
選擇以添加的Log Service資料來源名稱。
資源群組
選擇獨享Data Integration資源群組。
資料去向
選擇MaxCompute。
資料來源名稱
選擇以添加的MaxCompute資料來源名稱。
測試網路連通性,資料來源和資料去向均可連通後,點擊下一步。
配置資料來源與資料去向具體同步的表等資訊。
資料來源參數說明:
參數
描述
Logstore
目標日誌庫的名稱。
日誌開始時間
資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界,為yyyyMMddHHmmss格式的時間字串(例如20180111013000)。該參數可以和DataWorks的調度時間參數配合使用。
日誌結束時間
資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界,為yyyyMMddHHmmss格式的時間字串(例如20180111013010)。該參數可以和DataWorks的調度時間參數配合使用。
批量條數
一次讀取的資料條數,預設為256。
說明您可以進行資料預覽,此處僅選擇LogHub中的幾條資料展現在預覽框。由於您在進行同步任務時,會指定開始時間和結束時間,會導致預覽結果和實際的同步結果不一致。
選擇欄位的映射關係。
在通道控制中配置同步速率和髒資料策略等參數。
單擊右側調度配置,配置重跑屬性、調度資源群組以及依賴的上遊節點等參數。
說明依賴的上遊節點配置為使用工作空間根節點。
確認當前節點的配置無誤後,單擊左上方的。
運行離線同步節點。
您可以通過以下兩種方式運行離線同步節點:
直接運行(一次性運行)
單擊節點編輯頁面工具列中的表徵圖,直接在頁面運行。
說明運行之前需要配置自訂參數的具體取值。
調度運行
單擊右側調度配置,設定時間屬性,配置調度周期。
單擊節點編輯頁面工具列中的表徵圖,然後單擊表徵圖,提交離線同步節點至調度系統,調度系統會根據配置的屬性,從第2天開始自動定時運行。
通過指令碼模式配置離線同步節點
成功建立離線同步節點後,單擊工具列中的轉換指令碼。
單擊提示對話方塊中的確認,即可進入指令碼模式進行開發。
單擊工具列中的匯入模板。
在匯入模板對話方塊中,選擇從來源端的LogHub資料來源同步至目標端的ODPS資料來源的匯入模板,單擊確認。
匯入模板後,根據自身需求編輯代碼,樣本指令碼如下。
{ "type": "job", "version": "1.0", "configuration": { "reader": { "plugin": "loghub", "parameter": { "datasource": "loghub_lzz",//資料來源名,需要和您添加的資料來源名一致。 "logstore": "logstore-ut2",//目標日誌庫的名字,LogStore是Log Service中日誌資料的採集、儲存和查詢單元。 "beginDateTime": "${startTime}",//資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。 "endDateTime": "${endTime}",//資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。 "batchSize": 256,//一次讀取的資料條數,預設為256。 "splitPk": "", "column": [ "key1", "key2", "key3" ] } }, "writer": { "plugin": "odps", "parameter": { "datasource": "odps_source",//資料來源名,需要和您添加的資料來源名一致。 "table": "test",//目標表名。 "truncate": true, "partition": "",//分區資訊。 "column": [//目標列名。 "key1", "key2", "key3" ] } }, "setting": { "speed": { "mbps": 8,//作業速率上限,單位MB/s。 "concurrent": 7//並發數。 } } } }