DataWorksData Integration支援使用MetaQ Reader讀取訊息佇列Message Queue(簡稱MQ)的資料,本文為您介紹DataWorks的MetaQ資料讀取能力。
支援的版本
MetaQ Reader通過訊息佇列服務的Java SDK訂閱MetaQ中的即時訊息資料,使用的Java SDK版本如下所示。
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>
支援的欄位類型
支援的欄位類型如下。
欄位類型 | 離線讀(MetaQ Reader) |
STRING | 支援 |
MetaQ Reader針對MetaQ類型的轉換列表,如下所示。
Data Integration資料類型 | 訊息佇列資料類型 |
STRING | STRING |
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
附錄:MetaQ 指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
Reader指令碼參數
參數 | 描述 | 是否必選 |
accessId | 訪問訊息佇列的存取金鑰,用於標識使用者。 | 是 |
accessKey | 訪問訊息佇列的存取金鑰,用來驗證使用者的密鑰。 | 是 |
consumerId | Consumer是訊息的消費者,也稱為訊息訂閱者,負責接收並消費訊息。 consumerId是一類Consumer的標識,該類Consumer通常接收並消費一類訊息,且消費邏輯一致。 | 是 |
topicName | 訊息主題,一級訊息類型,通過topic對訊息進行分類。 | 是 |
subExpression | 訊息子主題。 | 是 |
onsChannel | 用於進行訊息佇列鑒權。 | 是 |
unitName | 接收訊息的目標單元。常用單元如下: sh:中心 unsz:深圳單元 us:美國 en-us:歐洲 rg-ru:俄羅斯 zbyk:張北優酷 unzbyun:張北雲 unshyun:上海雲 lazada-sg:新加坡lazada lazada-my:馬來西亞lazada lazada-vn:越南lazada lazada-ph:菲律賓lazada lazada-th:泰國lazada lazada-id:印尼lazada
| 否 |
instanceName | Consumer的執行個體名稱。 | 否 |
domainName | 訊息佇列的存取點。 | 是 |
contentType | 訊息的類型,支援singlestringcolumn(訊息為STRING類型)、text(訊息為文本類型)和json(訊息為JSON類型)。 | 是 |
beginOffset | 任務開始讀取的Offset,支援begin(從最開始),lastRead(上次讀取的Offset)。 | 否 |
nullCurrentOffset | 上次Offset為空白時,開始讀取的地方。支援begin(從最開始),current(當前offset)。 | 是 |
fieldDelimiter | 分隔字元模式下訊息字串的資料行分隔符號,例如逗號等。支援控制字元,例如\u0001。 | 是 |
column | 讀取的欄位列表。 | 是 |
beginDateTime | 資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。 beginDateTime是yyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。 | 否
說明 beginDateTime和endDateTime配合使用。 |
endDateTime | 資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。 endDateTime是yyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。 |