This topic describes how to use the ApsaraMQ for RocketMQ connector.
You can call API operations up to 5,000 times per second to access an ApsaraMQ for RocketMQ 4.x Standard Edition instance. If the upper limit is exceeded when you use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink, throttling is triggered and Realtime Compute for Apache Flink deployments may be unstable. Therefore, we recommend that you evaluate the impact if you are using or want to use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink. We recommend that you use another messaging middleware service, such as Kafka, Simple Log Service, or DataHub, instead of ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware based on your business requirements. If you want to use ApsaraMQ for RocketMQ 4.x Standard Edition to process a large number of messages based on your business requirements, submit a ticket to contact technical support to increase the number of messages that can be processed by ApsaraMQ for RocketMQ 4.x Standard Edition per second.
Background information
ApsaraMQ for RocketMQ is a distributed messaging middleware service developed by Alibaba Cloud based on Apache RocketMQ. The service provides low latency, high concurrency, high availability, and high reliability. ApsaraMQ for RocketMQ provides the asynchronous decoupling and load shifting features for distributed application systems. The service also supports features for Internet applications, including message accumulation, high throughput, and reliable retry.
The following table describes the capabilities that are supported by the ApsaraMQ for RocketMQ connector.
Item | Description |
Table type | Source table and result table. |
Running mode | Streaming mode only. |
Data format | CSV and binary formats. |
Metric |
Note For more information about the metrics, see Metrics. |
API type | DataStream API and SQL API. Only ApsaraMQ for RocketMQ 4.x instances support DataStream APIs. |
Data update or deletion in a result table | Data cannot be deleted from a result table. Data can only be updated in or inserted into a result table. |
Features
The following tables describe the fields that are supported for an ApsaraMQ for RocketMQ source table and an ApsaraMQ for RocketMQ result table.
Fields in an ApsaraMQ for RocketMQ source table
NoteOnly Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.1 or later supports the following fields of ApsaraMQ for RocketMQ.
Field
Data type
Description
topic
VARCHAR METADATA VIRTUAL
The topic for a type of ApsaraMQ for RocketMQ messages.
queue-id
INT METADATA VIRTUAL
The ID of the queue in which an ApsaraMQ for RocketMQ message is stored.
queue-offset
BIGINT METADATA VIRTUAL
The consumption offset of an ApsaraMQ for RocketMQ message.
msg-id
VARCHAR METADATA VIRTUAL
The ID of an ApsaraMQ for RocketMQ message.
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The time at which an ApsaraMQ for RocketMQ message is stored.
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The time at which an ApsaraMQ for RocketMQ message is generated.
keys
VARCHAR METADATA VIRTUAL
The keys of an ApsaraMQ for RocketMQ message.
tags
VARCHAR METADATA VIRTUAL
The tags of an ApsaraMQ for RocketMQ message.
Fields in an ApsaraMQ for RocketMQ result table
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.0 or later supports the following fields of ApsaraMQ for RocketMQ.
Field
Data type
Description
keys
VARCHAR METADATA
The keys of an ApsaraMQ for RocketMQ message.
tags
VARCHAR METADATA
The tags of an ApsaraMQ for RocketMQ message.
Prerequisites
Resources are created in the ApsaraMQ for RocketMQ console. For more information, see Create resources.
Limits
Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the ApsaraMQ for RocketMQ connector.
Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports ApsaraMQ for RocketMQ 5.x instances.
In Realtime Compute for Apache Flink that uses VVR of a version earlier than 6.0.2, the deployment parallelism for an ApsaraMQ for RocketMQ source table must be less than or equal to the number of partitions in a topic of ApsaraMQ for RocketMQ. This limit is removed from Realtime Compute for Apache Flink that uses VVR 6.0.2 or later. You can set the deployment parallelism to a value that is greater than the number of partitions in the topic of ApsaraMQ for RocketMQ in advance. You do not need to manually adjust the deployment parallelism when an ApsaraMQ for RocketMQ instance is scaled in.
The ApsaraMQ for RocketMQ connector uses pull consumers to consume messages. All subtasks share data consumption.
Syntax
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the connector.
STRING
Yes
No default value
In ApsaraMQ for RocketMQ 4.x, set the value to
mq
.In ApsaraMQ for RocketMQ 5.x, set the value to
mq5
.
endPoint
The endpoint of ApsaraMQ for RocketMQ.
STRING
Yes
No default value
ApsaraMQ for RocketMQ supports the following types of endpoints:
For deployments that run on VVR 3.0.1 or later, use Transmission Control Protocol (TCP) endpoints. To obtain the required endpoint, perform the following steps:
Internal endpoints of ApsaraMQ for RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internal Access in the TCP Endpoint section.
Public endpoint of ApsaraMQ for RocketMQ: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internet Access in the TCP Endpoint section.
ImportantDue to changes in the network security policies of Alibaba Cloud, connection issues may occur when Realtime Compute for Apache Flink connects to the public ApsaraMQ for RocketMQ service. We recommend that you use the internal ApsaraMQ for RocketMQ service.
The ApsaraMQ for RocketMQ service that resides in the internal network does not support cross-origin access. For example, if your Realtime Compute for Apache Flink service resides in the China (Hangzhou) region but your ApsaraMQ for RocketMQ service resides in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access ApsaraMQ for RocketMQ.
If you want to access ApsaraMQ for RocketMQ over the Internet, you must configure network address translation (NAT). For more information, see Create and manage an Internet NAT gateway.
For Realtime Compute for Apache Flink deployments that run on VVR of a version earlier than 3.0.1, the old endpoint of ApsaraMQ for RocketMQ is no longer available. You must update your deployments to adapt to the VVR version.
ImportantIf you use the ApsaraMQ for RocketMQ connector whose VVR version is earlier than 3.0.1, you must update your Realtime Compute for Apache Flink deployments to VVR 3.0.1 or later and change the value of the endPoint parameter to the new endpoint of ApsaraMQ for RocketMQ. This reduces the risks of instability or unavailability that are caused by the old endpoint of ApsaraMQ for RocketMQ. For more information, see Service notices of Realtime Compute for Apache Flink.
topic
The name of the topic.
STRING
Yes
No default value
N/A.
accessId
ApsaraMQ for RocketMQ 4.x: the AccessKey ID of your Alibaba Cloud account.
ApsaraMQ for RocketMQ 5.x: the username of the ApsaraMQ for RocketMQ instance.
STRING
ApsaraMQ for RocketMQ 4.x: yes
ApsaraMQ for RocketMQ 5.x: no
No default value
ApsaraMQ for RocketMQ 4.x: For more information how to obtain the AccessKey ID of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Console operations topic.
ImportantTo protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.
ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the username of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an Elastic Compute Service (ECS) instance over an internal network, you do not need to set this parameter.
accessKey
ApsaraMQ for RocketMQ 4.x: the AccessKey secret of your Alibaba Cloud account.
ApsaraMQ for RocketMQ 5.x: the password of the ApsaraMQ for RocketMQ instance.
STRING
ApsaraMQ for RocketMQ 4.x: yes
ApsaraMQ for RocketMQ 5.x: no
No default value
ApsaraMQ for RocketMQ 4.x: For more information how to obtain the AccessKey secret of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Console operations topic.
ImportantTo protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.
ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the password of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an ECS instance over an internal network, you do not need to set this parameter.
tag
The tag of the message that is subscribed to or written.
STRING
No
No default value
For an ApsaraMQ for RocketMQ source table, only a single tag can be read.
For an ApsaraMQ for RocketMQ result table, you can configure multiple tags. Separate multiple tags with commas (,).
NoteFor an ApsaraMQ for RocketMQ result table, only ApsaraMQ for RocketMQ 4.x instances are supported. If you use an ApsaraMQ for RocketMQ 5.x instance, you can specify the tag of a message by using the fields in an ApsaraMQ for RocketMQ result table.
nameServerSubgroup
The name server group.
STRING
No
No default value
For the ApsaraMQ for RocketMQ service that resides in the classic network or a VPC of Alibaba Cloud, you must set this parameter to
nsaddr4client-internal
.For the ApsaraMQ for RocketMQ service that can be accessed over the Internet, you do not need to configure this parameter.
NoteThis parameter is supported only in VVR whose version is in the range of 2.1.1 to 3.0.0. In VVR 3.0.1 or later, this parameter is not supported.
encoding
The encoding format.
STRING
No
UTF-8
N/A.
instanceID
The ID of the ApsaraMQ for RocketMQ instance.
STRING
No
No default value
If the ApsaraMQ for RocketMQ instance does not use a separate namespace, the instanceID parameter cannot be configured.
If the ApsaraMQ for RocketMQ instance uses a separate namespace, you must configure the instanceID parameter.
NoteOnly ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to set this parameter.
Parameters that are exclusive to source tables
Parameter
Description
Data type
Required
Default value
Remarks
consumerGroup
The name of the consumer group.
STRING
Yes
No default value
N/A.
pullIntervalMs
The wait interval of the source if no data can be consumed from the upstream storage system.
INT
Yes
No default value
Unit: milliseconds.
You cannot configure the rate at which ApsaraMQ for RocketMQ messages are read because no throttling mechanism is available.
NoteOnly ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to set this parameter.
timeZone
The time zone of the instance.
STRING
No
No default value
Example: Asia or Shanghai.
startTimeMs
The time to start reading data.
LONG
No
No default value
The timestamp. Unit: milliseconds.
startMessageOffset
The start offset to consume messages.
INT
No
No default value
This parameter is optional. If you set this parameter, messages are consumed from the time specified by this parameter.
lineDelimiter
The row delimiter used when a message block is parsed.
STRING
No
\n
N/A.
fieldDelimiter
The field delimiter.
STRING
No
\u0001
The delimiter varies based on the mode in which the terminal of ApsaraMQ for RocketMQ works.
In read-only mode, the delimiter is
\u0001
. This is the default mode. In this mode, the delimiter is invisible.In edit mode, the delimiter is
^A
.
lengthCheck
The rule for checking the number of fields parsed from a row of data.
INT
No
NONE
Valid values:
NONE: This is the default value.
If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
If the number of fields that are parsed from a row of data is smaller than the specified number of fields, this row of data is skipped.
SKIP: If the number of fields that are parsed from a row of data does not match the number of specified fields, this row of data is skipped.
EXCEPTION: If the number of fields that are parsed from a row of data is different from the specified number of fields, an exception is reported.
PAD: Data is padded from left to right based on the order of specified fields.
If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
If the number of fields parsed from a row of data is smaller than the number of defined fields, the values of the missing fields are padded with null.
NoteSKIP, EXCEPTION, and PAD are optional values.
columnErrorDebug
Specifies whether to enable debugging.
BOOLEAN
No
false
If you set this parameter to true, a log entry that indicates a parsing exception is returned.
pullBatchSize
The maximum number of messages that can be pulled at a time.
INT
No
64
Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.
Parameters that are exclusive to result tables
Parameter
Description
Data type
Required
Default value
Remarks
producerGroup
The name of the producer group.
STRING
Yes
No default value
N/A.
retryTimes
The number of retries for writing data to the table.
INT
No
10
N/A.
sleepTimeMs
The retry interval.
LONG
No
5000
N/A.
partitionField
The partition key column. You can specify a field as a partition key column.
STRING
No
No default value
This parameter is required if the
mode
parameter is set topartition
.NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.
Data type mappings
Data type of Realtime Compute for Apache Flink | Data type of ApsaraMQ for RocketMQ |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Examples
Source table
CSV format
For example, a CSV message is recorded in the following format:
1,name,male 2,name,female
NoteAn ApsaraMQ for RocketMQ message can contain zero to more data records. Multiple data records are separated by
\n
.You can execute the following DDL statement to declare an ApsaraMQ for RocketMQ source table in a Realtime Compute for Apache Flink deployment:
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
Binary format
ApsaraMQ for RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
ApsaraMQ for RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
Result table
Create an ApsaraMQ for RocketMQ result table.
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
NoteIf the messages of your ApsaraMQ for RocketMQ instance are in the binary format, only one field can be defined in the DDL statement and the field must be of the VARBINARY data type.
Create an ApsaraMQ for RocketMQ result table in which the values of the keys and tags fields are defined as the
keys
andtags
of ApsaraMQ for RocketMQ messages.ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );