全部產品
Search
文件中心

DataWorks:LogHub(SLS)資料來源

更新時間:Oct 26, 2024

LogHub(SLS)資料來源為您提供讀取和寫入LogHub(SLS)雙向通道的功能,本文為您介紹DataWorks的LogHub(SLS)資料同步的能力支援情況。

使用限制

Data Integration離線寫LogHub(SLS)時,由於LogHub(SLS)無法實現等冪,FailOver重跑任務時會引起資料重複。

支援的欄位類型

Data Integration支援讀寫的LogHub(SLS)欄位類型如下。

欄位類型

離線讀(LogHub(SLS) Reader)

離線寫(LogHub(SLS) Writer)

即時讀

STRING

支援

支援

支援

其中:

  • 離線寫LogHub(SLS)時

    會將支援同步的各類型資料均轉換成STRING類型後寫入LogHub(SLS)。LogHub(SLS) Writer針對LogHub(SLS)類型的轉換列表,如下所示。

    支援的Data Integration內部類型

    寫入LogHub(SLS)時的資料類型

    LONG

    STRING

    DOUBLE

    STRING

    STRING

    STRING

    DATE

    STRING

    BOOLEAN

    STRING

    BYTES

    STRING

  • 即時讀LogHub(SLS)時

    會內建以下中繼資料欄位。

    LogHub(SLS)即時同步欄位

    資料類型

    說明

    __time__

    STRING

    SLS保留欄位:__time__寫入日誌資料時指定的日誌時間,unix時間戳記,單位為秒。

    __source__

    STRING

    SLS保留欄位:__source__日誌來源裝置。

    __topic__

    STRING

    SLS保留欄位:__topic__topic名稱。

    __tag__:__receive_time__

    STRING

    日誌到達服務端的時間。開啟記錄外網IP功能後,服務端接收日誌時為原始日誌追加該欄位。unix時間戳記,單位為秒。

    __tag__:__client_ip__

    STRING

    日誌來源裝置的公網IP。開啟記錄外網IP功能後,服務端接收日誌時為原始日誌追加該欄位。

    __tag__:__path__

    STRING

    Logtail採集的記錄檔路徑,Logtail會自動為日誌追加該欄位。

    __tag__:__hostname__

    STRING

    Logtail採集資料的來源機器主機名稱,Logtail會自動為日誌追加該欄位。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源詳細的配置參數解釋可在配置介面查看對應參數的文案提示

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

說明

LogHub資料來源作為資料來源端,在進行任務配置同步時支援通過LogHub的查詢文法、SPL語句(SLS Processing Language是SLS處理日誌的文法)對LogHub內的資料進行過濾,具體文法說明請參見附錄二:LogHub SPL文法過濾說明

單表離線同步任務配置指導

單表即時同步任務配置指導

操作流程請參見配置單表增量資料即時同步DataStudio側即時同步任務配置

整庫離線、整庫(即時)全增量、整庫(即時)分庫分表等整庫層級同步配置指導

操作流程請參見Data Integration側同步任務配置

常見問題

更多其他Data Integration常見問題請參見Data Integration常見問題

附錄一:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

{
 "type":"job",
 "version":"2.0",//版本號碼。
 "steps":[
     {
         "stepType":"LogHub",//外掛程式名。
         "parameter":{
             "datasource":"",//資料來源。
             "column":[//欄位。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", //日誌主題。
                 "C_HostName", //主機名稱。
                 "C_Path", //路徑。
                 "C_LogTime" //事件時間。
             ],
             "beginDateTime":"",//資料消費的開始時間位點。
             "batchSize":"",//一次從Log Service查詢的資料條數。
             "endDateTime":"",//資料消費的結束時間位點。
             "fieldDelimiter":",",//資料行分隔符號。
             "logstore":""//:目標日誌庫的名字。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"//錯誤記錄數。
     },
     "speed":{
         "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1 //作業並發數。
            "mbps":"12",//限流,此處1mbps = 1MB/s。
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}

Reader指令碼參數

參數

描述

是否必選

預設值

endPoint

Log Service入口endPoint是訪問一個專案(Project)及其內部日誌資料的URL。它和Project所在的阿里雲地區(Region)及Project名稱相關。各地區的服務入口請參見服務入口

accessId

訪問Log Service的存取金鑰,用於標識使用者。

accessKey

訪問Log Service的存取金鑰,用來驗證使用者的密鑰。

project

目標Log Service的專案名稱,是Log Service中的資源嵌入式管理單元,用於隔離和控制資源。

logstore

目標日誌庫的名稱,logstore是Log Service中日誌資料的採集、儲存和查詢單元。

batchSize

一次從Log Service查詢的資料條數。

128

column

每條資料中的列名,此處可以配置Log Service中的中繼資料作為同步列。Log Service支援日誌主題、採集機器唯一標識、主機名稱、路徑和日誌時間等中繼資料。

說明

列名區分大小寫。中繼資料的寫法請參見Log Service機器組

beginDateTime

資料消費的開始時間位點,即日誌資料到達LogHub(SLS)的時間。該參數為時間範圍(左閉右開)的左邊界,yyyyMMddHHmmss格式的時間字串(例如20180111013000),可以和DataWorks的調度時間參數配合使用。

例如,您在節點編輯頁面右側的調度配置,在參數中配置beginDateTime=${yyyymmdd-1},則在日誌開始時間處配置為${beginDateTime}000000,表示擷取的日誌開始時間為業務日期的0點0分0秒。詳情請參見調度參數支援的格式

說明

beginDateTimeendDateTime需要互相組合配套使用。

endDateTime

資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界,yyyyMMddHHmmss格式的時間字串(例如20180111013010),可以和DataWorks的調度時間參數配合使用。

例如,您在節點編輯頁面右側的調度配置,在參數中配置endDateTime=${yyyymmdd},則在日誌結束時間處配置為${endDateTime}000000,表示擷取的日誌結束時間為業務日期後一天的0點0分0秒。詳情請參見調度參數支援的格式

說明

上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚於下一周期的beginDateTime。否則,可能無法拉取部分地區的資料。

Writer指令碼Demo

{
    "type": "job",
    "version": "2.0",//版本號碼。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "LogHub",//外掛程式名。
            "parameter": {
                "datasource": "",//資料來源。
                "column": [//欄位。
                    "col0",
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5"
                ],
                "topic": "",//選取topic。
                "batchSize": "1024",//一次性批量提交的記錄數大小。
                "logstore": ""//目標LogService LogStore的名稱。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//錯誤記錄數。
        },
        "speed": {
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":3, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer指令碼參數

說明

LogHub(SLS) Writer通過Data Integration架構擷取Reader產生的資料,然後將Data Integration支援的類型通過逐一判斷轉換成STRING類型。當達到您指定的batchSize時,會使用LogService Java SDK一次性推送至LogHub(SLS)。

參數

描述

是否必選

預設值

endpoint

Log Service入口endPoint是訪問一個專案(Project)及其內部日誌資料的URL。它和Project所在的阿里雲地區(Region)及Project名稱相關。各地區的服務入口請參見:服務入口

accessKeyId

訪問Log Service的AccessKeyId

accessKeySecret

訪問Log Service的AccessKeySecret

project

目標Log Service的專案名稱。

logstore

目標日誌庫的名稱,logstore是Log Service中日誌資料的採集、儲存和查詢單元。

topic

目標Log Service的topic名稱。

Null 字元串

batchSize

LogHub(SLS)一次同步的資料條數,預設1,024條,最大值為4,096

說明

一次性同步至LogHub(SLS)的資料大小不要超過5M,請根據您的單條資料量大小調整一次性推送的條數。

1,024

column

每條資料中的column名稱。

附錄二:LogHub SPL文法過濾說明

LogHub資料來源作為資料來源端,在進行任務配置同步時支援通過LogHub的查詢文法、SPL語句(SLS Processing Language是SLS處理日誌的文法)對LogHub內的資料進行過濾,具體文法說明如下:

說明

SPL的更多詳細資料,請參見SPL概述

情境

SQL語句

SPL語句

資料過濾

SELECT * WHERE Type='write'
| where Type='write'

欄位處理與篩選

精確選擇欄位,並將其重新命名:

SELECT "__tag__:node" AS node, path
  • 精確選擇欄位,並重新命名。

    | project node="__tag__:node", path
  • 按模式選擇欄位。

    | project -wildcard "__tag__:*"
  • 重新命名部分欄位,不影響其他欄位。

    | project-rename node="__tag__:node"
  • 按模式排除欄位。

    | project-away -wildcard "__tag__:*"

資料規整

(調用SQL函數)

轉換資料類型、時間解析等:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

轉換資料類型、時間解析等:

| extend Status=cast(Status as BIGINT), extend Time=date_parse(Time, '%Y-%m-%d %H:%i')

欄位提取

正則提取:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

JSON提取:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time
  • 正則提取:一次性匹配。

    | parse-regexp protocol, '(\w+)/(\d+)' as scheme, version
  • JSON提取:全部展開。

    | parse-json -path='$.0' content
  • CSV提取。

    | parse-csv -delim='^_^' content as ip, time, host