启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置示例和配置参数说明了解Tablestore Sink Connector的相关配置。
配置示例
当从Kafka同步数据到数据表或者时序表时配置项不同,且不同工作模式下相应配置文件的示例不同。此处以同步数据到数据表中为例介绍配置示例。同步数据到时序表的配置示例中需要增加时序相关配置项。
- .properties格式配置文件的示例,适用于standalone模式。
# 设置连接器名称。 name=tablestore-sink # 指定连接器类。 connector.class=TableStoreSinkConnector # 设置最大任务数。 tasks.max=1 # 指定导出数据的Kafka的Topic列表。 topics=test # 以下为Tablestore连接参数的配置。 # Tablestore实例的Endpoint。 tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # 填写AccessKey ID和AccessKey Secret。 tablestore.access.key.id=xxx tablestore.access.key.secret=xxx # Tablestore实例名称。 tablestore.instance.name=xxx # 以下为数据映射相关的配置。 # 指定Kafka Record的解析器。 # 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。 event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser # 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。 # topics.assign.tables配置的优先级更高,如果配置了topics.assign.tables,则忽略table.name.format的配置。 # 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。 table.name.format=<topic> # 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。 # 如果缺省,则采取table.name.format的配置。 # topics.assign.tables=test:test_kafka # 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。 # kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。 # record_key表示以Record Key中的字段作为数据表的主键。 # record_value表示以Record Value中的字段作为数据表的主键。 primarykey.mode=kafka # 定义导入数据表的主键列名和数据类型。 # 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。 # 其中<tablename>为数据表名称的占位符。 # 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。 # 当主键模式为record_key或record_value时,必须配置以下两个属性。 # tablestore.test.primarykey.name=A,B # tablestore.test.primarykey.type=string,integer # 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。 # 默认值为空,使用Record Value中的所有字段作为数据表的属性列。 # 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。 # 其中<tablename>为数据表名称的占位符。 # tablestore.test.columns.whitelist.name=A,B # tablestore.test.columns.whitelist.type=string,integer # 以下为写入Tablestore相关的配置。 # 指定写入模式,可选值包括put和update,默认值为put。 # put表示覆盖写。 # update表示更新写。 insert.mode=put # 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。 insert.order.enable=true # 是否自动创建目标表,默认值为false。 auto.create=false # 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。 # none表示不允许进行任何删除。 # row表示允许删除行。 # column表示允许删除属性列。 # row_and_column表示允许删除行和属性列。 delete.mode=none # 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。 buffer.size=1024 # 写入数据表时的回调线程数,默认值为核数+1。 # max.thread.count= # 写入数据表时的最大请求并发数,默认值为10。 max.concurrency=10 # 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。 bucket.count=3 # 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。 flush.Interval=10000 # 以下为脏数据处理相关配置。 # 在解析Kafka Record或者写入数据表时可能发生错误,您可以可通过以下配置进行处理。 # 指定容错能力,可选值包括none和all,默认值为none。 # none表示任何错误都将导致Sink Task立即失败。 # all表示跳过产生错误的Record,并记录该Record。 runtime.error.tolerance=none # 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。 # ignore表示忽略所有错误。 # kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。 # tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。 runtime.error.mode=ignore # 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。 # runtime.error.bootstrap.servers=localhost:9092 # runtime.error.topic.name=errors # 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。 # runtime.error.table.name=errors
- .json格式配置文件的示例,适用于distributed模式。
{ "name": "tablestore-sink", "config": { // 指定连接器类。 "connector.class":"TableStoreSinkConnector", // 设置最大任务数。 "tasks.max":"3", // 指定导出数据的Kafka的Topic列表。 "topics":"test", // 以下为Tablestore连接参数的配置。 // Tablestore实例的Endpoint。 "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com", // 填写AccessKey ID和AccessKey Secret。 "tablestore.access.key.id":"xxx", "tablestore.access.key.secret":"xxx", // Tablestore实例名称。 "tablestore.instance.name":"xxx", // 以下为数据映射相关的配置。 // 指定Kafka Record的解析器。 // 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。 "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser", // 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。 // topics.assign.tables配置的优先级更高。如果配置了topics.assign.tables,则忽略table.name.format的配置。 // 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。 "table.name.format":"<topic>", // 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。 // 如果缺省,则采取table.name.format的配置。 // "topics.assign.tables":"test:test_kafka", // 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。 // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。 // record_key表示以Record Key中的字段作为数据表的主键。 // record_value表示以Record Value中的字段作为数据表的主键。 "primarykey.mode":"kafka", // 定义导入数据表的主键列名和数据类型。 // 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。 // 其中<tablename>为数据表名称的占位符。 // 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。 // 当主键模式为record_key或record_value时,必须配置以下两个属性。 // "tablestore.test.primarykey.name":"A,B", // "tablestore.test.primarykey.type":"string,integer", // 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。 // 默认值为空,使用Record Value中的所有字段作为数据表的属性列。 // 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。 // 其中<tablename>为数据表名称的占位符。 // "tablestore.test.columns.whitelist.name":"A,B", // "tablestore.test.columns.whitelist.type":"string,integer", // 以下为写入Tablestore相关的配置。 // 指定写入模式,可选值包括put和update,默认值为put。 // put表示覆盖写。 // update表示更新写。 "insert.mode":"put", // 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。 "insert.order.enable":"true", // 是否自动创建目标表,默认值为false。 "auto.create":"false", // 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。 // none表示不允许进行任何删除。 // row表示允许删除行。 // column表示允许删除属性列。 // row_and_column表示允许删除行和属性列。 "delete.mode":"none", // 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。 "buffer.size":"1024", // 写入数据表时的回调线程数,默认值为核数+1。 // "max.thread.count": // 写入数据表时的最大请求并发数,默认值为10。 "max.concurrency":"10", // 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。 "bucket.count":"3", // 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。 "flush.Interval":"10000", // 以下为脏数据处理相关配置。 // 在解析Kafka Record或者写入数据表时可能发生错误,您可以通过以下配置进行处理。 // 指定容错能力,可选值包括none和all,默认值为none。 // none表示任何错误都将导致Sink Task立即失败。 // all表示跳过产生错误的Record,并记录该Record。 "runtime.error.tolerance":"none", // 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。 // ignore表示忽略所有错误。 // kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。 // tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。 "runtime.error.mode":"ignore" // 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。 // "runtime.error.bootstrap.servers":"localhost:9092", // "runtime.error.topic.name":"errors", // 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。 // "runtime.error.table.name":"errors", }
配置项说明
配置文件中的配置项说明请参见下表。其中时序相关配置项只有同步数据到时序表时才需要配置。
分类 | 配置项 | 类型 | 是否必选 | 示例值 | 描述 |
---|---|---|---|---|---|
Kafka Connect常见配置 | name | string | 是 | tablestore-sink | 连接器(Connector)名称。连接器名称必须唯一。 |
connector.class | class | 是 | TableStoreSinkConnector | 连接器的Java类。
如果您要使用该连接器,请在connector.class配置项中指定Connector类的名称,支持配置为Connector类的全名(com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector)或别名(TableStoreSinkConnector)。
|
|
tasks.max | integer | 是 | 3 | 连接器支持创建的最大任务数。
如果连接器无法达到此并行度级别,则可能会创建较少的任务。 |
|
key.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆盖worker设置的默认key转换器。 | |
value.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆盖worker设置的默认value转换器。 | |
topics | list | 是 | test | 连接器输入的Kafka Topic列表,多个Topic之间以英文逗号(,)分隔。
您必须为连接器设置topics来控制连接器输入的Topic。 |
|
连接器Connection配置 | tablestore.endpoint | string | 是 | https://xxx.xxx.ots.aliyuncs.com | Tablestore实例的服务地址。更多信息,请参见服务地址。 |
tablestore.mode | string | 是 | timeseries | 根据数据同步到的表类型选择模式。取值范围如下:
|
|
tablestore.access.key.id | string | 是 | LTAn******************** | 登录账号的AccessKey ID和AccessKey Secret,获取方式请参见获取AccessKey。 | |
tablestore.access.key.secret | string | 是 | zbnK************************** | ||
tablestore.auth.mode | string | 是 | aksk | 设置认证方式。取值范围如下:
|
|
tablestore.instance.name | string | 是 | myotstest | Tablestore实例的名称。 | |
连接器Data Mapping配置 | event.parse.class | class | 是 | DefaultEventParser | 消息解析器的Java类,默认值为DefaultEventParser。解析器用于从Kafka Record中解析出数据表的主键列和属性列。
注意 Tablestore对列值大小有限制。string类型和binary类型的主键列列值限制均为1 KB,属性列列值限制均为2 MB。更多信息,请参见通用限制。
如果数据类型转换后列值超出对应限制,则将该Kafka Record作为脏数据处理。 如果使用默认的DefaultEventParser解析器,Kafka Record的Key或Value必须为Kafka Connect的Struct或Map类型。Struct中选择的字段必须为支持的数据类型,字段会根据数据类型映射表转换为Tablestore数据类型写入数据表。Map中的值类型也必须为支持的数据类型,支持的数据类型同Struct,最终会被转换为binary类型写入数据表。Kafka和Tablestore的数据类型映射关系请参见附录:Kafka和Tablestore数据类型映射。 如果Kafka Record为不兼容的数据格式,则您可以通过实现com.aliyun.tablestore.kafka.connect.parsers.EventParser定义的接口来自定义解析器。 |
table.name.format | string | 否 | kafka_<topic> | 目标数据表名称的格式字符串,默认值为<topic>。字符串中可包含<topic>作为原始Topic的占位符。例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则映射到Tablestore的表名为kafka_test。
此配置项的优先级低于topics.assign.tables配置项,如果配置了topics.assign.tables,则忽略table.name.format的配置。 |
|
topics.assign.tables | list | 是 | test:destTable | 指定topic与Tablestore表之间的映射关系,格式为<topic_1>:<tablename_1>,<topic_2>:<tablename_2> 。多个映射关系之间以英文逗号(,)分隔,例如test:destTable表示将Topic名为test的消息记录写入数据表destTable中。
此配置项的优先级高于table.name.format配置项,如果配置了topics.assign.tables,则忽略table.name.format的配置。 |
|
primarykey.mode | string | 否 | kafka | 数据表的主键模式。取值范围如下:
请配合tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type使用。此配置项不区分大小写。 |
|
tablestore.<tablename>.primarykey.name | list | 否 | A,B | 数据表的主键列名,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以英文逗号(,)分隔。
主键模式不同时,主键列名的配置不同。
Tablestore数据表的主键列是有顺序的,此属性的配置应注意主键列名顺序,例如PRIMARY KEY(A, B, C)与PRIMARY KEY(A, C, B)是不同的两个主键结构。 |
|
tablestore.<tablename>.primarykey.type | list | 否 | string, integer | 数据表的主键列数据类型,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以英文逗号(,)分隔,顺序必须与tablestore.<tablename>.primarykey.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary和auto_increment。
主键模式不同时,主键数据类型的配置不同。
|
|
tablestore.<tablename>.columns.whitelist.name | list | 否 | A,B | 数据表的属性列白名单中属性列名称,其中<tablename>为数据表名称的占位符,以英文逗号(,)分隔。
如果配置为空,则使用Record Value中的所有字段(Struct类型)或者键(Map类型)作为数据表的属性列,否则用于过滤得到所需属性列。 |
|
tablestore.<tablename>.columns.whitelist.type | list | 否 | string, integer | 数据表的属性列白名单中属性列数据类型,其中<tablename>为数据表名称的占位符,以英文逗号(,)分隔,顺序必须与tablestore.<tablename>.columns.whitelist.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary、boolean和double。 | |
连接器Write配置 | insert.mode | string | 否 | put | 写入模式。取值范围如下:
此属性配置不区分大小写。 |
insert.order.enable | boolean | 否 | true | 写入数据表时是否需要保序。取值范围如下:
|
|
auto.create | boolean | 否 | false | 是否需要自动创建目标表,支持自动创建数据表或者时序表。取值范围如下:
|
|
delete.mode | string | 否 | none | 删除模式,仅当同步数据到数据表且主键模式为record_key时才有效。取值范围如下:
此属性配置不区分大小写。 删除操作与insert.mode的配置相关。更多信息,请参见附录:删除语义。 |
|
buffer.size | integer | 否 | 1024 | 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须是2的指数。 | |
max.thread.count | integer | 否 | 3 | 写入数据表时的回调线程数,默认值为CPU核数+1 。
|
|
max.concurrency | integer | 否 | 10 | 写入数据表时的最大请求并发数。 | |
bucket.count | integer | 否 | 3 | 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。 | |
flush.Interval | integer | 否 | 10000 | 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。 | |
连接器Runtime Error配置 | runtime.error.tolerance | string | 否 | none | 解析Kafka Record或者写入表时产生错误的处理策略。取值范围如下:
此属性配置不区分大小写。 |
runtime.error.mode | string | 否 | ignore | 解析Kafka Record或者写入表时产生错误,对错误的Record的处理策略。取值范围如下:
kafka模式下需要对Kafka Record的Header、Key和Value进行序列化转换,tablestore模式下需要对Kafka Record的Key和Value进行序列化转换,此处默认使用org.apache.kafka.connect.json.JsonConverter,并且配置schemas.enable为true,您可以通过JsonConverter反序列化得到原始数据。关于Converter的更多信息,请参见Kafka Converter。 |
|
runtime.error.bootstrap.servers | string | 否 | localhost:9092 | 用于记录运行错误的Kafka集群地址。 | |
runtime.error.topic.name | string | 否 | errors | 用于记录运行错误的Kafka Topic名称。 | |
runtime.error.table.name | string | 否 | errors | 用于记录运行错误的Tablestore表名称。 | |
时序相关配置项 | tablestore.timeseries.<tablename>.measurement | string | 是 | mName | 将JSON中的key值为指定值对应的value值作为_m_name字段写入对应时序表中。
如果设置此配置项为<topic>,则将kafka记录的topic作为_m_name字段写入时序表中。 配置项名称中<tablename>为时序表名称的占位符,请根据实际修改,例如时序表名称为test,则配置项名称为tablestore.timeseries.test.measurement。 |
tablestore.timeseries.<tablename>.dataSource | string | 是 | ds | 将JSON中的key值为ds对应的value值作为_data_source字段写入对应时序表中。
配置项名称中<tablename>为时序表名称的占位符,请根据实际修改。 |
|
tablestore.timeseries.<tablename>.tags | list | 是 | region,level | 将JSON中key值为region和level所对应的value值作为tags字段写入对应时序表中。
配置项名称中<tablename>为时序表名称的占位符,请根据实际修改。 |
|
tablestore.timeseries.<tablename>.time | string | 是 | timestamp | 将JSON中key值为timestamp对应的value值作为_time字段写入对应时序表中。
配置项名称中<tablename>为时序表名称的占位符,请根据实际修改。 |
|
tablestore.timeseries.<tablename>.time.unit | string | 是 | MILLISECONDS | tablestore.timeseries.<tablename>.time值的时间戳单位。取值范围为SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。
配置项名称中<tablename>为时序表名称的占位符,请根据实际修改。 |
|
tablestore.timeseries.<tablename>.field.name | list | 否 | cpu,io | 将JSON中key值为cpu和io的键值对作为_field_name以及_field_name的值写入对应时序表。
配置项名称中<tablename>为时序表名称的占位符,请根据实际修改。 |
|
tablestore.timeseries.<tablename>.field.type | string | 否 | double,integer | tablestore.timeseries.<tablename>.field.name中字段对应的数据类型。取值范围为double、integer、string、binary、boolean。多个数据类型之间用半角冒号(,)分隔。
配置项名称中<tablename>为时序表名称的占位符,请根据实际修改。 |
|
tablestore.timeseries.mapAll | boolean | 否 | false | 将输入JSON中的非主键字段和时间字段都作为field存储到时序表中。
当配置项取值为false时,tablestore.timeseries.<tablename>.field.name和tablestore.timeseries.<tablename>.field.type必填。 |
|
tablestore.timeseries.toLowerCase | boolean | 否 | true | 将field中的key(输入数据中非主键或者时间的key,或者配置在tablestore.timeseries.<tablename>.field.name中的key)转为小写写入时序表。 | |
tablestore.timeseries.rowsPerBatch | integer | 否 | 50 | 写入tablestore时,一次请求支持写入的最大行数。最大值为200,默认值为200。 |
附录:Kafka和Tablestore数据类型映射
Kafka和Tablestore数据类型映射关系请参见下表。
Kafka Schema Type | Tablestore数据类型 |
---|---|
STRING | STRING |
INT8、INT16、INT32、INT64 | INTEGER |
FLOAT32、FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
附录:删除语义
当同步数据到数据表且Kafka消息记录的value中存在空值时,根据写入模式(insert.mode)和删除模式(delete.mode)的不同设置,数据写入到表格存储数据表中的处理方式不同,详细说明请参见下表。
insert.mode | put | update | ||||||
---|---|---|---|---|---|---|---|---|
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
value为空值 | 覆盖写 | 删行 | 覆盖写 | 删行 | 脏数据 | 删行 | 脏数据 | 删行 |
value所有字段值均为空值 | 覆盖写 | 覆盖写 | 覆盖写 | 覆盖写 | 脏数据 | 脏数据 | 删列 | 删列 |
value部分字段值为空值 | 覆盖写 | 覆盖写 | 覆盖写 | 覆盖写 | 忽略空值 | 忽略空值 | 删列 | 删列 |