Before you start Tablestore Sink Connector, you need to specify key-value pairs to pass parameters to the Kafka Connect process. This topic provides configuration examples and parameter descriptions to show how to configure Tablestore Sink Connector.
Configuration examples
The configuration items vary depending on whether data is synchronized from Kafka to a data table or a time series table in Tablestore. The configuration examples of the configuration files vary based on the working mode. This section provides an example on how to configure data synchronization from Kafka to a data table in Tablestore. To synchronize data to a time series table in Tablestore, you need to add configuration items that are specific to data synchronization from Kafka to a time series table in Tablestore.
- The following sample code provides an example on how to configure the configuration
file in the .properties format for Tablestore Sink Connector in the standalone mode
# Specify the connector name. name=tablestore-sink # Specify the connector class. connector.class=TableStoreSinkConnector # Specify the maximum number of tasks. tasks.max=1 # Specify the list of Kafka topics from which data is exported. topics=test # Specify values for the following Tablestore connection parameters: # The endpoint of the Tablestore instance. tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # The AccessKey pair which consists of an AccessKey ID and an AccessKey secret. tablestore.access.key.id=xxx tablestore.access.key.secret=xxx # The name of the Tablestore instance. tablestore.instance.name=xxx # Specify the following data mapping parameters: # Specify the parser that is used to parse Kafka message records. # The DefaultEventParser of Tablestore Sink Connector supports the Struct and Map classes of Kafka Connect. You can also use a custom EventParser. event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser # Specify the format string for the name of the destination Tablestore table. <topic> can be used in the string as a placeholder for the topic from which you want to export data. # topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. # For example, if table.name.format is set to kafka_<topic> and the name of the Kafka topic from which you want to export data is test, Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore. table.name.format=<topic> # Specify the mapping between the Kafka topic and the destination Tablestore table. The value must be in the <topic>:<tablename> format. The topic name and table name are separated with a colon (:). If you want to specify multiple mappings, separate them with commas (,). # If the mapping is not specified, the configuration of table.name.format is used. # topics.assign.tables=test:test_kafka # Specify the primary key mode. Valid values: kafka, record_key, and record_value. Default value: kafka. # kafka: <connect_topic>_<connect_partition> and <connect_offset> are used as the primary key of the data table. # record_key: Fields in the record keys are used as the primary key of the data table. # record_value: Fields in the record values are used as the primary key of the data table. primarykey.mode=kafka # Specify the name and data type of the primary key column in the destination Tablestore data table. # The format of the primary key column name is tablestore.<tablename>.primarykey.name. The format of the data type of the primary key column is tablestore.<tablename>.primarykey.type. # <tablename> is a placeholder for the data table name. # If the primary key mode is kafka, you do not need to specify the name and data type of the primary key column. The default primary key column names {"topic_partition","offset"} and the default data types {string, integer} of the primary key columns are used. # If the primary key mode is record_key or record_value, you must specify the name and data type of the primary key column. # tablestore.test.primarykey.name=A,B # tablestore.test.primarykey.type=string,integer # Specify an attribute column whitelist to filter the fields in the record values to obtain the required attribute columns. # By default, the attribute column whitelist is empty. All fields in the record values are used as the attribute columns of the data table. # The format of the attribute column name is tablestore.<tablename>.columns.whitelist.name. The format of the data type of the attribute column is tablestore.<tablename>.columns.whitelist.type. # <tablename> is a placeholder for the data table name. # tablestore.test.columns.whitelist.name=A,B # tablestore.test.columns.whitelist.type=string,integer # Specify how to write Kafka message records to the destination Tablestore table: # Specify the write mode. Valid values: put and update. Default value: put. # put: Data in the destination table is overwritten by Kafka message records. # update: Data in the destination table is updated by Kafka message records. insert.mode=put # Specify whether to write data in the sequence that data is read. Default value: true. You can disable this option to improve the write performance. insert.order.enable=true # Specify whether to automatically create a destination table. Default value: false. auto.create=false # Specify the delete mode. Valid values: none, row, column, and row_and_column. Default value: none. # none: No delete operations can be performed. # row: Rows can be deleted. # column: Attribute columns can be deleted. # row_and_column: Rows and attribute columns can be deleted. delete.mode=none # Specify the maximum number of rows that can be included in the buffer queue in the memory when data is written to the data table. Default value: 1024. The value of this parameter must be an exponent of 2. buffer.size=1024 # Specify the number of callback threads that are used when data is written to the data table. Default value = Number of vCPUs + 1. # max.thread.count= # Specify the maximum number of concurrent write requests that can be sent to write data to the data table. Default value: 10. max.concurrency=10 # Specify the number of buckets to which data is written. Default value: 3. If you increase the value of this parameter, the concurrent write capability can be increased. However, you cannot set the value of this parameter to a value greater than the maximum number of concurrent write requests that you specified. bucket.count=3 # Specify the interval at which the buffer queue is refreshed when data is written to the data table. Unit: milliseconds. Default value: 10000. flush.Interval=10000 # Specify how to process dirty data: # An error may occur when the Kafka message records are parsed or written to the data table. You can specify the following two parameters to determine how to fix the error: # Specify the fault tolerance capability. Valid values: none and all. Default value: none. # none: An error causes the data import task that uses Tablestore Sink Connector to fail. # all: The message records for which errors are reported are skipped and logged. runtime.error.tolerance=none # Specify how dirty data is logged. Valid values: ignore, kafka, and tablestore. Default value: ignore. # ignore: All errors are ignored. # kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. # tablestore: The message records for which errors are reported and the error messages are stored in a different Tablestore data table. runtime.error.mode=ignore # If you set runtime.error.mode to kafka, you must specify the Kafka cluster address and the topic. # runtime.error.bootstrap.servers=localhost:9092 # runtime.error.topic.name=errors # If you set runtime.error.mode to tablestore, you must specify the name of the Tablestore data table. # runtime.error.table.name=errors
- The following sample code provides an example on how to configure the configuration
file in the .json format for Tablestore Sink Connector in the distributed mode:
{ "name": "tablestore-sink", "config": { // Specify the connector class. "connector.class":"TableStoreSinkConnector", // Specify the maximum number of tasks. "tasks.max":"3", // Specify the list of Kafka topics from which you want to export data. "topics":"test", // Specify values for the following Tablestore connection parameters: // The endpoint of the Tablestore instance. "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com", // The AccessKey pair which consists of an AccessKey ID and an AccessKey secret. "tablestore.access.key.id":"xxx", "tablestore.access.key.secret":"xxx", // The name of the Tablestore instance. "tablestore.instance.name":"xxx", // Specify the following data mapping parameters: // Specify the parser that is used to parse Kafka message records. // The DefaultEventParser of Tablestore Sink Connector supports the Struct and Map classes of Kafka Connect. You can also use a custom EventParser. "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser", // Specify the format string for the name of the destination Tablestore table. <topic> can be used in the string as a placeholder for the topic from which you want to export data. // topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. // For example, if table.name.format is set to kafka_<topic> and the name of the Kafka topic from which you want to export data is test, Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore. "table.name.format":"<topic>", // Specify the mapping between the Kafka topic and the destination Tablestore table. The value must be in the <topic>:<tablename> format. The topic name and table name are separated with a colon (:). If you want to specify multiple mappings, separate them with commas (,). // If the mapping is not specified, the configuration of table.name.format is used. // "topics.assign.tables":"test:test_kafka", // Specify the primary key mode. Valid values: kafka, record_key, and record_value. Default value: kafka. // kafka: <connect_topic>_<connect_partition> and <connect_offset> are used as the primary key of the data table. // record_key: Fields in the record keys are used as the primary key of the data table. // record_value: Fields in the record values are used as the primary key of the data table. "primarykey.mode":"kafka", // Specify the name and data type of the primary key column in the destination Tablestore data table. // The format of the primary key column name is tablestore.<tablename>.primarykey.name. The format of the data type of the primary key column is tablestore.<tablename>.primarykey.type. // <tablename> is a placeholder for the data table name. // If the primary key mode is kafka, you do not need to specify the name and data type of the primary key column. The default primary key column names {"topic_partition","offset"} and the default data types {string, integer} of the primary key columns are used. // If the primary key mode is record_key or record_value, you must specify the name and data type of the primary key column. // "tablestore.test.primarykey.name":"A,B", // "tablestore.test.primarykey.type":"string,integer", // Specify an attribute column whitelist to filter the fields in the record values to obtain the required attribute columns. // By default, the attribute column whitelist is empty. All fields in the record values are used as the attribute columns of the data table. // The format of the attribute column name is tablestore.<tablename>.columns.whitelist.name. The format of the data type of the attribute column is tablestore.<tablename>.columns.whitelist.type. // <tablename> is a placeholder for the data table name. // "tablestore.test.columns.whitelist.name":"A,B", // "tablestore.test.columns.whitelist.type":"string,integer", // Specify how to write Kafka message records to the destination Tablestore table: // Specify the write mode. Valid values: put and update. Default value: put. // put: Data in the table is overwritten by Kafka message records. // update: Data in the table is updated by Kafka message records. "insert.mode":"put", // Specify whether to write data in the sequence that data is read. Default value: true. You can disable this option to improve the write performance. "insert.order.enable":"true", // Specify whether to automatically create a destination table. Default value: false. "auto.create":"false", // Specify the delete mode. Valid values: none, row, column, and row_and_column. Default value: none. // none: No delete operations can be performed. // row: Rows can be deleted. // column: Attribute columns can be deleted. // row_and_column: Rows and attribute columns can be deleted. "delete.mode":"none", // Specify the maximum number of rows that can be included in the buffer queue in the memory when data is written to the data table. Default value: 1024. The value of this parameter must be an exponent of 2. "buffer.size":"1024", // Specify the number of callback threads that are used when data is written to the data table. Default value = Number of vCPUs + 1. // "max.thread.count": // Specify the maximum number of concurrent write requests that can be sent to write data to the data table. Default value: 10. "max.concurrency":"10", // Specify the number of buckets to which data is written. Default value: 3. You can increase the value of this parameter to increase the concurrent write capability. However, you cannot set the value of this parameter to a value greater than the maximum number of concurrent write requests that you specified. "bucket.count":"3", // Specify the interval at which the buffer queue is refreshed when data is written to the data table. Unit: milliseconds. Default value: 10000. "flush.Interval":"10000", // Specify how to process dirty data: // An error may occur when the Kafka message records are parsed or written to the data table. You can specify the following two parameters to determine how to fix the error: // Specify the fault tolerance capability. Valid values: none and all. Default value: none. // none: An error causes the data import task that uses Tablestore Sink Connector to fail. // all: The message records for which errors are reported are skipped and logged. "runtime.error.tolerance":"none", // Specify how dirty data is logged. Valid values: ignore, kafka, and tablestore. Default value: ignore. // ignore: All errors are ignored. // kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. // tablestore: The message records for which errors are reported and the error messages are stored in a different Tablestore data table. "runtime.error.mode":"ignore" // If you set runtime.error.mode to kafka, you must specify the Kafka cluster address and the topic. // "runtime.error.bootstrap.servers":"localhost:9092", // "runtime.error.topic.name":"errors", // If you set runtime.error.mode to tablestore, you must specify the name of the Tablestore data table. // "runtime.error.table.name":"errors", }
Parameters
The following table describes the parameters in the configuration file. You need to configure time series-related parameters only when you synchronize data from Kafka to a time series table in Tablestore.
Category | Parameter | Type | Required | Example | Description |
---|---|---|---|---|---|
Kafka Connect parameters | name | string | Yes | tablestore-sink | The name of the connector. The connector name must be unique. |
connector.class | class | Yes | TableStoreSinkConnector | The Java class of the connector.
If you want to use the connector, specify the connector class by using connector.class.
You can set connector.class to the full name or alias of the connector class. The
full name of the connector class is com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector
and the alias of the connector class is TableStoreSinkConnector.
|
|
tasks.max | integer | Yes | 3 | The maximum number of tasks that can be created for the connector.
If the maximum number of tasks fail to be created, fewer tasks may be created. |
|
key.converter | string | No | org.apache.kafka.connect.json.JsonConverter | The key converter that is used to replace the default key converter that is specified in the worker configuration file. | |
value.converter | string | No | org.apache.kafka.connect.json.JsonConverter | The value converter that is used to replace the default value converter that is specified in the worker configuration file. | |
topics | list | Yes | test | The list of Kafka topics that can be specified for the connector. Separate multiple
Kafka topics with commas (,).
You must specify topics to manage topics that are specified for the connector. |
|
Connector connection parameters | tablestore.endpoint | string | Yes | https://xxx.xxx.ots.aliyuncs.com | The endpoint of the Tablestore instance. For more information, see Endpoint. |
tablestore.mode | string | Yes | timeseries | The type of the destination table. Default value: normal. Valid values:
|
|
tablestore.access.key.id | string | Yes | LTAn******************** | The AccessKey ID and AccessKey secret of your account. For more information about how to obtain the AccessKey ID and the AccessKey secret, see Obtain an AccessKey pair. | |
tablestore.access.key.secret | string | Yes | zbnK************************** | ||
tablestore.auth.mode | string | Yes | aksk | The authentication mode. Default value: aksk. Valid values:
|
|
tablestore.instance.name | string | Yes | myotstest | The name of the Tablestore instance. | |
Data mapping parameters of the connector | event.parse.class | class | Yes | DefaultEventParser | The Java class of the EventParser. Default value: DefaultEventParser. The parser parses
Kafka message records to obtain the primary key column and attribute column of the
data table.
Notice Tablestore provides limits on the size of column values. The values of primary key
columns of the string or binary type cannot exceed 1 KB in size, and the values of
attribute columns cannot exceed 2 MB in size. For more information, see General limits.
If the column values exceed the limits after the data types are converted, the Kafka message records are processed as dirty data. To use DefaultEventParser, the keys or values of the Kafka message records must be of the Struct or Map class of Kafka Connect. The selected fields in Struct must be of data types that are supported by Tablestore Sink Connector. The fields are converted to data of the Tablestore data types based on the data type mapping table and then written to the data table. The data types of the values in Map must be the data types that are supported by Tablestore Sink Connector. Tablestore Sink Connector supports the same data types in Struct and Map. The values in Map are converted to data of the binary type and then written to the data table. For more information about the data type mappings between Kafka and Tablestore, see Appendix: Data type mappings between Kafka and Tablestore. If the data types of Kafka message records are incompatible with Tablestore Sink Connector, you can call the operation that is defined by com.aliyun.tablestore.kafka.connect.parsers.EventParser to configure the parser. |
table.name.format | string | No | kafka_<topic> | The format string for the name of the destination Tablestore data table. Default value:
<topic>. <topic> can be used in the string as a placeholder for the topic from which
you want to export data. For example, if table.name.format is set to kafka_<topic>,
and the name of the Kafka topic from which you want to export data is test, the Kafka
message records from the test topic are mapped to the table named kafka_test in Tablestore.
topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. |
|
topics.assign.tables | list | Yes | test:destTable | Specifies the mapping between the topic and the destination Tablestore table in the
<topic_1>:<tablename_1>,<topic_2>:<tablename_2> format. Separate multiple mappings with commas (,). For example, test:destTable specifies
that the message records from the topic named test are written to the data table named
destTable.
topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. |
|
primarykey.mode | string | No | kafka | The primary key mode of the data table. Valid values:
Configure this parameter together with tablestore.<tablename>.primarykey.name and tablestore.<tablename>.primarykey.type. The value of this parameter is not case-sensitive. |
|
tablestore.<tablename>.primarykey.name | list | No | A,B | The primary key column name of the data table. <tablename> is a placeholder for the
data table name. The value of this parameter contains one to four primary key column
names that are separated with commas (,).
The primary key column name varies with the primary key mode.
The primary key columns of the Tablestore data table are sequential. You need to take note of the sequence of primary key columns when you define tablestore.<tablename>.primarykey.name. For example, PRIMARY KEY (A, B, C) and PRIMARY KEY (A, C, B) have different schemas. |
|
tablestore.<tablename>.primarykey.type | list | No | string, integer | The data type of the primary key column in the data table. <tablename> is a placeholder
for the name of the data table. The value of this parameter contains one to four data
types of the primary key columns. Separate data types of the primary key columns with
commas (,). The sequence of data types of the primary key columns must correspond
to the sequence of the primary key column names that are specified by tablestore.<tablename>.primarykey.name.
The value of this parameter is not case-sensitive. Valid values: integer, string,
binary, and auto_increment.
The data type of the primary key column varies with the primary key mode.
|
|
tablestore.<tablename>.columns.whitelist.name | list | No | A,B | The name of the attribute column in the attribute column whitelist. <tablename> is
a placeholder for the data table name. Separate attribute column names with commas
(,).
If you do not configure this parameter, all fields of the Struct class or all keys of the Map class in the record values are used as the attribute columns of the data table. If you configure this parameter, the fields in the record values are filtered based on the specified attribute column whitelist to obtain the required attribute columns. |
|
tablestore.<tablename>.columns.whitelist.type | list | No | string, integer | The data type of the attribute column in the attribute column whitelist. <tablename> is a placeholder for the data table name. Separate data types of the attribute columns with commas (,). The sequence of data types of the attribute columns must correspond to the sequence of the attribute column names that are specified by <tablename>.columns.whitelist.name. The value of this parameter is not case-sensitive. Valid values: integer, string, binary, boolean, and double. | |
Connector write parameters | insert.mode | string | No | put | The write mode. Default value: put. Valid values:
The value of this parameter is not case-sensitive. |
insert.order.enable | boolean | No | true | Specifies whether data is written to the data table in the sequence that data is read.
Default value: true. Valid values:
|
|
auto.create | boolean | No | false | Specifies whether to automatically create a destination table. A data table or a time
series table can be automatically created. Default value: false. Valid values:
|
|
delete.mode | string | No | none | The delete mode. The configuration of this parameter takes effect only when data is
synchronized to a data table and the primary key mode is set to record_key. Default
value: none. Valid values:
The value of this parameter is not case-sensitive. This parameter is specified based on the value of the insert.mode parameter. For more information, see Appendix: Delete syntax. |
|
buffer.size | integer | No | 1024 | The maximum number of rows that can be included in the buffer queue in the memory when data is written to the data table. Default value: 1024. The value of this parameter must be an exponent of 2. | |
max.thread.count | integer | No | 3 | The number of callback threads that are used when data is written to the data table.
Default value = Number of vCPUs + 1 .
|
|
max.concurrency | integer | No | 10 | The maximum number of concurrent write requests that can be sent to write data to the data table. | |
bucket.count | integer | No | 3 | The number of buckets to which data is written. Default value: 3. If you increase the value of this parameter, the concurrent write capability can be increased. However, you cannot set the value of this parameter to a value greater than the maximum number of concurrent write requests that you specified. | |
flush.Interval | integer | No | 10000 | The interval at which the buffer queue is refreshed when data is written to the data table. Unit: milliseconds. Default value: 10000. | |
Connector Runtime Error parameters | runtime.error.tolerance | string | No | none | The error handling policy that is used if an error occurs when the Kafka message records
are parsed or written to the table. Default value: none. Valid values:
The value of this parameter is not case-sensitive. |
runtime.error.mode | string | No | ignore | Specifies how to process the message records for which errors are reported when Kafka
message records are parsed or written to the table. Default value: ignore. Valid values:
If runtime.error.mode is set to kafka, you need to serialize the headers, keys, and values of the Kafka message records. If runtime.error.mode is set to tablestore, you need to serialize the keys and values of the Kafka message records. By default, org.apache.kafka.connect.json.JsonConverter is used to serialize data and schemas.enable is set to true. You can use JsonConverter to deserialize data to obtain the original data. For more information about Converter, see Kafka Converter. |
|
runtime.error.bootstrap.servers | string | No | localhost:9092 | The address of the Kafka cluster where the message records for which errors are reported and the error messages are stored. | |
runtime.error.topic.name | string | No | errors | The name of the Kafka topic that stores the message records for which errors are reported and the error messages. | |
runtime.error.table.name | string | No | errors | The name of the Tablestore table that stores the message records for which errors are reported and the error messages. | |
Time series-related parameters | tablestore.timeseries.<tablename>.measurement | string | Yes | mName | Specifies that the values that correspond to the specified key in JSON formatted data
are written to the time series table as the values of the _m_name field.
If tablestore.timeseries.<tablename>.measurement is set to <topic>, the values that correspond to the topic key of Kafka message records are written to the time series table as the values of the _m_name field. <tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. For example, if the name of the time series table is test, the parameter name is tablestore.timeseries.test.measurement. |
tablestore.timeseries.<tablename>.dataSource | string | Yes | ds | Specifies that the values that correspond to the ds key in JSON formatted data are
written to the time series table as the values of the _data_source field.
<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. |
|
tablestore.timeseries.<tablename>.tags | list | Yes | region,level | Specifies that the values that correspond to the region and level keys in JSON formatted
data are written to the time series table as the values of the tags field.
<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. |
|
tablestore.timeseries.<tablename>.time | string | Yes | timestamp | Specifies that the values that correspond to the timestamp key in JSON formatted data
are written to the time series table as the values of the _time field.
<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. |
|
tablestore.timeseries.<tablename>.time.unit | string | Yes | MILLISECONDS | The unit of the values of the tablestore.timeseries.<tablename>.time parameter. Valid
values: SECONDS, MILLISECONDS, MICROSECONDS, and NANOSECONDS.
<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. |
|
tablestore.timeseries.<tablename>.field.name | list | No | cpu,io | Specifies that the cpu and io keys in JSON formatted data are written to the time
series table as the names of _field_name and the values that correspond to the cpu
and io keys in JSON formatted data are written to the time series table as the values
of _field_name.
<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. |
|
tablestore.timeseries.<tablename>.field.type | string | No | double,integer | The data type of the field that is specified by tablestore.timeseries.<tablename>.field.name.
Valid values: double, integer, string, binary, and boolean. Separate multiple data
types with commas (,).
<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. |
|
tablestore.timeseries.mapAll | boolean | No | false | Specifies whether fields other than the primary key fields and time fields in JSON
formatted data are written to the time series table as fields.
If tablestore.timeseries.mapAll is set to false, you must configure the tablestore.timeseries.<tablename>.field.name and tablestore.timeseries.<tablename>.field.type parameters. |
|
tablestore.timeseries.toLowerCase | boolean | No | true | Specifies whether the keys in the fields are converted to lowercase letters before being written/and then written to the time series table. The keys in the fields are keys in the non-primary key fields or non-time fields, or keys specified in tablestore.timeseries.<tablename>.field.name. | |
tablestore.timeseries.rowsPerBatch | integer | No | 50 | The maximum number of rows that can be written to Tablestore in a request. The maximum and default values are 200. |
Appendix: Data type mappings between Kafka and Tablestore
The following table describes the mappings between the data types of Kafka and Tablestore.
Kafka schema type | Tablestore data type |
---|---|
STRING | STRING |
INT8, INT16, INT32, and INT64 | INTEGER |
FLOAT32 and FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
Appendix: Delete syntax
The following table describes the methods that are used to write data to a Tablestore data table based on the configurations of the write mode (insert.mode) and delete mode (delete.mode) when message records contain empty values and data is synchronized from Kafka to a data table in Tablestore.
insert.mode | put | update | ||||||
---|---|---|---|---|---|---|---|---|
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
Empty values | Overwrite | Delete rows | Overwrite | Delete rows | Dirty data | Delete rows | Dirty data | Delete rows |
All empty fields in values | Overwrite | Overwrite | Overwrite | Overwrite | Dirty data | Dirty data | Delete columns | Delete columns |
Some empty fields in values | Overwrite | Overwrite | Overwrite | Overwrite | Ignore empty values | Ignore empty values | Delete columns | Delete columns |