全部產品
Search
文件中心

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ

更新時間:Oct 11, 2024

本文為您介紹ApsaraMQ for RocketMQ連接器。

重要

鑒於ApsaraMQ for RocketMQ 4.x標準版執行個體共用API調用彈性上限為每秒5000次,使用該版本的訊息中介軟體在與Realtime ComputeFlink版對接時,若超過上限會觸發限流機制,可能會導致Flink作業運行不穩定。因此,在選擇訊息中介軟體時,如果您正在或計劃通過標準版RocketMQ與Flink對接,請您謹慎評估。如果業務情境允許,請考慮使用Kafka、Log Service(SLS)或DataHub等其他中介軟體進行替代。如果您確實需要使用ApsaraMQ for RocketMQ 4.x標準版處理大規模的訊息,也請同時通過提交工單與RocketMQ產品取得聯絡申請提升限速上限。

背景資訊

雲訊息佇列 RocketMQ 版是阿里雲基於Apache RocketMQ構建的低延遲、高並發、高可用和高可靠的分布式訊息中介軟體。其既可為分布式應用系統提供非同步解耦和削峰填穀的能力,同時也具備互連網應用所需的海量訊息堆積、高吞吐和可靠重試等特性。

RocketMQ連接器支援的資訊如下。

類別

詳情

支援類型

源表和結果表

運行模式

僅支援流模式

資料格式

CSV和二進位格式

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream(僅支援RocketMQ 4.x)和SQL

是否支援更新或刪除結果表資料

不支援刪除結果表資料,只支援插入和更新資料。

特色功能

RocketMQ源表和結果表支援屬性欄位,具體如下。

  • 源表屬性欄位

    說明

    僅在VVR 3.0.1及以上版本支援擷取以下RocketMQ屬性欄位。

    欄位名

    欄位類型

    說明

    topic

    VARCHAR METADATA VIRTUAL

    訊息Topic。

    queue-id

    INT METADATA VIRTUAL

    訊息佇列ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    訊息佇列的消費位點。

    msg-id

    VARCHAR METADATA VIRTUAL

    訊息ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    訊息儲存時間。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    訊息產生時間。

    keys

    VARCHAR METADATA VIRTUAL

    訊息Keys。

    tags

    VARCHAR METADATA VIRTUAL

    訊息Tags。

  • 結果表屬性欄位

    說明

    僅Realtime Compute引擎VVR 4.0.0及以上版本支援以下RocketMQ屬性欄位。

    欄位名

    欄位類型

    說明

    keys

    VARCHAR METADATA

    訊息Keys。

    tags

    VARCHAR METADATA

    訊息Tags。

前提條件

已建立了RocketMQ資源,詳情請參見建立資源

使用限制

  • 僅FlinkRealtime Compute引擎VVR 2.0.0及以上版本支援RocketMQ連接器。

  • 僅FlinkRealtime Compute引擎VVR 8.0.3及以上版本支援5.x版本的RocketMQ。

  • 在FlinkRealtime Compute引擎VVR 6.0.2以下版本,源表的並發度必須小於等於RocketMQ topic的分區數,在Realtime Compute引擎VVR 6.0.2及以上版本解除該限制。您可以提前設定大於分區數的並發度,不需要因RocketMQ的縮容而手動調整作業並發度。

  • RocketMQ連接器使用Pull Consumer消費,所有的子任務分擔消費。

文法結構

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    connector類型。

    String

    • RocketMQ 4.x固定值為mq

    • RocketMQ 5.x固定值為mq5

    endPoint

    EndPoint地址

    String

    ApsaraMQ for RocketMQ接入地址支援以下兩種類型:

    • VVR 3.0.1及以上版本的作業:需要使用TCP協議用戶端存取點,詳情請參見

      • 內網服務MQ(阿里雲傳統網路/VPC)接入地址:在MQ控制台目標執行個體詳情中,選擇存取點 > TCP協議用戶端存取點 > 內網訪問,擷取對應的EndPoint。

      • 公網服務MQ接入地址:在MQ控制台目標執行個體詳情中,選擇存取點 > TCP協議 > 用戶端存取點 > 公網訪問,擷取對應的EndPoint。

      重要

      由於阿里雲網路安全性原則動態變化,Realtime Compute串連公網服務MQ時可能會出現網路連接問題,推薦您使用內網服務MQ。

      • 內網服務無法跨域訪問。例如,您所購買的Realtime Compute服務的地區為華東1,但是購買的RocketMQ服務的地區為華東2(上海),則無法訪問。

      • 通過公網方式訪問RocketMQ,需要配置NAT,詳情請參見建立和管理公網NAT Gateway執行個體

    • VVR 3.0.1以下版本的作業:RocketMQ舊的存取點已不可用,您需要適配升級Realtime Compute作業。

      重要

      如果您已使用了VVR 3.0.1以下版本的RocketMQ連接器,則您需要將您的Realtime Compute作業升級至VVR 3.0.1及以上版本,並將作業中EndPoint參數取值更改為新的RocketMQ存取點,舊的存取點存在穩定性風險或停用問題,詳情請參見Realtime ComputeFlink版產品公告

    topic

    topic名稱。

    String

    無。

    accessId

    • 4.x:阿里雲帳號的AccessKey ID。

    • 5.x:

      RocketMQ執行個體使用者名稱

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理

    • RocketMQ 5.x:如果是使用公網存取點訪問,需配置為RocketMQ控制台執行個體使用者名稱。如果是在阿里雲ECS內網訪問,無需填寫該配置。

    accessKey

    • 4.x: 阿里雲帳號的AccessKey Secret。

    • 5.x:執行個體密碼

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理

    • RocketMQ 5.x:如果是使用公網存取點訪問,需配置為RocketMQ控制台執行個體密碼。如果是在阿里雲ECS內網訪問,無需填寫該配置。

    tag

    訂閱或寫入的標籤

    String

    • RocketMQ作為源表時,只能讀取單個tag。

    • RocketMQ作為結果表時,支援設定多個tag,以逗號(,)進行分隔。

    說明

    當作為結果表時,僅支援RocketMQ 4.x。RocketMQ 5.x請使用結果表屬性欄位來指定寫出訊息的 tag。

    nameServerSubgroup

    NameServer組。

    String

    • 內網服務(阿里雲傳統網路或VPC):必須配置'nameServerSubgroup' = 'nsaddr4client-internal'

    • 公網服務:無需配置nameServerSubgroup

    說明

    僅VVR 2.1.1-VVR 3.0.0版本支援該參數,VVR 3.0.1及以後版本不支援該參數。

    encoding

    編碼格式。

    String

    UTF-8

    無。

    instanceID

    RocketMQ執行個體ID。

    String

    • 如果RocketMQ執行個體無獨立命名空間,則不可以使用instanceID參數。

    • 如果RocketMQ執行個體有獨立命名空間,則instanceID參數必選。

    說明

    僅RocketMQ 4.x支援該參數,RocketMQ 5.x不需要配置該參數。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    consumerGroup

    Consumer組名。

    String

    無。

    pullIntervalMs

    上遊沒有資料可供消費時,source的休眠時間。

    Int

    單位為毫秒。

    目前沒有限流機制,無法設定讀取RocketMQ的速率。

    說明

    僅RocketMQ 4.x支援該參數,RocketMQ 5.x不需要配置該參數。

    timeZone

    時區。

    String

    例如,Asia/Shanghai。

    startTimeMs

    啟動時間點。

    Long

    時間戳記,單位為毫秒。

    startMessageOffset

    訊息開始的位移量。

    Int

    如果填寫該參數,則優先以startMessageOffset的位點開始載入資料。

    lineDelimiter

    解析Block時,行分隔字元。

    String

    \n

    無。

    fieldDelimiter

    欄位分隔符號。

    String

    \u0001

    根據MQ終端的模式,分隔字元分別為:

    • 在唯讀模式下(預設模式),分隔字元為\u0001。該模式下,分隔字元不可見。

    • 在編輯模式下,分隔字元為^A

    lengthCheck

    單列欄位條數檢查策略。

    Int

    NONE

    取值如下:

    • NONE:預設值。

      • 解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。

      • 解析出的欄位數小於定義欄位數時,跳過這行資料。

    • SKIP:解析出的欄位數和定義欄位數不同時跳過資料。

    • EXCEPTION:解析出的欄位數和定義欄位數不同時提示異常。

    • PAD:按從左至右順序填充。

      • 解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。

      • 解析出的欄位數小於定義欄位數時,在行尾用Null填充缺少的欄位。

    說明

    SKIP、EXCEPTION和PAD為可選值。

    columnErrorDebug

    是否開啟調試開關。

    Boolean

    false

    如果設定為true,則列印解析異常的Log。

    pullBatchSize

    每次拉取訊息的最大數量。

    Int

    64

    僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    producerGroup

    寫入的群組。

    String

    無。

    retryTimes

    寫入的重試次數。

    Int

    10

    無。

    sleepTimeMs

    稍候再試時間。

    Long

    5000

    無。

    partitionField

    指定欄位名,將該欄位作為分區列。

    String

    如果modepartition,則該參數必填。

    說明

    僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。

類型映射

Flink欄位類型

雲訊息佇列RocketMQ欄位類型

VARCHAR

STRING

程式碼範例

  • 源表示例

    • CSV格式

      假設您的一條CSV格式訊息記錄如下。

      1,name,male 
      2,name,female
      說明

      一條RocketMQ訊息可以包括零條到多條資料記錄,記錄之間使用\n分隔。

      Flink作業中,聲明RocketMQ資料來源表的DDL如下。

      • RocketMQ 5.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq5',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
      • RocketMQ 4.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
    • 二進位格式

      • RocketMQ 5.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq5',
          'endpoint' = '<yourEndpoint>',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;

      • RocketMQ 4.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq',
          'endpoint' = '<yourEndpoint>',
          'pullIntervalMs' = '500',
          'accessId' = '${secret_values.ak_id}',
          'accessKey' = '${secret_values.ak_secret}',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;
  • 結果表示例

    • 建立結果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
        說明

        如果您的MQ訊息為二進位格式,則DDL中只能定義一個欄位,且欄位類型必須為VARBINARY。

    • 建立將keystags欄位指定為RocketMQ訊息的key和tag的結果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );

DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法

Realtime Compute引擎VVR提供MetaQSource,用於讀取RocketMQ;提供OutputFormat的實作類別MetaQOutputFormat,用於寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的樣本如下:

RocketMQ 4.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里雲Realtime ComputeFlink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

RocketMQ 5.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里雲Realtime ComputeFlink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

XML

Maven中央庫中已經放置了MQ DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
說明

RocketMQ存取點Endpoint配置詳情請參見關於TCP內網存取點設定的公告

常見問題

RocketMQ Topic擴容時,RocketMQ如何感知Topic分區數變化?