全部產品
Search
文件中心

DataWorks:MetaQ資料來源

更新時間:Jun 19, 2024

DataWorksData Integration支援使用MetaQ Reader讀取訊息佇列Message Queue(簡稱MQ)的資料,本文為您介紹DataWorks的MetaQ資料讀取能力。

支援的版本

MetaQ Reader通過訊息佇列服務的Java SDK訂閱MetaQ中的即時訊息資料,使用的Java SDK版本如下所示。
<dependency>
            <groupId>com.taobao.metaq.final</groupId>
            <artifactId>metaq-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-sdk</artifactId>
            <version>1.3.1</version>
        </dependency>

使用限制

支援的欄位類型

支援的欄位類型如下。
欄位類型離線讀(MetaQ Reader)
STRING支援
MetaQ Reader針對MetaQ類型的轉換列表,如下所示。
Data Integration資料類型訊息佇列資料類型
STRINGSTRING

資料同步任務開發

附錄:MetaQ 指令碼Demo與參數說明

附錄:離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

MetaQ Reader指令碼Demo

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "metaqreader",
                    "parameter": {
                        "accessId": "<yourAccessKeyId>",
                        "accessKey": "<yourAccessKeySecret>",
                        "consumerId": "Test01",
                        "topicName": "test",
                        "subExpression": "*",
                        "onsChannel": "ALIYUN",
                        "domainName": "***.aliyun.com",
                        "contentType": "singlestringcolumn",
                        "beginOffset": "lastRead",
                        "nullCurrentOffset": "begin",
                        "fieldDelimiter": ",",
                        "column": [
                            "col0"
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}

MetaQ Reader指令碼參數

參數描述是否必選
accessId訪問訊息佇列的存取金鑰,用於標識使用者。
accessKey訪問訊息佇列的存取金鑰,用來驗證使用者的密鑰。
consumerIdConsumer是訊息的消費者,也稱為訊息訂閱者,負責接收並消費訊息。

consumerId是一類Consumer的標識,該類Consumer通常接收並消費一類訊息,且消費邏輯一致。

topicName訊息主題,一級訊息類型,通過topic對訊息進行分類。
subExpression訊息子主題。
onsChannel用於進行訊息佇列鑒權。
unitName接收訊息的目標單元。常用單元如下:
  • sh:中心
  • unsz:深圳單元
  • us:美國
  • en-us:歐洲
  • rg-ru:俄羅斯
  • zbyk:張北優酷
  • unzbyun:張北雲
  • unshyun:上海雲
  • lazada-sg:新加坡lazada
  • lazada-my:馬來西亞lazada
  • lazada-vn:越南lazada
  • lazada-ph:菲律賓lazada
  • lazada-th:泰國lazada
  • lazada-id:印尼lazada
instanceNameConsumer的執行個體名稱。
domainName訊息佇列的存取點。
contentType訊息的類型,支援singlestringcolumn(訊息為STRING類型)、text(訊息為文本類型)和json(訊息為JSON類型)。
beginOffset任務開始讀取的Offset,支援begin(從最開始),lastRead(上次讀取的Offset)。
nullCurrentOffset上次Offset為空白時,開始讀取的地方。支援begin(從最開始),current(當前offset)。
fieldDelimiter分隔字元模式下訊息字串的資料行分隔符號,例如逗號等。支援控制字元,例如\u0001
column讀取的欄位列表。
beginDateTime資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。

beginDateTimeyyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。

說明 beginDateTimeendDateTime配合使用。
endDateTime資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。

endDateTimeyyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。