This topic describes how to use the ApsaraMQ for RocketMQ connector.
You can call API operations up to 5,000 times per second to access an ApsaraMQ for RocketMQ 4.x Standard Edition instance. If the upper limit is exceeded when you use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink, throttling is triggered and Realtime Compute for Apache Flink deployments may be unstable. Therefore, we recommend that you evaluate the impact if you are using or want to use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink. We recommend that you use another messaging middleware service, such as Kafka, Simple Log Service, or DataHub, instead of ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware based on your business requirements. If you want to use ApsaraMQ for RocketMQ 4.x Standard Edition to process a large number of messages based on your business requirements, submit a ticket to contact technical support to increase the number of messages that can be processed by ApsaraMQ for RocketMQ 4.x Standard Edition per second.
Background information
ApsaraMQ for RocketMQ is a distributed messaging middleware service developed by Alibaba Cloud based on Apache RocketMQ. The service provides low latency, high concurrency, high availability, and high reliability. ApsaraMQ for RocketMQ provides the asynchronous decoupling and load shifting features for distributed application systems. The service also supports features for Internet applications, including message accumulation, high throughput, and reliable retry.
The following table describes the capabilities that are supported by the ApsaraMQ for RocketMQ connector.
Item | Description |
Table type | Source table and result table. |
Running mode | Streaming mode only. |
Data format | CSV and binary formats. |
Metric |
Note For more information about the metrics, see Metrics. |
API type | DataStream API and SQL API. Only ApsaraMQ for RocketMQ 4.x instances support DataStream APIs. |
Data update or deletion in a result table | Data cannot be deleted from a result table. Data can only be updated in or inserted into a result table. |
Features
The following tables describe the fields that are supported for an ApsaraMQ for RocketMQ source table and an ApsaraMQ for RocketMQ result table.
Fields in an ApsaraMQ for RocketMQ source table
NoteOnly Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.1 or later supports the following fields of ApsaraMQ for RocketMQ.
Field
Data type
Description
topic
VARCHAR METADATA VIRTUAL
The topic for a type of ApsaraMQ for RocketMQ messages.
queue-id
INT METADATA VIRTUAL
The ID of the queue in which an ApsaraMQ for RocketMQ message is stored.
queue-offset
BIGINT METADATA VIRTUAL
The consumption offset of an ApsaraMQ for RocketMQ message.
msg-id
VARCHAR METADATA VIRTUAL
The ID of an ApsaraMQ for RocketMQ message.
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The time at which an ApsaraMQ for RocketMQ message is stored.
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The time at which an ApsaraMQ for RocketMQ message is generated.
keys
VARCHAR METADATA VIRTUAL
The keys of an ApsaraMQ for RocketMQ message.
tags
VARCHAR METADATA VIRTUAL
The tags of an ApsaraMQ for RocketMQ message.
Fields in an ApsaraMQ for RocketMQ result table
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.0 or later supports the following fields of ApsaraMQ for RocketMQ.
Field
Data type
Description
keys
VARCHAR METADATA
The keys of an ApsaraMQ for RocketMQ message.
tags
VARCHAR METADATA
The tags of an ApsaraMQ for RocketMQ message.
Prerequisites
Resources are created in the ApsaraMQ for RocketMQ console. For more information, see Create resources.
Limits
Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the ApsaraMQ for RocketMQ connector.
Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports ApsaraMQ for RocketMQ 5.x instances.
In Realtime Compute for Apache Flink that uses VVR of a version earlier than 6.0.2, the deployment parallelism for an ApsaraMQ for RocketMQ source table must be less than or equal to the number of partitions in a topic of ApsaraMQ for RocketMQ. This limit is removed from Realtime Compute for Apache Flink that uses VVR 6.0.2 or later. You can set the deployment parallelism to a value that is greater than the number of partitions in the topic of ApsaraMQ for RocketMQ in advance. You do not need to manually adjust the deployment parallelism when an ApsaraMQ for RocketMQ instance is scaled in.
The ApsaraMQ for RocketMQ connector uses pull consumers to consume messages. All subtasks share data consumption.
Syntax
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the connector.
STRING
Yes
No default value
In ApsaraMQ for RocketMQ 4.x, set the value to
mq
.In ApsaraMQ for RocketMQ 5.x, set the value to
mq5
.
endPoint
The endpoint of ApsaraMQ for RocketMQ.
STRING
Yes
No default value
ApsaraMQ for RocketMQ supports the following types of endpoints:
For deployments that run on VVR 3.0.1 or later, use Transmission Control Protocol (TCP) endpoints. To obtain the required endpoint, perform the following steps:
Internal endpoints of ApsaraMQ for RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internal Access in the TCP Endpoint section.
Public endpoint of ApsaraMQ for RocketMQ: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internet Access in the TCP Endpoint section.
ImportantDue to changes in the network security policies of Alibaba Cloud, connection issues may occur when Realtime Compute for Apache Flink connects to the public ApsaraMQ for RocketMQ service. We recommend that you use the internal ApsaraMQ for RocketMQ service.
The ApsaraMQ for RocketMQ service that resides in the internal network does not support cross-origin access. For example, if your Realtime Compute for Apache Flink service resides in the China (Hangzhou) region but your ApsaraMQ for RocketMQ service resides in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access ApsaraMQ for RocketMQ.
If you want to access ApsaraMQ for RocketMQ over the Internet, you must configure network address translation (NAT). For more information, see Create and manage an Internet NAT gateway.
For Realtime Compute for Apache Flink deployments that run on VVR of a version earlier than 3.0.1, the old endpoint of ApsaraMQ for RocketMQ is no longer available. You must update your deployments to adapt to the VVR version.
ImportantIf you use the ApsaraMQ for RocketMQ connector whose VVR version is earlier than 3.0.1, you must update your Realtime Compute for Apache Flink deployments to VVR 3.0.1 or later and change the value of the endPoint parameter to the new endpoint of ApsaraMQ for RocketMQ. This reduces the risks of instability or unavailability that are caused by the old endpoint of ApsaraMQ for RocketMQ. For more information, see Service notices of Realtime Compute for Apache Flink.
topic
The name of the topic.
STRING
Yes
No default value
N/A.
accessId
ApsaraMQ for RocketMQ 4.x: the AccessKey ID of your Alibaba Cloud account.
ApsaraMQ for RocketMQ 5.x: the username of the ApsaraMQ for RocketMQ instance.
STRING
ApsaraMQ for RocketMQ 4.x: yes
ApsaraMQ for RocketMQ 5.x: no
No default value
ApsaraMQ for RocketMQ 4.x: For more information how to obtain the AccessKey ID of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Console operations topic.
ImportantTo protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.
ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the username of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an Elastic Compute Service (ECS) instance over an internal network, you do not need to set this parameter.
accessKey
ApsaraMQ for RocketMQ 4.x: the AccessKey secret of your Alibaba Cloud account.
ApsaraMQ for RocketMQ 5.x: the password of the ApsaraMQ for RocketMQ instance.
STRING
ApsaraMQ for RocketMQ 4.x: yes
ApsaraMQ for RocketMQ 5.x: no
No default value
ApsaraMQ for RocketMQ 4.x: For more information how to obtain the AccessKey secret of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Console operations topic.
ImportantTo protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.
ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the password of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an ECS instance over an internal network, you do not need to set this parameter.
tag
The tag of the message that is subscribed to or written.
STRING
No
No default value
For an ApsaraMQ for RocketMQ source table, only a single tag can be read.
For an ApsaraMQ for RocketMQ result table, you can configure multiple tags. Separate multiple tags with commas (,).
NoteFor an ApsaraMQ for RocketMQ result table, only ApsaraMQ for RocketMQ 4.x instances are supported. If you use an ApsaraMQ for RocketMQ 5.x instance, you can specify the tag of a message by using the fields in an ApsaraMQ for RocketMQ result table.
nameServerSubgroup
The name server group.
STRING
No
No default value
For the ApsaraMQ for RocketMQ service that resides in the classic network or a VPC of Alibaba Cloud, you must set this parameter to
nsaddr4client-internal
.For the ApsaraMQ for RocketMQ service that can be accessed over the Internet, you do not need to configure this parameter.
NoteThis parameter is supported only in VVR whose version is in the range of 2.1.1 to 3.0.0. In VVR 3.0.1 or later, this parameter is not supported.
encoding
The encoding format.
STRING
No
UTF-8
N/A.
instanceID
The ID of the ApsaraMQ for RocketMQ instance.
STRING
No
No default value
If the ApsaraMQ for RocketMQ instance does not use a separate namespace, the instanceID parameter cannot be configured.
If the ApsaraMQ for RocketMQ instance uses a separate namespace, you must configure the instanceID parameter.
NoteOnly ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to set this parameter.
Parameters that are exclusive to source tables
Parameter
Description
Data type
Required
Default value
Remarks
consumerGroup
The name of the consumer group.
STRING
Yes
No default value
N/A.
pullIntervalMs
The wait interval of the source if no data can be consumed from the upstream storage system.
INT
Yes
No default value
Unit: milliseconds.
You cannot configure the rate at which ApsaraMQ for RocketMQ messages are read because no throttling mechanism is available.
NoteOnly ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to set this parameter.
timeZone
The time zone of the instance.
STRING
No
No default value
Example: Asia or Shanghai.
startTimeMs
The time to start reading data.
LONG
No
No default value
The timestamp. Unit: milliseconds.
startMessageOffset
The start offset to consume messages.
INT
No
No default value
This parameter is optional. If you set this parameter, messages are consumed from the time specified by this parameter.
lineDelimiter
The row delimiter used when a message block is parsed.
STRING
No
\n
N/A.
fieldDelimiter
The field delimiter.
STRING
No
\u0001
The delimiter varies based on the mode in which the terminal of ApsaraMQ for RocketMQ works.
In read-only mode, the delimiter is
\u0001
. This is the default mode. In this mode, the delimiter is invisible.In edit mode, the delimiter is
^A
.
lengthCheck
The rule for checking the number of fields parsed from a row of data.
INT
No
NONE
Valid values:
NONE: This is the default value.
If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
If the number of fields that are parsed from a row of data is smaller than the specified number of fields, this row of data is skipped.
SKIP: If the number of fields that are parsed from a row of data does not match the number of specified fields, this row of data is skipped.
EXCEPTION: If the number of fields that are parsed from a row of data is different from the specified number of fields, an exception is reported.
PAD: Data is padded from left to right based on the order of specified fields.
If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
If the number of fields parsed from a row of data is smaller than the number of defined fields, the values of the missing fields are padded with null.
NoteSKIP, EXCEPTION, and PAD are optional values.
columnErrorDebug
Specifies whether to enable debugging.
BOOLEAN
No
false
If you set this parameter to true, a log entry that indicates a parsing exception is returned.
pullBatchSize
The maximum number of messages that can be pulled at a time.
INT
No
64
Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.
Parameters that are exclusive to result tables
Parameter
Description
Data type
Required
Default value
Remarks
producerGroup
The name of the producer group.
STRING
Yes
No default value
N/A.
retryTimes
The number of retries for writing data to the table.
INT
No
10
N/A.
sleepTimeMs
The retry interval.
LONG
No
5000
N/A.
partitionField
The partition key column. You can specify a field as a partition key column.
STRING
No
No default value
This parameter is required if the
mode
parameter is set topartition
.NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.
Data type mappings
Data type of Realtime Compute for Apache Flink | Data type of ApsaraMQ for RocketMQ |
VARCHAR | STRING |
Examples
Source table
CSV format
For example, a CSV message is recorded in the following format:
1,name,male 2,name,female
NoteAn ApsaraMQ for RocketMQ message can contain zero to more data records. Multiple data records are separated by
\n
.You can execute the following DDL statement to declare an ApsaraMQ for RocketMQ source table in a Realtime Compute for Apache Flink deployment:
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
Binary format
ApsaraMQ for RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
ApsaraMQ for RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
Result table
Create an ApsaraMQ for RocketMQ result table.
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
NoteIf the messages of your ApsaraMQ for RocketMQ instance are in the binary format, only one field can be defined in the DDL statement and the field must be of the VARBINARY data type.
Create an ApsaraMQ for RocketMQ result table in which the values of the keys and tags fields are defined as the
keys
andtags
of ApsaraMQ for RocketMQ messages.ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to access Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.
The VVR engine of Realtime Compute for Apache Flink provides MetaQSource
for reading data from ApsaraMQ for RocketMQ and the implementation class MetaQOutputFormat
of OutputFormat
for writing data to ApsaraMQ for RocketMQ. The following sample code uses the ApsaraMQ for RocketMQ DataStream connector to read data from and write data to ApsaraMQ for RocketMQ.
ApsaraMQ for RocketMQ 4.x
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// The following two configurations are used only for local debugging. You need to delete the two configurations before you package the draft and upload it to Alibaba Cloud Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "Absolute path of the uber JAR package.
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Creates and adds RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Converts message body to upper case.
.map(RocketMQDataStreamDemo2::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consumer
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
0, // Start offset.
300_000, // Partition discover interval.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}
ApsaraMQ for RocketMQ 5.x
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// The following two configurations are used only for local debugging. You need to delete the two configurations before you package the draft and upload it to Alibaba Cloud Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "Absolute path of the uber JAR package.
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}
XML
The ApsaraMQ for RocketMQ DataStream connectors of different versions are stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>
For more information about the endpoints of ApsaraMQ for RocketMQ, see Announcement on the settings of internal TCP endpoints.