啟動Tablestore Sink Connector時,您需要通過索引值映射向Kafka Connect進程傳遞參數。通過本文您可以結合配置樣本和配置參數說明瞭解Tablestore Sink Connector的相關配置。
配置樣本
當從Kafka同步資料到資料表或者時序表時配置項不同,且不同工作模式下相應設定檔的樣本不同。此處以同步資料到資料表中為例介紹配置樣本。同步資料到時序表的配置樣本中需要增加時序相關配置項。
standalone模式配置樣本
如果使用的是standalone模式,您需要通過.properties格式檔案進行配置。配置樣本如下:
# 設定連接器名稱。
name=tablestore-sink
# 指定連接器類。
connector.class=TableStoreSinkConnector
# 設定最大任務數。
tasks.max=1
# 指定匯出資料的Kafka的Topic列表。
topics=test
# 以下為Tablestore串連參數的配置。
# Tablestore執行個體的Endpoint。
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填寫AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore執行個體名稱。
tablestore.instance.name=xxx
# 以下為資料對應相關的配置。
# 指定Kafka Record的解析器。
# 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
# 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。
# topics.assign.tables配置的優先順序更高,如果配置了topics.assign.tables,則忽略table.name.format的配置。
# 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。
table.name.format=<topic>
# 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。
# 如果預設,則採取table.name.format的配置。
# topics.assign.tables=test:test_kafka
# 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。
# kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。
# record_key表示以Record Key中的欄位作為資料表的主鍵。
# record_value表示以Record Value中的欄位作為資料表的主鍵。
primarykey.mode=kafka
# 定義匯入資料表的主鍵列名和資料類型。
# 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
# 其中<tablename>為資料表名稱的預留位置。
# 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。
# 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer
# 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。
# 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。
# 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
# 其中<tablename>為資料表名稱的預留位置。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer
# 以下為寫入Tablestore相關的配置。
# 指定寫入模式,可選值包括put和update,預設值為put。
# put表示覆蓋寫。
# update表示更新寫。
insert.mode=put
# 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。
insert.order.enable=true
# 是否自動建立目標表,預設值為false。
auto.create=false
# 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。
# none表示不允許進行任何刪除。
# row表示允許刪除行。
# column表示允許刪除屬性列。
# row_and_column表示允許刪除行和屬性列。
delete.mode=none
# 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。
buffer.size=1024
# 寫入資料表時的回調線程數,預設值為核心數+1。
# max.thread.count=
# 寫入資料表時的最大請求並發數,預設值為10。
max.concurrency=10
# 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
bucket.count=3
# 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
flush.Interval=10000
# 以下為髒資料處理相關配置。
# 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以可通過以下配置進行處理。
# 指定容錯能力,可選值包括none和all,預設值為none。
# none表示任何錯誤都將導致Sink Task立即失敗。
# all表示跳過產生錯誤的Record,並記錄該Record。
runtime.error.tolerance=none
# 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。
# ignore表示忽略所有錯誤。
# kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。
# tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。
runtime.error.mode=ignore
# 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors
# 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。
# runtime.error.table.name=errorsdistributed模式配置樣本
如果使用的是distributed模式,您需要通過JSON格式檔案進行配置。配置樣本如下:
{
"name": "tablestore-sink",
"config": {
// 指定連接器類。
"connector.class":"TableStoreSinkConnector",
// 設定最大任務數。
"tasks.max":"3",
// 指定匯出資料的Kafka的Topic列表。
"topics":"test",
// 以下為Tablestore串連參數的配置。
// Tablestore執行個體的Endpoint。
"tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
// 填寫AccessKey ID和AccessKey Secret。
"tablestore.access.key.id":"xxx",
"tablestore.access.key.secret":"xxx",
// Tablestore執行個體名稱。
"tablestore.instance.name":"xxx",
// 以下為資料對應相關的配置。
// 指定Kafka Record的解析器。
// 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。
"event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
// 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。
// topics.assign.tables配置的優先順序更高。如果配置了topics.assign.tables,則忽略table.name.format的配置。
// 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。
"table.name.format":"<topic>",
// 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。
// 如果預設,則採取table.name.format的配置。
// "topics.assign.tables":"test:test_kafka",
// 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。
// kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。
// record_key表示以Record Key中的欄位作為資料表的主鍵。
// record_value表示以Record Value中的欄位作為資料表的主鍵。
"primarykey.mode":"kafka",
// 定義匯入資料表的主鍵列名和資料類型。
// 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
// 其中<tablename>為資料表名稱的預留位置。
// 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。
// 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。
// "tablestore.test.primarykey.name":"A,B",
// "tablestore.test.primarykey.type":"string,integer",
// 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。
// 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。
// 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
// 其中<tablename>為資料表名稱的預留位置。
// "tablestore.test.columns.whitelist.name":"A,B",
// "tablestore.test.columns.whitelist.type":"string,integer",
// 以下為寫入Tablestore相關的配置。
// 指定寫入模式,可選值包括put和update,預設值為put。
// put表示覆蓋寫。
// update表示更新寫。
"insert.mode":"put",
// 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。
"insert.order.enable":"true",
// 是否自動建立目標表,預設值為false。
"auto.create":"false",
// 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。
// none表示不允許進行任何刪除。
// row表示允許刪除行。
// column表示允許刪除屬性列。
// row_and_column表示允許刪除行和屬性列。
"delete.mode":"none",
// 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。
"buffer.size":"1024",
// 寫入資料表時的回調線程數,預設值為核心數+1。
// "max.thread.count":
// 寫入資料表時的最大請求並發數,預設值為10。
"max.concurrency":"10",
// 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
"bucket.count":"3",
// 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
"flush.Interval":"10000",
// 以下為髒資料處理相關配置。
// 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以通過以下配置進行處理。
// 指定容錯能力,可選值包括none和all,預設值為none。
// none表示任何錯誤都將導致Sink Task立即失敗。
// all表示跳過產生錯誤的Record,並記錄該Record。
"runtime.error.tolerance":"none",
// 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。
// ignore表示忽略所有錯誤。
// kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。
// tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。
"runtime.error.mode":"ignore"
// 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。
// "runtime.error.bootstrap.servers":"localhost:9092",
// "runtime.error.topic.name":"errors",
// 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。
// "runtime.error.table.name":"errors",
}配置項說明
設定檔中的配置項說明請參見下表。其中時序相關配置項只有同步資料到時序表時才需要配置。
Kafka Connect常見配置
配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
name | string | 是 | tablestore-sink | 連接器(Connector)名稱。連接器名稱必須唯一。 |
connector.class | class | 是 | TableStoreSinkConnector | 連接器的Java類。 如果您要使用該連接器,請在 |
tasks.max | integer | 是 | 3 | 連接器支援建立的最大任務數。 如果連接器無法達到此並行度層級,則可能會建立較少的任務。 |
key.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆蓋worker設定的預設key轉換器。 |
value.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆蓋worker設定的預設value轉換器。 |
topics | list | 是 | test | 連接器輸入的Kafka Topic列表,多個Topic之間以半形逗號(,)分隔。 您必須為連接器設定topics來控制連接器輸入的Topic。 |
連接器Connection配置
配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
tablestore.endpoint | string | 是 | https://xxx.xxx.ots.aliyuncs.com | Tablestore執行個體的服務地址。 |
tablestore.mode | string | 是 | timeseries | 根據資料同步到的表類型選擇模式。取值範圍如下:
|
tablestore.access.key.id | string | 是 | LTAn******************** | 登入帳號的AccessKey ID和AccessKey Secret,擷取方式請參見建立AccessKey。 |
tablestore.access.key.secret | string | 是 | zbnK************************** | |
tablestore.auth.mode | string | 是 | aksk | 設定認證方式。取值範圍如下:
|
tablestore.instance.name | string | 是 | myotstest | Tablestore執行個體的名稱。 |
連接器Data Mapping配置
配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
event.parse.class | class | 是 | DefaultEventParser | 訊息解析器的Java類,預設值為DefaultEventParser。解析器用於從Kafka Record中解析出資料表的主鍵列和屬性列。 重要 Tablestore對列值大小有限制。string類型和binary類型的主鍵列值限制均為1 KB,屬性列列值限制均為2 MB。更多資訊,請參見使用限制。 如果資料類型轉換後列值超出對應限制,則將該Kafka Record作為髒資料處理。 如果使用預設的DefaultEventParser解析器,Kafka Record的Key或Value必須為Kafka Connect的Struct或Map類型。 Struct中選擇的欄位必須為支援的資料類型,欄位會根據資料類型映射表轉換為Tablestore資料類型寫入資料表;Map中的實值型別也必須為支援的資料類型,支援的資料類型與Struct相同,最終會被轉換為binary類型寫入資料表。 如果Kafka Record為不相容的資料格式,則您可以通過實現 |
table.name.format | string | 否 | kafka_<topic> | 目標資料表名稱的格式字串,預設值為 此配置項的優先順序低於 |
topics.assign.tables | list | 是 | test:destTable | 指定topic與Tablestore表之間的映射關係,格式為 此配置項的優先順序高於 |
primarykey.mode | string | 否 | kafka | 資料表的主鍵模式。取值範圍如下:
請配合 |
tablestore.<tablename>.primarykey.name | list | 否 | A,B | 資料表的主鍵列名,其中 主鍵模式不同時,主鍵列名的配置不同。
Tablestore資料表的主鍵列是有順序的,此屬性的配置應注意主鍵列名順序,例如PRIMARY KEY(A、B、C)與PRIMARY KEY(A、C、B)是不同的兩個主鍵結構。 |
tablestore.<tablename>.primarykey.type | list | 否 | string, integer | 資料表的主鍵列資料類型,其中 主鍵資料類型的配置與主鍵模式相關。
|
tablestore.<tablename>.columns.whitelist.name | list | 否 | A,B | 資料表的屬性列白名單中屬性列名稱,其中 如果配置為空白,則使用Record Value中的所有欄位(Struct類型)或者鍵(Map類型)作為資料表的屬性列,否則用於過濾得到所需屬性列。 |
tablestore.<tablename>.columns.whitelist.type | list | 否 | string, integer | 資料表的屬性列白名單中屬性列資料類型,其中 |
連接器Write配置
配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
insert.mode | string | 否 | put | 寫入模式。取值範圍如下:
此屬性配置不區分大小寫。 |
insert.order.enable | boolean | 否 | true | 寫入資料表時是否需要保持順序。取值範圍如下:
|
auto.create | boolean | 否 | false | 是否需要自動建立目標表,支援自動建立資料表或時序表。取值範圍如下:
|
delete.mode | string | 否 | none | 刪除模式,僅當同步資料到資料表且主鍵模式為record_key時才有效。取值範圍如下:
此屬性配置不區分大小寫。 刪除操作與 |
buffer.size | integer | 否 | 1024 | 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須是2的指數。 |
max.thread.count | integer | 否 | 3 | 寫入資料表時的回調線程數,預設值為 |
max.concurrency | integer | 否 | 10 | 寫入資料表時的最大請求並發數,預設值為10。 |
bucket.count | integer | 否 | 3 | 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。 |
flush.Interval | integer | 否 | 10000 | 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。 |
連接器Runtime Error配置
配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
runtime.error.tolerance | string | 否 | none | 解析Kafka Record或者寫入表時產生錯誤的處理策略。取值範圍如下:
此屬性配置不區分大小寫。 |
runtime.error.mode | string | 否 | ignore | 解析Kafka Record或者寫入表時產生錯誤,對錯誤的Record的處理策略。取值範圍如下:
kafka模式下需要對Kafka Record的Header、Key和Value進行序列化轉換,tablestore模式下需要對Kafka Record的Key和Value進行序列化轉換,此處預設使用 |
runtime.error.bootstrap.servers | string | 否 | localhost:9092 | 用於記錄運行錯誤的Kafka叢集地址。 |
runtime.error.topic.name | string | 否 | errors | 用於記錄運行錯誤的Kafka Topic名稱。 |
runtime.error.table.name | string | 否 | errors | 用於記錄運行錯誤的Tablestore表名稱。 |
時序相關配置
配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
tablestore.timeseries.<tablename>.measurement | string | 是 | mName | 將JSON中的key值為指定值對應的value值作為_m_name欄位寫入對應時序表中。 如果設定此配置項為 配置項名稱中 |
tablestore.timeseries.<tablename>.dataSource | string | 是 | ds | 將JSON中的key值為ds對應的value值作為_data_source欄位寫入對應時序表中。 配置項名稱中 |
tablestore.timeseries.<tablename>.tags | list | 是 | region,level | 將JSON中key值為region和level所對應的value值作為tags欄位寫入對應時序表中。 配置項名稱中 |
tablestore.timeseries.<tablename>.time | string | 是 | timestamp | 將JSON中key值為timestamp對應的value值作為_time欄位寫入對應時序表中。 配置項名稱中 |
tablestore.timeseries.<tablename>.time.unit | string | 是 | MILLISECONDS |
配置項名稱中 |
tablestore.timeseries.<tablename>.field.name | list | 否 | cpu,io | 將JSON中key值為cpu和io的索引值對作為_field_name以及_field_name的值寫入對應時序表。 配置項名稱中 |
tablestore.timeseries.<tablename>.field.type | string | 否 | double,integer |
配置項名稱中 |
tablestore.timeseries.mapAll | boolean | 否 | false | 將輸入JSON中的非主鍵欄位和時間欄位都作為field儲存到時序表中。 當配置項取值為false時, |
tablestore.timeseries.toLowerCase | boolean | 否 | true | 將field中的key(輸入資料中非主鍵或者時間的key,或配置在 |
tablestore.timeseries.rowsPerBatch | integer | 否 | 50 | 寫入tablestore時,一次請求支援寫入的最大行數。最大值為200,預設值為200。 |
附錄:Kafka和Tablestore資料類型映射
Kafka和Tablestore資料類型映射關係請參見下表。
Kafka Schema Type | Tablestore資料類型 |
STRING | STRING |
INT8、INT16、INT32、INT64 | INTEGER |
FLOAT32、FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
附錄:刪除語義
只有同步資料到資料表時才支援此功能。
當同步資料到資料表且Kafka訊息記錄的value中存在空值時,根據寫入模式(insert.mode)和刪除模式(delete.mode)的不同設定,資料寫入到Table Store資料表的處理方式不同,詳細說明請參見下表。
insert.mode | put | update | ||||||
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
value為空白值 | 覆蓋寫 | 刪行 | 覆蓋寫 | 刪行 | 髒資料 | 刪行 | 髒資料 | 刪行 |
value所有欄位值均為空白值 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 髒資料 | 髒資料 | 刪列 | 刪列 |
value部分欄位值為空白值 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 忽略空值 | 忽略空值 | 刪列 | 刪列 |