全部產品
Search
文件中心

DataWorks:MetaQ資料來源

更新時間:Oct 24, 2024

DataWorksData Integration支援使用MetaQ Reader讀取訊息佇列Message Queue(簡稱MQ)的資料,本文為您介紹DataWorks的MetaQ資料讀取能力。

支援的版本

MetaQ Reader通過訊息佇列服務的Java SDK訂閱MetaQ中的即時訊息資料,使用的Java SDK版本如下所示。

<dependency>
            <groupId>com.taobao.metaq.final</groupId>
            <artifactId>metaq-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-sdk</artifactId>
            <version>1.3.1</version>
        </dependency>

使用限制

支援的欄位類型

支援的欄位類型如下。

欄位類型

離線讀(MetaQ Reader)

STRING

支援

MetaQ Reader針對MetaQ類型的轉換列表,如下所示。

Data Integration資料類型

訊息佇列資料類型

STRING

STRING

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

附錄:MetaQ 指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "metaqreader",
                    "parameter": {
                        "accessId": "<yourAccessKeyId>",
                        "accessKey": "<yourAccessKeySecret>",
                        "consumerId": "Test01",
                        "topicName": "test",
                        "subExpression": "*",
                        "onsChannel": "ALIYUN",
                        "domainName": "***.aliyun.com",
                        "contentType": "singlestringcolumn",
                        "beginOffset": "lastRead",
                        "nullCurrentOffset": "begin",
                        "fieldDelimiter": ",",
                        "column": [
                            "col0"
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}

Reader指令碼參數

參數

描述

是否必選

accessId

訪問訊息佇列的存取金鑰,用於標識使用者。

accessKey

訪問訊息佇列的存取金鑰,用來驗證使用者的密鑰。

consumerId

Consumer是訊息的消費者,也稱為訊息訂閱者,負責接收並消費訊息。

consumerId是一類Consumer的標識,該類Consumer通常接收並消費一類訊息,且消費邏輯一致。

topicName

訊息主題,一級訊息類型,通過topic對訊息進行分類。

subExpression

訊息子主題。

onsChannel

用於進行訊息佇列鑒權。

unitName

接收訊息的目標單元。常用單元如下:

  • sh:中心

  • unsz:深圳單元

  • us:美國

  • en-us:歐洲

  • rg-ru:俄羅斯

  • zbyk:張北優酷

  • unzbyun:張北雲

  • unshyun:上海雲

  • lazada-sg:新加坡lazada

  • lazada-my:馬來西亞lazada

  • lazada-vn:越南lazada

  • lazada-ph:菲律賓lazada

  • lazada-th:泰國lazada

  • lazada-id:印尼lazada

instanceName

Consumer的執行個體名稱。

domainName

訊息佇列的存取點。

contentType

訊息的類型,支援singlestringcolumn(訊息為STRING類型)、text(訊息為文本類型)和json(訊息為JSON類型)。

beginOffset

任務開始讀取的Offset,支援begin(從最開始),lastRead(上次讀取的Offset)。

nullCurrentOffset

上次Offset為空白時,開始讀取的地方。支援begin(從最開始),current(當前offset)。

fieldDelimiter

分隔字元模式下訊息字串的資料行分隔符號,例如逗號等。支援控制字元,例如\u0001

column

讀取的欄位列表。

beginDateTime

資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。

beginDateTimeyyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。

說明

beginDateTimeendDateTime配合使用。

endDateTime

資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。

endDateTimeyyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。