You can use the kafka-connect-tablestore package to synchronize data from Apache Kafka to a time series table in Tablestore. This topic describes how to configure data synchronization from Kafka to time series tables in Tablestore.
Prerequisites
Apache Kafka is installed and enabled, and ZooKeeper is enabled. For more information, see Kafka documentation.
The Tablestore service is activated, and an instance and a time series table are created. For more information, see Use Tablestore.
NoteYou can also use Tablestore Sink Connector to automatically create a destination time series table. To create this time series table, set auto.create to true.
An AccessKey pair is obtained. For more information, see Obtain an AccessKey pair.
Background information
Tablestore can store time series data and supports analytics on time series data. For more information, see Overview.
Step 1: Deploy Tablestore Sink Connector
Obtain the Tablestore Sink Connector package by using one of the following methods:
Download the source code from Tablestore Sink Connector source code on GitHub and compile the source code.
Run the following command to download the source code of Tablestore Sink Connector by using the Git tool:
git clone https://github.com/aliyun/kafka-connect-tablestore.git
Go to the directory where the source code that you downloaded is stored, and run the following command to package the source code by using Maven:
mvn clean package -DskipTests
After the compilation is complete, the generated package is stored in the target directory. The kafka-connect-tablestore-1.0.jar package is used as an example.
Download the kafka-connect-tablestore package that has been compiled.
Copy the package to the $KAFKA_HOME/libs directory on each node.
Step 2: Start Tablestore Sink Connector
Tablestore Sink Connector can work in the standalone or distributed mode. You can select a mode based on your business requirements.
To write time series data to Tablestore, message records in Kafka must be in the JSON format. Therefore, Jsonconverter is required to start Tablestore Sink Connector. You do not need to extract the schema and enter the key, but you must configure the configuration items in connect-standalone.properties or connect-distributed.properties. The following sample code shows how to configure the configuration items.
If you enter the key, you must configure key.converter and key.converter.schemas.enable based on the format of the key.
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
This section describes how to synchronize data to a time series table in Tablestore in the standalone mode. The procedure to synchronize data to a time series table in Tablestore in the distributed mode is similar to the procedure to synchronize data to a data table in Tablestore in the distributed mode. However, you need to modify the preceding configuration items in the worker configuration file connect-distributed.properties and modify time series-related configuration items in the connector configuration file connect-tablestore-sink-quickstart.json. For more information, see the configuration procedure in the distributed mode in Step 2: Start Tablestore Sink Connector.
To use Tablestore Sink Connector in the standalone mode, perform the following steps:
Modify the worker configuration file connect-standalone.properties and the connector configuration file connect-tablestore-sink-quickstart.properties based on your requirements.
Example on how to modify the worker configuration file connect-standalone.properties
The worker configuration file contains configuration items. These items include the Kafka connection parameters, the serialization format, and the frequency at which the offsets are committed. The following sample code is an example that is provided by Apache Kafka on how to modify the worker configuration file. For more information, see Kafka Connect.
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
Example on how to modify the connector configuration file connect-tablestore-sink-quickstart.properties
The connector configuration file contains configuration items. These items include the connector class, Tablestore connection parameters, and data mapping. For more information, see Configuration description.
# Specify the connector name. name=tablestore-sink # Specify the connector class. connector.class=TableStoreSinkConnector # Specify the maximum number of tasks. tasks.max=1 # Specify the list of Kafka topics from which you want to export data. topics=test # Specify values for the following Tablestore connection parameters: # The endpoint of the Tablestore instance. tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # The authentication mode. tablestore.auth.mode=aksk # The AccessKey ID and the AccessKey secret. If tablestore.auth.mode is set to aksk, you need to specify the AccessKey ID and the AccessKey secret. tablestore.access.key.id=xxx tablestore.access.key.secret=xxx # The name of the Tablestore instance. tablestore.instance.name=xxx ## The configuration items related to Security Token Service (STS) authentication. If STS authentication is used, the following configuration items must be specified. You must also specify ACCESS_ID and ACCESS_KEY in environment variables when STS authentication is used. #sts.endpoint= #region= #account.id= #role.name= # Specify the format string for the name of the destination Tablestore table. You can use <topic> in the string as a placeholder for the topic from which you want to export data. # topics.assign.tables is assigned higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. # For example, if table.name.format is set to kafka_<topic> and the name of the Kafka topic from which you want to export data is test, Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore. table.name.format=<topic> # Specify the mapping between the Kafka topic and the destination Tablestore table. The value must be in the <topic>:<tablename> format. The topic name and table name are separated with a colon (:). If you want to specify multiple mappings, separate multiple mappings with commas (,). # If the mapping is not specified, the configuration of table.name.format is used. # topics.assign.tables=test:test_kafka # Specify whether to automatically create a destination table. Default value: false. auto.create=true # Specify how to process dirty data: # An error may occur when the Kafka message records are parsed or written to the time series table. You can specify the following two parameters to determine how to fix the error: # Specify the fault tolerance capability. Valid values: none and all. Default value: none. # none: An error causes the data import task that uses Tablestore Sink Connector to fail. # all: The message records for which errors are reported are skipped and logged. runtime.error.tolerance=none # Specify how dirty data is logged. Valid values: ignore, kafka, and tablestore. Default value: ignore. # ignore: All errors are ignored. # kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. # tablestore: The message records for which errors are reported and the error messages are stored in a Tablestore data table. runtime.error.mode=ignore # If you set runtime.error.mode to kafka, you must specify the Kafka cluster address and the topic. # runtime.error.bootstrap.servers=localhost:9092 # runtime.error.topic.name=errors # If you set runtime.error.mode to tablestore, you must specify the name of the Tablestore data table. # runtime.error.table.name=errors ## The following configuration items are specific to data synchronization from Apache Kafka to time series tables in Tablestore. # The connector working mode. Default value: normal. tablestore.mode=timeseries # Mappings of the primary key field in the time series table. tablestore.timeseries.test.measurement=m tablestore.timeseries.test.dataSource=d tablestore.timeseries.test.tags=region,level # Mappings of the time field in the time series table. tablestore.timeseries.test.time=timestamp tablestore.timeseries.test.time.unit=MILLISECONDS # Specify whether to convert the column names of the time series data field to lowercase letters. Default value: true. The names of columns in the time series tables in the TimeSeries model do not support uppercase letters. If tablestore.timeseries.toLowerCase is set to false and the column name contains uppercase letters, an error is reported when data is written to the time series table. tablestore.timeseries.toLowerCase=true # Specify whether to store fields other than the primary key field and the time field as the time series data field in the time series table. Default value: true. If tablestore.timeseries.mapAll is set to false, only fields that are specified by using tablestore.timeseries.test.field.name are stored in the time series table as the time series data field. tablestore.timeseries.mapAll=true # Specify the name of the field that is contained in the time series data field. If you specify multiple fields that are contained in the time series data field, separate multiple field names with commas (,). tablestore.timeseries.test.field.name=cpu # Specify the type of the field that is contained in the time series data field. Valid values: double, integer, string, binary, and boolean. # If multiple fields are contained in the time series data field, the field types and the field names must be configured in pairs. Separate multiple field types with commas (,). tablestore.timeseries.test.field.type=double
Go to the $KAFKA_HOME directory and run the following command to enable the standalone mode:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties
Step 3: Generate message records
Go to the $KAFKA_HOME directory and run the following command to start a console producer client:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
The following table describes the parameters that you need to configure to start a console producer client.
Parameter
Example
Description
--broker-list
localhost:9092
The address and port of the broker in the Kafka cluster.
--topic
test
The name of the topic. By default, a topic is automatically created when you start Tablestore Sink Connector. You can also manually create a topic.
Write messages to the topic named test.
ImportantTo import data to a time series table, you must write data in the JSON format to the Kafka topic.
{"m":"cpu","d":"127.0.0.1","region":"shanghai","level":1,"timestamp":1638868699090,"io":5.5,"cpu":"3.5"}
Log on to the Tablestore console to view the data.