啟動Tablestore Sink Connector時,您需要通過索引值映射向Kafka Connect進程傳遞參數。通過本文您可以結合配置樣本和配置參數說明瞭解Tablestore Sink Connector的相關配置。
配置樣本
當從Kafka同步資料到資料表或者時序表時配置項不同,且不同工作模式下相應設定檔的樣本不同。此處以同步資料到資料表中為例介紹配置樣本。同步資料到時序表的配置樣本中需要增加時序相關配置項。
- .properties格式設定檔的樣本,適用於standalone模式。
# 設定連接器名稱。 name=tablestore-sink # 指定連接器類。 connector.class=TableStoreSinkConnector # 設定最大任務數。 tasks.max=1 # 指定匯出資料的Kafka的Topic列表。 topics=test # 以下為Tablestore串連參數的配置。 # Tablestore執行個體的Endpoint。 tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # 填寫AccessKey ID和AccessKey Secret。 tablestore.access.key.id=xxx tablestore.access.key.secret=xxx # Tablestore執行個體名稱。 tablestore.instance.name=xxx # 以下為資料對應相關的配置。 # 指定Kafka Record的解析器。 # 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。 event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser # 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。 # topics.assign.tables配置的優先順序更高,如果配置了topics.assign.tables,則忽略table.name.format的配置。 # 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。 table.name.format=<topic> # 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。 # 如果預設,則採取table.name.format的配置。 # topics.assign.tables=test:test_kafka # 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。 # kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。 # record_key表示以Record Key中的欄位作為資料表的主鍵。 # record_value表示以Record Value中的欄位作為資料表的主鍵。 primarykey.mode=kafka # 定義匯入資料表的主鍵列名和資料類型。 # 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。 # 其中<tablename>為資料表名稱的預留位置。 # 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。 # 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。 # tablestore.test.primarykey.name=A,B # tablestore.test.primarykey.type=string,integer # 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。 # 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。 # 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。 # 其中<tablename>為資料表名稱的預留位置。 # tablestore.test.columns.whitelist.name=A,B # tablestore.test.columns.whitelist.type=string,integer # 以下為寫入Tablestore相關的配置。 # 指定寫入模式,可選值包括put和update,預設值為put。 # put表示覆蓋寫。 # update表示更新寫。 insert.mode=put # 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。 insert.order.enable=true # 是否自動建立目標表,預設值為false。 auto.create=false # 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。 # none表示不允許進行任何刪除。 # row表示允許刪除行。 # column表示允許刪除屬性列。 # row_and_column表示允許刪除行和屬性列。 delete.mode=none # 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。 buffer.size=1024 # 寫入資料表時的回調線程數,預設值為核心數+1。 # max.thread.count= # 寫入資料表時的最大請求並發數,預設值為10。 max.concurrency=10 # 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。 bucket.count=3 # 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。 flush.Interval=10000 # 以下為髒資料處理相關配置。 # 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以可通過以下配置進行處理。 # 指定容錯能力,可選值包括none和all,預設值為none。 # none表示任何錯誤都將導致Sink Task立即失敗。 # all表示跳過產生錯誤的Record,並記錄該Record。 runtime.error.tolerance=none # 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。 # ignore表示忽略所有錯誤。 # kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。 # tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。 runtime.error.mode=ignore # 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。 # runtime.error.bootstrap.servers=localhost:9092 # runtime.error.topic.name=errors # 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。 # runtime.error.table.name=errors
- .json格式設定檔的樣本,適用於distributed模式。
{ "name": "tablestore-sink", "config": { // 指定連接器類。 "connector.class":"TableStoreSinkConnector", // 設定最大任務數。 "tasks.max":"3", // 指定匯出資料的Kafka的Topic列表。 "topics":"test", // 以下為Tablestore串連參數的配置。 // Tablestore執行個體的Endpoint。 "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com", // 填寫AccessKey ID和AccessKey Secret。 "tablestore.access.key.id":"xxx", "tablestore.access.key.secret":"xxx", // Tablestore執行個體名稱。 "tablestore.instance.name":"xxx", // 以下為資料對應相關的配置。 // 指定Kafka Record的解析器。 // 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。 "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser", // 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。 // topics.assign.tables配置的優先順序更高。如果配置了topics.assign.tables,則忽略table.name.format的配置。 // 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。 "table.name.format":"<topic>", // 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。 // 如果預設,則採取table.name.format的配置。 // "topics.assign.tables":"test:test_kafka", // 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。 // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。 // record_key表示以Record Key中的欄位作為資料表的主鍵。 // record_value表示以Record Value中的欄位作為資料表的主鍵。 "primarykey.mode":"kafka", // 定義匯入資料表的主鍵列名和資料類型。 // 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。 // 其中<tablename>為資料表名稱的預留位置。 // 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。 // 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。 // "tablestore.test.primarykey.name":"A,B", // "tablestore.test.primarykey.type":"string,integer", // 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。 // 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。 // 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。 // 其中<tablename>為資料表名稱的預留位置。 // "tablestore.test.columns.whitelist.name":"A,B", // "tablestore.test.columns.whitelist.type":"string,integer", // 以下為寫入Tablestore相關的配置。 // 指定寫入模式,可選值包括put和update,預設值為put。 // put表示覆蓋寫。 // update表示更新寫。 "insert.mode":"put", // 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。 "insert.order.enable":"true", // 是否自動建立目標表,預設值為false。 "auto.create":"false", // 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。 // none表示不允許進行任何刪除。 // row表示允許刪除行。 // column表示允許刪除屬性列。 // row_and_column表示允許刪除行和屬性列。 "delete.mode":"none", // 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。 "buffer.size":"1024", // 寫入資料表時的回調線程數,預設值為核心數+1。 // "max.thread.count": // 寫入資料表時的最大請求並發數,預設值為10。 "max.concurrency":"10", // 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。 "bucket.count":"3", // 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。 "flush.Interval":"10000", // 以下為髒資料處理相關配置。 // 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以通過以下配置進行處理。 // 指定容錯能力,可選值包括none和all,預設值為none。 // none表示任何錯誤都將導致Sink Task立即失敗。 // all表示跳過產生錯誤的Record,並記錄該Record。 "runtime.error.tolerance":"none", // 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。 // ignore表示忽略所有錯誤。 // kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。 // tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。 "runtime.error.mode":"ignore" // 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。 // "runtime.error.bootstrap.servers":"localhost:9092", // "runtime.error.topic.name":"errors", // 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。 // "runtime.error.table.name":"errors", }
配置項說明
設定檔中的配置項說明請參見下表。其中時序相關配置項只有同步資料到時序表時才需要配置。
分類 | 配置項 | 類型 | 是否必選 | 樣本值 | 描述 |
---|---|---|---|---|---|
Kafka Connect常見配置 | name | string | 是 | tablestore-sink | 連接器(Connector)名稱。連接器名稱必須唯一。 |
connector.class | class | 是 | TableStoreSinkConnector | 連接器的Java類。
如果您要使用該連接器,請在connector.class配置項中指定Connector類的名稱,支援配置為Connector類的全名(com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector)或別名(TableStoreSinkConnector)。
|
|
tasks.max | integer | 是 | 3 | 連接器支援建立的最大任務數。
如果連接器無法達到此並行度層級,則可能會建立較少的任務。 |
|
key.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆蓋worker設定的預設key轉換器。 | |
value.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆蓋worker設定的預設value轉換器。 | |
topics | list | 是 | test | 連接器輸入的Kafka Topic列表,多個Topic之間以英文逗號(,)分隔。
您必須為連接器設定topics來控制連接器輸入的Topic。 |
|
連接器Connection配置 | tablestore.endpoint | string | 是 | https://xxx.xxx.ots.aliyuncs.com | Tablestore執行個體的服務地址。更多資訊,請參見服務地址。 |
tablestore.mode | string | 是 | timeseries | 根據資料同步到的表類型選擇模式。取值範圍如下:
|
|
tablestore.access.key.id | string | 是 | LTAn******************** | 登入帳號的AccessKey ID和AccessKey Secret,擷取方式請參見擷取AccessKey。 | |
tablestore.access.key.secret | string | 是 | zbnK************************** | ||
tablestore.auth.mode | string | 是 | aksk | 設定認證方式。取值範圍如下:
|
|
tablestore.instance.name | string | 是 | myotstest | Tablestore執行個體的名稱。 | |
連接器Data Mapping配置 | event.parse.class | class | 是 | DefaultEventParser | 訊息解析器的Java類,預設值為DefaultEventParser。解析器用於從Kafka Record中解析出資料表的主鍵列和屬性列。
注意 Tablestore對列值大小有限制。string類型和binary類型的主鍵列列值限制均為1 KB,屬性列列值限制均為2 MB。更多資訊,請參見通用限制。
如果資料類型轉換後列值超出對應限制,則將該Kafka Record作為髒資料處理。 如果使用預設的DefaultEventParser解析器,Kafka Record的Key或Value必須為Kafka Connect的Struct或Map類型。Struct中選擇的欄位必須為支援的資料類型,欄位會根據資料類型映射錶轉換為Tablestore資料類型寫入資料表。Map中的實值型別也必須為支援的資料類型,支援的資料類型同Struct,最終會被轉換為binary類型寫入資料表。Kafka和Tablestore的資料類型映射關係請參見附錄:Kafka和Tablestore資料類型映射。 如果Kafka Record為不相容的資料格式,則您可以通過實現com.aliyun.tablestore.kafka.connect.parsers.EventParser定義的介面來自訂解析器。 |
table.name.format | string | 否 | kafka_<topic> | 目標資料表名稱的格式字串,預設值為<topic>。字串中可包含<topic>作為原始Topic的預留位置。例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則映射到Tablestore的表名為kafka_test。
此配置項的優先順序低於topics.assign.tables配置項,如果配置了topics.assign.tables,則忽略table.name.format的配置。 |
|
topics.assign.tables | list | 是 | test:destTable | 指定topic與Tablestore表之間的映射關係,格式為<topic_1>:<tablename_1>,<topic_2>:<tablename_2> 。多個映射關係之間以英文逗號(,)分隔,例如test:destTable表示將Topic名為test的訊息記錄寫入資料表destTable中。
此配置項的優先順序高於table.name.format配置項,如果配置了topics.assign.tables,則忽略table.name.format的配置。 |
|
primarykey.mode | string | 否 | kafka | 資料表的主鍵模式。取值範圍如下:
請配合tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type使用。此配置項不區分大小寫。 |
|
tablestore.<tablename>.primarykey.name | list | 否 | A,B | 資料表的主鍵列名,其中<tablename>為資料表名稱的預留位置,包含1~4個主鍵列,以英文逗號(,)分隔。
主鍵模式不同時,主鍵列名的配置不同。
Tablestore資料表的主鍵列是有順序的,此屬性的配置應注意主鍵列名順序,例如PRIMARY KEY(A, B, C)與PRIMARY KEY(A, C, B)是不同的兩個主鍵結構。 |
|
tablestore.<tablename>.primarykey.type | list | 否 | string, integer | 資料表的主鍵列資料類型,其中<tablename>為資料表名稱的預留位置,包含1~4個主鍵列,以英文逗號(,)分隔,順序必須與tablestore.<tablename>.primarykey.name相對應。此屬性配置不區分大小寫。資料類型的可選值包括integer、string、binary和auto_increment。
主鍵模式不同時,主鍵資料類型的配置不同。
|
|
tablestore.<tablename>.columns.whitelist.name | list | 否 | A,B | 資料表的屬性列白名單中屬性列名稱,其中<tablename>為資料表名稱的預留位置,以英文逗號(,)分隔。
如果配置為空白,則使用Record Value中的所有欄位(Struct類型)或者鍵(Map類型)作為資料表的屬性列,否則用於過濾得到所需屬性列。 |
|
tablestore.<tablename>.columns.whitelist.type | list | 否 | string, integer | 資料表的屬性列白名單中屬性列資料類型,其中<tablename>為資料表名稱的預留位置,以英文逗號(,)分隔,順序必須與tablestore.<tablename>.columns.whitelist.name相對應。此屬性配置不區分大小寫。資料類型的可選值包括integer、string、binary、boolean和double。 | |
連接器Write配置 | insert.mode | string | 否 | put | 寫入模式。取值範圍如下:
此屬性配置不區分大小寫。 |
insert.order.enable | boolean | 否 | true | 寫入資料表時是否需要保序。取值範圍如下:
|
|
auto.create | boolean | 否 | false | 是否需要自動建立目標表,支援自動建立資料表或者時序表。取值範圍如下:
|
|
delete.mode | string | 否 | none | 刪除模式,僅當同步資料到資料表且主鍵模式為record_key時才有效。取值範圍如下:
此屬性配置不區分大小寫。 刪除操作與insert.mode的配置相關。更多資訊,請參見附錄:刪除語義。 |
|
buffer.size | integer | 否 | 1024 | 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須是2的指數。 | |
max.thread.count | integer | 否 | 3 | 寫入資料表時的回調線程數,預設值為CPU核心數+1 。
|
|
max.concurrency | integer | 否 | 10 | 寫入資料表時的最大請求並發數。 | |
bucket.count | integer | 否 | 3 | 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。 | |
flush.Interval | integer | 否 | 10000 | 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。 | |
連接器Runtime Error配置 | runtime.error.tolerance | string | 否 | none | 解析Kafka Record或者寫入表時產生錯誤的處理策略。取值範圍如下:
此屬性配置不區分大小寫。 |
runtime.error.mode | string | 否 | ignore | 解析Kafka Record或者寫入表時產生錯誤,對錯誤的Record的處理策略。取值範圍如下:
kafka模式下需要對Kafka Record的Header、Key和Value進行序列化轉換,tablestore模式下需要對Kafka Record的Key和Value進行序列化轉換,此處預設使用org.apache.kafka.connect.json.JsonConverter,並且配置schemas.enable為true,您可以通過JsonConverter還原序列化得到未經處理資料。關於Converter的更多資訊,請參見Kafka Converter。 |
|
runtime.error.bootstrap.servers | string | 否 | localhost:9092 | 用於記錄運行錯誤的Kafka叢集地址。 | |
runtime.error.topic.name | string | 否 | errors | 用於記錄運行錯誤的Kafka Topic名稱。 | |
runtime.error.table.name | string | 否 | errors | 用於記錄運行錯誤的Tablestore表名稱。 | |
時序相關配置項 | tablestore.timeseries.<tablename>.measurement | string | 是 | mName | 將JSON中的key值為指定值對應的value值作為_m_name欄位寫入對應時序表中。
如果設定此配置項為<topic>,則將kafka記錄的topic作為_m_name欄位寫入時序表中。 配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改,例如時序表名稱為test,則配置項名稱為tablestore.timeseries.test.measurement。 |
tablestore.timeseries.<tablename>.dataSource | string | 是 | ds | 將JSON中的key值為ds對應的value值作為_data_source欄位寫入對應時序表中。
配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。 |
|
tablestore.timeseries.<tablename>.tags | list | 是 | region,level | 將JSON中key值為region和level所對應的value值作為tags欄位寫入對應時序表中。
配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。 |
|
tablestore.timeseries.<tablename>.time | string | 是 | timestamp | 將JSON中key值為timestamp對應的value值作為_time欄位寫入對應時序表中。
配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。 |
|
tablestore.timeseries.<tablename>.time.unit | string | 是 | MILLISECONDS | tablestore.timeseries.<tablename>.time值的時間戳記單位。取值範圍為SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。
配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。 |
|
tablestore.timeseries.<tablename>.field.name | list | 否 | cpu,io | 將JSON中key值為cpu和io的索引值對作為_field_name以及_field_name的值寫入對應時序表。
配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。 |
|
tablestore.timeseries.<tablename>.field.type | string | 否 | double,integer | tablestore.timeseries.<tablename>.field.name中欄位對應的資料類型。取值範圍為double、integer、string、binary、boolean。多個資料類型之間用半形冒號(,)分隔。
配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。 |
|
tablestore.timeseries.mapAll | boolean | 否 | false | 將輸入JSON中的非主鍵欄位和時間欄位都作為field儲存到時序表中。
當配置項取值為false時,tablestore.timeseries.<tablename>.field.name和tablestore.timeseries.<tablename>.field.type必填。 |
|
tablestore.timeseries.toLowerCase | boolean | 否 | true | 將field中的key(輸入資料中非主鍵或者時間的key,或者配置在tablestore.timeseries.<tablename>.field.name中的key)轉為小寫寫入時序表。 | |
tablestore.timeseries.rowsPerBatch | integer | 否 | 50 | 寫入tablestore時,一次請求支援寫入的最大行數。最大值為200,預設值為200。 |
附錄:Kafka和Tablestore資料類型映射
Kafka和Tablestore資料類型映射關係請參見下表。
Kafka Schema Type | Tablestore資料類型 |
---|---|
STRING | STRING |
INT8、INT16、INT32、INT64 | INTEGER |
FLOAT32、FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
附錄:刪除語義
當同步資料到資料表且Kafka訊息記錄的value中存在空值時,根據寫入模式(insert.mode)和刪除模式(delete.mode)的不同設定,資料寫入到Tablestore資料表中的處理方式不同,詳細說明請參見下表。
insert.mode | put | update | ||||||
---|---|---|---|---|---|---|---|---|
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
value為空白值 | 覆蓋寫 | 刪行 | 覆蓋寫 | 刪行 | 髒資料 | 刪行 | 髒資料 | 刪行 |
value所有欄位值均為空白值 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 髒資料 | 髒資料 | 刪列 | 刪列 |
value部分欄位值為空白值 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 覆蓋寫 | 忽略空值 | 忽略空值 | 刪列 | 刪列 |