All Products
Search
Document Center

ApsaraMQ for Kafka:Create a Tablestore sink connector

更新時間:Nov 07, 2024

This topic describes how to create a Tablestore sink connector and how to export data from a data source topic in a ApsaraMQ for Kafka instance to Tablestore.

Prerequisites

Usage notes

  • You can export data from a data source topic in a ApsaraMQ for Kafka instance only to a Tablestore instance in the same region as the Message Queue for Apache Kafka instance. For more information about the limits on connectors, see Limits.

  • When you create a connector, ApsaraMQ for Kafka automatically creates a service-linked role for you.

    • If no service-linked role is available, ApsaraMQ for Kafka automatically creates a role for you to export data from ApsaraMQ for Kafka to Tablestore.

    • If these service-linked roles are available, ApsaraMQ for Kafka does not create them again.

    For more information about service-linked roles, see Service-linked roles.

Procedure

This section describes how to export data from a data source topic in the ApsaraMQ for Kafka instance to Tablestore by using a Tablestore sink connector.

  1. Optional:Create the topics and consumer group that are required by a Tablestore sink connector.

    If you do not want to manually create the topics and consumer group, skip this step and set the Resource Creation Method parameter to Auto in the next step.

    Important

    Some topics that are required by a Tablestore sink connector must use a local storage engine. If the major version of your ApsaraMQ for Kafka instance is 0.10.2, you cannot manually create topics that use a local storage engine. In major version 0.10.2, these topics must be automatically created.

    1. Create the topics that are required by a Tablestore sink connector

    2. Create the consumer group that is required by a Tablestore sink connector

  2. Create and deploy a Tablestore sink connector

  3. Verify the result.

    1. Send a test message

    2. Query the data of a table

Create the topics that are required by a Tablestore sink connector

In the ApsaraMQ for Kafka console, you can manually create the five topics required by the Tablestore sink connector. The five topics are: task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. These topics differ in the partition count and storage engine. For more information, see Parameters in the Configure Source Service step.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

    Important

    You must create topics in the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance that is deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click Create Topic.

  6. In the Create Topic panel, specify the properties of the topic and click OK.

    Parameter

    Description

    Example

    Name

    The topic name.

    demo

    Description

    The topic description.

    demo test

    Partitions

    The number of partitions in the topic.

    12

    Storage Engine

    Note

    You can specify the storage engine type only if you use a Professional Edition instance. If you use a Standard Edition instance, cloud storage is selected by default.

    The type of the storage engine that is used to store messages in the topic.

    ApsaraMQ for Kafka supports the following types of storage engines:

    • Cloud Storage: If you select this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine features low latency, high performance, long durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage.

    • Local Storage: If you select this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.

    Cloud Storage

    Message Type

    The message type of the topic. Valid values:

    • Normal Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, the order of messages that are stored in the partitions may not be preserved. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.

    • Partitionally Ordered Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, messages are still stored in the partitions in the order in which the messages are sent. Messages in some partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    The log cleanup policy that is used by the topic.

    If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine parameter to Local Storage only if you use an ApsaraMQ for Kafka Professional Edition instance.

    ApsaraMQ for Kafka provides the following log cleanup policies:

    • Delete: the default log cleanup policy. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes the earliest stored messages to ensure service availability.

    • Compact: the log compaction policy that is used in Apache Kafka. Log compaction ensures that the latest values are retained for messages that have the same key. This policy is suitable for scenarios such as restoring a failed system or reloading the cache after a system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.

      Important

      You can use log-compacted topics only in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.

    Compact

    Tag

    The tags that you want to attach to the topic.

    demo

    After a topic is created, you can view the topic on the Topics page.

Create the consumer group that is required by a Tablestore sink connector

In the ApsaraMQ for Kafka console, you can manually create the consumer group that is required by a Tablestore sink connector.Group The name of the consumer group must be in the connect-Task name format. For more information, see Parameters in the Configure Source Service step.Group

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Groups.

  5. On the Groups page, click Create Group.

  6. In the Create Group panel, enter a group name in the Group ID field and a group description in the Description field, attach tags to the group, and then click OK.

    After a consumer group is created, you can view the consumer group on the Groups page.

Create and deploy a Tablestore sink connector

Create and deploy a Tablestore sink connector that you can use to export data from ApsaraMQ for Kafka to Tablestore.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. In the left-side navigation pane, click Connectors.

  4. On the Connectors page, select the instance to which the connector belongs from the Select Instance drop-down list and click Create Connector.

  5. Complete the Create Connector wizard.

    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.

      Parameter

      Description

      Example

      Name

      The name of the connector. Specify a connector name based on the following rules:

      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).

      • Each connector name must be unique within a ApsaraMQ for Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-ts-sink

      Instance

      The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed.

      demo alikafka_post-cn-st21p8vj****

    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are described in the following table, and then click Next.

      Note

      If you have created the topics and consumer group in advance, set the Resource Creation Method parameter to Manual and enter the names of the created resources in the fields below.Group Otherwise, set the Resource Creation Method parameter to Auto.

      Table 1. Parameters in the Configure Source Service step

      Parameter

      Description

      Example

      Data Source Topic

      The name of the source topic from which data is to be synchronized.

      ts-test-input

      Consumer Thread Concurrency

      The number of concurrent consumer threads that are used to synchronize data from the source topic. By default, six concurrent consumer threads are used. Valid values:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      Consumer Offset

      The consumer offset from which you want the specified consumer group to start to consume messages. Valid values:

      • Earliest Offset: The specified consumer group starts to consume messages from the earliest consumer offset.

      • Latest Offset: The specified consumer group starts to consume messages from the most recent consumer offset.

      Earliest Offset

      VPC ID

      The ID of the VPC in which the data synchronization task runs. Click Configure Runtime Environment to display the parameter. By default, the VPC ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. You do not need to configure this parameter.

      vpc-bp1xpdnd3l***

      vSwitch ID

      The ID of the vSwitch in which the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the source ApsaraMQ for Kafka instance. The default value is the vSwitch ID that you specified when you deployed the ApsaraMQ for Kafka instance.

      vsw-bp1d2jgg81***

      Failure Handling Policy

      Specifies whether to retain the subscription to the partition in which an error occurs after the relevant message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:

      • Continue Subscription: retains the subscription to the partition in which an error occurs and returns the logs.

      • Stop Subscription: stops the subscription to the partition in which an error occurs and returns the logs.

      Note

      Continue Subscription

      Resource Creation Method

      The method used to create the topics and consumer group that are required by the Tablestore sink connector.Group Click Configure Runtime Environment to display the parameter. Valid values:

      • Auto

      • Manual

      Auto

      Connector Consumer Group

      The consumer group that is used by the data synchronization task of the connector.Group Click Configure Runtime Environment to display the parameter. Valid values: The name of this consumer group must be in the connect-Task name format.Group

      connect-cluster-kafka-ots-sink

      Task Offset Topic

      The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-offset.

      • Partitions: The number of partitions in the topic must be greater than 1.

      • Storage Engine: The storage engine of the topic must be set to Local Storage.

      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.

      connect-offset-kafka-ots-sink

      Task Configuration Topic

      The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-config.

      • Partitions: The topic can contain only one partition.

      • Storage Engine: The storage engine of the topic must be set to Local Storage.

      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.

      connect-config-kafka-ots-sink

      Task Status Topic

      The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-status.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: The storage engine of the topic must be set to Local Storage.

      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.

      connect-status-kafka-ots-sink

      Dead-letter Queue Topic

      The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.

      • Topic: We recommend that you start the topic name with connect-error.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.

      connect-error-kafka-ots-sink

      Error Data Topic

      The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.

      • Topic: We recommend that you start the topic name with connect-error.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.

      connect-error-kafka-ots-sink

    3. In the Configure Destination Service step, select Tablestore as the destination service, set the parameters that are shown in the following table, and then click Create.

      Parameter

      Description

      Example

      Instance Name

      The name of the Tablestore instance.

      k00eny67****

      Automatically Create Destination Table

      Specifies whether to automatically create a destination table in Tablestore.

      • Yes: A table is automatically created in Tablestore to store synchronized data. You can customize the table name.

      • No: An existing table is used to store synchronized data.

      Yes

      Destination Table Name

      The name of the table that stores the synchronized data. If you set Automatically Create Destination Table to No, ensure that the table name you enter is the same as that of an existing table in the Tablestore instance.

      kafka_table

      Tablestore

      The type of the table that stores the synchronized data.

      • Wide Column Model

      • TimeSeries Model

      Wide Column Model

      Message Key Format

      The format of the message key in Message Queue for Apache Kafka. You can set this parameter to String or JSON. The default value is JSON. This parameter is displayed when Tablestore is set to Wide Column Model.

      • String: The message key is parsed as a string.

      • JSON: The message key must be in the JSON format.

      String

      Message Value Format

      The format of the message value in Message Queue for Apache Kafka. You can set this parameter to String or JSON. The default value is JSON. This parameter is displayed when Tablestore is set to Wide Column Model.

      • String: The message value is parsed as a string.

      • JSON: The message value must be in the JSON format.

      String

      JSON Message Field Conversion

      The field processing method of JSON messages. This parameter is displayed when Message Key Format or Message Value Format is set to JSON. Valid values:

      • Write All as String: All fields are converted into strings in Tablestore.

      • Automatically Identify Field Types: String and boolean fields are converted into string and boolean fields in Tablestore respectively. Integer and float data are converted into double data in Tablestore.

      Write All as String

      Primary Key Mode

      Specifies the primary key mode. The primary keys for a data table can be extracted from different parts of ApsaraMQ for Kafka message records, including the Coordinates (Topic, Partition, Offset), Key, and Value of the message records. This parameter is displayed when Tablestore is set to Wide Column Model. The default value is kafka.

      • kafka: uses <connect_topic>_<connect_partition> and <connect_offset> as the primary keys of the data table.

      • record_key: uses the fields in the message key as the primary keys of the data table.

      • record_value: uses the fields in the message value as the primary keys of the data table.

      kafka

      Primary Key Column Names

      The names of the primary key columns and the corresponding data types. A column name specifies the field extracted from the message key or message value. You can set the data type of the field to String or Integer.

      The parameter is displayed when Message Key Format is set to JSON and Primary Key Mode is set to record_key. This parameter is also displayed when Message Value Format is set to JSON and Primary Key Mode is set to record_value.

      Click Create to add a column name. You can configure a maximum of four column names.

      None

      Write Mode

      Specifies the write mode. You can set this parameter to put or update. The default value is put. This parameter is displayed when Tablestore is set to Wide Column Model.

      • put: New data overwrites the original data in the table.

      • update: New data is added to the table and the original data is retained.

      put

      Delete Mode

      Specifies whether you can delete rows or attribute columns when the ApsaraMQ for Kafka message records contain empty values. This parameter is displayed when Primary Key Mode is set to record_key. Valid values:

      • none: The default value. Deletion is not allowed.

      • row: allows you to delete rows.

      • column: allows you to delete attribute columns.

      • row_and_column: allows you to delete rows and attribute columns.

      The deletion operation depends on the write mode.

      • When Write Mode is set to put, set Delete Mode to any value. Even if the message record contains empty values, the data of the message record is exported to the Tablestore data table in the overwrite mode.

      • When Write Mode is set to update, set Delete Mode to none or row. If all fields in the message record are empty, the message record is regarded as dirty data. If some fields in the message record are empty, these empty values are automatically skipped and other non-empty values are added to the Tablestore table. When Delete Mode is set to column or row_and_column and the message record contains empty values, the system deletes the attribute columns or both rows and attribute columns corresponding to empty values as required. Then, the remaining data is added to the Tablestore table.

      None

      Metric Name Field

      Maps a field as a measure name field (_m_name) to the TimeSeries model in Tablestore and indicates a physical quantity or monitoring metric that is measured, such as temperature or speed. Empty values are not supported. This parameter is displayed when Tablestore is set to TimeSeries Model.

      measurement

      Data Source Field

      Maps a field as a data source field (_data_source) to the TimeSeries model in Tablestore and indicates a source of data, such as device ID. Empty values are supported. This parameter is displayed when Tablestore is set to TimeSeries Model.

      source

      Tag Field

      Uses one or more fields as tag fields (_tags) in the TimeSeries model in Tablestore. Each tag is a key:value pair of the string type. The key is the configured field name, and the value is the field value. A tag is a part of timeline metadata. A timeline consists of metric names, data sources, and tags. Empty values are supported. This parameter is displayed when Tablestore is set to TimeSeries Model.

      tag1, tag2

      Timestamp Field

      Maps a field as a timestamp field (_time) to the TimeSeries model in Tablestore and indicates the point in time when the data is generated, for example, the time when a physical amount is measured. When data is written to Tablestore, the timestamp is converted into a value in microseconds. This parameter is displayed when Tablestore is set to TimeSeries Model.

      time

      Timestamp Unit

      Configure this parameter based on the timestamp configuration. This parameter is displayed when Tablestore is set to TimeSeries Model. Valid values:

      • SECONDS: seconds.

      • MILLISECONDS: milliseconds.

      • MICROSECONDS: microseconds.

      • NANOSECONDS: nanoseconds.

      MILLISECONDS

      Whether to Map All Non-primary Key Fields

      Specifies whether to map all non-primary key fields as data fields. Primary key fields are fields that have been mapped as measure names, data sources, tags, or timestamps. This parameter is displayed when Tablestore is set to TimeSeries Model. Valid values:

      • Yes: The fields are automatically mapped and the data types are automatically determined. Numbers are all converted into double data.

      • No: You must specify the fields to be mapped and the data types.

      Yes

      Configure Mapping for All Non-primary Key Fields

      Specifies the field types corresponding to the names of non-primary key fields in the TimeSeries model in Tablestore. Five data types are supported, which are double, integer, string, binary, and boolean. This parameter is displayed when Whether to Map All Non-primary Key Fields is set to No.

      String

      After the connector is created, you can view the connector on the Connectors page.

  6. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

  7. Click OK.

Send a test message

After you deploy a Tablestore sink connector, you can send a message to the data source topic in ApsaraMQ for Kafka to test whether the message can be synchronized to Tablestore.

  1. On the Connectors page, find the connector that you want to manage and click Test in the Actions column.

  2. In the Send Message panel, configure the parameters to send a message for testing.

    • If you set the Sending Method parameter to Console, perform the following steps:

      1. In the Message Key field, enter the message key. Example: demo.

      2. In the Message Content field, enter the message content. Example: {"key": "test"}.

      3. Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.

        • If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field. Example: 0. For information about how to query partition IDs, see View partition status.

        • If you do not want to send the test message to a specific partition, click No.

    • If you set the Sending Method parameter to Docker, run the Docker command in the Run the Docker container to produce a sample message section to send the test message.

    • If you set the Sending Method parameter to SDK, select an SDK for the required programming language or framework and an access method to send and subscribe to the test message.

Query the data of a table

After you send a message to the data source topic in ApsaraMQ for Kafka, you can check whether the message is received in the Tablestore console. Perform the following steps to view the data in the Tablestore table:

  1. Log on to the Tablestore console.

  2. On the Overview page, click the name of the instance that you want to manage, or click Manage Instance in the Actions column of the instance that you want to manage.

  3. On the Instance Details tab, find the table that you want to manage in the Tables section. View a data table

  4. Click the table name. On the Query Data tab of the Manage Table page, view the data in the table. Query the data of a table