Tablestore Sink Connector会根据订阅的主题轮询地从Kafka中拉取消息,并对消息记录进行解析,然后将数据批量导入到Tablestore的数据表。
前提条件
已安装Kafka,并且已启动ZooKeeper和Kafka。更多信息,请参见Kafka官方文档。
已开通表格存储服务,创建实例以及创建数据表。具体操作,请参见使用流程。
说明您也可以通过Tablestore Sink Connector自动创建目标数据表,此时需要配置auto.create为true。
已获取AccessKey。具体操作,请参见获取AccessKey。
步骤一:部署Tablestore Sink Connector
通过以下任意一种方式获取Tablestore Sink Connector。
通过GitHub下载源码并编译。源码的GitHub路径为Tablestore Sink Connector源码。
通过Git工具执行以下命令下载Tablestore Sink Connector源码。
git clone https://github.com/aliyun/kafka-connect-tablestore.git
进入到下载的源码目录后,执行以下命令进行Maven打包。
mvn clean package -DskipTests
编译完成后,生成的压缩包(例如kafka-connect-tablestore-1.0.jar)会存放在target目录。
直接下载编译完成的kafka-connect-tablestore压缩包。
将压缩包复制到各个节点的$KAFKA_HOME/libs目录下。
步骤二:启动Tablestore Sink Connector
Tablestore Sink Connector具有standalone模式和distributed模式两种工作模式。请根据实际选择。
standalone模式的配置步骤如下:
根据实际修改worker配置文件connect-standalone.properties和connector配置文件connect-tablestore-sink-quickstart.properties。
worker配置文件connect-standalone.properties的配置示例
worker配置中包括Kafka连接参数、序列化格式、提交偏移量的频率等配置项。此处以Kafka官方示例为例介绍。更多信息,请参见Kafka Connect。
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
connector配置文件connect-tablestore-sink-quickstart.properties的配置示例
connector配置中包括连接器类、表格存储连接参数、数据映射等配置项。更多信息,请参见配置说明。
# 设置连接器名称。 name=tablestore-sink # 指定连接器类。 connector.class=TableStoreSinkConnector # 设置最大任务数。 tasks.max=1 # 指定导出数据的Kafka的Topic列表。 topics=test # 以下为Tablestore连接参数配置。 # Tablestore实例的Endpoint。 tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # 填写AccessKey ID和AccessKey Secret。 tablestore.access.key.id =xxx tablestore.access.key.secret=xxx # Tablestore实例名称。 tablestore.instance.name=xxx # 用于指定表格存储目标表名称的格式字符串,其中<topic>作为原始Topic的占位符。默认值为<topic>。 # Examples: # table.name.format=kafka_<topic>,主题为test的消息记录将写入表名为kafka_test的数据表。 # table.name.format= # 主键模式,默认值为kafka。 # 将以<topic>_<partition>(Kafka主题和分区,用"_"分隔)和<offset>(消息记录在分区中的偏移量)作为Tablestore数据表的主键。 # primarykey.mode= # 自动创建目标表,默认值为false。 auto.create=true
进入到$KAFKA_HOME目录后,执行以下命令启动standalone模式。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties
distributed模式的配置步骤如下:
根据实际修改worker配置文件connect-distributed.properties。
worker配置中包括Kafka连接参数、序列化格式、提交偏移量的频率等配置项,还包括存储各connectors相关信息的Topic,建议您提前手动创建相应Topic。此处以Kafka官方示例为例介绍。更多信息,请参见Kafka Connect。
offset.storage.topic:用于存储各connectors相关offset的Compact Topic。
config.storage.topic:用于存储connector和task相关配置的Compact Topic,此Topic的Parition数必须设置为1。
status.storage.topic:用于存储Kafka Connect状态信息的Compact Topic。
## # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended # to be used with the examples, and some settings may differ from those used in a production system, especially # the `bootstrap.servers` and those specifying replication factors. # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. bootstrap.servers=localhost:9092 # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs group.id=connect-cluster # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #offset.storage.partitions=25 # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. config.storage.topic=connect-configs config.storage.replication.factor=1 # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. status.storage.topic=connect-status status.storage.replication.factor=1 #status.storage.partitions=5 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # These are provided to inform the user about the presence of the REST host and port configs # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests. #rest.host.name= #rest.port=8083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
进入到$KAFKA_HOME目录后,执行以下命令启动distributed模式。
重要您需要在每个节点上均启动worker进程。
bin/connect-distributed.sh config/connect-distributed.properties
通过REST API管理connectors。更多信息,请参见REST API。
在config路径下创建connect-tablestore-sink-quickstart.json文件并填写以下示例内容。
connector配置文件以JSON格式指定参数键值对,包括连接器类、表格存储连接参数、数据映射等配置项。更多信息,请参见配置说明。
{ "name": "tablestore-sink", "config": { "connector.class":"TableStoreSinkConnector", "tasks.max":"1", "topics":"test", "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com", "tablestore.access.key.id":"xxx", "tablestore.access.key.secret":"xxx", "tablestore.instance.name":"xxx", "table.name.format":"<topic>", "primarykey.mode":"kafka", "auto.create":"true" } }
执行以下命令启动一个Tablestore Sink Connector。
curl -i -k -H "Content-type: application/json" -X POST -d @config/connect-tablestore-sink-quickstart.json http://localhost:8083/connectors
其中
http://localhost:8083/connectors
为Kafka REST服务的地址,请根据实际修改。
步骤三:生产新的记录
进入到$KAFKA_HOME目录后,执行以下命令启动一个控制台生产者。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
配置项说明请参见下表。
配置项
示例值
描述
--broker-list
localhost:9092
Kafka集群broker地址和端口。
--topic
test
主题名称。启动Tablestore Sink Connector时默认会自动创建Topic,您也可以选择手动创建。
向主题test中写入一些新的消息。
Struct类型消息
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" }, { "type":"string", "optional":false, "field":"product" }, { "type":"int64", "optional":false, "field":"quantity" }, { "type":"double", "optional":false, "field":"price" } ], "optional":false, "name":"record" }, "payload":{ "id":1, "product":"foo", "quantity":100, "price":50 } }
Map类型消息
{ "schema":{ "type":"map", "keys":{ "type":"string", "optional":false }, "values":{ "type":"int32", "optional":false }, "optional":false }, "payload":{ "id":1 } }
登录表格存储控制台查看数据。
表格存储实例中将自动创建一张数据表,表名为test,表中数据如下图所示。其中第一行数据为Map类型消息导入结果,第二行数据为Struct类型消息导入结果。