This topic describes how to create a Tablestore sink connector to synchronize data from a source topic on an ApsaraMQ for Kafka instance to a table in Tablestore.
Prerequisites
Resources are created and the required policies are attached. For more information, see Prerequisites.
The
AliyunOTSFullAccess
policy is manually attached to the service role with which the function that is generated during connector creation is configured. The policy is used to grant the service role the permissions to manage Tablestore. For more information, see Grant Function Compute permissions to access other Alibaba Cloud services.
Step 1: Create Tablestore resources
Activate Tablestore. For more information, see Step 1: Activate Tablestore.
Create a Tablestore instance. For more information, see Step 2: Create an instance.
Create a data table. For more information, see Step 3: Create a data table.
In this example, an instance named ots-sink and a data table named ots_sink_table are created. Primary keys pk1 and pk2 are specified when the data table is created.
Step 2: Create and start the Tablestore sink connector
Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
In the left-side navigation pane, choose .
On the Tasks page, click Create Task.
On the Create Task page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:
Task Creation
In the Source step, set the Data Provider parameter to ApsaraMQ for Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.
Parameter
Description
Example
Region
The region where the ApsaraMQ for Kafka instance resides.
China (Beijing)
ApsaraMQ for Kafka Instance
The ApsaraMQ for Kafka instance in which the messages that you want to route are produced.
MQ_INST_115964845466****_ByBeUp3p
Topic
The topic on the ApsaraMQ for Kafka instance in which the messages that you want to route are produced.
topic
Group ID
The name of the consumer group on the ApsaraMQ for Kafka instance. You must use a separate consumer group to create the message routing source. Do not use a consumer group that is in use. Otherwise, existing messages may fail to be sent and received.
GID_http_1
Consumer Offset
The offset from which messages are consumed.
Latest Offset
Network Configuration
The type of the network over which you want to route messages.
Basic Network
VPC
The ID of the virtual private cloud (VPC) in which the ApsaraMQ for Kafka instance is deployed. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The ID of the vSwitch with which the ApsaraMQ for Kafka instance is associated. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.
vsw-bp1gbjhj53hdjdkg****
Security Group
The security group to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.
alikafka_pre-cn-7mz2****
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
100
Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 specifies that messages are sent immediately after aggregation.
3
In the Filtering step, define a data pattern in the Pattern Content code editor to filter requests. For more information, see Event patterns.
In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Data cleansing.
In the Sink step, set the Service Type parameter to Tablestore and follow the on-screen instructions to configure other parameters. The following table describes the parameters.
Parameter
Description
Example
Instance Name
The name of the Tablestore instance that you created.
ost-sink
Destination Table
The Tablestore data table that you created.
ost_sink_table
Primary Key
The method that you want to use to generate the primary keys of Tablestore. You must define a rule in JSONPath syntax to extract the content of each primary key.
For example, you can specify id as the primary key name and
$.data.value.id
as the numerical extraction rule.Attribute Column
The method that you want to use to generate attribute columns. You must define a rule in JSONPath syntax to extract the content of each attribute column.
For example, you can specify key as the attribute column name and
$.data.value.name
as the numerical extraction rule.Write Mode
The mode in which data is written to Tablestore. Valid values:
put: overwrites a row.
update: updates a row.
put
Network Settings
VPC: Messages in ApsaraMQ for Kafka are delivered to Tablestore in a VPC.
Internet: Messages in ApsaraMQ for Kafka are delivered to Tablestore over the Internet.
Internet
VPC
The VPC ID. This parameter is required only if you set the Network Settings parameter to VPC.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The vSwitch ID. This parameter is required only if you set the Network Settings parameter to VPC.
vsw-bp1gbjhj53hdjdkg****
Security Group
The security group to which the Tablestore instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.
test_group
Task Property
Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.
Go back to the Tasks page, find the Tablestore sink connector that you created, and then click Enable in the Actions column.
In the Note message, click OK.
The sink task requires 30 to 60 seconds to be enabled. You can view the progress in the Status column on the Tasks page.
Step 3: Test the Tablestore sink connector
On the Tasks page, find the Tablestore sink connector and click the name of the source topic in the Event Source column.
- On the Topic Details page, click Send Message.
In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.
On the Tasks page, find the Tablestore sink connector that you created and click the name of the destination table in the Event Target column.
On the Manage Table page, click the Query Data tab. On the Query Data tab, click Search. In the dialog box that appears, specify the query range and click OK.