This topic describes how to synchronize data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster to meet your real-time data processing requirements.
ApsaraDB for ClickHouse clusters can synchronize data only from a self-managed Kafka cluster deployed on an Elastic Compute Service (ECS) instance or from an ApsaraMQ for Kafka instance. The following example describes how to synchronize data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster.
Prerequisites
A destination ApsaraDB for ClickHouse cluster is created. An ApsaraMQ for Kafka instance and the destination ApsaraDB for ClickHouse cluster are deployed in the same region and use the same virtual private cloud (VPC). For more information, see Create an ApsaraDB for ClickHouse cluster.
An account is created in the destination ApsaraDB for ClickHouse cluster to log on to the database and has the permissions to perform operations on the database. For more information, see Create a database account and Grant permissions.
Procedure
Log on to the ApsaraDB for ClickHouse console.
In the top navigation bar, select the region where the cluster is deployed.
On the Clusters page, click the Default Instances tab, find the cluster that you want to manage, and then click the ID of the cluster.
In the upper-right corner of the Cluster Information page, click Log On to Database.
In the Log on to Database Instance dialog box of the Data Management (DMS) console, set the Database Account and Database Password parameters and click Login.
Create a table that uses the Kafka engine.
NoteThe table that uses the Kafka engine is used to consume data from ApsaraMQ for Kafka. The consumed data is synchronized to tables in ApsaraDB for ClickHouse.
The table that uses the Kafka engine cannot be directly used.
The table that uses the Kafka engine is used only to consume data in ApsaraMQ for Kafka and cannot be used to store data.
Use the following syntax to create a table:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port1,host:port2,host:port3', kafka_topic_list = 'topic_name1,topic_name2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_num_consumers = N,] [kafka_thread_per_consume = 1,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch = 0,] [kafka_auto_offset_reset = N]
The following table describes the parameters in the preceding statement.
Parameter
Required
Description
kafka_broker_list
Yes
The endpoints that are used to connect to the ApsaraMQ for Kafka instance used as the data source. Separate the endpoints with commas (,). For more information about how to view endpoints, see View endpoints.
kafka_topic_list
Yes
The names of topics in the ApsaraMQ for Kafka instance used as the data source. Separate the names of topics with commas (,). For more information about how to view the name of a topic, see Step 1: Create a topic.
kafka_group_name
Yes
The name of the consumer group in the ApsaraMQ for Kafka instance used as the data source. For more information about how to create a consumer group, see Step 2: Create a group.
kafka_format
Yes
The format of the message body supported by ApsaraDB for ClickHouse.
NoteFor more information about the formats of message bodies supported by ApsaraDB for ClickHouse, see Formats for Input and Output Data.
kafka_row_delimiter
No
The row delimiter that is used to separate rows. The default value is \n. You can also set this parameter based on the actual delimited format in which data is written.
kafka_num_consumers
No
The number of consumers that consume the data in the table. Default value: 1.
NoteIf the throughput of one consumer is insufficient, specify a larger number of consumers.
The number of consumers cannot exceed the number of partitions in a topic because only one consumer can be assigned per partition.
kafka_thread_per_consumer
No
Specifies whether each consumer starts an independent thread for consumption. Default value: 0. Valid values:
0: All consumers use one thread for consumption.
1: Each consumer starts an independent thread for consumption.
For more information about how to improve the consumption speed, see Clusters and Performance.
kafka_max_block_size
No
The maximum size of Kafka messages that can be written to the table in each batch. Default value: 65536. Unit: bytes.
kafka_skip_broken_messages
No
The tolerance of the Kafka message parser to dirty data. Default value: 0. If you set kafka_skip_broken_messages to N, the Kafka engine skips N Kafka messages that cannot be parsed. One message is equivalent to one row of data.
kafka_commit_every_batch
No
Specifies how often a commit operation is performed. Default value: 0. Valid values:
0: A commit operation is performed only after a whole block is written.
1: A commit operation is performed after a batch of data is written.
kafka_auto_offset_reset
No
The offset from which the data in the ApsaraMQ for Kafka instance used as the data source is consumed. Valid values:
earliest (default value): Data in the ApsaraMQ for Kafka instance is consumed from the earliest offset.
latest: Data in the ApsaraMQ for Kafka instance is consumed from the latest offset.
NoteThis parameter is not supported by the ApsaraDB for ClickHouse clusters of version 21.8.
NoteFor more information about other parameters, see Kafka.
You can execute the following sample statement:
CREATE TABLE default.kafka_src_table ON CLUSTER default ( -- Fields that define the schema of the table id Int32, name String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-3-vpc.alikafka.aliyuncs.com:9092', kafka_topic_list = 'test', kafka_group_name = 'test', kafka_format = 'CSV';
Create a table in ApsaraDB for ClickHouse.
NoteFor more information about the purpose of creating a local table or a distributed table, see Terms.
Create a local table.
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);
Create a distributed table.
NoteIf you want to synchronize data only to a local table, skip this step.
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Create a view to synchronize data that is consumed from the table that uses the Kafka engine to the distributed table in ApsaraDB for ClickHouse.
NoteIf you synchronize data to a local table, replace the name of the distributed table with the name of the local table in the statement.
You can use the following syntax to create a view:
CREATE MATERIALIZED VIEW [view.name] ON CLUSTER default TO [dest_table] AS SELECT * FROM [src_table];
You can execute the following sample statement:
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
Verify the data synchronization result
You can use either of the following methods to verify the data synchronization result.
Query the distributed table in ApsaraDB for ClickHouse
Use the topic of ApsaraMQ for Kafka to send messages.
Log on to the ApsaraMQ for Kafka console.
On the Instances page, click the name of the instance that has the topic that you want to use to send messages.
On the Topics page, find the topic that you want to use to send messages, and choose in the Actions column.
In the Start to Send and Consume Message panel, specify Message Content for the message that you want to send.
In this example, messages
1,a
and2,b
are sent by using this topic.Click OK.
Execute the following query statement to query the distributed table in ApsaraDB for ClickHouse to check whether the data is synchronized to the distributed table:
SELECT * FROM kafka_table_distributed;
NoteIf you synchronize data to a local table, replace the name of the distributed table with the name of the local table in the statement.
The following query result is returned:
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘
NoteIf the result is returned after you execute the query statement, the data has been synchronized from the ApsaraMQ for Kafka instance to the ApsaraDB for ClickHouse cluster.
Query the system table
Query the system table system.kafka
to view the data consumption status of the table that uses the Kafka engine.
SELECT * FROM system.kafka
The following query result is returned:
┌─database─┬──────────table──────────────┬─topic─┬─consumer_group─┬─last_read_message_count─┬───────status──────┬─exception─┐
│ default │ kafka_table_distributed │ test │ test │ 2 │ attach_view │ │
└──────────┴─────────────────────────────┴───────┴────────────────┴─────────────────────────┴───────────────────┴───────────┘
The following table describes the fields that are returned in the query result.
Field | Description |
database | The name of the database to which the table that uses the Kafka engine belongs. |
table | The name of the table that uses the Kafka engine. |
topic | The name of the topic for the table that uses the Kafka engine. |
consumer_group | The name of the consumer group in the table that uses the Kafka engine. |
last_read_message_count | The number of messages read by the table that uses the Kafka engine. |
status | The consumption status of the table that uses the Kafka engine. Valid values:
|
exception | The details about the consumption error. Note If the value of the status field is error, the error details are returned for this field. |
FAQ
For more information about the frequently asked questions of synchronizing data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster and related solutions, see FAQ.