LogHub(SLS)資料來源為您提供讀取和寫入LogHub(SLS)雙向通道的功能,本文為您介紹DataWorks的LogHub(SLS)資料同步的能力支援情況。
使用限制
Data Integration離線寫LogHub(SLS)時,由於LogHub(SLS)無法實現等冪,FailOver重跑任務時會引起資料重複。
支援的欄位類型
Data Integration支援讀寫的LogHub(SLS)欄位類型如下。
欄位類型 | 離線讀(LogHub(SLS) Reader) | 離線寫(LogHub(SLS) Writer) | 即時讀 |
STRING | 支援 | 支援 | 支援 |
其中:
離線寫LogHub(SLS)時
會將支援同步的各類型資料均轉換成STRING類型後寫入LogHub(SLS)。LogHub(SLS) Writer針對LogHub(SLS)類型的轉換列表,如下所示。
支援的Data Integration內部類型
寫入LogHub(SLS)時的資料類型
LONG
STRING
DOUBLE
STRING
STRING
STRING
DATE
STRING
BOOLEAN
STRING
BYTES
STRING
即時讀LogHub(SLS)時
會內建以下中繼資料欄位。
LogHub(SLS)即時同步欄位
資料類型
說明
__time__
STRING
SLS保留欄位:__time__寫入日誌資料時指定的日誌時間,unix時間戳記,單位為秒。
__source__
STRING
SLS保留欄位:__source__日誌來源裝置。
__topic__
STRING
SLS保留欄位:__topic__topic名稱。
__tag__:__receive_time__
STRING
日誌到達服務端的時間。開啟記錄外網IP功能後,服務端接收日誌時為原始日誌追加該欄位。unix時間戳記,單位為秒。
__tag__:__client_ip__
STRING
日誌來源裝置的公網IP。開啟記錄外網IP功能後,服務端接收日誌時為原始日誌追加該欄位。
__tag__:__path__
STRING
Logtail採集的記錄檔路徑,Logtail為日誌自動追加該欄位。
__tag__:__hostname__
STRING
Logtail採集資料的來源機器主機名稱,Logtail為日誌自動追加該欄位。
資料同步任務開發:LogHub(SLS)同步流程引導
LogHub(SLS)資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:LogHub(SLS)指令碼Demo與參數說明。
單表即時同步任務配置指導
操作流程請參見配置單表增量資料即時同步、DataStudio側即時同步任務配置。
整庫離線、整庫(即時)全增量、整庫(即時)分庫分表等整庫層級同步配置指導
操作流程請參見Data Integration側同步任務配置。
常見問題
更多其他Data Integration常見問題請參見Data Integration常見問題。
附錄:LogHub(SLS)指令碼Demo與參數說明
附錄:離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。
LogHub(SLS) Reader指令碼Demo
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"LogHub",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"column":[//欄位。
"col0",
"col1",
"col2",
"col3",
"col4",
"C_Category",
"C_Source",
"C_Topic",
"C_MachineUUID", //日誌主題。
"C_HostName", //主機名稱。
"C_Path", //路徑。
"C_LogTime" //事件時間。
],
"beginDateTime":"",//資料消費的開始時間位點。
"batchSize":"",//一次從Log Service查詢的資料條數。
"endDateTime":"",//資料消費的結束時間位點。
"fieldDelimiter":",",//資料行分隔符號。
"logstore":""//:目標日誌庫的名字。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1 //作業並發數。
"mbps":"12",//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
LogHub(SLS) Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
endPoint | Log Service入口endPoint是訪問一個專案(Project)及其內部日誌資料的URL。它和Project所在的阿里雲地區(Region)及Project名稱相關。各地區的服務入口請參見服務入口。 | 是 | 無 |
accessId | 訪問Log Service的存取金鑰,用於標識使用者。 | 是 | 無 |
accessKey | 訪問Log Service的存取金鑰,用來驗證使用者的密鑰。 | 是 | 無 |
project | 目標Log Service的專案名稱,是Log Service中的資源嵌入式管理單元,用於隔離和控制資源。 | 是 | 無 |
logstore | 目標日誌庫的名稱,logstore是Log Service中日誌資料的採集、儲存和查詢單元。 | 是 | 無 |
batchSize | 一次從Log Service查詢的資料條數。 | 否 | 128 |
column | 每條資料中的列名,此處可以配置Log Service中的中繼資料作為同步列。Log Service支援日誌主題、採集機器唯一標識、主機名稱、路徑和日誌時間等中繼資料。 說明 列名區分大小寫。中繼資料的寫法請參見Log Service機器組。 | 是 | 無 |
beginDateTime | 資料消費的開始時間位點,即日誌資料到達LogHub(SLS)的時間。該參數為時間範圍(左閉右開)的左邊界,yyyyMMddHHmmss格式的時間字串(例如20180111013000),可以和DataWorks的調度時間參數配合使用。 例如,您在節點編輯頁面右側的調度配置,在參數中配置 說明 beginDateTime和endDateTime需要互相組合配套使用。 | 是 | 無 |
endDateTime | 資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界,yyyyMMddHHmmss格式的時間字串(例如20180111013010),可以和DataWorks的調度時間參數配合使用。 例如,您在節點編輯頁面右側的調度配置,在參數中配置endDateTime=${yyyymmdd},則在日誌結束時間處配置為${endDateTime}000000,表示擷取的日誌結束時間為業務日期後一天的0點0分0秒。詳情請參見調度參數支援的格式。 說明 上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚於下一周期的beginDateTime。否則,可能無法拉取部分地區的資料。 | 是 | 無 |
LogHub(SLS) writer指令碼Demo
{
"type": "job",
"version": "2.0",//版本號碼。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",//外掛程式名。
"parameter": {
"datasource": "",//資料來源。
"column": [//欄位。
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"topic": "",//選取topic。
"batchSize": "1024",//一次性批量提交的記錄數大小。
"logstore": ""//目標LogService LogStore的名稱。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""//錯誤記錄數。
},
"speed": {
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":3, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
LogHub(SLS) Writer指令碼參數
LogHub(SLS) Writer通過Data Integration架構擷取Reader產生的資料,然後將Data Integration支援的類型通過逐一判斷轉換成STRING類型。當達到您指定的batchSize時,會使用LogService Java SDK一次性推送至LogHub(SLS)。
參數 | 描述 | 是否必選 | 預設值 |
endpoint | Log Service入口endPoint是訪問一個專案(Project)及其內部日誌資料的URL。它和Project所在的阿里雲地區(Region)及Project名稱相關。各地區的服務入口請參見:服務入口。 | 是 | 無 |
accessKeyId | 訪問Log Service的AccessKeyId。 | 是 | 無 |
accessKeySecret | 訪問Log Service的AccessKeySecret。 | 是 | 無 |
project | 目標Log Service的專案名稱。 | 是 | 無 |
logstore | 目標日誌庫的名稱,logstore是Log Service中日誌資料的採集、儲存和查詢單元。 | 是 | 無 |
topic | 目標Log Service的topic名稱。 | 否 | Null 字元串 |
batchSize | LogHub(SLS)一次同步的資料條數,預設1,024條,最大值為4,096。 說明 一次性同步至LogHub(SLS)的資料大小不要超過5M,請根據您的單條資料量大小調整一次性推送的條數。 | 否 | 1,024 |
column | 每條資料中的column名稱。 | 是 | 無 |